better handling of filenames conflicts + differences between path and url, cleaning, removed interface

This commit is contained in:
Guilhem Fauré 2023-06-13 13:45:16 +02:00
parent be4b5166d7
commit df7e0df1cf
3 changed files with 200 additions and 179 deletions

View File

@ -31,14 +31,14 @@ class Configuration:
unknown_char_replacement: str = "??" # Replaces unknown characters
clear_log: bool = True # Clear log before every run instead of appending to
clear_output: bool = True # Remove eventual output dir before running
conflict_strategy: str = "prepend id" # Prepend or append : date, id or counter
ignore_pattern: list[str] = [] # Ignore objects of which title match
logfile: str = "log-spip2md.log" # File where logs will be written, relative to wd
loglevel: str = "WARNING" # Minimum criticity of logs written in logfile
logname: str = "spip2md" # Labelling of logs
export_filetype: str = "md" # Extension of exported text files
max_articles_export: int = 1000 # TODO reimplement
max_sections_export: int = 500 # TODO reimplement
title_max_length: int = 40 # Maximum length of a single title for directory names
# max_articles_export: int = 1000 # TODO reimplement
# max_sections_export: int = 500 # TODO reimplement
def __init__(self, config_file: Optional[str] = None):
if config_file is not None:

View File

@ -1,6 +1,6 @@
# SPIP website to plain Markdown files converter, Copyright (C) 2023 Guilhem Fauré
import logging
from os import listdir, makedirs
from os import listdir, mkdir
from os.path import basename, isfile, splitext
from re import I, Match, Pattern, finditer, match, search
from shutil import copyfile
@ -50,7 +50,7 @@ DeepDict = dict[str, "list[DeepDict] | list[str] | str"]
LOG = logging.getLogger(CFG.logname + ".models")
class SpipInterface:
class WritableObject:
# From SPIP database
texte: str
lang: str
@ -59,34 +59,16 @@ class SpipInterface:
statut: str
profondeur: int
# Converted fields
_title: str
_storage_title: str # Title with which directories names are built
_draft: bool
# Additional fields
_id: BigAutoField | int = 0 # same ID attribute name for all objects
# _id: BigIntegerField | int = 0 # same ID attribute name for all objects
# _depth: IntegerField | int # Equals `profondeur` for sections
_depth: int # Equals `profondeur` for sections
_fileprefix: str # String to prepend to written files
_parentdir: str # Path from output dir to direct parent
_storage_parentdir: Optional[str] = None
_storage_title: Optional[str] = None
_url: Optional[str] = None # In case URL in frontmatter different of dest dir
_storage_parentdir: str # Path from output dir to direct parent
_style: tuple[int, ...] # _styles to apply to some elements of printed output
# memo: dict[str, str] = {} # Memoïze values
_storage_title_append: int = 0 # Append a number to storage title if > 0
def dest_directory(self) -> str:
raise NotImplementedError("Subclasses need to implement directory()")
def dest_filename(self, prepend: str = "", append: str = "") -> str:
raise NotImplementedError(
f"Subclasses need to implement dest_filename(), params:{prepend}{append}"
)
def dest_path(self) -> str:
return self.dest_directory() + self.dest_filename()
class WritableObject(SpipInterface):
# Apply a mapping from regex maps
@staticmethod
def apply_mapping(text: str, mapping: tuple) -> str:
@ -162,10 +144,21 @@ class WritableObject(SpipInterface):
# Apply post-init conversions and cancel the export if self not of the right lang
def convert(self) -> None:
self._title = self.convert_field(self.titre)
self._storage_title = self.convert_field(self.titre)
if not CFG.export_drafts and self._draft:
raise DontExportDraftError(f"{self.titre} is a draft, cancelling export")
def dest_directory(self) -> str:
raise NotImplementedError("Subclasses need to implement directory()")
def dest_filename(self, prepend: str = "", append: str = "") -> str:
raise NotImplementedError(
f"Subclasses need to implement dest_filename(), params:{prepend}{append}"
)
def dest_path(self) -> str:
return self.dest_directory() + self.dest_filename()
# Print one or more line(s) in which special elements are stylized
def style_print(
self, string: str, indent: Optional[str] = " ", end: str = "\n"
@ -194,10 +187,10 @@ class WritableObject(SpipInterface):
self.style_print(counter)
# Output the counter & title of the object being exported
msg: str = f"{index + 1}. "
if len(self._title) == 0:
if len(self._storage_title) == 0:
msg += "EMPTY NAME"
else:
msg += self._title
msg += self._storage_title
# Print the output as the program goes
# LOG.debug(f"Begin exporting {type(self).__name__} {output[-1]}")
self.style_print(msg, end="")
@ -225,20 +218,24 @@ class WritableObject(SpipInterface):
def write_all(
self,
parentdepth: int,
parentdir: str,
storage_parentdir: str,
index: int,
total: int,
storage_parentdir: Optional[str] = None,
parenturl: str,
) -> str:
LOG.debug(f"Writing {type(self).__name__} `{self._title}`")
LOG.debug(f"Writing {type(self).__name__} `{self._storage_title}`")
self._depth = parentdepth + 1
self._parentdir = parentdir
if storage_parentdir:
self._storage_parentdir = storage_parentdir
self._storage_parentdir = storage_parentdir
self._parenturl = parenturl
output: str = self.begin_message(index, total)
try:
output += self.end_message(self.write())
except Exception as err:
except (
LangNotFoundError,
DontExportDraftError,
IgnoredPatternError,
FileNotFoundError,
) as err:
output += self.end_message(err)
return output
@ -264,9 +261,9 @@ class Document(WritableObject, SpipDocuments):
def dest_directory(self, prepend: str = "", append: str = "") -> str:
_id: str = str(self._id) + "-" if CFG.prepend_id else ""
return (
self._parentdir
self._storage_parentdir
+ prepend
+ slugify(_id + self._title, max_length=100)
+ slugify(_id + self._storage_title, max_length=100)
+ append
)
@ -284,15 +281,21 @@ class Document(WritableObject, SpipDocuments):
def write_all(
self,
parentdepth: int,
parentdir: str,
storage_parentdir: str,
index: int,
total: int,
forcedlang: str,
storage_parentdir: Optional[str],
forcedlang: Optional[str] = None,
parenturl: str = "",
) -> str:
self.convert() # Apply post-init conversions
LOG.debug(
f"Document {self._storage_title} doesnt care about forcedlang {forcedlang}"
)
LOG.debug(
f"Document {self._storage_title} doesnt care about parenturl {parenturl}"
)
return super().write_all(
parentdepth, parentdir, index, total, storage_parentdir
parentdepth, storage_parentdir, index, total, parenturl
)
@ -319,10 +322,14 @@ class RedactionalObject(WritableObject):
langue_choisie: str
# Converted
_text: str
_url_title: str # Title in metadata of articles
_parenturl: str # URL relative to lang to direct parent
# Get rid of other lang than forced in text and modify lang to forced if found
def translate_multi(self, forced_lang: str, text: str) -> str:
LOG.debug(f"Translating <multi> blocks of `{self._title}`")
def translate_multi(
self, forced_lang: str, text: str, change_lang: bool = True
) -> str:
# LOG.debug(f"Translating <multi> blocks of `{self._url_title}`")
# for each <multi> blocks, keep only forced lang
lang: Optional[Match[str]] = None
for block in MULTILANG_BLOCK.finditer(text):
@ -331,16 +338,17 @@ class RedactionalObject(WritableObject):
# Log the translation
trans: str = lang.group(1)[:50].strip()
LOG.debug(
f"Keeping {forced_lang} translation of `{self._title}`: "
+ f"`{trans}`, becoming its new lang"
f"Keeping {forced_lang} translation of `{self._url_title}`: "
+ f"`{trans}`"
)
self.lang = forced_lang # So write-all will not be cancelled
if self.id_trad == 0: # Assign translation key to id so hugo can link
self.id_trad = self._id
if change_lang:
self.lang = forced_lang # So write-all will not be cancelled
if self.id_trad == 0: # Assign translation key to id for Hugo
self.id_trad = self._id
# Replace the mutli blocks with the text in the proper lang
text = text.replace(block.group(), lang.group(1))
if lang is None:
LOG.debug(f"{forced_lang} not found in `{self._title}`")
LOG.debug(f"{forced_lang} not found in `{self._url_title}`")
return text
def replace_links(self, text: str) -> str:
@ -391,7 +399,7 @@ class RedactionalObject(WritableObject):
for link, getobj, repl in LinkMappings():
# LOG.debug(f"Looking for {link} in {text}")
for m in link.finditer(text):
LOG.debug(f"Found internal link {m.group()} in {self._title}")
LOG.debug(f"Found internal link {m.group()} in {self._url_title}")
try:
LOG.debug(f"Searching for object of id {m.group(2)} with {getobj}")
o: "Document | Article | Section" = getobj(int(m.group(2)))
@ -406,54 +414,45 @@ class RedactionalObject(WritableObject):
print(repl, m.group(1), o.dest_filename())
raise err
else:
repl = repl.format(o._title, o.dest_filename())
LOG.debug(f"Translate link {m.group()} to {repl} in {self._title}")
repl = repl.format(o._storage_title, o.dest_filename())
LOG.debug(
f"Translate link {m.group()} to {repl} in {self._url_title}"
)
text = text.replace(m.group(), repl)
except DoesNotExist:
LOG.warn(f"No object for link {m.group()} in {self._title}")
LOG.warn(f"No object for link {m.group()} in {self._url_title}")
text = text.replace(m.group(), repl.format("", "NOT FOUND"), 1)
return text
# Modify this objects title to prevent filename conflicts
def conflict_title(self, conflict: str) -> None:
if CFG.conflict_strategy == "prepend id":
title: str = str(self._id) + "_" + self._title
elif CFG.conflict_strategy == "append id":
title: str = self._title + "_" + str(self._id)
elif CFG.conflict_strategy == "prepend counter":
m = match(r"([0-9]+)_" + self._title, conflict)
if m is not None:
title: str = str(int(m.group(1)) + 1) + "_" + self._title
else:
title: str = "1_" + self._title
else: # Defaults to append counter
m = match(self._title + r"_([0-9]+)$", conflict)
if m is not None:
title: str = self._title + "_" + str(int(m.group(1)) + 1)
else:
title: str = self._title + "_1"
LOG.debug(f"Rewriting {self._title} title to {title}")
self._title = title
# Get this object url, or none if its the same as directory
def url(self) -> str:
_id: str = str(self._id) + "-" if CFG.prepend_id else ""
counter: str = (
"_" + str(self._storage_title_append)
if self._storage_title_append > 0
else ""
)
# Return none if url will be the same as directory
return (
self._parenturl
+ slugify(_id + self._url_title, max_length=CFG.title_max_length)
+ counter
+ r"/"
)
# Get slugified directory of this object
def dest_directory(self) -> str:
_id: str = str(self._id) + "-" if CFG.prepend_id else ""
slug: str = slugify(_id + self._title, max_length=100)
directory: str = self._parentdir + slug
if self._storage_title is not None or self._storage_parentdir is not None:
self._url = directory
directory: str = (
self._storage_parentdir
if self._storage_parentdir is not None
else self._parentdir
+ slugify(
_id + self._storage_title
if self._storage_title is not None
else self._title,
max_length=100,
)
)
return directory + r"/"
counter: str = (
"_" + str(self._storage_title_append)
if self._storage_title_append > 0
else ""
)
directory: str = self._storage_parentdir + slugify(
_id + self._storage_title,
max_length=CFG.title_max_length,
)
return directory + counter + r"/"
# Get filename of this object
def dest_filename(self) -> str:
@ -462,64 +461,84 @@ class RedactionalObject(WritableObject):
def convert_title(self, forced_lang: str) -> None:
LOG.debug(f"Convert title of currently untitled {type(self).__name__}")
if hasattr(self, "_title"):
LOG.debug(f"{type(self).__name__} {self._title} _title is already set")
LOG.debug(f"{type(self).__name__} {self._url_title} _title is already set")
return
if self.titre is None:
LOG.debug(f"{type(self).__name__} title is None")
self._title = ""
self._url_title = ""
return
if len(self.titre) == 0:
LOG.debug(f"{type(self).__name__} title is empty")
self._title = ""
self._url_title = ""
return
self._title = self.titre.strip()
# Keep storage language title to store it
if CFG.storage_language is not None and CFG.storage_language != forced_lang:
self._storage_title = self.translate_multi(
CFG.storage_language, self._title
)
self._storage_title = self.convert_field(self._storage_title)
self._title = self.translate_multi(forced_lang, self._title)
LOG.debug(f"Convert internal links of {self.lang} `{self._title}` title")
self._title = self.replace_links(self._title)
LOG.debug(f"Apply conversions to {self.lang} `{self._title}` title")
self._title = self.convert_field(self._title)
self._url_title = self.titre.strip()
# Set storage title to language of storage lang if different
storage_lang: str = (
CFG.storage_language if CFG.storage_language is not None else forced_lang
)
LOG.debug(
f"Searching for {storage_lang} in <multi> blocks of `{self._url_title}`"
+ " storage title"
)
self._storage_title = self.translate_multi(
storage_lang,
self._url_title,
False,
)
LOG.debug(
f"Searching for {forced_lang} in <multi> blocks of `{self._url_title}`"
+ " URL title"
)
self._url_title = self.translate_multi(forced_lang, self._url_title)
LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` title")
self._storage_title = self.replace_links(self._storage_title)
self._url_title = self.replace_links(self._url_title)
LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` title")
self._storage_title = self.convert_field(self._storage_title)
self._url_title = self.convert_field(self._url_title)
for p in CFG.ignore_pattern:
for title in (self._storage_title, self._url_title):
m = match(p, title, I)
if m is not None:
raise IgnoredPatternError(
f"{self._url_title} matches with ignore pattern {p}, ignoring"
)
def convert_text(self, forced_lang: str) -> None:
LOG.debug(f"Convert text of `{self._title}`")
LOG.debug(f"Convert text of `{self._url_title}`")
if hasattr(self, "_text"):
LOG.debug(f"{type(self).__name__} {self._title} _text is already set")
LOG.debug(f"{type(self).__name__} {self._url_title} _text is already set")
return
if self.texte is None:
LOG.debug(f"{type(self).__name__} {self._title} text is None")
LOG.debug(f"{type(self).__name__} {self._url_title} text is None")
self._text = ""
return
if len(self.texte) == 0:
LOG.debug(f"{type(self).__name__} {self._title} text is empty")
LOG.debug(f"{type(self).__name__} {self._url_title} text is empty")
self._text = ""
return
self._text = self.translate_multi(forced_lang, self.texte.strip())
LOG.debug(f"Convert internal links of {self.lang} `{self._title}` text")
LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` text")
self._text = self.replace_links(self._text)
LOG.debug(f"Apply conversions to {self.lang} `{self._title}` text")
LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` text")
self._text = self.convert_field(self._text)
def convert_extra(self) -> None:
LOG.debug(f"Convert extra of `{self._title}`")
LOG.debug(f"Convert extra of `{self._url_title}`")
if hasattr(self, "_extra"):
LOG.debug(f"{type(self).__name__} {self._title} _extra is already set")
LOG.debug(f"{type(self).__name__} {self._url_title} _extra is already set")
return
if self.extra is None:
LOG.debug(f"{type(self).__name__} {self._title} extra is None")
LOG.debug(f"{type(self).__name__} {self._url_title} extra is None")
self._extra = ""
return
if len(self.extra) == 0:
LOG.debug(f"{type(self).__name__} {self._title} extra is empty")
LOG.debug(f"{type(self).__name__} {self._url_title} extra is empty")
self._extra = ""
return
LOG.debug(f"Convert internal links of {self.lang} `{self._title}` extra")
LOG.debug(f"Convert internal links of {self.lang} `{self._url_title}` extra")
self._extra = self.replace_links(self._extra)
LOG.debug(f"Apply conversions to {self.lang} `{self._title}` extra")
LOG.debug(f"Apply conversions to {self.lang} `{self._url_title}` extra")
self._extra = self.convert_field(self._extra)
def __init__(self, *args, **kwargs):
@ -529,7 +548,7 @@ class RedactionalObject(WritableObject):
# Get related documents
def documents(self) -> tuple[Document]:
LOG.debug(f"Initialize documents of `{self._title}`")
LOG.debug(f"Initialize documents of `{self._url_title}`")
documents = (
Document.select()
.join(
@ -546,7 +565,7 @@ class RedactionalObject(WritableObject):
meta: dict[str, Any] = {
"lang": self.lang,
"translationKey": self.id_trad,
"title": self._title,
"title": self._url_title,
"publishDate": self.date,
"lastmod": self.maj,
"draft": self._draft,
@ -555,8 +574,9 @@ class RedactionalObject(WritableObject):
"spip_id_secteur": self.id_secteur,
"spip_id": self._id,
}
if self._url is not None:
meta = meta | {"url": self._url}
# Add url if different of directory
if self.url() not in self.dest_directory():
meta = meta | {"url": self.url()}
if append is not None:
return dump(meta | append, allow_unicode=True)
else:
@ -568,8 +588,8 @@ class RedactionalObject(WritableObject):
# Start the content with frontmatter
body: str = "---\n" + self.frontmatter() + "---"
# Add the title as a Markdown h1
if len(self._title) > 0 and CFG.prepend_h1:
body += "\n\n# " + self._title
if self._url_title is not None and len(self._url_title) > 0 and CFG.prepend_h1:
body += "\n\n# " + self._url_title
# If there is a text, add the text preceded by two line breaks
if len(self._text) > 0:
# Remove remaining HTML after & append to body
@ -584,9 +604,8 @@ class RedactionalObject(WritableObject):
self,
children: tuple[Document] | tuple[Any],
forcedlang: str,
storage_parentdir: Optional[str] = None,
) -> list[str]:
LOG.debug(f"Writing documents of {type(self).__name__} `{self._title}`")
LOG.debug(f"Writing documents of {type(self).__name__} `{self._url_title}`")
output: list[str] = []
total = len(children)
i = 0
@ -599,40 +618,54 @@ class RedactionalObject(WritableObject):
i,
total,
forcedlang,
storage_parentdir,
self.url(),
)
)
i += 1
except LangNotFoundError as err:
LOG.debug(err)
except DontExportDraftError as err:
LOG.debug(err)
except IgnoredPatternError as err:
except (
LangNotFoundError,
DontExportDraftError,
IgnoredPatternError,
) as err:
LOG.debug(err)
return output
# Write object to output destination
def write(self) -> str:
# Make a directory for this object if there isnt
directory: str = self.dest_directory()
try:
makedirs(directory)
except FileExistsError:
# Create a new directory if write is about to overwrite an existing file
# or to write into a directory without the same fileprefix
for file in listdir(directory):
LOG.debug(
f"Testing if {type(self).__name__} `{self.dest_path()}` of prefix "
+ f"{self._fileprefix} can be written along with `{file}` "
+ f"of prefix `{file.split('.')[0]}` in `{self.dest_directory()}`"
)
if isfile(directory + file) and (
directory + file == self.dest_path()
or file.split(".")[0] != self._fileprefix
):
self.conflict_title(directory.split("/")[-1])
makedirs(self.dest_directory())
break
# If it cannot for incompatibility, try until it can
incompatible: bool = True
while incompatible:
directory: str = self.dest_directory()
try:
mkdir(directory)
break
except FileExistsError:
# If not stated incompatible with the following, will write in this dir
incompatible = False
# Create a new directory if write is about to overwrite an existing file
# or to write into a directory without the same fileprefix
for file in listdir(directory):
if isfile(directory + file):
LOG.debug(
f"Can {type(self).__name__} `{self.dest_path()}` of prefix "
+ f"{self._fileprefix} and suffix {CFG.export_filetype}"
+ f" be written along with `{file}` of prefix "
+ f"`{file.split('.')[0]}` and suffix {file.split('.')[-1]}"
+ f"` in {self.dest_directory()}` ?"
)
# Resolve conflict at first incompatible file encountered
if directory + file == self.dest_path() or (
file.split(".")[-1] == CFG.export_filetype
and file.split(".")[0] != self._fileprefix
):
LOG.debug(
f"No, incrementing counter of {self.dest_directory()}"
)
self._storage_title_append += 1
incompatible = True
break
# Write the content of this object into a file named as self.filename()
with open(self.dest_path(), "w") as f:
f.write(self.content())
@ -641,17 +674,11 @@ class RedactionalObject(WritableObject):
# Apply post-init conversions and cancel the export if self not of the right lang
def convert(self, forced_lang: str) -> None:
self.convert_title(forced_lang)
for p in CFG.ignore_pattern:
m = match(p, self._title, I)
if m is not None:
raise IgnoredPatternError(
f"{self._title} is matching with ignore pattern {p}, ignoring"
)
self.convert_text(forced_lang)
self.convert_extra()
if self.lang != forced_lang:
raise LangNotFoundError(
f"`{self._title}` lang is {self.lang} instead of the wanted"
f"`{self._url_title}` lang is {self.lang} instead of the wanted"
+ f" {forced_lang} and it dont contains"
+ f" {forced_lang} translation in Markup either"
)
@ -705,7 +732,7 @@ class Article(RedactionalObject, SpipArticles):
return body
def authors(self) -> list[SpipAuteurs]:
LOG.debug(f"Initialize authors of `{self._title}`")
LOG.debug(f"Initialize authors of `{self._url_title}`")
return (
SpipAuteurs.select()
.join(
@ -719,20 +746,18 @@ class Article(RedactionalObject, SpipArticles):
def write_all(
self,
parentdepth: int,
parentdir: str,
storage_parentdir: str,
index: int,
total: int,
forced_lang: str,
storage_parentdir: Optional[str] = None,
parenturl: str,
) -> DeepDict:
self.convert(forced_lang)
return {
"msg": super().write_all(
parentdepth, parentdir, index, total, storage_parentdir
),
"documents": self.write_children(
self.documents(), forced_lang, storage_parentdir
parentdepth, storage_parentdir, index, total, parenturl
),
"documents": self.write_children(self.documents(), forced_lang),
}
@ -756,7 +781,7 @@ class Section(RedactionalObject, SpipRubriques):
# Get articles of this section
def articles(self, limit: int = 10**6) -> tuple[Article]:
LOG.debug(f"Initialize articles of `{self._title}`")
LOG.debug(f"Initialize articles of `{self._url_title}`")
return (
Article.select()
.where(Article.id_rubrique == self._id)
@ -766,7 +791,7 @@ class Section(RedactionalObject, SpipRubriques):
# Get subsections of this section
def sections(self, limit: int = 10**6) -> tuple[Self]:
LOG.debug(f"Initialize subsections of `{self._title}`")
LOG.debug(f"Initialize subsections of `{self._url_title}`")
return (
Section.select()
.where(Section.id_parent == self._id)
@ -783,24 +808,18 @@ class Section(RedactionalObject, SpipRubriques):
def write_all(
self,
parentdepth: int,
parentdir: str,
storage_parentdir: str,
index: int,
total: int,
forced_lang: str,
storage_parentdir: Optional[str] = None,
parenturl: str = "",
) -> DeepDict:
self.convert(forced_lang)
return {
"msg": super().write_all(
parentdepth, parentdir, index, total, storage_parentdir
),
"documents": self.write_children(
self.documents(), forced_lang, storage_parentdir
),
"articles": self.write_children(
self.articles(), forced_lang, storage_parentdir
),
"sections": self.write_children(
self.sections(), forced_lang, storage_parentdir
parentdepth, storage_parentdir, index, total, parenturl
),
"documents": self.write_children(self.documents(), forced_lang),
"articles": self.write_children(self.articles(), forced_lang),
"sections": self.write_children(self.sections(), forced_lang),
}

View File

@ -58,7 +58,9 @@ as database user {esc(BOLD)}{CFG.db_user}{esc()}
except IgnoredPatternError as err:
ROOTLOG.debug(err) # Log the message
print() # Break line between level 0 sections in output
ROOTLOG.debug(f"Finished exporting {lang} root section {i}/{nb} {s._title}")
ROOTLOG.debug(
f"Finished exporting {lang} root section {i}/{nb} {s._url_title}"
)
return {"sections": buffer}