refactor: more modular, extendad classes don’t modify constructors. started to properly translate <multi> blocks, but still bugs

This commit is contained in:
Guilhem Fauré 2023-05-31 15:11:38 +02:00
parent 35829285bf
commit fe71b8fea3
3 changed files with 355 additions and 314 deletions

View File

@ -2,11 +2,11 @@
import logging import logging
from os import makedirs from os import makedirs
from os.path import basename, splitext from os.path import basename, splitext
from re import finditer, search from re import Pattern, finditer, search
from shutil import copyfile from shutil import copyfile
from typing import Any, Match, Optional from typing import Any, Optional
from peewee import BigAutoField, DateTimeField, DoesNotExist, ModelSelect from peewee import DateTimeField, DoesNotExist
from slugify import slugify from slugify import slugify
from yaml import dump from yaml import dump
@ -15,7 +15,7 @@ from spip2md.regexmaps import (
ARTICLE_LINK, ARTICLE_LINK,
BLOAT, BLOAT,
DOCUMENT_LINK, DOCUMENT_LINK,
HTMLTAG, HTMLTAGS,
ISO_UTF, ISO_UTF,
MULTILANG_BLOCK, MULTILANG_BLOCK,
MULTILANGS, MULTILANGS,
@ -36,56 +36,150 @@ from spip2md.spip_models import (
from spip2md.style import BLUE, BOLD, GREEN, WARNING_STYLE, YELLOW, esc from spip2md.style import BLUE, BOLD, GREEN, WARNING_STYLE, YELLOW, esc
class SpipWritable: class SpipNormalized:
# From SPIP database
texte: str texte: str
lang: str lang: str
titre: str titre: str
descriptif: str descriptif: str
profondeur: int statut: str
style: tuple[int, ...] # profondeur: int
# Custom
obj_id: int = 0 # database ID of object, but same attribute name for all objects
depth: int # Equals `profondeur` for sections
fileprefix: str # String to prepend to written files
parentdir: str # Path from output dir to direct parent
style: tuple[int, ...] # Styles to apply to some elements of printed output
# Returns the first detected language & instantiate a new object for the nexts def status(self) -> bool:
return self.statut == "publie"
def dest_directory(self, prepend: str = "", append: str = "") -> str:
raise NotImplementedError(
f"Subclasses need to implement directory(), params:{prepend}{append}"
)
def dest_filename(self, prepend: str = "", append: str = "") -> str:
raise NotImplementedError(
f"Subclasses need to implement dest_filename(), params:{prepend}{append}"
)
def dest_path(self) -> str:
return self.dest_directory() + self.dest_filename()
class NormalizedSection(SpipNormalized, SpipRubriques):
fileprefix: str = "_index"
style = (BOLD, GREEN) # Sections accent color is green
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.obj_id = self.id_rubrique
self.depth = self.profondeur
class NormalizedArticle(SpipNormalized, SpipArticles):
fileprefix: str = "index"
style = (BOLD, YELLOW) # Articles accent color is yellow
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.obj_id = self.id_article
class NormalizedDocument(SpipNormalized, SpipDocuments):
fileprefix: str = ""
style = (BOLD, BLUE) # Documents accent color is blue
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.obj_id = self.id_document
class WritableObject(SpipNormalized):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set the lang attribute of self to the first one detected
# Then, if theres other langs remaining, instanciate a new object with same
# input text but stripped of first lang
# Then returns the text of first detected language
# WARNING currently only supports ONE <multi> block per text
def translate_multi(self, text: str) -> str: def translate_multi(self, text: str) -> str:
# Create a lang: text dict # Memoize self title
translations: dict[str, str] = {"default": text} title: str = self.title()
# Keep the first lang in default translation, then # First translation found, with eventual preexisting text
# for each langs of <multi> blocks, add its text to the corresponding dict key current_translation: str = text
for block in MULTILANG_BLOCK.finditer(translations["default"]): next_text: str = text # <multi> block(s) without first lang
for i, lang in enumerate(MULTILANGS.finditer(block.group(1))): block = MULTILANG_BLOCK.search(text)
if i == 0: if block is not None:
translations["default"] = translations["default"].replace( lang = MULTILANGS.search(block.group(1))
block.group(), lang.group(2) if lang is not None:
) # set current lang to found first lang
if lang.group(1) in translations: self.lang = lang.group(1)
translations[lang.group(1)] += lang.group(2) # replace multi blocks of current text with first lang
else: current_translation = current_translation.replace(
translations[lang.group(1)] = lang.group(2) block.group(), lang.group(2)
# Logs the translation )
title: str = self.titre.strip() # Log the translation
translated: str = lang.group(2)[:50].strip() translated: str = lang.group(2)[:60].strip()
logging.info(f"{lang.group(1)} translation of {title}: {translated}") logging.info(
# Instantiate & write translated f"{title} lang becomes {self.lang}, with text {translated}"
# for lang, translation in translations.items(): )
# if lang == "non existant lang": # remove first lang from next_text
# new_lang = self.__init__( next_text = next_text.replace(lang.group(), "")
# texte=translation, else:
# lang=lang, # Log the unexpected situation
# titre=self.titre, logging.warning(
# descriptif=self.descriptif, f"Unexpected empty <multi> block in {title}, deleting it anyway"
# profondeur=self.profondeur, )
# style=self.style, # Do the same for the next text
# ) next_block = MULTILANG_BLOCK.search(next_text)
# Return the translations dict if next_block is not None:
# return translations next_lang = MULTILANGS.search(next_block.group(1))
if next_lang is not None:
# If there is a remaining lang
# Instantiate & write a similar object with modified text & lang
logging.info(f"Instanciate {next_lang.group(1)} translation of {title}")
next_lang_obj: WritableObject = type(self)(
texte=next_text,
lang=next_lang.group(1),
titre=self.titre,
descriptif=self.descriptif,
)
next_lang_obj.style = self.style
next_lang_obj.depth = self.depth
next_lang_obj.parentdir = self.dest_directory()
# WARNING the output will appear in terminal & logfile but wont return
next_lang_obj.begin_message(0, 0) # WARNING wrong counter
try:
next_lang_obj.end_message(next_lang_obj.write())
except Exception as err:
next_lang_obj.end_message(err)
# Return the first detected language # Return the first detected language
return translations["default"] return current_translation
# Apply different mappings to a text field, like SPIP to Markdown or encoding # Apply a mapping from regex maps
def convert(self, text: str, clean_html: bool = True) -> str: @staticmethod
if len(text) == 0: def apply_mapping(text: str, mapping: tuple) -> str:
# print("Empty text") if type(mapping) == tuple and len(mapping) > 0:
return "" if type(mapping[0]) == tuple and len(mapping[0]) > 0:
if type(mapping[0][0]) == Pattern:
for old, new in mapping:
text = old.sub(new, text)
else:
for old, new in mapping:
text = text.replace(old, new)
elif type(mapping[0]) == Pattern:
for old in mapping:
text = old.sub("", text)
else:
for old in mapping:
text = old.replace("", text)
return text
# Warn about unknown chars & replace them with config defined replacement
def warn_unknown(self, text: str, unknown_mapping: tuple) -> str:
# Return unknown char surrounded by context_length chars # Return unknown char surrounded by context_length chars
def unknown_chars_context(text: str, char: str, context_len: int = 24) -> str: def unknown_chars_context(text: str, char: str, context_len: int = 24) -> str:
context: str = r".{0," + str(context_len) + r"}" context: str = r".{0," + str(context_len) + r"}"
@ -98,22 +192,7 @@ class SpipWritable:
else: else:
return char return char
# Convert SPIP syntax to Markdown for char in unknown_mapping:
for spip, markdown in SPIP_MARKDOWN:
text = spip.sub(markdown, text)
# Remove useless text
for bloat in BLOAT:
text = bloat.sub("", text)
# Convert broken ISO encoding to UTF
for iso, utf in ISO_UTF:
text = text.replace(iso, utf)
# Handle <multi> multi language blocks
text = self.translate_multi(text)
# Delete remaining HTML tags in body WARNING
if clean_html:
text = HTMLTAG.sub("", text)
# Warn about unknown chars
for char in UNKNOWN_ISO:
lastend: int = 0 lastend: int = 0
for match in finditer("(" + char + ")+", text): for match in finditer("(" + char + ")+", text):
context: str = unknown_chars_context(text[lastend:], char) context: str = unknown_chars_context(text[lastend:], char)
@ -128,21 +207,32 @@ class SpipWritable:
lastend = match.end() lastend = match.end()
return text return text
def __init__(self, *args, **kwargs): # Apply needed methods on text fields
super().__init__(*args, **kwargs) def convert_field(self, field: Optional[str], clean_html: bool = True) -> str:
if self.titre is not None: if field is None:
# print(f"Convert titre from {type(self)} {self.titre}") return ""
self.titre: str = self.convert(self.titre) if len(field) == 0:
if self.descriptif is not None: return ""
# print(f"Convert descriptif from {type(self)} {self.titre}") # Convert SPIP syntax to Markdown
self.descriptif: str = self.convert(self.descriptif) field = self.apply_mapping(field, SPIP_MARKDOWN)
# Remove useless text
field = self.apply_mapping(field, BLOAT)
# Convert broken ISO encoding to UTF
field = self.apply_mapping(field, ISO_UTF)
if clean_html:
# Delete remaining HTML tags in body WARNING
field = self.apply_mapping(field, HTMLTAGS)
# Warn about unknown chars
field = self.warn_unknown(field, UNKNOWN_ISO)
return field.strip() # Strip whitespaces around text
def filename(self, date: bool = False) -> str: def title(self) -> str:
raise NotImplementedError( return self.convert_field(self.titre)
f"Subclasses need to implement filename(), date: {date}"
)
# Print one or more string(s) in which special elements are stylized def description(self) -> str:
return self.convert_field(self.descriptif)
# Print one or more line(s) in which special elements are stylized
def style_print(self, string: str, indent: bool = True, end: str = "\n") -> str: def style_print(self, string: str, indent: bool = True, end: str = "\n") -> str:
stylized: str = string stylized: str = string
for o in SPECIAL_OUTPUT: for o in SPECIAL_OUTPUT:
@ -150,38 +240,35 @@ class SpipWritable:
for w in WARNING_OUTPUT: for w in WARNING_OUTPUT:
stylized = w.sub(esc(*WARNING_STYLE) + r"\1" + esc(), stylized) stylized = w.sub(esc(*WARNING_STYLE) + r"\1" + esc(), stylized)
if indent: if indent:
stylized = " " * self.profondeur + stylized stylized = " " * self.depth + stylized
print(stylized, end=end) print(stylized, end=end)
# Return the stylized string # Return the stylized string
return stylized return stylized
# Print the message telling what is going to be done
def begin_message(self, index: int, limit: int, step: int = 100) -> list[str]: def begin_message(self, index: int, limit: int, step: int = 100) -> list[str]:
output: list[str] = [] output: list[str] = []
# Output the remaining number of objects to export every step object # Output the remaining number of objects to export every step object
if index % step == 0: if index % step == 0:
output.append(f"Exporting {limit-index}") output.append(f"Exporting {limit-index}")
output[-1] += f" level {self.profondeur}" output[-1] += f" level {self.depth}"
s: str = "s" if limit - index > 1 else "" s: str = "s" if limit - index > 1 else ""
output[-1] += f" {type(self).__name__}{s}" output[-1] += f" {type(self).__name__}{s}"
# Print the output as the program goes # Print the output as the program goes
self.style_print(output[-1]) self.style_print(output[-1])
# Output the counter & title of the object being exported # Output the counter & title of the object being exported
output.append(f"{index + 1}. ") output.append(f"{index + 1}. ")
if self.titre is None: if len(self.title()) == 0:
output[-1] += "MISSING NAME"
elif len(self.titre) == 0:
output[-1] += "EMPTY NAME" output[-1] += "EMPTY NAME"
else: else:
output[-1] += self.titre.strip(" ") output[-1] += self.title()
# Print the output as the program goes # Print the output as the program goes
self.style_print(output[-1], end="") self.style_print(output[-1], end="")
return output return output
# Write object to output destination # Write object to output destination
def write(self, parent_dir: str) -> str: def write(self) -> str:
raise NotImplementedError( raise NotImplementedError("Subclasses need to implement write()")
f"Subclasses need to implement write(), export dir: {parent_dir}"
)
# Output information about file that was just exported # Output information about file that was just exported
def end_message(self, message: str | Exception) -> str: def end_message(self, message: str | Exception) -> str:
@ -193,141 +280,113 @@ class SpipWritable:
return output + str(message) return output + str(message)
class Document(SpipWritable, SpipDocuments): class Document(WritableObject, NormalizedDocument):
# Documents accent color is blue
style = (BOLD, BLUE)
class Meta: class Meta:
table_name: str = "spip_documents" table_name: str = "spip_documents"
def __init__(self, *args, **kwargs): # Get source name of this file
super().__init__(*args, **kwargs) def src_path(self, data_dir: Optional[str] = None) -> str:
self.statut: str = "false" if self.statut == "publie" else "true" if data_dir is None:
return CFG.data_dir + self.fichier
return data_dir + self.fichier
# Get slugified name of this file # Get directory of this object
def filename(self, date: bool = False) -> str: def dest_directory(self, prepend: str = "", append: str = "/") -> str:
name_type: tuple[str, str] = splitext(basename(str(self.fichier))) return self.parentdir + prepend + slugify(self.titre, max_length=100) + append
return (
slugify( # Get destination slugified name of this file
(self.date_publication + "-" if date else "") + name_type[0], def dest_filename(self, prepend: str = "", append: str = "") -> str:
max_length=100, name, filetype = splitext(basename(str(self.fichier)))
) return slugify(prepend + name, max_length=100) + append + filetype
+ name_type[1]
)
# Write document to output destination # Write document to output destination
def write(self, parent_dir: str) -> str: def write(self) -> str:
# Define file source and destination
src: str = CFG.data_dir + self.fichier
dest: str = parent_dir + self.filename()
# Copy the document from its SPIP location to the new location # Copy the document from its SPIP location to the new location
copyfile(src, dest) return copyfile(self.src_path(), self.dest_path())
return dest
class SpipObject(SpipWritable): class RedactionalObject(WritableObject):
object_id: BigAutoField
id_trad: int id_trad: int
id_rubrique: int
date: DateTimeField date: DateTimeField
maj: str maj: str
id_secteur: int id_secteur: int
descriptif: str
extra: str extra: str
langue_choisie: str
# Custom
prefix: str = "index"
def convert(self, text: str, clean_html: bool = True) -> str: def replace_links(
if len(text) == 0: self,
# print("Empty text") text: str,
return "" mapping: tuple,
obj_type: type[NormalizedSection | NormalizedArticle | NormalizedDocument],
def found_replace(path_link: str, doc: Any, text: str, match: Match) -> str: ) -> str:
# TODO get relative path for id_link, path_link in mapping:
if len(match.group(1)) > 0:
repl: str = path_link.format(match.group(1), doc.filename())
else:
repl: str = path_link.format(doc.titre, doc.filename())
logging.info(f"Translating link to {repl}")
return text.replace(match.group(), repl)
def not_found_warn(path_link: str, text: str, match: Match) -> str:
logging.warn(f"No object for link {match.group()} in {self.titre}")
return text.replace(match.group(), path_link.format("", "NOT FOUND"), 1)
for id_link, path_link in DOCUMENT_LINK:
# print(f"Looking for links like {id_link}") # print(f"Looking for links like {id_link}")
for match in id_link.finditer(text): for match in id_link.finditer(text):
logging.info(f"Found document link {match.group()} in {self.titre}") logging.info(f"Found document link {match.group()} in {self.titre}")
try: try:
doc: Document = Document.get(Document.id_document == match.group(2)) o: obj_type = obj_type.get(obj_type.obj_id == match.group(2))
text = found_replace(path_link, doc, text, match) # TODO get relative path
if len(match.group(1)) > 0:
repl: str = path_link.format(match.group(1), o.dest_path())
else:
repl: str = path_link.format(o.titre, o.dest_path())
logging.info(f"Translating link to {repl}")
text = text.replace(match.group(), repl)
except DoesNotExist: except DoesNotExist:
text = not_found_warn(path_link, text, match) logging.warn(f"No object for link {match.group()} in {self.titre}")
for id_link, path_link in ARTICLE_LINK: text = text.replace(
# print(f"Looking for links like {id_link}") match.group(), path_link.format("", "NOT FOUND"), 1
for match in id_link.finditer(text):
logging.info(f"Found article link {match.group()} in {self.titre}")
try:
art: Article = Article.get(Article.id_article == match.group(2))
text = found_replace(path_link, art, text, match)
except DoesNotExist:
text = not_found_warn(path_link, text, match)
for id_link, path_link in SECTION_LINK:
# print(f"Looking for links like {id_link}")
for match in id_link.finditer(text):
logging.info(f"Found section link {match.group()} in {self.titre}")
try:
section: Rubrique = Rubrique.get(
Rubrique.id_rubrique == match.group(2)
) )
text = found_replace(path_link, section, text, match) return text
except DoesNotExist:
text = not_found_warn(path_link, text, match)
return super().convert(text, clean_html)
def __init__(self, *args, **kwargs): def text(self) -> str:
super().__init__(*args, **kwargs) if self.texte is None:
# Common fields that need conversions return ""
if self.texte is not None: if len(self.texte) == 0:
# print(f"Convert texte from {type(self)} {self.titre}") return ""
# print(f"First 500 chars: {self.texte[:500]}") text: str = self.texte
self.texte: str = self.convert(self.texte) # Handle <multi> multi language blocks
if self.extra is not None: text = self.translate_multi(text)
# print(f"Convert extra from {type(self)} {self.titre}") # Replace ID based SPIP links with relative path links
# print(f"First 500 chars: {self.extra[:500]}") text = self.replace_links(text, DOCUMENT_LINK, Document)
self.extra: str = self.convert(self.extra) text = self.replace_links(text, ARTICLE_LINK, Article)
self.statut: str = "false" if self.statut == "publie" else "true" text = self.replace_links(text, SECTION_LINK, Section)
self.langue_choisie: str = "false" if self.langue_choisie == "oui" else "true" return self.convert_field(text)
# Define file prefix (needs to be redefined for sections)
self.prefix = "index" def ext(self) -> str:
if self.extra is None:
return ""
if len(self.extra) == 0:
return ""
text: str = self.extra
text = self.replace_links(text, ARTICLE_LINK, Article)
text = self.replace_links(text, SECTION_LINK, Section)
return self.convert_field(text)
def choosen_language(self) -> bool:
return self.langue_choisie == "oui"
# Get related documents # Get related documents
def documents(self) -> ModelSelect: def documents(self) -> list[Document]:
documents = ( documents = (
Document.select() Document.select()
.join( .join(
SpipDocumentsLiens, SpipDocumentsLiens,
on=(Document.id_document == SpipDocumentsLiens.id_document), on=(Document.id_document == SpipDocumentsLiens.id_document),
) )
.where(SpipDocumentsLiens.id_objet == self.object_id) .where(SpipDocumentsLiens.id_objet == self.obj_id)
) )
return documents return documents
# Get related articles
def articles(self) -> ModelSelect:
return (
Article.select()
.where(Article.id_rubrique == self.object_id)
.order_by(Article.date.desc())
# .limit(limit)
)
# Get slugified directory of this object # Get slugified directory of this object
def dir_slug(self, include_date: bool = False, end_slash: bool = True) -> str: def dest_directory(self, prepend: str = "", append: str = "/") -> str:
date: str = self.date + "-" if include_date else "" return self.parentdir + prepend + slugify(self.titre, max_length=100) + append
slash: str = "/" if end_slash else ""
return slugify(date + self.titre, max_length=100) + slash
# Get filename of this object # Get filename of this object
def filename(self) -> str: def dest_filename(self) -> str:
return self.prefix + "." + self.lang + "." + CFG.export_filetype return self.prefix + "." + self.lang + "." + CFG.export_filetype
# Get the YAML frontmatter string # Get the YAML frontmatter string
@ -342,7 +401,7 @@ class SpipObject(SpipWritable):
"description": self.descriptif, "description": self.descriptif,
# Debugging # Debugging
"spip_id_secteur": self.id_secteur, "spip_id_secteur": self.id_secteur,
"spip_id": self.object_id, "spip_id": self.obj_id,
} }
if append is not None: if append is not None:
return dump(meta | append, allow_unicode=True) return dump(meta | append, allow_unicode=True)
@ -354,52 +413,48 @@ class SpipObject(SpipWritable):
# Start the content with frontmatter # Start the content with frontmatter
body: str = "---\n" + self.frontmatter() + "---" body: str = "---\n" + self.frontmatter() + "---"
# Add the title as a Markdown h1 # Add the title as a Markdown h1
if self.titre is not None and len(self.titre) > 0 and CFG.prepend_h1: if len(self.title()) > 0 and CFG.prepend_h1:
body += "\n\n# " + self.titre body += "\n\n# " + self.title()
# If there is a text, add the text preceded by two line breaks # If there is a text, add the text preceded by two line breaks
if self.texte is not None and len(self.texte) > 0: if len(self.text()) > 0:
# Remove remaining HTML after & append to body # Remove remaining HTML after & append to body
body += "\n\n" + self.texte body += "\n\n" + self.text()
# Same with an "extra" section # Same with an "extra" section
if self.extra is not None and len(self.extra) > 0: if len(self.ext()) > 0:
body += "\n\n# EXTRA\n\n" + self.extra body += "\n\n# EXTRA\n\n" + self.ext()
return body return body
# Write object to output destination # Write object to output destination
def write(self, parent_dir: str) -> str: def write(self) -> str:
# Define actual export directory
directory: str = parent_dir + self.dir_slug()
# Make a directory for this object if there isnt # Make a directory for this object if there isnt
makedirs(directory, exist_ok=True) makedirs(self.dest_directory(), exist_ok=True)
# Define actual export path
path: str = directory + self.filename()
# Write the content of this object into a file named as self.filename() # Write the content of this object into a file named as self.filename()
with open(path, "w") as f: with open(self.dest_path(), "w") as f:
f.write(self.content()) f.write(self.content())
return path return self.dest_path()
class Article(SpipObject, SpipArticles): class Article(RedactionalObject, NormalizedArticle):
# Articles accent color is yellow
style = (BOLD, YELLOW)
class Meta: class Meta:
table_name: str = "spip_articles" table_name: str = "spip_articles"
def __init__(self, *args, **kwargs): def surtitle(self) -> str:
super().__init__(*args, **kwargs) return self.convert_field(str(self.surtitre))
# More conversions needed for articles
if self.surtitre is not None: def subtitle(self) -> str:
self.surtitre: str = self.convert(self.surtitre) return self.convert_field(str(self.soustitre))
if self.soustitre is not None:
self.soustitre: str = self.convert(self.soustitre) def caption(self) -> str:
if self.chapo is not None: return self.convert_field(str(self.chapo))
self.chapo: str = self.convert(self.chapo)
if self.ps is not None: def postscriptum(self) -> str:
self.ps: str = self.convert(self.ps) return self.convert_field(str(self.ps))
self.accepter_forum: str = "true" if self.accepter_forum == "oui" else "false"
# ID def ublog(self) -> str:
self.object_id = self.id_article return self.convert_field(str(self.microblog))
def accept_forum(self) -> bool:
return self.accepter_forum == "oui"
def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str: def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str:
meta: dict[str, Any] = { meta: dict[str, Any] = {
@ -420,14 +475,14 @@ class Article(SpipObject, SpipArticles):
def content(self) -> str: def content(self) -> str:
body: str = super().content() body: str = super().content()
# If there is a caption, add the caption followed by a hr # If there is a caption, add the caption followed by a hr
if len(str(self.chapo)) > 0: if len(self.caption()) > 0:
body += "\n\n" + self.chapo + "\n\n***" body += "\n\n" + self.caption() + "\n\n***"
# PS # PS
if len(str(self.ps)) > 0: if len(self.postscriptum()) > 0:
body += "\n\n# POST-SCRIPTUM\n\n" + self.ps body += "\n\n# POST-SCRIPTUM\n\n" + self.postscriptum()
# Microblog # Microblog
if len(str(self.microblog)) > 0: if len(self.ublog()) > 0:
body += "\n\n# MICROBLOGGING\n\n" + self.microblog body += "\n\n# MICROBLOGGING\n\n" + self.ublog()
return body return body
def authors(self) -> list[SpipAuteurs]: def authors(self) -> list[SpipAuteurs]:
@ -437,24 +492,14 @@ class Article(SpipObject, SpipArticles):
SpipAuteursLiens, SpipAuteursLiens,
on=(SpipAuteurs.id_auteur == SpipAuteursLiens.id_auteur), on=(SpipAuteurs.id_auteur == SpipAuteursLiens.id_auteur),
) )
.where(SpipAuteursLiens.id_objet == self.id_article) .where(SpipAuteursLiens.id_objet == self.obj_id)
) )
class Rubrique(SpipObject, SpipRubriques): class Section(RedactionalObject, NormalizedSection):
# Sections accent color is green
style = (BOLD, GREEN)
class Meta: class Meta:
table_name: str = "spip_rubriques" table_name: str = "spip_rubriques"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# ID
self.object_id = self.id_rubrique
# File prefix
self.prefix = "_index"
def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str: def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str:
meta: dict[str, Any] = { meta: dict[str, Any] = {
# Debugging # Debugging
@ -466,31 +511,38 @@ class Rubrique(SpipObject, SpipRubriques):
else: else:
return super().frontmatter(meta) return super().frontmatter(meta)
def write_tree( # Get articles of this section
self, parent_dir: str, index: int, total: int def articles(self) -> list[Article]:
) -> list[str | list[Any]]: return (
Article.select()
.where(Article.id_rubrique == self.obj_id)
.order_by(Article.date.desc())
# .limit(limit)
)
def write_tree(self, index: int, total: int) -> list[str | list[Any]]:
# Define dictionary output to diplay # Define dictionary output to diplay
output: list[str | list[Any]] = [] output: list[str | list[Any]] = []
# Print & add to output the message before the section write
for m in self.begin_message(index, total): for m in self.begin_message(index, total):
output.append(m) output.append(m)
# Get this sections articles documents # Get this sections articles & documents
articles = self.articles() articles: list[Article] = self.articles()
documents = self.documents() documents: list[Document] = self.documents()
# Write this section # Write this section & print the finish message of the section writing
output[-1] += self.end_message(self.write(parent_dir)) output[-1] += self.end_message(self.write())
# Redefine parent_dir for subtree elements
parent_dir = parent_dir + self.dir_slug()
# Write this sections articles and documents # Write this sections articles and documents
def write_loop(objects: ModelSelect) -> list[str]: def write_loop(objects: list[Article] | list[Document]) -> list[str]:
output: list[str] = [] output: list[str] = []
total = len(objects) total = len(objects)
for i, obj in enumerate(objects): for i, obj in enumerate(objects):
obj.profondeur = self.profondeur + 1 obj.depth = self.depth + 1
obj.parentdir = self.dest_directory()
for m in obj.begin_message(i, total): for m in obj.begin_message(i, total):
output.append(m) output.append(m)
try: try:
output[-1] += obj.end_message(obj.write(parent_dir)) output[-1] += obj.end_message(obj.write())
except Exception as err: except Exception as err:
output[-1] += obj.end_message(err) output[-1] += obj.end_message(err)
return output return output
@ -498,51 +550,15 @@ class Rubrique(SpipObject, SpipRubriques):
output.append(write_loop(articles)) output.append(write_loop(articles))
output.append(write_loop(documents)) output.append(write_loop(documents))
# Get all child section of self # Get all child section of this section
child_sections: ModelSelect = ( child_sections: list[Section] = (
Rubrique.select() Section.select()
.where(Rubrique.id_parent == self.id_rubrique) .where(Section.id_parent == self.obj_id)
.order_by(Rubrique.date.desc()) .order_by(Section.date.desc())
) )
nb: int = len(child_sections) nb: int = len(child_sections)
# Do the same for subsections (write their entire subtree) # Do the same for subsections (write their entire subtree)
for i, s in enumerate(child_sections): for i, s in enumerate(child_sections):
output.append(s.write_tree(parent_dir, i, nb)) s.parentdir = self.dest_directory()
return output output.append(s.write_tree(i, nb))
class RootRubrique(Rubrique):
class Meta:
table_name: str = "spip_rubriques"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 0 ID
self.id_rubrique = 0
# self.object_id = 0
self.profondeur = 0
def write_tree(self, parent_dir: str) -> list[str | list]:
# Define dictionary output to diplay
output: list[str | list] = []
# Print starting message
print(
f"""\
Begin exporting {esc(BOLD)}{CFG.db}@{CFG.db_host}{esc()} SPIP database to plain \
Markdown+YAML files,
into the directory {esc(BOLD)}{parent_dir}{esc()}, \
as database user {esc(BOLD)}{CFG.db_user}{esc()}
"""
)
# Get all child section of self
child_sections: ModelSelect = (
Rubrique.select()
.where(Rubrique.id_parent == self.id_rubrique)
.order_by(Rubrique.date.desc())
)
nb: int = len(child_sections)
# Do the same for subsections (write their entire subtree)
for i, s in enumerate(child_sections):
output.append(s.write_tree(parent_dir, i, nb))
print() # Break line for level 1
return output return output

View File

@ -4,14 +4,47 @@ from os import makedirs, remove
from os.path import isfile from os.path import isfile
from shutil import rmtree from shutil import rmtree
from peewee import ModelSelect
from spip2md.config import CFG from spip2md.config import CFG
from spip2md.extended_models import RootRubrique from spip2md.extended_models import Section
from spip2md.spip_models import DB from spip2md.spip_models import DB
from spip2md.style import BOLD, esc from spip2md.style import BOLD, esc
# Define parent ID of level 0 sections
ROOTID = 0
# Count on outputted tree
def count_output( # Write the level 0 sections and their subtrees
def write_root_tree(parent_dir: str) -> list[str | list]:
# Define dictionary output to diplay
output: list[str | list] = []
# Print starting message
print(
f"""\
Begin exporting {esc(BOLD)}{CFG.db}@{CFG.db_host}{esc()} SPIP database to plain \
Markdown+YAML files,
into the directory {esc(BOLD)}{parent_dir}{esc()}, \
as database user {esc(BOLD)}{CFG.db_user}{esc()}
"""
)
# Get all sections of parentID ROOTID
child_sections: list[Section] = (
Section.select()
.where(Section.id_parent == ROOTID)
.order_by(Section.date.desc())
)
nb: int = len(child_sections)
# Write each subsections (write their entire subtree)
for i, s in enumerate(child_sections):
s.parentdir = CFG.output_dir
output.append(s.write_tree(i, nb))
print() # Break line between level 0 sections in output
return output
# Count on outputted tree & print results if finished
def summarize(
tree: list[str | list[str | list]], tree: list[str | list[str | list]],
indent: str = " ", indent: str = " ",
depth: int = -1, depth: int = -1,
@ -20,11 +53,19 @@ def count_output(
) -> tuple[int, int]: ) -> tuple[int, int]:
for sub in tree: for sub in tree:
if type(sub) == list: if type(sub) == list:
branches, leaves = count_output( branches, leaves = summarize(sub, indent, depth + 1, branches + 1, leaves)
sub, indent, depth + 1, branches + 1, leaves
)
elif type(sub) == str: elif type(sub) == str:
leaves += 1 leaves += 1
# End message only if its the root one
if depth == -1:
print(
f"""\
Exported a total of {esc(BOLD)}{leaves}{esc()} Markdown files, \
stored into {esc(BOLD)}{branches}{esc()} directories"""
)
# Warn about issued warnings in log file
if isfile(CFG.logfile):
print(f"\nWarnings and informations in {esc(BOLD)}{CFG.logfile}{esc()}")
return (branches, leaves) return (branches, leaves)
@ -40,18 +81,6 @@ def init_logging() -> None:
) )
# Summary message at the end of the program
def summary(branches: int, leaves: int) -> None:
print(
f"""\
Exported a total of {esc(BOLD)}{leaves}{esc()} Markdown files, \
stored into {esc(BOLD)}{branches}{esc()} directories"""
)
# Warn about issued warnings in log file
if isfile(CFG.logfile):
print(f"\nTake a look at warnings and infos in {esc(BOLD)}{CFG.logfile}{esc()}")
# Clear the output dir if needed & create a new # Clear the output dir if needed & create a new
def clear_output() -> None: def clear_output() -> None:
if CFG.clear_output: if CFG.clear_output:
@ -59,10 +88,6 @@ def clear_output() -> None:
makedirs(CFG.output_dir, exist_ok=True) makedirs(CFG.output_dir, exist_ok=True)
# Define the virtual id=0 section
ROOT = RootRubrique()
# To execute when script is directly executed as a script # To execute when script is directly executed as a script
def cli(): def cli():
# def cli(*addargv: str): # def cli(*addargv: str):
@ -84,6 +109,6 @@ def cli():
DB.connect() DB.connect()
# Write everything while printing the output human-readably # Write everything while printing the output human-readably
summary(*count_output(ROOT.write_tree(CFG.output_dir))) summarize(write_root_tree(CFG.output_dir))
DB.close() # Close the connection with the database DB.close() # Close the connection with the database

View File

@ -177,7 +177,7 @@ BLOAT = (
) )
# Matches against every HTML tag # Matches against every HTML tag
HTMLTAG = compile(r"<\/?.*?>\s*", S) HTMLTAGS = (compile(r"<\/?.*?>\s*", S),)
# ((Broken ISO 8859-1 encoding, Proper UTF equivalent encoding), …) # ((Broken ISO 8859-1 encoding, Proper UTF equivalent encoding), …)