new change in architecture, more modular, allow to export article’s documents
This commit is contained in:
parent
8981bbfda6
commit
a7901c2325
@ -1,5 +1,6 @@
|
||||
# SPIP website to plain Markdown files converter, Copyright (C) 2023 Guilhem Fauré
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from os import makedirs
|
||||
from os.path import basename, splitext
|
||||
from re import Pattern, finditer, search
|
||||
@ -11,9 +12,9 @@ from peewee import (
|
||||
BigIntegerField,
|
||||
DateTimeField,
|
||||
DoesNotExist,
|
||||
IntegerField,
|
||||
)
|
||||
from slugify import slugify
|
||||
from typing_extensions import Self
|
||||
from yaml import dump
|
||||
|
||||
from spip2md.config import CFG
|
||||
@ -41,6 +42,9 @@ from spip2md.spip_models import (
|
||||
)
|
||||
from spip2md.style import BLUE, BOLD, GREEN, WARNING_STYLE, YELLOW, esc
|
||||
|
||||
# Define recursive list type
|
||||
RecursiveList = list["str | RecursiveList"]
|
||||
|
||||
# Define logger for this file’s logs
|
||||
LOG = logging.getLogger(CFG.logname + ".models")
|
||||
|
||||
@ -58,7 +62,8 @@ class SpipInterface:
|
||||
_status: bool
|
||||
# Additional fields
|
||||
_id: BigAutoField | int = 0 # same ID attribute name for all objects
|
||||
_depth: IntegerField | int # Equals `profondeur` for sections
|
||||
# _depth: IntegerField | int # Equals `profondeur` for sections
|
||||
_depth: int # Equals `profondeur` for sections
|
||||
_fileprefix: str # String to prepend to written files
|
||||
_parentdir: str # Path from output dir to direct parent
|
||||
_style: tuple[int, ...] # _styles to apply to some elements of printed output
|
||||
@ -242,6 +247,22 @@ class WritableObject(SpipInterface):
|
||||
self.style_print(output + str(message), indent=None)
|
||||
return output + str(message)
|
||||
|
||||
# Perform all the write steps of this object
|
||||
def write_all(
|
||||
self, parentdepth: int, parentdir: str, index: int, total: int
|
||||
) -> RecursiveList:
|
||||
LOG.debug(f"Writing {type(self).__name__} `{self._title}`")
|
||||
output: RecursiveList = []
|
||||
self._depth = parentdepth + 1
|
||||
self._parentdir = parentdir
|
||||
for m in self.begin_message(index, total):
|
||||
output.append(m)
|
||||
try:
|
||||
output[-1] += self.end_message(self.write())
|
||||
except Exception as err:
|
||||
output[-1] += self.end_message(err)
|
||||
return output
|
||||
|
||||
|
||||
class Document(WritableObject, NormalizedDocument):
|
||||
class Meta:
|
||||
@ -294,7 +315,7 @@ class RedactionalObject(WritableObject):
|
||||
# Function specific logger
|
||||
log = logging.getLogger(CFG.logname + ".models.translate_multi")
|
||||
text: str = getattr(self, spipattr) # Get text of attribute
|
||||
log.debug(f"Begin translation of `{self._title}` `{spipattr}`")
|
||||
log.debug(f"Translating <multi> blocks of `{self._title}` `{spipattr}`")
|
||||
# Handle <multi> multi language blocks
|
||||
translations: dict[str, str] = {} # Dict such as lang: text
|
||||
original_translation: str = text
|
||||
@ -318,7 +339,7 @@ class RedactionalObject(WritableObject):
|
||||
f"Add {lang.group(1)} translation of `{self._title}`: {trans}"
|
||||
)
|
||||
translations[lang.group(1)] = lang.group(2)
|
||||
# Iterate over translations, creating translated sub-WritableObjects if needed
|
||||
# Iterate over translations, adding translated attributes to translations dict
|
||||
for lang, translation in translations.items():
|
||||
if lang in CFG.export_languages:
|
||||
if lang not in self._translations:
|
||||
@ -341,6 +362,7 @@ class RedactionalObject(WritableObject):
|
||||
mapping: tuple,
|
||||
obj_type: type[NormalizedSection | NormalizedArticle | NormalizedDocument],
|
||||
) -> str:
|
||||
LOG.debug(f"Convert {mapping}s links of `{self._title}` as {obj_type}")
|
||||
for id_link, path_link in mapping:
|
||||
# print(f"Looking for links like {id_link}")
|
||||
for match in id_link.finditer(text):
|
||||
@ -370,50 +392,39 @@ class RedactionalObject(WritableObject):
|
||||
return self._fileprefix + "." + self.lang + "." + CFG.export_filetype
|
||||
|
||||
def convert_title(self) -> str:
|
||||
LOG.debug(f"Convert title of currently untitled {type(self).__name__}")
|
||||
if hasattr(self, "_title"):
|
||||
LOG.debug(
|
||||
"convert_title() call"
|
||||
+ f" but {type(self).__name__} {self._title}._title is already set"
|
||||
)
|
||||
LOG.debug(f"{type(self).__name__} {self._title}._title is already set")
|
||||
return self._title
|
||||
if self.titre is None:
|
||||
LOG.debug(f"convert_title() call but {type(self).__name__}.title is None")
|
||||
LOG.debug(f"{type(self).__name__}.title is None")
|
||||
return ""
|
||||
if len(self.titre) == 0:
|
||||
LOG.debug(f"convert_title() call but {type(self).__name__}.title is empty")
|
||||
LOG.debug(f"{type(self).__name__}.title is empty")
|
||||
return ""
|
||||
self._title = self.titre.strip() # Define temporary title to use in functions
|
||||
self._title = self.translate_multi("titre", "_title")
|
||||
LOG.debug(f"`{self._title}` current translations: {self._translations}")
|
||||
return self.convert_field(self._title)
|
||||
|
||||
def convert_text(self) -> str:
|
||||
LOG.debug(f"Convert text of `{self._title}`")
|
||||
if hasattr(self, "_text"):
|
||||
LOG.debug(
|
||||
"convert_text() call"
|
||||
+ f" but {type(self).__name__} {self._title}._text is already set"
|
||||
)
|
||||
LOG.debug(f"{type(self).__name__} {self._title}._text is already set")
|
||||
return self._text
|
||||
if self.texte is None:
|
||||
LOG.debug(
|
||||
"convert_text() call"
|
||||
+ f" but {type(self).__name__} {self._title}.text is None"
|
||||
)
|
||||
LOG.debug(f"{type(self).__name__} {self._title}.text is None")
|
||||
return ""
|
||||
if len(self.texte) == 0:
|
||||
LOG.debug(
|
||||
"convert_text() call"
|
||||
+ f" but {type(self).__name__} {self._title}.text is empty"
|
||||
)
|
||||
LOG.debug(f"{type(self).__name__} {self._title}.text is empty")
|
||||
return ""
|
||||
text: str = self.translate_multi("texte", "_title")
|
||||
LOG.debug(f"`{self._title}` current translations: {self._translations}")
|
||||
text = self.replace_links(text, DOCUMENT_LINK, Document)
|
||||
text = self.replace_links(text, ARTICLE_LINK, Article)
|
||||
text = self.replace_links(text, SECTION_LINK, Section)
|
||||
return self.convert_field(text)
|
||||
|
||||
def convert_extra(self) -> str:
|
||||
LOG.debug(f"Convert extra of `{self._title}`")
|
||||
if hasattr(self, "_extra"):
|
||||
return self._extra
|
||||
if self.extra is None:
|
||||
@ -426,9 +437,8 @@ class RedactionalObject(WritableObject):
|
||||
return self.convert_field(text)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._translations: dict[str, dict[str, str]] = {} # prevent inherithance
|
||||
# for lang in CFG.export_languages:
|
||||
# self._translations[lang] = {} # Initialize keys for export langugaes
|
||||
# Initialise translation dict as empty, in the form lang: attr: value
|
||||
self._translations: dict[str, dict[str, str]] = {}
|
||||
super().__init__(*args, **kwargs)
|
||||
# Initialize converted fields beginning with underscore
|
||||
self._choosen_language = self.langue_choisie == "oui"
|
||||
@ -436,7 +446,8 @@ class RedactionalObject(WritableObject):
|
||||
self._extra = self.convert_extra()
|
||||
|
||||
# Get related documents
|
||||
def documents(self) -> list[Document]:
|
||||
def documents(self) -> tuple[Document]:
|
||||
LOG.debug(f"Initialize documents of `{self._title}`")
|
||||
documents = (
|
||||
Document.select()
|
||||
.join(
|
||||
@ -449,6 +460,7 @@ class RedactionalObject(WritableObject):
|
||||
|
||||
# Get the YAML frontmatter string
|
||||
def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str:
|
||||
# LOG.debug(f"Write frontmatter of `{self._title}`")
|
||||
meta: dict[str, Any] = {
|
||||
"lang": self.lang,
|
||||
"translationKey": self.id_trad,
|
||||
@ -468,6 +480,7 @@ class RedactionalObject(WritableObject):
|
||||
|
||||
# Get file text content
|
||||
def content(self) -> str:
|
||||
# LOG.debug(f"Write content of `{self._title}`")
|
||||
# Start the content with frontmatter
|
||||
body: str = "---\n" + self.frontmatter() + "---"
|
||||
# Add the title as a Markdown h1
|
||||
@ -491,15 +504,44 @@ class RedactionalObject(WritableObject):
|
||||
f.write(self.content())
|
||||
return self.dest_path()
|
||||
|
||||
# Output information about file that was just exported
|
||||
def end_message(self, message: str | Exception) -> str:
|
||||
output: str = super().end_message(message)
|
||||
# Write eventual translations of self
|
||||
# Output translated self objects
|
||||
def translations(self) -> list[Self]:
|
||||
translations: list[Self] = []
|
||||
LOG.debug(f"`{self._title}` contains translations: `{self._translations}`")
|
||||
for lang, translation in self._translations.items():
|
||||
LOG.debug(f"Writing {lang} translation of section `{self._title}`")
|
||||
self.style_print(f"{lang}: " + translation["_title"])
|
||||
# translated.end_message(translated.write())
|
||||
for lang, translated_attrs in self._translations.items():
|
||||
LOG.debug(f"Instanciating {lang} translation of section `{self._title}`")
|
||||
# Copy itself (with every attribute) as a base for the translated object
|
||||
translation: Self = deepcopy(self)
|
||||
# Replace the lang & the translations attributes of the translated object
|
||||
translation.lang = lang
|
||||
translation._translations = {}
|
||||
# Replace the translated attributes of the translated object
|
||||
for attr, value in translated_attrs.values():
|
||||
setattr(translation, attr, value)
|
||||
return translations
|
||||
|
||||
# Get the children of this object
|
||||
def children(self) -> tuple[tuple[WritableObject], ...]:
|
||||
return (self.documents(),)
|
||||
|
||||
# Write all the children of this object
|
||||
def write_children(self) -> RecursiveList:
|
||||
LOG.debug(f"Writing children of {type(self).__name__} `{self._title}`")
|
||||
output: RecursiveList = []
|
||||
for children in self.children():
|
||||
total = len(children)
|
||||
for i, obj in enumerate(children):
|
||||
output.append(
|
||||
obj.write_all(self._depth, self.dest_directory(), i, total)
|
||||
)
|
||||
return output
|
||||
|
||||
# Perform all the write steps of this object
|
||||
def write_all(
|
||||
self, parentdepth: int, parentdir: str, index: int, total: int
|
||||
) -> RecursiveList:
|
||||
output: RecursiveList = super().write_all(parentdepth, parentdir, index, total)
|
||||
output.append(self.write_children())
|
||||
return output
|
||||
|
||||
|
||||
@ -547,6 +589,7 @@ class Article(RedactionalObject, NormalizedArticle):
|
||||
return body
|
||||
|
||||
def authors(self) -> list[SpipAuteurs]:
|
||||
LOG.debug(f"Initialize authors of `{self._title}`")
|
||||
return (
|
||||
SpipAuteurs.select()
|
||||
.join(
|
||||
@ -573,7 +616,8 @@ class Section(RedactionalObject, NormalizedSection):
|
||||
return super().frontmatter(meta)
|
||||
|
||||
# Get articles of this section
|
||||
def articles(self) -> list[Article]:
|
||||
def articles(self) -> tuple[Article]:
|
||||
LOG.debug(f"Initialize articles of `{self._title}`")
|
||||
return (
|
||||
Article.select()
|
||||
.where((Article.id_rubrique == self._id) & (Article.lang == self.lang))
|
||||
@ -581,52 +625,14 @@ class Section(RedactionalObject, NormalizedSection):
|
||||
# .limit(limit)
|
||||
)
|
||||
|
||||
def write_tree(self, index: int, total: int) -> list[str | list[Any]]:
|
||||
# Define logger for this method’s logs
|
||||
log = logging.getLogger(CFG.logname + ".models.write_tree")
|
||||
# Define dictionary output to diplay
|
||||
output: list[str | list[Any]] = []
|
||||
# Print & add to output the message before the section write
|
||||
for m in self.begin_message(index, total):
|
||||
output.append(m)
|
||||
# Get this section’s articles & documents
|
||||
articles: list[Article] = self.articles()
|
||||
documents: list[Document] = self.documents()
|
||||
# Write this section & print the finish message of the section writing
|
||||
output[-1] += self.end_message(self.write())
|
||||
|
||||
# Write this section’s articles and documents
|
||||
def write_loop(objects: list[Article] | list[Document]) -> list[str]:
|
||||
output: list[str] = []
|
||||
total = len(objects)
|
||||
for i, obj in enumerate(objects):
|
||||
obj._depth = self._depth + 1
|
||||
obj._parentdir = self.dest_directory()
|
||||
for m in obj.begin_message(i, total):
|
||||
output.append(m)
|
||||
try:
|
||||
output[-1] += obj.end_message(obj.write())
|
||||
except Exception as err:
|
||||
output[-1] += obj.end_message(err)
|
||||
return output
|
||||
|
||||
log.debug(f"Export section {index} `{self._title}` articles")
|
||||
output.append(write_loop(articles))
|
||||
log.debug(f"Export section {index} `{self._title}` documents")
|
||||
output.append(write_loop(documents))
|
||||
|
||||
# Get all child section of this section
|
||||
log.debug(f"Initialize subsections of `{self._title}`")
|
||||
child_sections: tuple[Section, ...] = (
|
||||
# Get subsections of this section
|
||||
def sections(self) -> tuple[Self]:
|
||||
LOG.debug(f"Initialize subsections of `{self._title}`")
|
||||
return (
|
||||
Section.select()
|
||||
.where(Section.id_parent == self._id)
|
||||
.order_by(Section.date.desc())
|
||||
)
|
||||
nb: int = len(child_sections)
|
||||
# Do the same for subsections (write their entire subtree)
|
||||
for i, s in enumerate(child_sections):
|
||||
log.debug(f"Begin exporting section {i}/{nb} `{s._title}`")
|
||||
s._parentdir = self.dest_directory()
|
||||
output.append(s.write_tree(i, nb))
|
||||
log.debug(f"Finished exporting section {i}/{nb} `{s._title}`")
|
||||
return output
|
||||
|
||||
def children(self) -> tuple[tuple[WritableObject], ...]:
|
||||
return (self.articles(),) + super().children() + (self.sections(),)
|
||||
|
@ -5,20 +5,21 @@ from os.path import isfile
|
||||
from shutil import rmtree
|
||||
|
||||
from spip2md.config import CFG
|
||||
from spip2md.extended_models import Section
|
||||
from spip2md.extended_models import RecursiveList, Section
|
||||
from spip2md.spip_models import DB
|
||||
from spip2md.style import BOLD, esc
|
||||
|
||||
# Define parent ID of level 0 sections
|
||||
ROOTID = 0
|
||||
# Define loggers for this file
|
||||
ROOTLOG = logging.getLogger(CFG.logname + ".root")
|
||||
LIBLOG = logging.getLogger(CFG.logname + ".lib")
|
||||
|
||||
|
||||
# Write the level 0 sections and their subtrees
|
||||
def write_root_tree(parent_dir: str) -> list[str | list]:
|
||||
# Define logger for this method’s logs
|
||||
log = logging.getLogger(CFG.logname + ".write_root_tree")
|
||||
def write_root(parent_dir: str) -> RecursiveList:
|
||||
# Define dictionary output to diplay
|
||||
output: list[str | list] = []
|
||||
output: RecursiveList = []
|
||||
# Print starting message
|
||||
print(
|
||||
f"""\
|
||||
@ -28,7 +29,7 @@ into the directory {esc(BOLD)}{parent_dir}{esc()}, \
|
||||
as database user {esc(BOLD)}{CFG.db_user}{esc()}
|
||||
"""
|
||||
)
|
||||
log.debug("Initialize root sections")
|
||||
ROOTLOG.debug("Initialize root sections")
|
||||
# Get all sections of parentID ROOTID
|
||||
child_sections: tuple[Section, ...] = (
|
||||
Section.select()
|
||||
@ -38,11 +39,10 @@ as database user {esc(BOLD)}{CFG.db_user}{esc()}
|
||||
nb: int = len(child_sections)
|
||||
# Write each subsections (write their entire subtree)
|
||||
for i, s in enumerate(child_sections):
|
||||
log.debug(f"Begin exporting section {i}/{nb} {s._title}")
|
||||
s._parentdir = CFG.output_dir
|
||||
output.append(s.write_tree(i, nb))
|
||||
ROOTLOG.debug(f"Begin exporting section {i}/{nb} {s._title}")
|
||||
output.append(s.write_all(-1, CFG.output_dir, i, nb))
|
||||
print() # Break line between level 0 sections in output
|
||||
log.debug(f"Finished exporting section {i}/{nb} {s._title}")
|
||||
ROOTLOG.debug(f"Finished exporting section {i}/{nb} {s._title}")
|
||||
return output
|
||||
|
||||
|
||||
@ -63,7 +63,7 @@ def summarize(
|
||||
if depth == -1:
|
||||
print(
|
||||
f"""\
|
||||
Exported a total of {esc(BOLD)}{leaves}{esc()} Markdown files, \
|
||||
Exported a total of {esc(BOLD)}{leaves}{esc()} files, \
|
||||
stored into {esc(BOLD)}{branches}{esc()} directories"""
|
||||
)
|
||||
# Warn about issued warnings in log file
|
||||
@ -115,6 +115,6 @@ def cli():
|
||||
DB.connect()
|
||||
|
||||
# Write everything while printing the output human-readably
|
||||
summarize(write_root_tree(CFG.output_dir))
|
||||
summarize(write_root(CFG.output_dir))
|
||||
|
||||
DB.close() # Close the connection with the database
|
||||
|
Loading…
Reference in New Issue
Block a user