custom debugger name, private vars in constructors for converted fields, various fixes

This commit is contained in:
Guilhem Fauré 2023-06-01 15:11:39 +02:00
parent 123ae5945b
commit 003b21fb3b
3 changed files with 145 additions and 118 deletions

View File

@ -28,6 +28,7 @@ class Configuration:
clear_output: bool = False # Remove eventual output dir before running
clear_log: bool = False # Clear log before every run instead of appending to
logfile: str = "spip2md.log" # File where logs will be written, relative to wd
logname: str = "spip2md" # Labelling of logs
loglevel: str = "WARNING" # Minimum criticity of logs written in logfile
remove_html: bool = True # Should spip2md remove every HTML tags
max_articles_export: int = 1000 # TODO reimplement

View File

@ -43,24 +43,28 @@ from spip2md.spip_models import (
)
from spip2md.style import BLUE, BOLD, GREEN, WARNING_STYLE, YELLOW, esc
# Define logger for this files logs
LOG = logging.getLogger(CFG.logname + ".models")
class SpipNormalized:
class SpipInterface:
# From SPIP database
texte: str
lang: str
titre: str
descriptif: str
statut: str
# profondeur: int
# Custom
obj_id: BigAutoField | int = 0 # same ID attribute name for all objects
depth: IntegerField | int # Equals `profondeur` for sections
fileprefix: str # String to prepend to written files
parentdir: str # Path from output dir to direct parent
style: tuple[int, ...] # Styles to apply to some elements of printed output
def status(self) -> bool:
return self.statut == "publie"
profondeur: int
# Converted fields
_title: str
_status: bool
# Additional fields
_id: BigAutoField | int = 0 # same ID attribute name for all objects
_depth: IntegerField | int # Equals `profondeur` for sections
_fileprefix: str # String to prepend to written files
_parentdir: str # Path from output dir to direct parent
_style: tuple[int, ...] # _styles to apply to some elements of printed output
# memo: dict[str, str] = {} # Memoïze values
def dest_directory(self, prepend: str = "", append: str = "") -> str:
raise NotImplementedError(
@ -76,60 +80,59 @@ class SpipNormalized:
return self.dest_directory() + self.dest_filename()
class NormalizedSection(SpipNormalized, SpipRubriques):
fileprefix: str = "_index"
style = (BOLD, GREEN) # Sections accent color is green
class NormalizedSection(SpipInterface, SpipRubriques):
_fileprefix: str = "_index"
_style = (BOLD, GREEN) # Sections accent color is green
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.obj_id = self.id_rubrique
self.depth = self.profondeur
self._id = self.id_rubrique
self._depth = self.profondeur
class NormalizedArticle(SpipNormalized, SpipArticles):
fileprefix: str = "index"
style = (BOLD, YELLOW) # Articles accent color is yellow
class NormalizedArticle(SpipInterface, SpipArticles):
_fileprefix: str = "index"
_style = (BOLD, YELLOW) # Articles accent color is yellow
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.obj_id = self.id_article
self._id = self.id_article
class NormalizedDocument(SpipNormalized, SpipDocuments):
fileprefix: str = ""
style = (BOLD, BLUE) # Documents accent color is blue
class NormalizedDocument(SpipInterface, SpipDocuments):
_fileprefix: str = ""
_style = (BOLD, BLUE) # Documents accent color is blue
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.obj_id = self.id_document
self._id = self.id_document
class WritableObject(SpipNormalized):
class WritableObject(SpipInterface):
translations: dict[str, Self]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Detect every language present in <multi> blocks of text
# For each language in <multi> block, output a new object with the translation
def translate_multi(self, text: str) -> dict[str, str]:
# title: str = self.title() # Memoize self title # WARNING recurses
title: str = self.titre.strip() # Memoize self title # WARNING recurses
translations: dict[str, str] = {self.lang: text} # Dict such as lang: text
# for each langs of <multi> blocks, add its text to the corresponding dict key
for block in MULTILANG_BLOCK.finditer(text):
for lang in MULTILANGS.finditer(block.group(1)):
# To log the translation
trans: str = lang.group(2)[:50].strip()
if lang.group(1) == self.lang:
LOG.debug(f"Keeping {lang.group(1)} of {self._title}: {trans}")
translations[self.lang] = translations[self.lang].replace(
block.group(), lang.group(2)
)
elif lang.group(1) in translations:
LOG.debug(f"Appending to {lang.group(1)} of {self._title}: {trans}")
translations[lang.group(1)] += lang.group(2)
else:
LOG.debug(
f"Adding {lang.group(1)} translation of {self._title}: {trans}"
)
translations[lang.group(1)] = lang.group(2)
# Logs the translation
translated: str = lang.group(2)[:50].strip()
logging.info(f"{title} {lang.group(1)} translation: {translated}")
return translations
# Apply a mapping from regex maps
@ -169,11 +172,11 @@ class WritableObject(SpipNormalized):
lastend: int = 0
for match in finditer("(" + char + ")+", text):
context: str = unknown_chars_context(text[lastend:], char)
logging.warn(
f"Unknown char {char} found in {self.titre[:40]} at: {context}"
LOG.warn(
f"Unknown char {char} found in {self._title[:40]} at: {context}"
)
if CFG.unknown_char_replacement is not None:
logging.warn(
LOG.warn(
f"Replacing {match.group()} with {CFG.unknown_char_replacement}"
)
text = text.replace(match.group(), CFG.unknown_char_replacement, 1)
@ -199,21 +202,25 @@ class WritableObject(SpipNormalized):
field = self.warn_unknown(field, UNKNOWN_ISO)
return field.strip() # Strip whitespaces around text
def title(self) -> str:
def convert_title(self) -> str:
return self.convert_field(self.titre)
def description(self) -> str:
return self.convert_field(self.descriptif)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize converted fields beginning with underscore
self._title: str = self.convert_title()
self._description: str = self.convert_field(self.descriptif)
self._status = self.statut == "publie"
# Print one or more line(s) in which special elements are stylized
def style_print(self, string: str, indent: bool = True, end: str = "\n") -> str:
def _style_print(self, string: str, indent: bool = True, end: str = "\n") -> str:
stylized: str = string
for o in SPECIAL_OUTPUT:
stylized = o.sub(esc(*self.style) + r"\1" + esc(), stylized)
stylized = o.sub(esc(*self._style) + r"\1" + esc(), stylized)
for w in WARNING_OUTPUT:
stylized = w.sub(esc(*WARNING_STYLE) + r"\1" + esc(), stylized)
if indent:
stylized = " " * self.depth + stylized
stylized = " " * self._depth + stylized
print(stylized, end=end)
# Return the stylized string
return stylized
@ -226,20 +233,20 @@ class WritableObject(SpipNormalized):
# Output the remaining number of objects to export every step object
if index % step == 0:
output.append(f"Exporting {limit-index}")
output[-1] += f" level {self.depth}"
output[-1] += f" level {self._depth}"
s: str = "s" if limit - index > 1 else ""
output[-1] += f" {type(self).__name__}{s}"
# Print the output as the program goes
self.style_print(output[-1])
self._style_print(output[-1])
# Output the counter & title of the object being exported
output.append(f"{index + 1}. ")
output.append(prepend)
if len(self.title()) == 0:
if len(self._title) == 0:
output[-1] += "EMPTY NAME"
else:
output[-1] += self.title()
output[-1] += self._title
# Print the output as the program goes
self.style_print(output[-1], end="")
self._style_print(output[-1], end="")
return output
# Write object to output destination
@ -252,7 +259,7 @@ class WritableObject(SpipNormalized):
if type(message) is not str:
output += "ERROR "
# Print the output as the program goes
self.style_print(output + str(message), indent=False)
self._style_print(output + str(message), indent=False)
return output + str(message)
@ -267,14 +274,21 @@ class Document(WritableObject, NormalizedDocument):
return data_dir + self.fichier
# Get directory of this object
def dest_directory(self, prepend: str = "", append: str = "/") -> str:
return self.parentdir + prepend + slugify(self.titre, max_length=100) + append
def dest_directory(self, prepend: str = "", append: str = "") -> str:
return self._parentdir + prepend + slugify(self._title, max_length=100) + append
# Get destination slugified name of this file
def dest_filename(self, prepend: str = "", append: str = "") -> str:
name, filetype = splitext(basename(str(self.fichier)))
return slugify(prepend + name, max_length=100) + append + filetype
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize converted fields beginning with underscore
# self._src_path = self.src_path()
# self._dest_directory = self.dest_directory()
# self._dest_filename = self.dest_filename()
# Write document to output destination
def write(self) -> str:
# Copy the document from its SPIP location to the new location
@ -302,50 +316,71 @@ class RedactionalObject(WritableObject):
for id_link, path_link in mapping:
# print(f"Looking for links like {id_link}")
for match in id_link.finditer(text):
logging.info(f"Found document link {match.group()} in {self.titre}")
LOG.debug(f"Found document link {match.group()} in {self._title}")
try:
o: obj_type = obj_type.get(obj_type.obj_id == match.group(2))
o: obj_type = obj_type.get(obj_type._id == match.group(2))
# TODO get relative path
if len(match.group(1)) > 0:
repl: str = path_link.format(match.group(1), o.dest_path())
else:
repl: str = path_link.format(o.titre, o.dest_path())
logging.info(f"Translating link to {repl}")
repl: str = path_link.format(o._title, o.dest_path())
LOG.debug(f"Translating link to {repl}")
text = text.replace(match.group(), repl)
except DoesNotExist:
logging.warn(f"No object for link {match.group()} in {self.titre}")
LOG.warn(f"No object for link {match.group()} in {self._title}")
text = text.replace(
match.group(), path_link.format("", "NOT FOUND"), 1
)
return text
def title(self) -> str:
if self.texte is None:
# Get slugified directory of this object
def dest_directory(self, prepend: str = "", append: str = "/") -> str:
return self._parentdir + prepend + slugify(self._title, max_length=100) + append
# Get filename of this object
def dest_filename(self) -> str:
return self.prefix + "." + self.lang + "." + CFG.export_filetype
def convert_title(self) -> str:
# Use memoïzed value in priority
if hasattr(self, "_title"):
return self._title
if self.titre is None:
return ""
if len(self.texte) == 0:
if len(self.titre) == 0:
return ""
text: str = self.texte
text: str = self.titre
self._title = self.titre.strip() # Define temporary title for translate
# Handle <multi> multi language blocks
for lang, translation in self.translate_multi(text):
LOG.debug(f"Begin translation of {text.strip()} title")
for lang, translation in self.translate_multi(text).items():
if lang == self.lang:
text = translation
else:
LOG.debug(f"Not setting current article title to {lang} {translation}")
continue
self.translations: dict[str, Self] = {}
self.translations[lang] = deepcopy(self)
self.translations[lang].titre = translation
LOG.debug(f"Setting current article title to {text.strip()}")
return self.convert_field(text)
def text(self) -> str:
def convert_text(self) -> str:
# Use memoïzed value in priority
if hasattr(self, "_text"):
return self._text
if self.texte is None:
return ""
if len(self.texte) == 0:
return ""
text: str = self.texte
# Handle <multi> multi language blocks
LOG.debug(f"Begin translation of {text.strip()} text")
for lang, translation in self.translate_multi(text):
if lang == self.lang:
text = translation
else:
continue
self.translations: dict[str, Self] = {}
self.translations[lang] = deepcopy(self)
self.translations[lang].texte = translation
@ -355,7 +390,10 @@ class RedactionalObject(WritableObject):
text = self.replace_links(text, SECTION_LINK, Section)
return self.convert_field(text)
def ext(self) -> str:
def convert_extra(self) -> str:
# Use memoïzed value in priority
if hasattr(self, "_extra"):
return self._extra
if self.extra is None:
return ""
if len(self.extra) == 0:
@ -365,8 +403,12 @@ class RedactionalObject(WritableObject):
text = self.replace_links(text, SECTION_LINK, Section)
return self.convert_field(text)
def choosen_language(self) -> bool:
return self.langue_choisie == "oui"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize converted fields beginning with underscore
self._choosen_language = self.langue_choisie == "oui"
self._text = self.convert_text()
self._extra = self.convert_extra()
# Get related documents
def documents(self) -> list[Document]:
@ -376,31 +418,23 @@ class RedactionalObject(WritableObject):
SpipDocumentsLiens,
on=(Document.id_document == SpipDocumentsLiens.id_document),
)
.where(SpipDocumentsLiens.id_objet == self.obj_id)
.where(SpipDocumentsLiens.id_objet == self._id)
)
return documents
# Get slugified directory of this object
def dest_directory(self, prepend: str = "", append: str = "/") -> str:
return self.parentdir + prepend + slugify(self.titre, max_length=100) + append
# Get filename of this object
def dest_filename(self) -> str:
return self.prefix + "." + self.lang + "." + CFG.export_filetype
# Get the YAML frontmatter string
def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str:
meta: dict[str, Any] = {
"lang": self.lang,
"translationKey": self.id_trad,
"title": self.titre,
"title": self._title,
"publishDate": self.date,
"lastmod": self.maj,
"draft": self.statut,
"description": self.descriptif,
"draft": self._status,
"description": self._description,
# Debugging
"spip_id_secteur": self.id_secteur,
"spip_id": self.obj_id,
"spip_id": self._id,
}
if append is not None:
return dump(meta | append, allow_unicode=True)
@ -412,15 +446,15 @@ class RedactionalObject(WritableObject):
# Start the content with frontmatter
body: str = "---\n" + self.frontmatter() + "---"
# Add the title as a Markdown h1
if len(self.title()) > 0 and CFG.prepend_h1:
body += "\n\n# " + self.title()
if len(self._title) > 0 and CFG.prepend_h1:
body += "\n\n# " + self._title
# If there is a text, add the text preceded by two line breaks
if len(self.text()) > 0:
if len(self._text) > 0:
# Remove remaining HTML after & append to body
body += "\n\n" + self.text()
body += "\n\n" + self._text
# Same with an "extra" section
if len(self.ext()) > 0:
body += "\n\n# EXTRA\n\n" + self.ext()
if len(self._extra) > 0:
body += "\n\n# EXTRA\n\n" + self._extra
return body
# Write object to output destination
@ -437,23 +471,15 @@ class Article(RedactionalObject, NormalizedArticle):
class Meta:
table_name: str = "spip_articles"
def surtitle(self) -> str:
return self.convert_field(str(self.surtitre))
def subtitle(self) -> str:
return self.convert_field(str(self.soustitre))
def caption(self) -> str:
return self.convert_field(str(self.chapo))
def postscriptum(self) -> str:
return self.convert_field(str(self.ps))
def ublog(self) -> str:
return self.convert_field(str(self.microblog))
def accept_forum(self) -> bool:
return self.accepter_forum == "oui"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize converted fields beginning with underscore
self._accept_forum = self.accepter_forum == "oui"
self._surtitle = self.convert_field(str(self.surtitre))
self._subtitle = self.convert_field(str(self.soustitre))
self._caption = self.convert_field(str(self.chapo))
self._ps = self.convert_field(str(self.ps))
self._microblog = self.convert_field(str(self.microblog))
def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str:
meta: dict[str, Any] = {
@ -474,14 +500,14 @@ class Article(RedactionalObject, NormalizedArticle):
def content(self) -> str:
body: str = super().content()
# If there is a caption, add the caption followed by a hr
if len(self.caption()) > 0:
body += "\n\n" + self.caption() + "\n\n***"
if len(self._caption) > 0:
body += "\n\n" + self._caption + "\n\n***"
# PS
if len(self.postscriptum()) > 0:
body += "\n\n# POST-SCRIPTUM\n\n" + self.postscriptum()
if len(self._ps) > 0:
body += "\n\n# POST-SCRIPTUM\n\n" + self._ps
# Microblog
if len(self.ublog()) > 0:
body += "\n\n# MICROBLOGGING\n\n" + self.ublog()
if len(self._microblog) > 0:
body += "\n\n# MICROBLOGGING\n\n" + self._microblog
return body
def authors(self) -> list[SpipAuteurs]:
@ -491,7 +517,7 @@ class Article(RedactionalObject, NormalizedArticle):
SpipAuteursLiens,
on=(SpipAuteurs.id_auteur == SpipAuteursLiens.id_auteur),
)
.where(SpipAuteursLiens.id_objet == self.obj_id)
.where(SpipAuteursLiens.id_objet == self._id)
)
@ -514,7 +540,7 @@ class Section(RedactionalObject, NormalizedSection):
def articles(self) -> list[Article]:
return (
Article.select()
.where((Article.id_rubrique == self.obj_id) & (Article.lang == self.lang))
.where((Article.id_rubrique == self._id) & (Article.lang == self.lang))
.order_by(Article.date.desc())
# .limit(limit)
)
@ -536,8 +562,8 @@ class Section(RedactionalObject, NormalizedSection):
output: list[str] = []
total = len(objects)
for i, obj in enumerate(objects):
obj.depth = self.depth + 1
obj.parentdir = self.dest_directory()
obj._depth = self._depth + 1
obj._parentdir = self.dest_directory()
for m in obj.begin_message(i, total):
output.append(m)
try:
@ -552,12 +578,12 @@ class Section(RedactionalObject, NormalizedSection):
# Get all child section of this section
child_sections: tuple[Section, ...] = (
Section.select()
.where(Section.id_parent == self.obj_id)
.where(Section.id_parent == self._id)
.order_by(Section.date.desc())
)
nb: int = len(child_sections)
# Do the same for subsections (write their entire subtree)
for i, s in enumerate(child_sections):
s.parentdir = self.dest_directory()
s._parentdir = self.dest_directory()
output.append(s.write_tree(i, nb))
return output

View File

@ -35,7 +35,7 @@ as database user {esc(BOLD)}{CFG.db_user}{esc()}
nb: int = len(child_sections)
# Write each subsections (write their entire subtree)
for i, s in enumerate(child_sections):
s.parentdir = CFG.output_dir
s._parentdir = CFG.output_dir
output.append(s.write_tree(i, nb))
print() # Break line between level 0 sections in output
return output
@ -68,16 +68,16 @@ stored into {esc(BOLD)}{branches}{esc()} directories"""
# Clear the previous log file if needed, then configure logging
def init_logging() -> None:
def init_logging(**kwargs) -> None:
if CFG.clear_log and isfile(CFG.logfile):
remove(CFG.logfile)
logging.basicConfig(
format="%(levelname)s:%(message)s",
filename=CFG.logfile,
encoding="utf-8",
level=CFG.loglevel,
encoding="utf-8", filename=CFG.logfile, level=CFG.loglevel, **kwargs
)
# return logging.getLogger(CFG.logname)
# Clear the output dir if needed & create a new
def clear_output() -> None: