From 23d8702dbedb8b4c766ba0df7ec90c04bcabec5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilhem=20Faur=C3=A9?= Date: Thu, 22 Jun 2023 14:11:03 +0200 Subject: [PATCH] init foundations --- spip2md/cli.py | 52 +-- spip2md/config.py | 10 +- spip2md/convert.py | 1081 +++++++------------------------------------- spip2md/write.py | 45 +- 4 files changed, 171 insertions(+), 1017 deletions(-) diff --git a/spip2md/cli.py b/spip2md/cli.py index 69c58de..cd0e81d 100644 --- a/spip2md/cli.py +++ b/spip2md/cli.py @@ -56,15 +56,21 @@ def esc(*args: int) -> str: # Extend Site class to add terminal output capabilities class PrintableSite(WritableSite): - def write_all(self) -> None: - pass + def write(self) -> str: + return "write path" + + +# Initialize DB database connection from config +def init_db(cfg: Configuration): + DB.init( # type: ignore + cfg.db, host=cfg.db_host, user=cfg.db_user, password=cfg.db_pass + ) def main(*argv: str): cfg = Configuration(*argv) # Get the configuration - # Initialize the database with settings from CFG - DB.init(cfg.db, host=cfg.db_host, user=cfg.db_user, password=cfg.db_pass) + init_db(cfg) # Eventually remove already existing output dir if cfg.clear_output: @@ -73,40 +79,4 @@ def main(*argv: str): with DB: # Connect to the database where SPIP site is stored in this block # Write everything while printing the output human-readably - PrintableSite(cfg).write_all() - - -# def summarize( -# tree: dict[Any, Any] | list[Any], -# depth: int = -1, -# prevkey: Optional[str] = None, -# counter: Optional[dict[str, int]] = None, -# ) -> dict[str, int]: -# if counter is None: -# counter = {} -# # __import__("pprint").pprint(tree) # DEBUG -# if type(tree) == dict: -# for key, sub in tree.items(): -# if type(sub) == list: -# counter = summarize(sub, depth + 1, key, counter) -# # if type of sub is str, it’s just the name, don’t count -# if type(tree) == list: -# for sub in tree: -# if prevkey is not None: -# if prevkey not in counter: -# counter[prevkey] = 0 -# counter[prevkey] += 1 -# if type(sub) == dict: -# counter = summarize(sub, depth + 1, None, counter) -# -# # End message only if it’s the root one -# if depth == -1: -# LOG.debug(tree) -# totals: str = "" -# for key, val in counter.items(): -# totals += f"{esc(BOLD)}{val}{esc()} {key}, " -# print(f"Exported a total of {totals[:-2]}") -# # Warn about issued warnings in log file -# if isfile(LOGFILE): -# print(f"Check out warnings and infos in {esc(BOLD)}{LOGFILE}{esc()}") -# return counter + PrintableSite(cfg).write() diff --git a/spip2md/config.py b/spip2md/config.py index 12b6d1f..486bfa3 100644 --- a/spip2md/config.py +++ b/spip2md/config.py @@ -23,7 +23,7 @@ from yaml import Loader, load # Global configuration object class Configuration: - config_file: Optional[str] = None # Location of the config file + # config_file: Optional[str] = None # Location of the config file name: str = "spip2md" # Name of program, notably used in logs @@ -80,7 +80,7 @@ class Configuration: # Return the first path that actually exists for path in config_locations: if isfile(path): - self.config_file = path + # self.config_file = path return path # If not found, raise error raise FileNotFoundError @@ -88,7 +88,9 @@ class Configuration: def __init__(self, *argv: str): try: # Read config from config file - with open(self._find_config_file(*argv)) as f: + with open(self._find_config_file(*argv[1:])) as f: + # Tell user about config + print(f"Read configuration file from {f.name}") config = load(f.read(), Loader=Loader) # Assign configuration for each attribute in config file for attr in config: @@ -100,7 +102,5 @@ class Configuration: setattr(self, attr, directory) else: setattr(self, attr, config[attr]) - # Tell user about config - print(f"Successfully read configuration file from {self.config_file}") except FileNotFoundError: print("No configuration file found, using defaults") diff --git a/spip2md/convert.py b/spip2md/convert.py index 41c4ef3..d82115e 100644 --- a/spip2md/convert.py +++ b/spip2md/convert.py @@ -18,218 +18,37 @@ This file contains the core classes of spip2md that models internal objects of s and methods to convert them to Markdown + YAML, static site structure """ import logging -from os import listdir, mkdir -from os.path import basename, isfile, splitext -from re import I, Match, Pattern, finditer, match, search -from re import error as re_error -from shutil import copyfile -from typing import Any, Optional +from os.path import basename, splitext -from peewee import ( - BigAutoField, - BigIntegerField, - DateTimeField, - DoesNotExist, -) from slugify import slugify -from yaml import dump from spip2md.config import Configuration -from spip2md.regexmaps import ( - ARTICLE_LINK, - BLOAT, - CONFIG_LANGS, - DOCUMENT_LINK, - HTMLTAGS, - IMAGE_LINK, - ISO_UTF, - MULTILANG_BLOCK, - SECTION_LINK, - SPECIAL_OUTPUT, - SPIP_MARKDOWN, - UNKNOWN_ISO, - WARNING_OUTPUT, -) from spip2md.spip_models import ( SpipArticles, SpipAuteurs, SpipAuteursLiens, SpipDocuments, SpipDocumentsLiens, + SpipMots, + SpipMotsLiens, SpipRubriques, ) -# Declare exceptions -class IgnoredPatternError(Exception): - pass +# Unique Spip object ID +class ObjId: + _id: int + _type: type + def __init__(self, obj_id: int, type_string: str) -> None: + self._id = obj_id + self._type_str = type_string -class LangNotFoundError(Exception): - pass + def __hash__(self): + return hash((self._id, self._type_str)) - -class DontExportDraftError(Exception): - pass - - -class DontExportEmptyError(Exception): - pass - - -# class ConvertableObject: -# # From SPIP database -# texte: str -# lang: str -# titre: str -# descriptif: str -# statut: str -# profondeur: int -# # Converted fields -# _storage_title: str # Title with which directories names are built -# _draft: bool -# # Additional fields -# _id: BigAutoField | int = 0 # same ID attribute name for all objects -# _depth: int # Equals `profondeur` for sections -# _fileprefix: str # String to prepend to written files -# _storage_parentdir: str # Path from output dir to direct parent -# _style: tuple[int, ...] # _styles to apply to some elements of printed output -# _storage_title_append: int = 0 # Append a number to storage title if > 0 -# -# # Warn about unknown chars & replace them with config defined replacement -# def warn_unknown(self, text: str, unknown_mapping: tuple) -> str: -# # Return unknown char surrounded by context_length chars -# def unknown_chars_context(text: str, char: str, context_len: int = 24) -> str: -# context: str = r".{0," + str(context_len) + r"}" -# m = search( -# context + r"(?=" + char + r")" + char + context, -# text, -# ) -# if m is not None: -# return m.group() -# else: -# return char -# -# for char in unknown_mapping: -# lastend: int = 0 -# for m in finditer("(" + char + ")+", text): -# context: str = unknown_chars_context(text[lastend:], char) -# LOG.warn( -# f"Unknown char {char} in file {self.dest_path()} at: {context}" -# ) -# if CFG.unknown_char_replacement is not None: -# LOG.warn( -# f"Replacing {m.group()} with {CFG.unknown_char_replacement}" -# ) -# text = text.replace(m.group(), CFG.unknown_char_replacement, 1) -# lastend = m.end() -# return text -# -# # Apply post-init conversions and cancel the export if self not of the right lang -# def convert(self, forced_lang: Optional[str] = None) -> None: -# self._storage_title = self.convert_field(self.titre) -# # Warn about unknown chars -# self._storage_title = self.warn_unknown(self._storage_title, UNKNOWN_ISO) -# if not CFG.export_drafts and self._draft: -# raise DontExportDraftError(f"{self.titre} is a draft, cancelling export") -# -# def dest_directory(self) -> str: -# raise NotImplementedError("Subclasses need to implement directory()") -# -# def dest_filename(self, prepend: str = "", append: str = "") -> str: -# raise NotImplementedError( -# f"Subclasses need to implement dest_filename(), params:{prepend}{append}" -# ) -# -# def dest_path(self) -> str: -# return self.dest_directory() + self.dest_filename() -# -# # Print one or more line(s) in which special elements are stylized -# def style_print( -# self, string: str, indent: Optional[str] = " ", end: str = "\n" -# ) -> str: -# stylized: str = string -# for o in SPECIAL_OUTPUT: -# stylized = o.sub(esc(*self._style) + r"\1" + esc(), stylized) -# for w in WARNING_OUTPUT: -# stylized = w.sub(esc(*WARNING_STYLE) + r"\1" + esc(), stylized) -# if indent is not None and len(indent) > 0: -# stylized = indent * self._depth + stylized -# print(stylized, end=end) -# # Return the stylized string in case -# return stylized -# -# # Print the message telling what is going to be done -# def begin_message(self, index: int, limit: int, step: int = 100) -> str: -# # Output the remaining number of objects to export every step object -# if index % step == 0 and limit > 0: -# counter: str = f"Exporting {limit-index} level {self._depth}" -# s: str = "s" if limit - index > 1 else "" -# if hasattr(self, "lang"): -# counter += f" {self.lang}" -# counter += f" {type(self).__name__}{s}" -# # Print the output as the program goes -# self.style_print(counter) -# # Output the counter & title of the object being exported -# msg: str = f"{index + 1}. " -# if len(self._storage_title) == 0: -# msg += "EMPTY NAME" -# else: -# msg += self._storage_title -# # Print the output as the program goes -# # LOG.debug(f"Begin exporting {type(self).__name__} {output[-1]}") -# self.style_print(msg, end="") -# return msg -# -# # Write object to output destination -# def write(self) -> str: -# raise NotImplementedError("Subclasses need to implement write()") -# -# # Output information about file that was just exported -# def end_message(self, message: str | Exception) -> str: -# output: str = " -> " -# if type(message) is FileNotFoundError: -# output += "ERROR: NOT FOUND: " -# elif type(message) is DoesNotExist: -# output += "ERROR: NO DESTINATION DIR: " -# elif type(message) is DontExportDraftError: -# output += "ERROR: NOT EXPORTING DRAFT: " -# elif type(message) is DontExportEmptyError: -# output += "ERROR: NOT EXPORTING EMPTY: " -# elif type(message) is not str: -# output += "ERROR: UNKNOWN: " -# # Print the output as the program goes -# # LOG.debug(f"Finished exporting {type(self).__name__}: {message}") -# self.style_print(output + str(message), indent=None) -# return output + str(message) -# -# # Perform all the write steps of this object -# def write_all( -# self, -# parentdepth: int, -# storage_parentdir: str, -# index: int, -# total: int, -# parenturl: str, -# forced_lang: Optional[str] = None, -# ) -> str: -# self._depth = parentdepth + 1 -# self._storage_parentdir = storage_parentdir -# self._parenturl = parenturl -# self.convert(forced_lang) # Post init convertions -# LOG.debug(f"Writing {type(self).__name__} `{self._storage_title}`") -# output: str = self.begin_message(index, total) -# try: -# output += self.end_message(self.write()) -# except ( -# LangNotFoundError, -# DontExportDraftError, -# DontExportEmptyError, -# IgnoredPatternError, -# FileNotFoundError, -# ) as err: -# output += self.end_message(err) -# return output + def __eq__(self, other: "ObjId"): + return (self._id, self._type_str) == (other._id, other._type_str) class ConvertableDocument: @@ -240,119 +59,60 @@ class ConvertableDocument: _src: str # URL _slug: str = "" # URL - # _fileprefix: str = "" - # _style = (BOLD, CYAN) # Documents accent color is blue + class Meta: + table_name: str = "spip_document" # Define the name of the Spip DB table def __init__(self, spip_obj: SpipDocuments, cfg: Configuration): self._log_c = logging.getLogger(cfg.name + ".convert.document") + self._cfg = cfg self._spip_obj = spip_obj - self._id = spip_obj.id_document + self._id = int(spip_obj.id_document) # type: ignore # Define source name of this file self._src = cfg.data_dir + spip_obj.fichier # Define destination name of this file name, filetype = splitext(basename(str(spip_obj.fichier))) prepend: str = str(spip_obj.id_document) + "-" if self._cfg.prepend_id else "" - return slugify(prepend + name, max_length=cfg.title_max_length) + filetype - - # Get directory of this object - # def dest_directory(self, prepend: str = "", append: str = "") -> str: - # _id: str = str(self._id) + "-" if self._cfg.prepend_id else "" - # return ( - # self._storage_parentdir - # + prepend - # + slugify(_id + self._storage_title, max_length=100) - # + append - # ) - - # Get destination slugified name of this file - # def dest_filename(self, prepend: str = "", append: str = "") -> str: - # name, filetype = splitext(basename(str(self._src))) - # return slugify(prepend + name, max_length=100) + append + filetype - - # Write document to output destination - # def write(self) -> str: - # # Copy the document from it’s SPIP location to the new location - # return copyfile(self.src_path(), self.dest_path()) - - # Perform all the write steps of this object - # def write_all( - # self, - # parentdepth: int, - # storage_parentdir: str, - # index: int, - # total: int, - # forcedlang: Optional[str] = None, - # parenturl: str = "", - # ) -> str: - # # self.convert() # Apply post-init conversions - # LOG.debug( - # f"Document {self._storage_title} don’t care about forcedlang {forcedlang}" - # ) - # LOG.debug( - # f"Document {self._storage_title} doesn’t care about parenturl {parenturl}" - # ) - # return super().write_all( - # parentdepth, storage_parentdir, index, total, parenturl - # ) + self._slug = slugify(prepend + name, max_length=cfg.title_max_length) + filetype -class ConvertableArticle: - # id_trad: BigIntegerField | BigAutoField | int - # id_rubrique: BigAutoField | int - # date: DateTimeField | str - # date: DateTimeField - # maj: str - # id_secteur: BigIntegerField | int - # extra: str - # langue_choisie: str +class ConvertableRedactional: _log_c: logging.Logger # Logger for conversion operations _cfg: Configuration # Global configuration - _children: tuple[ - "ConvertableSection | ConvertableArticle | ConvertableDocument", ... - ] # sub-sections, documents, articles - _spip_obj: SpipArticles # The Spip Article this is representing + _spip_obj: SpipArticles | SpipRubriques # The Spip Article this is representing + _depth: int # Depth + _children: dict[ + ObjId, "ConvertableDocument | ConvertableRedactional" + ] = {} # documents + _index: dict[ObjId, ObjId] = {} # Index of the next-hop subsection to ObjId obj + _id: int _lang: str - _fileprefix: str = "index" - # Converted fields - _surtitle: str # Content - _title: str # Content - _subtitle: str # Content - _description: str # Content - _caption: str # Content - _extra: str # Content - _text: str # Content - _slug: str # URL - _meta: dict[str, str | int | bool | None] # Metadata dictionary + _authors: tuple[SpipAuteurs, ...] + _tags: tuple[SpipMots, ...] - # _style = (BOLD, YELLOW) # Articles accent color is yellow - - def documents(self, limit: int = 10**3): - self._log_c.debug("Initialize documents") - return ( - SpipDocuments.select() - .join( - SpipDocumentsLiens, - on=(SpipDocuments.id_document == SpipDocumentsLiens.id_document), + # Initialize documents related to self + def documents(self, limit: int = 10**3) -> dict[ObjId, ConvertableDocument]: + print( + "Initialize documents.\n" + + f"Section: {self._spip_obj.titre}, Depth : {self._depth}" + ) + documents = [ + ConvertableDocument(doc, self._cfg) + for doc in ( + SpipDocuments.select() + .join( + SpipDocumentsLiens, + on=(SpipDocuments.id_document == SpipDocumentsLiens.id_document), + ) + .where(SpipDocumentsLiens.id_objet == self._id) + .limit(limit) ) - .where(SpipDocumentsLiens.id_objet == self._id) - .limit(limit) - ) - - def children(self): - self._children = tuple( - ConvertableDocument(d, self._cfg) for d in self.documents() - ) - - def __init__(self, spip_obj: SpipArticles, forced_lang: str, cfg: Configuration): - self._log_c = logging.getLogger(cfg.name + ".convert.article") - self._spip_obj = spip_obj - self._id = spip_obj.id_article - self._lang = forced_lang - self._draft = spip_obj.statut != "publie" - self.children() + ] + # Store them mutably + return {ObjId(d._id, "document"): d for d in documents} + # Initialize self authors def authors(self) -> tuple[SpipAuteurs, ...]: - self._log_c.debug("Initialize authors") + print("Initialize authors") return ( SpipAuteurs.select() .join( @@ -362,674 +122,139 @@ class ConvertableArticle: .where(SpipAuteursLiens.id_objet == self._id) ) - # # Get the YAML frontmatter string - # def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str: - # # LOG.debug(f"Write frontmatter of `{self._title}`") - # meta: dict[str, Any] = { - # "lang": self.lang, - # "translationKey": self.id_trad if self.id_trad != 0 else self._id, - # "title": self._url_title, - # "publishDate": self.date, - # "lastmod": self.maj, - # "draft": self._draft, - # "description": self._description, - # } - # # Add debugging meta if needed - # if CFG.debug_meta: - # meta = meta | { - # "spip_id": self._id, - # "spip_id_secteur": self.id_secteur, - # } - # # Add url if different of directory - # if self.url() not in self.dest_directory(): - # meta = meta | {"url": self.url()} - # if append is not None: - # return dump(meta | append, allow_unicode=True) - # else: - # return dump(meta, allow_unicode=True) - - # Build metadata dictionary that can be outputted as YAML - def metadata(self): - self._meta = { - "lang": str(self.lang), - "translationKey": int( - self._spip_obj.id_trad if self._spip_obj.id_trad != 0 else self._id - ), - "title": str(self._title), - "publishDate": str(self._spip_obj.date), - "lastmod": str(self._spip_obj.maj), - "draft": self._draft, - "description": self._description, - } - # Add debugging meta if needed - if self._cfg.debug: - self._meta |= { - "id": int(self._id), - "spip_id_secteur": int(self._spip_obj.id_secteur), - } - - # Apply a mapping from regex maps - @staticmethod - def apply_mapping(text: str, mapping: tuple, keep_markup: bool = True) -> str: - if type(mapping) == tuple and len(mapping) > 0: - if type(mapping[0]) == tuple and len(mapping[0]) > 0: - if type(mapping[0][0]) == Pattern: # Mostly for syntax conversion - for old, new in mapping: - if keep_markup: - text = old.sub(new, text) - else: - try: - text = old.sub(r"\1", text) - except re_error: - text = old.sub("", text) - else: - for old, new in mapping: # Mostly for broken encoding - text = text.replace(old, new) - elif type(mapping[0]) == Pattern: - for old in mapping: - text = old.sub("", text) - else: - for old in mapping: - text = text.replace(old, "") - return text - - # Apply needed methods on text fields - def convert_field(self, field: str) -> str: - if field is None: - return "" - if len(field) == 0: - return "" - # Convert SPIP syntax to Markdown - field = self.apply_mapping(field, SPIP_MARKDOWN) - # Remove useless text - field = self.apply_mapping(field, BLOAT) - # Convert broken ISO encoding to UTF - field = self.apply_mapping(field, ISO_UTF) - return field.strip() # Strip whitespaces around text - - # Convert Spip syntax to Markdown on self then each children - def convert(self): - self._title = self.convert_field(str(self._spip_obj.titre)) - self._surtitle = self.convert_field(str(self._spip_obj.surtitre)) - self._subtitle = self.convert_field(str(self._spip_obj.soustitre)) - self._caption = self.convert_field(str(self._spip_obj.chapo)) - self._description = self.convert_field(str(self._spip_obj.descriptif)) - self._text = self.convert_field(str(self._spip_obj.texte)) - self._extra = self.convert_field(str(self._spip_obj.extra)) - # Useless but keep them - # self._ps = self.convert_field(str(self._spip_obj.ps)) - # self._microblog = self.convert_field(str(self._spip_obj.microblog)) - # self._accept_forum = self._spip_obj.accepter_forum == "oui" - # self._choosen_language = self._spip_obj.langue_choisie == "oui" - - # Get rid of other lang than forced in text and modify lang to forced if found - def translate_multi( - self, forced_lang: str, text: str, change_lang: bool = True - ) -> str: - # LOG.debug(f"Translating blocks of `{self._url_title}`") - # for each blocks, keep only forced lang - lang: Optional[Match[str]] = None - for block in MULTILANG_BLOCK.finditer(text): - lang = CONFIG_LANGS[forced_lang].search(block.group(1)) - if lang is not None: - # Log the translation - trans: str = lang.group(1)[:50].strip() - self._log_c.debug( - f"Keeping {forced_lang} translation of `{self._url_title}`: " - + f"`{trans}`" - ) - if change_lang: - self.lang = forced_lang # So write-all will not be cancelled - # Replace the mutli blocks with the text in the proper lang - text = text.replace(block.group(), lang.group(1)) - if lang is None: - self._log_c.debug(f"{forced_lang} not found") - return text - - # Keep only one relevant language for self then each children - def translate(self): - if self._lang != self._spip_obj.lang: # WARNING not the solution - raise LangNotFoundError( - f"`{self._url_title}` lang is {self.lang} instead of the wanted" - + f" {self._lang} and it don’t contains" - + f" {self._lang} translation in Markup either" + # Initialize self tags + def tags(self) -> tuple[SpipMots]: + print("Initialize tags") + return ( + SpipMots.select() + .join( + SpipMotsLiens, + on=(SpipMots.id_mot == SpipMotsLiens.id_mot), ) - # Define slug - self._slug = ( - slugify(self._title, max_length=self._cfg.title_max_length) - + "/" - + self._fileprefix - + "." - + self.lang - + "." - + self._cfg.export_filetype + .where(SpipMotsLiens.id_objet == self._id) ) - # WARNING symplify a lot this - def replace_links(self, text: str) -> str: - class LinkMappings: - _link_types = IMAGE_LINK, DOCUMENT_LINK, SECTION_LINK, ARTICLE_LINK - def __iter__(self): - self._type_cursor = 0 - self._link_cursor = -1 - return self +class ConvertableArticle(ConvertableRedactional): + _fileprefix: str = "index" + _children: dict[ObjId, ConvertableDocument] = {} # documents + # Converted fields + _surtitle: str # Content + _title: str # Content + _subtitle: str # Content + _description: str # Content + _caption: str # Content + _extra: str # Content + _text: str # Content + _slug: str # URL - @staticmethod - def getdocument(obj_id: int) -> Document: - doc: Document = Document.get(Document.id_document == obj_id) - doc.convert() - return doc + class Meta: + table_name: str = "spip_articles" # Define the name of the Spip DB table - @staticmethod - def getsection(obj_id: int) -> Section: - sec: Section = Section.get(Section.id_rubrique == obj_id) - sec.convert(self.lang) - return sec - - @staticmethod - def getarticle(obj_id: int) -> Article: - art: Article = Article.get(Article.id_article == obj_id) - art.convert(self.lang) - return art - - _obj_getters = getdocument, getdocument, getsection, getarticle - - def __next__(self): - self._link_cursor += 1 - # If we reach end of current link type, pass to the beginning of next - if self._link_cursor >= len(self._link_types[self._type_cursor]): - self._link_cursor = 0 - self._type_cursor += 1 - - if self._type_cursor >= len(self._link_types): - raise StopIteration - - return ( - self._link_types[self._type_cursor][self._link_cursor], - self._obj_getters[self._type_cursor], - "!" if self._type_cursor == 0 else "", - ) - - for link, getobj, prepend in LinkMappings(): - # LOG.debug(f"Looking for {link} in {text}") - for m in link.finditer(text): - LOG.debug(f"Found internal link {m.group()} in {self._url_title}") - try: - LOG.debug( - f"Searching for object of id {m.group(2)} with " - + getobj.__name__ - ) - o: "Document | Article | Section" = getobj(int(m.group(2))) - # TODO get full relative path for sections and articles - # TODO rewrite links markup (bold/italic) after stripping - if len(m.group(1)) > 0: - repl = f"{prepend}[{m.group(1)}]({o.dest_filename()})" - else: - repl = f"{prepend}[{o._storage_title}]({o.dest_filename()})" - LOG.debug( - f"Translate link {m.group()} to {repl} in {self._url_title}" - ) - text = text.replace(m.group(), repl) - except DoesNotExist: - LOG.warn(f"No object for link {m.group()} in {self._url_title}") - text = text.replace(m.group(), prepend + "[](NOT FOUND)", 1) - return text - - # Repair internal links & embeds for self then each children - def link(self): - pass - - # Get file text content - def content(self) -> str: - # LOG.debug(f"Write content of `{self._title}`") - # Start the content with frontmatter - body: str = "---\n" + self.frontmatter() + "---" - # Add the title as a Markdown h1 - if self._url_title is not None and len(self._url_title) > 0 and CFG.prepend_h1: - body += "\n\n# " + self._url_title - # If there is a text, add the text preceded by two line breaks - if len(self._text) > 0: - # Remove remaining HTML after & append to body - body += "\n\n" + self._text - elif not CFG.export_empty: - raise DontExportEmptyError - # Same with an "extra" section - if len(self._extra) > 0: - body += "\n\n# EXTRA\n\n" + self._extra - return body - - def clean(self): - # Delete remaining HTML tags if needed - if self._cfg.remove_html: - self._title = self.apply_mapping( - self._title, HTMLTAGS, self._cfg.metadata_markup - ) - self._surtitle = self.apply_mapping( - self._surtitle, HTMLTAGS, self._cfg.metadata_markup - ) - self._subtitle = self.apply_mapping( - self._subtitle, HTMLTAGS, self._cfg.metadata_markup - ) - self._caption = self.apply_mapping(self._caption, HTMLTAGS) - self._text = self.apply_mapping(self._text, HTMLTAGS) - self._extra = self.apply_mapping(self._extra, HTMLTAGS) - - def convert_title(self, forced_lang: str) -> None: - LOG.debug(f"Convert title of currently untitled {type(self).__name__}") - if hasattr(self, "_title"): - LOG.debug(f"{type(self).__name__} {self._url_title} _title is already set") - return - if self.titre is None: - LOG.debug(f"{type(self).__name__} title is None") - self._url_title = "" - return - if len(self.titre) == 0: - LOG.debug(f"{type(self).__name__} title is empty") - self._url_title = "" - return - self._url_title = self.titre.strip() - # Set storage title to language of storage lang if different - storage_lang: str = ( - CFG.storage_language if CFG.storage_language is not None else forced_lang - ) - LOG.debug( - f"Searching for {storage_lang} in blocks of `{self._url_title}`" - + " storage title" - ) - self._storage_title = self.translate_multi( - storage_lang, - self._url_title, - False, - ) - LOG.debug( - f"Searching for {forced_lang} in blocks of `{self._url_title}`" - + " URL title" - ) - self._url_title = self.translate_multi(forced_lang, self._url_title) - LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` title") - self._storage_title = self.replace_links(self._storage_title) - self._url_title = self.replace_links(self._url_title) - LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` title") - self._storage_title = self.convert_field(self._storage_title) - self._url_title = self.convert_field(self._url_title, CFG.metadata_markup) - for p in CFG.ignore_patterns: - for title in (self._storage_title, self._url_title): - m = match(p, title, I) - if m is not None: - raise IgnoredPatternError( - f"{self._url_title} matches with ignore pattern {p}, ignoring" - ) - # Warn about unknown chars - self._storage_title = self.warn_unknown(self._storage_title, UNKNOWN_ISO) - self._url_title = self.warn_unknown(self._url_title, UNKNOWN_ISO) - - def convert_text(self, forced_lang: str) -> None: - LOG.debug(f"Convert text of `{self._url_title}`") - if hasattr(self, "_text"): - LOG.debug(f"{type(self).__name__} {self._url_title} _text is already set") - return - if self.texte is None: - LOG.debug(f"{type(self).__name__} {self._url_title} text is None") - self._text = "" - return - if len(self.texte) == 0: - LOG.debug(f"{type(self).__name__} {self._url_title} text is empty") - self._text = "" - return - self._text = self.translate_multi(forced_lang, self.texte.strip()) - LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` text") - self._text = self.replace_links(self._text) - LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` text") - self._text = self.convert_field(self._text) - # Warn about unknown chars - self._text = self.warn_unknown(self._text, UNKNOWN_ISO) - - def convert_extra(self) -> None: - LOG.debug(f"Convert extra of `{self._url_title}`") - if hasattr(self, "_extra"): - LOG.debug(f"{type(self).__name__} {self._url_title} _extra is already set") - return - if self.extra is None: - LOG.debug(f"{type(self).__name__} {self._url_title} extra is None") - self._extra = "" - return - if len(self.extra) == 0: - LOG.debug(f"{type(self).__name__} {self._url_title} extra is empty") - self._extra = "" - return - LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` extra") - self._extra = self.replace_links(self._extra) - LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` extra") - self._extra = self.convert_field(self._extra, CFG.metadata_markup) - # Warn about unknown chars - self._extra = self.warn_unknown(self._extra, UNKNOWN_ISO) - - # # Get this object url, or none if it’s the same as directory - # def url(self) -> str: - # _id: str = str(self._id) + "-" if CFG.prepend_id else "" - # counter: str = ( - # "_" + str(self._storage_title_append) - # if self._storage_title_append > 0 - # else "" - # ) - # # Return none if url will be the same as directory - # return ( - # self._parenturl - # + slugify(_id + self._url_title, max_length=CFG.title_max_length) - # + counter - # + r"/" - # ) - - # # Get slugified directory of this object - # def dest_directory(self) -> str: - # _id: str = str(self._id) + "-" if CFG.prepend_id else "" - # counter: str = ( - # "_" + str(self._storage_title_append) - # if self._storage_title_append > 0 - # else "" - # ) - # directory: str = self._storage_parentdir + slugify( - # _id + self._storage_title, - # max_length=CFG.title_max_length, - # ) - # return directory + counter + r"/" - # - # # Get filename of this object - # def dest_filename(self) -> str: - # return self._fileprefix + "." + self.lang + "." + CFG.export_filetype - - # # Write all the documents of this object - # def write_children( - # self, - # children: tuple[Document] | tuple[Any], - # forcedlang: str, - # ) -> list[str]: - # LOG.debug(f"Writing documents of {type(self).__name__} `{self._url_title}`") - # output: list[str] = [] - # total = len(children) - # i = 0 - # for obj in children: - # try: - # output.append( - # obj.write_all( - # self._depth, - # self.dest_directory(), - # i, - # total, - # forcedlang, - # self.url(), - # ) - # ) - # i += 1 - # except ( - # LangNotFoundError, - # DontExportDraftError, - # DontExportEmptyError, - # IgnoredPatternError, - # ) as err: - # LOG.debug(err) - # return output - - # # Write object to output destination - # def write(self) -> str: - # # Make a directory for this object if there isn’t - # # If it cannot for incompatibility, try until it can - # incompatible: bool = True - # while incompatible: - # directory: str = self.dest_directory() - # try: - # mkdir(directory) - # break - # except FileExistsError: - # # If not stated incompatible with the following, will write in this dir - # incompatible = False - # # Create a new directory if write is about to overwrite an existing file - # # or to write into a directory without the same fileprefix - # for file in listdir(directory): - # if isfile(directory + file): - # LOG.debug( - # f"Can {type(self).__name__} `{self.dest_path()}` of prefix " - # + f"{self._fileprefix} and suffix {CFG.export_filetype}" - # + f" be written along with `{file}` of prefix " - # + f"`{file.split('.')[0]}` and suffix {file.split('.')[-1]}" - # + f"` in {self.dest_directory()}` ?" - # ) - # # Resolve conflict at first incompatible file encountered - # if directory + file == self.dest_path() or ( - # file.split(".")[-1] == CFG.export_filetype - # and file.split(".")[0] != self._fileprefix - # ): - # LOG.debug( - # f"No, incrementing counter of {self.dest_directory()}" - # ) - # self._storage_title_append += 1 - # incompatible = True - # break - # - # # Write the content of this object into a file named as self.filename() - # with open(self.dest_path(), "w") as f: - # f.write(self.content()) - # return self.dest_path() - - def frontmatter(self, append: Optional[dict[str, Any]] = None) -> str: - meta: dict[str, Any] = { - # Article specific - "summary": self.chapo, - "surtitle": self.surtitre, - "subtitle": self.soustitre, - "date": self.date_redac, - "authors": [author.nom for author in self.authors()], - } - # Add debugging meta if needed - if CFG.debug_meta: - meta = meta | {"spip_id_rubrique": self.id_rubrique} - if append is not None: - return super().frontmatter(meta | append) - else: - return super().frontmatter(meta) - - # def content(self) -> str: - # body: str = super().content() - # # If there is a caption, add the caption followed by a hr - # if len(self._caption) > 0: - # body += "\n\n" + self._caption + "\n\n***" - # # PS - # if len(self._ps) > 0: - # body += "\n\n# POST-SCRIPTUM\n\n" + self._ps - # # Microblog - # if len(self._microblog) > 0: - # body += "\n\n# MICROBLOGGING\n\n" + self._microblog - # return body - - # Perform all the write steps of this object - # def write_all( - # self, - # parentdepth: int, - # storage_parentdir: str, - # index: int, - # total: int, - # forced_lang: str, - # parenturl: str, - # ) -> DeepDict: - # # self.convert(forced_lang) - # return { - # "msg": super().write_all( - # parentdepth, storage_parentdir, index, total, parenturl - # ), - # "documents": self.write_children(self.documents(), forced_lang), - # } + def __init__(self, spip_obj: SpipArticles, cfg: Configuration, depth: int): + self._log_c = logging.getLogger(cfg.name + ".convert.article") + self._cfg = cfg + self._spip_obj = spip_obj + self._id = int(spip_obj.id_article) # type: ignore # Peewee types not defined + self._lang = str(spip_obj.lang) + self._depth = depth + self._draft = spip_obj.statut != "publie" + self._children |= self.documents() # Retreive documents & add them to the index # Define Section as an Article that can contain other Articles or Sections -class ConvertableSection(ConvertableArticle): - # _fileprefix: str = "_index" - # _style = (BOLD, GREEN) # Sections accent color is green +class ConvertableSection(ConvertableRedactional): + _fileprefix: str = "_index" # Prefix of written Markdown files + # sub-sections, documents, articles + _children: dict[ + ObjId, "ConvertableSection | ConvertableArticle | ConvertableDocument" + ] = {} - _log_c: logging.Logger # Logger for conversion operations - _cfg: Configuration # Global configuration - _children: tuple[ - "ConvertableSection | ConvertableArticle | ConvertableDocument", ... - ] # sub-sections, documents, articles - _section: SpipRubriques - _lang: str + class Meta: + table_name: str = "spip_rubriques" # Define the name of the Spip DB table # Get articles of this section - def articles(self, limit: int = 10**6) -> tuple[SpipArticles, ...]: - self._log_c.debug("Initialize articles") - return ( - SpipArticles.select() - .where(SpipArticles.id_rubrique == self._id) - .order_by(SpipArticles.date.desc()) - .limit(limit) + def articles(self, limit: int = 10**6): + print( + "Initialize articles.\n" + + f"Section: {self._spip_obj.titre}, Depth : {self._depth}" ) + articles = [ + ConvertableArticle(art, self._cfg, self._depth) + for art in ( + SpipArticles.select() + .where(SpipArticles.id_rubrique == self._id) + .order_by(SpipArticles.date.desc()) + .limit(limit) + ) + ] + # Add these articles children to self index + for art in articles: + for doc in art._children: + self._index[ObjId(doc._id, "document")] = ObjId(art._id, "article") + # Store them mutably + return {ObjId(a._id, "article"): a for a in articles} # Get subsections of this section - def sections(self, limit: int = 10**6) -> tuple[SpipRubriques, ...]: - self._log_c.debug("Initialize subsections") - return ( - SpipRubriques.select() - .where(SpipRubriques.id_parent == self._id) - .order_by(SpipRubriques.date.desc()) - .limit(limit) + def sections(self, limit: int = 10**6): + print( + "Initialize subsections.\n" + + f"Section: {self._spip_obj.titre}, Depth : {self._depth}" ) + sections = [ + ConvertableSection(sec, self._cfg, self._depth) + for sec in ( + SpipRubriques.select() + .where(SpipRubriques.id_parent == self._id) + .order_by(SpipRubriques.date.desc()) + .limit(limit) + ) + ] + # Add these sections’s indexes to self index, replacing next hop with section + for sec in sections: + self._index |= {obj: ObjId(sec._id, "section") for obj in sec._index.keys()} + # Store them mutably + return {ObjId(s._id, "section"): s for s in sections} - # Initialize children - def children(self): - self._children = tuple( - ConvertableArticle(a, self._lang, self._cfg) for a in self.articles() - ) + tuple(ConvertableSection(s, self._lang, self._cfg) for s in self.sections()) - - def __init__(self, section: SpipRubriques, forced_lang: str, cfg: Configuration): + def __init__(self, spip_obj: SpipRubriques, cfg: Configuration, parent_depth: int): self._log_c = logging.getLogger(cfg.name + ".convert.section") - self._section = section - self._id = section.id_rubrique - self._lang = forced_lang - self.children() - - # Get relational metadata in a static form for self then earch children - def metadata(self): - super().metadata() - for c in self._children: - c.metadata() - - # Convert Spip syntax to Markdown on self then each children - def convert(self): - super().convert() - for c in self._children: - c.convert() - - # Keep only one relevant language for self then each children - def translate(self): - super().translate() - for c in self._children: - c.translate() - - # Repair internal links & embeds for self then each children - def link(self): - super().link() - for c in self._children: - c.link() - - # Perform last cleaning steps - def clean(self): - # Add documents to children - self._children = self._children + tuple( - ConvertableDocument(d, self._cfg) for d in self.documents() - ) - super().clean() - for c in self._children: - c.clean() - - # def frontmatter(self, add: Optional[dict[str, Any]] = None) -> str: - # meta: dict[str, Any] = {} - # # Add debugging meta if needed - # if CFG.debug_meta: - # meta = meta | { - # "spip_id_parent": self.id_parent, - # "spip_profondeur": self.profondeur, - # } - # if add is not None: - # meta = meta | add - # return super().frontmatter(meta) - - # def __init__(self, *args, **kwargs): - # super().__init__(*args, **kwargs) - # self._id = self.id_rubrique - # self._depth = self.profondeur - - # Perform all the write steps of this object - # def write_all( - # self, - # parentdepth: int, - # storage_parentdir: str, - # index: int, - # total: int, - # forced_lang: str, - # parenturl: str = "", - # ) -> DeepDict: - # # self.convert(forced_lang) - # return { - # "msg": super().write_all( - # parentdepth, storage_parentdir, index, total, parenturl - # ), - # "documents": self.write_children(self.documents(), forced_lang), - # "articles": self.write_children(self.articles(), forced_lang), - # "sections": self.write_children(self.sections(), forced_lang), - # } + self._cfg = cfg + self._spip_obj = spip_obj + self._id = int(spip_obj.id_rubrique) # type: ignore + self._lang = str(spip_obj.lang) + self._depth = parent_depth + 1 + self._children |= self.documents() + self._children |= self.articles() + self._children |= self.sections() # The "root" element representing the whole converted site class ConvertableSite: _log_c: logging.Logger # Logger for conversion operations _cfg: Configuration # Global configuration - _children: tuple[ConvertableSection, ...] # Root sections - # _children: list[SpipDocuments | SpipArticles | SpipRubriques] + _children: dict[ObjId, ConvertableSection] = {} # Root sections + _index: dict[ObjId, ObjId] = {} # Routing table to nested objects - _root_id: int = 0 # Parent ID of root sections + _id: int = 0 # Parent ID of root sections + _depth: int = 0 # Depth - def children(self): - self._log_c.debug("Initialize root sections") - # Store each level 0 sections - # Language specified in database can differ from markup, se we force a language - # (we will remove irrelevant ones further) - for lang in self._cfg.export_languages: - # Get all sections of parentID root_id - sections: tuple[SpipRubriques, ...] = ( + def sections(self) -> dict[ObjId, ConvertableSection]: + print("Initialize ROOT sections") + # Get all sections of parentID root_id + sections = [ + ConvertableSection(sec, self._cfg, self._depth) + for sec in ( SpipRubriques.select() - .where(SpipRubriques.id_parent == self._root_id) + .where(SpipRubriques.id_parent == self._id) .order_by(SpipRubriques.date.desc()) ) - self._children = tuple( - ConvertableSection(s, lang, self._cfg) for s in sections - ) + ] + # Add these sections’s indexes to self index, replacing next hop with section + for sec in sections: + self._index |= {obj: ObjId(sec._id, "section") for obj in sec._index.keys()} + return {ObjId(s._id, "section"): s for s in sections} def __init__(self, cfg: Configuration) -> None: self._log_c = logging.getLogger(cfg.name + ".convert.site") self._cfg = cfg - self.children() - - # Get relational metadata in a static form for earch children - def metadata(self): - for c in self._children: - c.metadata() - - # Convert Spip syntax to Markdown on each children - def convert(self): - for c in self._children: - c.convert() - - # Keep only one relevant language for each children - def translate(self): - for c in self._children: - c.translate() - - # Repair internal links & embeds for each children - def link(self): - for c in self._children: - c.link() - - # Perform last cleaning steps - def clean(self): - for c in self._children: - c.clean() + self._children |= self.sections() diff --git a/spip2md/write.py b/spip2md/write.py index 3f0c6c5..17a560a 100644 --- a/spip2md/write.py +++ b/spip2md/write.py @@ -20,46 +20,5 @@ from spip2md.convert import ConvertableSite class WritableSite(ConvertableSite): - def write(self): - pass - - -# # Write the root sections and their subtrees -# def write_root(parent_dir: str, parent_id: int = 0) -> DeepDict: -# # Print starting message -# print( -# f"""\ -# Begin exporting {esc(BOLD)}{CFG.db}@{CFG.db_host}{esc()} SPIP database to plain \ -# Markdown+YAML files, -# into the directory {esc(BOLD)}{parent_dir}{esc()}, \ -# as database user {esc(BOLD)}{CFG.db_user}{esc()} -# """ -# ) -# buffer: list[DeepDict] = [] # Define temporary storage for output -# # Write each sections (write their entire subtree) for each export language -# # Language specified in database can differ from markup, se we force a language -# # and remove irrelevant ones at each looping -# for lang in CFG.export_languages: -# ROOTLOG.debug("Initialize root sections") -# # Get all sections of parentID ROOTID -# child_sections: tuple[Section, ...] = ( -# Section.select() -# .where(Section.id_parent == parent_id) -# .order_by(Section.date.desc()) -# ) -# nb: int = len(child_sections) -# for i, s in enumerate(child_sections): -# ROOTLOG.debug(f"Begin exporting {lang} root section {i}/{nb}") -# try: -# buffer.append(s.write_all(-1, CFG.output_dir, i, nb, lang)) -# except LangNotFoundError as err: -# ROOTLOG.debug(err) # Log the message -# except DontExportDraftError as err: # If not CFG.export_drafts -# ROOTLOG.debug(err) # Log the message -# except IgnoredPatternError as err: -# ROOTLOG.debug(err) # Log the message -# print() # Break line between level 0 sections in output -# ROOTLOG.debug( -# f"Finished exporting {lang} root section {i}/{nb} {s._url_title}" -# ) -# return {"sections": buffer} + def write(self) -> str: + return "write path"