diff --git a/spip2md/config.py b/spip2md/config.py index 6caaa88..489cea9 100644 --- a/spip2md/config.py +++ b/spip2md/config.py @@ -31,14 +31,14 @@ class Configuration: unknown_char_replacement: str = "??" # Replaces unknown characters clear_log: bool = True # Clear log before every run instead of appending to clear_output: bool = True # Remove eventual output dir before running - conflict_strategy: str = "prepend id" # Prepend or append : date, id or counter ignore_pattern: list[str] = [] # Ignore objects of which title match logfile: str = "log-spip2md.log" # File where logs will be written, relative to wd loglevel: str = "WARNING" # Minimum criticity of logs written in logfile logname: str = "spip2md" # Labelling of logs export_filetype: str = "md" # Extension of exported text files - max_articles_export: int = 1000 # TODO reimplement - max_sections_export: int = 500 # TODO reimplement + title_max_length: int = 40 # Maximum length of a single title for directory names + # max_articles_export: int = 1000 # TODO reimplement + # max_sections_export: int = 500 # TODO reimplement def __init__(self, config_file: Optional[str] = None): if config_file is not None: diff --git a/spip2md/extended_models.py b/spip2md/extended_models.py index bc59ad4..8f74fac 100644 --- a/spip2md/extended_models.py +++ b/spip2md/extended_models.py @@ -1,6 +1,6 @@ # SPIP website to plain Markdown files converter, Copyright (C) 2023 Guilhem Fauré import logging -from os import listdir, makedirs +from os import listdir, mkdir from os.path import basename, isfile, splitext from re import I, Match, Pattern, finditer, match, search from shutil import copyfile @@ -50,7 +50,7 @@ DeepDict = dict[str, "list[DeepDict] | list[str] | str"] LOG = logging.getLogger(CFG.logname + ".models") -class SpipInterface: +class WritableObject: # From SPIP database texte: str lang: str @@ -59,34 +59,16 @@ class SpipInterface: statut: str profondeur: int # Converted fields - _title: str + _storage_title: str # Title with which directories names are built _draft: bool # Additional fields _id: BigAutoField | int = 0 # same ID attribute name for all objects - # _id: BigIntegerField | int = 0 # same ID attribute name for all objects - # _depth: IntegerField | int # Equals `profondeur` for sections _depth: int # Equals `profondeur` for sections _fileprefix: str # String to prepend to written files - _parentdir: str # Path from output dir to direct parent - _storage_parentdir: Optional[str] = None - _storage_title: Optional[str] = None - _url: Optional[str] = None # In case URL in frontmatter different of dest dir + _storage_parentdir: str # Path from output dir to direct parent _style: tuple[int, ...] # _styles to apply to some elements of printed output - # memo: dict[str, str] = {} # Memoïze values + _storage_title_append: int = 0 # Append a number to storage title if > 0 - def dest_directory(self) -> str: - raise NotImplementedError("Subclasses need to implement directory()") - - def dest_filename(self, prepend: str = "", append: str = "") -> str: - raise NotImplementedError( - f"Subclasses need to implement dest_filename(), params:{prepend}{append}" - ) - - def dest_path(self) -> str: - return self.dest_directory() + self.dest_filename() - - -class WritableObject(SpipInterface): # Apply a mapping from regex maps @staticmethod def apply_mapping(text: str, mapping: tuple) -> str: @@ -162,10 +144,21 @@ class WritableObject(SpipInterface): # Apply post-init conversions and cancel the export if self not of the right lang def convert(self) -> None: - self._title = self.convert_field(self.titre) + self._storage_title = self.convert_field(self.titre) if not CFG.export_drafts and self._draft: raise DontExportDraftError(f"{self.titre} is a draft, cancelling export") + def dest_directory(self) -> str: + raise NotImplementedError("Subclasses need to implement directory()") + + def dest_filename(self, prepend: str = "", append: str = "") -> str: + raise NotImplementedError( + f"Subclasses need to implement dest_filename(), params:{prepend}{append}" + ) + + def dest_path(self) -> str: + return self.dest_directory() + self.dest_filename() + # Print one or more line(s) in which special elements are stylized def style_print( self, string: str, indent: Optional[str] = " ", end: str = "\n" @@ -194,10 +187,10 @@ class WritableObject(SpipInterface): self.style_print(counter) # Output the counter & title of the object being exported msg: str = f"{index + 1}. " - if len(self._title) == 0: + if len(self._storage_title) == 0: msg += "EMPTY NAME" else: - msg += self._title + msg += self._storage_title # Print the output as the program goes # LOG.debug(f"Begin exporting {type(self).__name__} {output[-1]}") self.style_print(msg, end="") @@ -225,20 +218,24 @@ class WritableObject(SpipInterface): def write_all( self, parentdepth: int, - parentdir: str, + storage_parentdir: str, index: int, total: int, - storage_parentdir: Optional[str] = None, + parenturl: str, ) -> str: - LOG.debug(f"Writing {type(self).__name__} `{self._title}`") + LOG.debug(f"Writing {type(self).__name__} `{self._storage_title}`") self._depth = parentdepth + 1 - self._parentdir = parentdir - if storage_parentdir: - self._storage_parentdir = storage_parentdir + self._storage_parentdir = storage_parentdir + self._parenturl = parenturl output: str = self.begin_message(index, total) try: output += self.end_message(self.write()) - except Exception as err: + except ( + LangNotFoundError, + DontExportDraftError, + IgnoredPatternError, + FileNotFoundError, + ) as err: output += self.end_message(err) return output @@ -264,9 +261,9 @@ class Document(WritableObject, SpipDocuments): def dest_directory(self, prepend: str = "", append: str = "") -> str: _id: str = str(self._id) + "-" if CFG.prepend_id else "" return ( - self._parentdir + self._storage_parentdir + prepend - + slugify(_id + self._title, max_length=100) + + slugify(_id + self._storage_title, max_length=100) + append ) @@ -284,15 +281,21 @@ class Document(WritableObject, SpipDocuments): def write_all( self, parentdepth: int, - parentdir: str, + storage_parentdir: str, index: int, total: int, - forcedlang: str, - storage_parentdir: Optional[str], + forcedlang: Optional[str] = None, + parenturl: str = "", ) -> str: self.convert() # Apply post-init conversions + LOG.debug( + f"Document {self._storage_title} doesn’t care about forcedlang {forcedlang}" + ) + LOG.debug( + f"Document {self._storage_title} doesn’t care about parenturl {parenturl}" + ) return super().write_all( - parentdepth, parentdir, index, total, storage_parentdir + parentdepth, storage_parentdir, index, total, parenturl ) @@ -319,10 +322,14 @@ class RedactionalObject(WritableObject): langue_choisie: str # Converted _text: str + _url_title: str # Title in metadata of articles + _parenturl: str # URL relative to lang to direct parent # Get rid of other lang than forced in text and modify lang to forced if found - def translate_multi(self, forced_lang: str, text: str) -> str: - LOG.debug(f"Translating blocks of `{self._title}`") + def translate_multi( + self, forced_lang: str, text: str, change_lang: bool = True + ) -> str: + # LOG.debug(f"Translating blocks of `{self._url_title}`") # for each blocks, keep only forced lang lang: Optional[Match[str]] = None for block in MULTILANG_BLOCK.finditer(text): @@ -331,16 +338,17 @@ class RedactionalObject(WritableObject): # Log the translation trans: str = lang.group(1)[:50].strip() LOG.debug( - f"Keeping {forced_lang} translation of `{self._title}`: " - + f"`{trans}`, becoming its new lang" + f"Keeping {forced_lang} translation of `{self._url_title}`: " + + f"`{trans}`" ) - self.lang = forced_lang # So write-all will not be cancelled - if self.id_trad == 0: # Assign translation key to id so hugo can link - self.id_trad = self._id + if change_lang: + self.lang = forced_lang # So write-all will not be cancelled + if self.id_trad == 0: # Assign translation key to id for Hugo + self.id_trad = self._id # Replace the mutli blocks with the text in the proper lang text = text.replace(block.group(), lang.group(1)) if lang is None: - LOG.debug(f"{forced_lang} not found in `{self._title}`") + LOG.debug(f"{forced_lang} not found in `{self._url_title}`") return text def replace_links(self, text: str) -> str: @@ -391,7 +399,7 @@ class RedactionalObject(WritableObject): for link, getobj, repl in LinkMappings(): # LOG.debug(f"Looking for {link} in {text}") for m in link.finditer(text): - LOG.debug(f"Found internal link {m.group()} in {self._title}") + LOG.debug(f"Found internal link {m.group()} in {self._url_title}") try: LOG.debug(f"Searching for object of id {m.group(2)} with {getobj}") o: "Document | Article | Section" = getobj(int(m.group(2))) @@ -406,54 +414,45 @@ class RedactionalObject(WritableObject): print(repl, m.group(1), o.dest_filename()) raise err else: - repl = repl.format(o._title, o.dest_filename()) - LOG.debug(f"Translate link {m.group()} to {repl} in {self._title}") + repl = repl.format(o._storage_title, o.dest_filename()) + LOG.debug( + f"Translate link {m.group()} to {repl} in {self._url_title}" + ) text = text.replace(m.group(), repl) except DoesNotExist: - LOG.warn(f"No object for link {m.group()} in {self._title}") + LOG.warn(f"No object for link {m.group()} in {self._url_title}") text = text.replace(m.group(), repl.format("", "NOT FOUND"), 1) return text - # Modify this object’s title to prevent filename conflicts - def conflict_title(self, conflict: str) -> None: - if CFG.conflict_strategy == "prepend id": - title: str = str(self._id) + "_" + self._title - elif CFG.conflict_strategy == "append id": - title: str = self._title + "_" + str(self._id) - elif CFG.conflict_strategy == "prepend counter": - m = match(r"([0-9]+)_" + self._title, conflict) - if m is not None: - title: str = str(int(m.group(1)) + 1) + "_" + self._title - else: - title: str = "1_" + self._title - else: # Defaults to append counter - m = match(self._title + r"_([0-9]+)$", conflict) - if m is not None: - title: str = self._title + "_" + str(int(m.group(1)) + 1) - else: - title: str = self._title + "_1" - LOG.debug(f"Rewriting {self._title} title to {title}") - self._title = title + # Get this object url, or none if it’s the same as directory + def url(self) -> str: + _id: str = str(self._id) + "-" if CFG.prepend_id else "" + counter: str = ( + "_" + str(self._storage_title_append) + if self._storage_title_append > 0 + else "" + ) + # Return none if url will be the same as directory + return ( + self._parenturl + + slugify(_id + self._url_title, max_length=CFG.title_max_length) + + counter + + r"/" + ) # Get slugified directory of this object def dest_directory(self) -> str: _id: str = str(self._id) + "-" if CFG.prepend_id else "" - slug: str = slugify(_id + self._title, max_length=100) - directory: str = self._parentdir + slug - if self._storage_title is not None or self._storage_parentdir is not None: - self._url = directory - directory: str = ( - self._storage_parentdir - if self._storage_parentdir is not None - else self._parentdir - + slugify( - _id + self._storage_title - if self._storage_title is not None - else self._title, - max_length=100, - ) - ) - return directory + r"/" + counter: str = ( + "_" + str(self._storage_title_append) + if self._storage_title_append > 0 + else "" + ) + directory: str = self._storage_parentdir + slugify( + _id + self._storage_title, + max_length=CFG.title_max_length, + ) + return directory + counter + r"/" # Get filename of this object def dest_filename(self) -> str: @@ -462,64 +461,84 @@ class RedactionalObject(WritableObject): def convert_title(self, forced_lang: str) -> None: LOG.debug(f"Convert title of currently untitled {type(self).__name__}") if hasattr(self, "_title"): - LOG.debug(f"{type(self).__name__} {self._title} _title is already set") + LOG.debug(f"{type(self).__name__} {self._url_title} _title is already set") return if self.titre is None: LOG.debug(f"{type(self).__name__} title is None") - self._title = "" + self._url_title = "" return if len(self.titre) == 0: LOG.debug(f"{type(self).__name__} title is empty") - self._title = "" + self._url_title = "" return - self._title = self.titre.strip() - # Keep storage language title to store it - if CFG.storage_language is not None and CFG.storage_language != forced_lang: - self._storage_title = self.translate_multi( - CFG.storage_language, self._title - ) - self._storage_title = self.convert_field(self._storage_title) - self._title = self.translate_multi(forced_lang, self._title) - LOG.debug(f"Convert internal links of {self.lang} `{self._title}` title") - self._title = self.replace_links(self._title) - LOG.debug(f"Apply conversions to {self.lang} `{self._title}` title") - self._title = self.convert_field(self._title) + self._url_title = self.titre.strip() + # Set storage title to language of storage lang if different + storage_lang: str = ( + CFG.storage_language if CFG.storage_language is not None else forced_lang + ) + LOG.debug( + f"Searching for {storage_lang} in blocks of `{self._url_title}`" + + " storage title" + ) + self._storage_title = self.translate_multi( + storage_lang, + self._url_title, + False, + ) + LOG.debug( + f"Searching for {forced_lang} in blocks of `{self._url_title}`" + + " URL title" + ) + self._url_title = self.translate_multi(forced_lang, self._url_title) + LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` title") + self._storage_title = self.replace_links(self._storage_title) + self._url_title = self.replace_links(self._url_title) + LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` title") + self._storage_title = self.convert_field(self._storage_title) + self._url_title = self.convert_field(self._url_title) + for p in CFG.ignore_pattern: + for title in (self._storage_title, self._url_title): + m = match(p, title, I) + if m is not None: + raise IgnoredPatternError( + f"{self._url_title} matches with ignore pattern {p}, ignoring" + ) def convert_text(self, forced_lang: str) -> None: - LOG.debug(f"Convert text of `{self._title}`") + LOG.debug(f"Convert text of `{self._url_title}`") if hasattr(self, "_text"): - LOG.debug(f"{type(self).__name__} {self._title} _text is already set") + LOG.debug(f"{type(self).__name__} {self._url_title} _text is already set") return if self.texte is None: - LOG.debug(f"{type(self).__name__} {self._title} text is None") + LOG.debug(f"{type(self).__name__} {self._url_title} text is None") self._text = "" return if len(self.texte) == 0: - LOG.debug(f"{type(self).__name__} {self._title} text is empty") + LOG.debug(f"{type(self).__name__} {self._url_title} text is empty") self._text = "" return self._text = self.translate_multi(forced_lang, self.texte.strip()) - LOG.debug(f"Convert internal links of {self.lang} `{self._title}` text") + LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` text") self._text = self.replace_links(self._text) - LOG.debug(f"Apply conversions to {self.lang} `{self._title}` text") + LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` text") self._text = self.convert_field(self._text) def convert_extra(self) -> None: - LOG.debug(f"Convert extra of `{self._title}`") + LOG.debug(f"Convert extra of `{self._url_title}`") if hasattr(self, "_extra"): - LOG.debug(f"{type(self).__name__} {self._title} _extra is already set") + LOG.debug(f"{type(self).__name__} {self._url_title} _extra is already set") return if self.extra is None: - LOG.debug(f"{type(self).__name__} {self._title} extra is None") + LOG.debug(f"{type(self).__name__} {self._url_title} extra is None") self._extra = "" return if len(self.extra) == 0: - LOG.debug(f"{type(self).__name__} {self._title} extra is empty") + LOG.debug(f"{type(self).__name__} {self._url_title} extra is empty") self._extra = "" return - LOG.debug(f"Convert internal links of {self.lang} `{self._title}` extra") + LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` extra") self._extra = self.replace_links(self._extra) - LOG.debug(f"Apply conversions to {self.lang} `{self._title}` extra") + LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` extra") self._extra = self.convert_field(self._extra) def __init__(self, *args, **kwargs): @@ -529,7 +548,7 @@ class RedactionalObject(WritableObject): # Get related documents def documents(self) -> tuple[Document]: - LOG.debug(f"Initialize documents of `{self._title}`") + LOG.debug(f"Initialize documents of `{self._url_title}`") documents = ( Document.select() .join( @@ -546,7 +565,7 @@ class RedactionalObject(WritableObject): meta: dict[str, Any] = { "lang": self.lang, "translationKey": self.id_trad, - "title": self._title, + "title": self._url_title, "publishDate": self.date, "lastmod": self.maj, "draft": self._draft, @@ -555,8 +574,9 @@ class RedactionalObject(WritableObject): "spip_id_secteur": self.id_secteur, "spip_id": self._id, } - if self._url is not None: - meta = meta | {"url": self._url} + # Add url if different of directory + if self.url() not in self.dest_directory(): + meta = meta | {"url": self.url()} if append is not None: return dump(meta | append, allow_unicode=True) else: @@ -568,8 +588,8 @@ class RedactionalObject(WritableObject): # Start the content with frontmatter body: str = "---\n" + self.frontmatter() + "---" # Add the title as a Markdown h1 - if len(self._title) > 0 and CFG.prepend_h1: - body += "\n\n# " + self._title + if self._url_title is not None and len(self._url_title) > 0 and CFG.prepend_h1: + body += "\n\n# " + self._url_title # If there is a text, add the text preceded by two line breaks if len(self._text) > 0: # Remove remaining HTML after & append to body @@ -584,9 +604,8 @@ class RedactionalObject(WritableObject): self, children: tuple[Document] | tuple[Any], forcedlang: str, - storage_parentdir: Optional[str] = None, ) -> list[str]: - LOG.debug(f"Writing documents of {type(self).__name__} `{self._title}`") + LOG.debug(f"Writing documents of {type(self).__name__} `{self._url_title}`") output: list[str] = [] total = len(children) i = 0 @@ -599,40 +618,54 @@ class RedactionalObject(WritableObject): i, total, forcedlang, - storage_parentdir, + self.url(), ) ) i += 1 - except LangNotFoundError as err: - LOG.debug(err) - except DontExportDraftError as err: - LOG.debug(err) - except IgnoredPatternError as err: + except ( + LangNotFoundError, + DontExportDraftError, + IgnoredPatternError, + ) as err: LOG.debug(err) return output # Write object to output destination def write(self) -> str: # Make a directory for this object if there isn’t - directory: str = self.dest_directory() - try: - makedirs(directory) - except FileExistsError: - # Create a new directory if write is about to overwrite an existing file - # or to write into a directory without the same fileprefix - for file in listdir(directory): - LOG.debug( - f"Testing if {type(self).__name__} `{self.dest_path()}` of prefix " - + f"{self._fileprefix} can be written along with `{file}` " - + f"of prefix `{file.split('.')[0]}` in `{self.dest_directory()}`" - ) - if isfile(directory + file) and ( - directory + file == self.dest_path() - or file.split(".")[0] != self._fileprefix - ): - self.conflict_title(directory.split("/")[-1]) - makedirs(self.dest_directory()) - break + # If it cannot for incompatibility, try until it can + incompatible: bool = True + while incompatible: + directory: str = self.dest_directory() + try: + mkdir(directory) + break + except FileExistsError: + # If not stated incompatible with the following, will write in this dir + incompatible = False + # Create a new directory if write is about to overwrite an existing file + # or to write into a directory without the same fileprefix + for file in listdir(directory): + if isfile(directory + file): + LOG.debug( + f"Can {type(self).__name__} `{self.dest_path()}` of prefix " + + f"{self._fileprefix} and suffix {CFG.export_filetype}" + + f" be written along with `{file}` of prefix " + + f"`{file.split('.')[0]}` and suffix {file.split('.')[-1]}" + + f"` in {self.dest_directory()}` ?" + ) + # Resolve conflict at first incompatible file encountered + if directory + file == self.dest_path() or ( + file.split(".")[-1] == CFG.export_filetype + and file.split(".")[0] != self._fileprefix + ): + LOG.debug( + f"No, incrementing counter of {self.dest_directory()}" + ) + self._storage_title_append += 1 + incompatible = True + break + # Write the content of this object into a file named as self.filename() with open(self.dest_path(), "w") as f: f.write(self.content()) @@ -641,17 +674,11 @@ class RedactionalObject(WritableObject): # Apply post-init conversions and cancel the export if self not of the right lang def convert(self, forced_lang: str) -> None: self.convert_title(forced_lang) - for p in CFG.ignore_pattern: - m = match(p, self._title, I) - if m is not None: - raise IgnoredPatternError( - f"{self._title} is matching with ignore pattern {p}, ignoring" - ) self.convert_text(forced_lang) self.convert_extra() if self.lang != forced_lang: raise LangNotFoundError( - f"`{self._title}` lang is {self.lang} instead of the wanted" + f"`{self._url_title}` lang is {self.lang} instead of the wanted" + f" {forced_lang} and it don’t contains" + f" {forced_lang} translation in Markup either" ) @@ -705,7 +732,7 @@ class Article(RedactionalObject, SpipArticles): return body def authors(self) -> list[SpipAuteurs]: - LOG.debug(f"Initialize authors of `{self._title}`") + LOG.debug(f"Initialize authors of `{self._url_title}`") return ( SpipAuteurs.select() .join( @@ -719,20 +746,18 @@ class Article(RedactionalObject, SpipArticles): def write_all( self, parentdepth: int, - parentdir: str, + storage_parentdir: str, index: int, total: int, forced_lang: str, - storage_parentdir: Optional[str] = None, + parenturl: str, ) -> DeepDict: self.convert(forced_lang) return { "msg": super().write_all( - parentdepth, parentdir, index, total, storage_parentdir - ), - "documents": self.write_children( - self.documents(), forced_lang, storage_parentdir + parentdepth, storage_parentdir, index, total, parenturl ), + "documents": self.write_children(self.documents(), forced_lang), } @@ -756,7 +781,7 @@ class Section(RedactionalObject, SpipRubriques): # Get articles of this section def articles(self, limit: int = 10**6) -> tuple[Article]: - LOG.debug(f"Initialize articles of `{self._title}`") + LOG.debug(f"Initialize articles of `{self._url_title}`") return ( Article.select() .where(Article.id_rubrique == self._id) @@ -766,7 +791,7 @@ class Section(RedactionalObject, SpipRubriques): # Get subsections of this section def sections(self, limit: int = 10**6) -> tuple[Self]: - LOG.debug(f"Initialize subsections of `{self._title}`") + LOG.debug(f"Initialize subsections of `{self._url_title}`") return ( Section.select() .where(Section.id_parent == self._id) @@ -783,24 +808,18 @@ class Section(RedactionalObject, SpipRubriques): def write_all( self, parentdepth: int, - parentdir: str, + storage_parentdir: str, index: int, total: int, forced_lang: str, - storage_parentdir: Optional[str] = None, + parenturl: str = "", ) -> DeepDict: self.convert(forced_lang) return { "msg": super().write_all( - parentdepth, parentdir, index, total, storage_parentdir - ), - "documents": self.write_children( - self.documents(), forced_lang, storage_parentdir - ), - "articles": self.write_children( - self.articles(), forced_lang, storage_parentdir - ), - "sections": self.write_children( - self.sections(), forced_lang, storage_parentdir + parentdepth, storage_parentdir, index, total, parenturl ), + "documents": self.write_children(self.documents(), forced_lang), + "articles": self.write_children(self.articles(), forced_lang), + "sections": self.write_children(self.sections(), forced_lang), } diff --git a/spip2md/lib.py b/spip2md/lib.py index cfad9e3..be20c0c 100644 --- a/spip2md/lib.py +++ b/spip2md/lib.py @@ -58,7 +58,9 @@ as database user {esc(BOLD)}{CFG.db_user}{esc()} except IgnoredPatternError as err: ROOTLOG.debug(err) # Log the message print() # Break line between level 0 sections in output - ROOTLOG.debug(f"Finished exporting {lang} root section {i}/{nb} {s._title}") + ROOTLOG.debug( + f"Finished exporting {lang} root section {i}/{nb} {s._url_title}" + ) return {"sections": buffer}