big refactor, code reusage, sections files export, more compact output, simplifications

This commit is contained in:
Guilhem Fauré 2023-05-23 17:45:34 +02:00
parent bf6b8d4fe5
commit 0ce795dc08
3 changed files with 104 additions and 112 deletions

View File

@ -2,7 +2,6 @@
from os.path import isfile from os.path import isfile
from typing import Optional from typing import Optional
# from yaml import CLoader as Loader
from yaml import Loader, load from yaml import Loader, load
config_paths = ("spip2md.yml", "spip2md.yaml") config_paths = ("spip2md.yml", "spip2md.yaml")
@ -20,28 +19,17 @@ class Configuration:
db_user: str = "spip" db_user: str = "spip"
db_pass: str = "password" db_pass: str = "password"
output_dir: str = "output" output_dir: str = "output"
default_export_max: int = 1000 max_articles_export: int = 1000
max_sections_export: int = 500
data_dir: str = "data" data_dir: str = "data"
clear_output: bool = False clear_output: bool = False
def __init__(self, config_file: Optional[str] = None) -> None: def __init__(self, config_file: Optional[str] = None):
if config_file is not None: if config_file is not None:
with open(config_file) as f: with open(config_file) as f:
config = load(f.read(), Loader=Loader) config = load(f.read(), Loader=Loader)
if "db" in config: for attr in config:
self.db = config["db"] setattr(self, attr, config[attr])
if "db_user" in config:
self.db_user = config["db_user"]
if "db_pass" in config:
self.db_pass = config["db_pass"]
if "output_dir" in config:
self.output_dir = config["output_dir"]
if "default_export_nb" in config:
self.default_export_max = config["default_export_max"]
if "data_dir" in config:
self.data_dir = config["data_dir"]
if "clear_output" in config:
self.clear_output = config["clear_output"]
config = Configuration(config_file()) config = Configuration(config_file())

View File

@ -18,33 +18,29 @@ from database import (
EXPORTTYPE: str = "md" EXPORTTYPE: str = "md"
class LimitCounter:
def __init__(self, limit: int) -> None:
self.count: int = -1
self.LIMIT: int = limit
def remaining(self) -> int:
return self.LIMIT - self.count
def step(self) -> int:
self.count += 1
if self.remaining() <= 0:
raise StopIteration
return self.count
class Iterator: class Iterator:
items: list[Any] items: list[Any]
def __init__(self) -> None: def __init__(self) -> None:
# Set a counter caped at the number of retrieved items # Set the limit at the number of retrieved items
self.count = LimitCounter(len(self.items)) self.LIMIT: int = len(self.items)
# Start before the first element
self.count: int = -1
def __iter__(self): def __iter__(self):
return self return self
def __len__(self) -> int: def __len__(self) -> int:
return self.count.LIMIT return self.LIMIT
def remaining(self) -> int:
return self.LIMIT - self.count
def __next__(self) -> Any:
self.count += 1
if self.remaining() <= 0:
raise StopIteration
return self.items[self.count]
class Document: class Document:
@ -72,7 +68,7 @@ class Document:
class Documents(Iterator): class Documents(Iterator):
def __init__(self, object_id: int) -> None: def __init__(self, object_id: int) -> None:
# Query the DB to retrieve all documents related to object of id object_id # Query the DB to retrieve all documents related to object of id object_id
self.items = ( items = (
SpipDocuments.select() SpipDocuments.select()
.join( .join(
SpipDocumentsLiens, SpipDocumentsLiens,
@ -80,16 +76,14 @@ class Documents(Iterator):
) )
.where(SpipDocumentsLiens.id_objet == object_id) .where(SpipDocumentsLiens.id_objet == object_id)
) )
self.items: list[Document] = [Document(i) for i in items]
super().__init__() super().__init__()
def __next__(self):
return (Document(self.items[self.count.step()]), self.count)
class Item: class Item:
id: int id: int
def __init__(self, item: SpipArticles | SpipRubriques) -> None: def __init__(self, item: SpipArticles | SpipRubriques):
self.title: str = convert_meta(item.titre) self.title: str = convert_meta(item.titre)
self.section_id: int = item.id_rubrique self.section_id: int = item.id_rubrique
self.description: str = convert_meta(item.descriptif) self.description: str = convert_meta(item.descriptif)
@ -139,7 +133,7 @@ class Item:
# Convert images & files links # Convert images & files links
text: str = convert_documents( text: str = convert_documents(
self.text, self.text,
[(d.id, d.title, d.get_slug()) for d, _ in self.get_documents()], [(d.id, d.title, d.get_slug()) for d in self.get_documents()],
) )
# Remove remaining HTML after & append to body # Remove remaining HTML after & append to body
body += "\n\n" + remove_tags(text) body += "\n\n" + remove_tags(text)
@ -157,7 +151,7 @@ class Item:
class Article(Item): class Article(Item):
def __init__(self, article: SpipArticles) -> None: def __init__(self, article: SpipArticles):
super().__init__(article) super().__init__(article)
self.id: int = article.id_article self.id: int = article.id_article
self.surtitle: str = convert_meta(article.surtitre) # Probably unused self.surtitle: str = convert_meta(article.surtitre) # Probably unused
@ -218,7 +212,7 @@ class Article(Item):
class Section(Item): class Section(Item):
def __init__(self, section: SpipRubriques) -> None: def __init__(self, section: SpipRubriques):
super().__init__(section) super().__init__(section)
self.id: int = section.id_rubrique self.id: int = section.id_rubrique
self.parent_id: int = section.id_parent self.parent_id: int = section.id_parent
@ -233,37 +227,33 @@ class Section(Item):
class Articles(Iterator): class Articles(Iterator):
def __init__(self, section_id: int, limit: int = 0) -> None: def __init__(self, section_id: int, limit: int = 0):
# Query the DB to retrieve all articles sorted by publication date # Query the DB to retrieve all articles sorted by publication date
if limit > 0: if limit > 0:
self.items = ( items = (
SpipArticles.select() SpipArticles.select()
.where(SpipArticles.id_rubrique == section_id) .where(SpipArticles.id_rubrique == section_id)
.order_by(SpipArticles.date.desc()) .order_by(SpipArticles.date.desc())
.limit(limit) .limit(limit)
) )
else: else:
self.items = ( items = (
SpipArticles.select() SpipArticles.select()
.where(SpipArticles.id_rubrique == section_id) .where(SpipArticles.id_rubrique == section_id)
.order_by(SpipArticles.date.desc()) .order_by(SpipArticles.date.desc())
) )
self.items: list[Article] = [Article(i) for i in items]
super().__init__() super().__init__()
def __next__(self):
return (Article(self.items[self.count.step()]), self.count)
class Sections(Iterator): class Sections(Iterator):
def __init__(self, limit: int = 0) -> None: def __init__(self, limit: int = 0):
# Query the DB to retrieve all sections sorted by publication date # Query the DB to retrieve all sections sorted by publication date
if limit > 0: if limit > 0:
self.items = ( items = (
SpipRubriques.select().order_by(SpipRubriques.date.desc()).limit(limit) SpipRubriques.select().order_by(SpipRubriques.date.desc()).limit(limit)
) )
else: else:
self.items = SpipRubriques.select().order_by(SpipRubriques.date.desc()) items = SpipRubriques.select().order_by(SpipRubriques.date.desc())
self.items: list[Section] = [Section(i) for i in items]
super().__init__() super().__init__()
def __next__(self):
return (Section(self.items[self.count.step()]), self.count)

View File

@ -10,10 +10,7 @@ from converter import get_unknown_chars, unknown_chars
from database import db from database import db
from items import ( from items import (
Article, Article,
Articles,
Document, Document,
Documents,
LimitCounter,
Section, Section,
Sections, Sections,
) )
@ -55,27 +52,30 @@ def highlight(string: str, *start_stop: tuple[int, int]) -> None:
print(string[previous_stop:], end="") print(string[previous_stop:], end="")
# Plural ?
def s(nb: int) -> str:
return "s" if nb > 1 else ""
# Indent with spaces
def indent(nb: int = 1) -> None:
for _ in range(nb):
print(" ", end="")
# Connect to the MySQL database with Peewee ORM # Connect to the MySQL database with Peewee ORM
db.init(config.db, host=config.db_host, user=config.db_user, password=config.db_pass) db.init(config.db, host=config.db_host, user=config.db_user, password=config.db_pass)
db.connect() db.connect()
# Output information about ongoing export & write section to output destination # Output information about ongoing export & write section to output destination
def write_section( def write_section(index: int, total: int, section: Section) -> str:
section: Section, counter: LimitCounter
) -> tuple[Articles, Documents, str]:
# Print the name of the exported section & number of remaining sections # Print the name of the exported section & number of remaining sections
style(f"{counter.count + 1}. ", BO) style(f"{index + 1}. ", BO)
highlight(section.title, *unknown_chars(section.title)) highlight(section.title, *unknown_chars(section.title))
if counter.remaining() > 2: style(f" {total-index-1}", BO, G)
style(f" {counter.remaining()-1}", BO, G) style(f" section{s(total-index)}")
style(" sections") print(" left to export")
print(" left to export", end="")
if toexport > 1:
style(f" {toexport}", BO, Y)
style(" articles")
print(" left before export limit", end="")
print()
# Define the sections path (directory) & create directory(ies) if needed # Define the sections path (directory) & create directory(ies) if needed
sectiondir: str = config.output_dir + "/" + section.get_slug() sectiondir: str = config.output_dir + "/" + section.get_slug()
makedirs(sectiondir, exist_ok=True) makedirs(sectiondir, exist_ok=True)
@ -84,29 +84,26 @@ def write_section(
with open(sectionpath, "w") as f: with open(sectionpath, "w") as f:
f.write(section.get_content()) f.write(section.get_content())
# Return the first "limit" articles of section # Return the first "limit" articles of section
return (section.get_articles(), section.get_documents(), sectiondir) return sectiondir
# Output information about ongoing export & write article to output destination # Output information about ongoing export & write article to output destination
def write_article( def write_article(index: int, total: int, article: Article, sectiondir: str) -> str:
article: Article, counter: LimitCounter, sectiondir: str
) -> tuple[Documents, str]:
# Print the remaining number of articles to export every 100 articles # Print the remaining number of articles to export every 100 articles
if counter.count % 100 == 0: if index % 100 == 0:
s: str = "s" if counter.remaining() > 1 else "" indent()
print(" Exporting", end="") print("Exporting", end="")
style(f" {counter.remaining()}", BO, Y) style(f" {total-index}", BO, Y)
print(" SPIP", end="") print(" SPIP", end="")
style(f" article{s}") style(f" article{s(total-index)}")
print(" to Markdown & YAML files") print(" to Markdown & YAML files")
# Print the title of the article being exported # Print the title of the article being exported
style( style(
f" {counter.count + 1}. " f" {index + 1}. "
+ ("EMPTY " if len(article.text) < 1 else "") + ("EMPTY " if len(article.text) < 1 else "")
+ f"{article.lang} " + f"{article.lang} "
) )
highlight(article.title, *unknown_chars(article.title)) highlight(article.title, *unknown_chars(article.title))
print()
# Define the full article path & create directory(ies) if needed # Define the full article path & create directory(ies) if needed
articledir: str = sectiondir + "/" + article.get_slug() articledir: str = sectiondir + "/" + article.get_slug()
makedirs(articledir, exist_ok=True) makedirs(articledir, exist_ok=True)
@ -114,43 +111,46 @@ def write_article(
articlepath: str = articledir + "/" + article.get_filename() articlepath: str = articledir + "/" + article.get_filename()
with open(articlepath, "w") as f: with open(articlepath, "w") as f:
f.write(article.get_content()) f.write(article.get_content())
# Store articles with unknown characters # Print export location when finished exporting
if len(get_unknown_chars(article.text)) > 0: style(" -> ", BO, G)
unknown_chars_articles.append(article) print(articlepath)
return (article.get_documents(), articledir) return articledir
# Output information about ongoing export & copy document to output destination # Output information about ongoing export & copy document to output destination
def write_document(document: Document, counter: LimitCounter, objectdir: str) -> None: def write_document(
if counter.count % 100 == 0: index: int, total: int, document: Document, objectdir: str, indent_depth: int = 1
s: str = "s" if counter.remaining() > 1 else "" ) -> None:
print(" Exporting", end="") if index % 100 == 0:
style(f" {counter.remaining()}", BO, B) indent(indent_depth)
style(f" document{s}") print("Exporting", end="")
print(" in this article") style(f" {total-index}", BO, B)
style(f" document{s(total-index)}\n")
# Print the name of the file with a counter # Print the name of the file with a counter
style(f" {counter.count + 1}. {document.media} ") indent(indent_depth)
style(f"{index + 1}. {document.media} ")
if len(document.title) > 0: if len(document.title) > 0:
highlight(document.title + " ", *unknown_chars(document.title)) highlight(document.title + " ", *unknown_chars(document.title))
style("at ") style("at ")
print(document.file) print(document.file, end="")
# Define document path # Define document path
documentpath: str = expanduser(config.data_dir + "/" + document.file) documentpath: str = expanduser(config.data_dir + "/" + document.file)
# Copy the document from its SPIP location to the new location # Copy the document from its SPIP location to the new location
try: try:
copyfile(documentpath, objectdir + "/" + document.get_slug()) copyfile(documentpath, objectdir + "/" + document.get_slug())
except FileNotFoundError: except FileNotFoundError:
style(" NOT FOUND: ", BO, R) style(" -> NOT FOUND!\n", BO, R)
print(documentpath)
else: else:
# Print the outputted files path when copied the file # Print the outputted files path when copied the file
style(" -->", BO, B) style(" ->", BO, B)
print(f" {objectdir}/{document.get_slug()}") print(f" {objectdir}/{document.get_slug()}")
# Return true if an article field contains an unknown character # Return true if an article field contains an unknown character
def has_unknown_chars(article: Article) -> bool: def has_unknown_chars(article: Article) -> bool:
if len(get_unknown_chars(article.text)) > 0:
return True return True
return False
# Print the detected unknown chars in article in their context but highlighted # Print the detected unknown chars in article in their context but highlighted
@ -177,9 +177,14 @@ def warn_unknown_chars(article: Article) -> None:
if __name__ == "__main__": if __name__ == "__main__":
# Define max nb of articles to export based on first CLI argument # Define max nb of articles to export based on first CLI argument
if len(argv) >= 2: if len(argv) >= 2:
toexport = int(argv[1]) max_articles_export = int(argv[1])
else: else:
toexport = config.default_export_max max_articles_export = config.max_articles_export
# Define max nb of sections to export based on second CLI argument
if len(argv) >= 3:
max_sections_export = int(argv[2])
else:
max_sections_export = config.max_sections_export
# Clear the output dir & create a new # Clear the output dir & create a new
if config.clear_output: if config.clear_output:
@ -189,24 +194,33 @@ if __name__ == "__main__":
# Make a list containing articles where unknown characters are detected # Make a list containing articles where unknown characters are detected
unknown_chars_articles: list[Article] = [] unknown_chars_articles: list[Article] = []
# Loop among first maxexport articles & export them # Get sections with an eventual maximum
for section, counter in Sections(toexport): sections = Sections(max_sections_export)
nb_sections_export: int = len(sections)
# Loop among sections & export them
for i, section in enumerate(sections):
# Write the section & store its articles # Write the section & store its articles
articles, documents, sectiondir = write_section(section, counter) sectiondir = write_section(i, nb_sections_export, section)
# Loop over sections related files (images …) # Loop over sections related files (images …)
for document, counter in documents: documents = section.get_documents()
write_document(document, counter, sectiondir) for i, document in enumerate(documents):
write_document(i, len(documents), document, sectiondir)
# Loop over sections articles # Loop over sections articles
for article, counter in articles: articles = section.get_articles(max_articles_export)
documents, articledir = write_article(article, counter, sectiondir) for i, article in enumerate(articles):
articledir = write_article(i, len(articles), article, sectiondir)
# Add article to unknown_chars_articles if needed # Add article to unknown_chars_articles if needed
if has_unknown_chars(article): if has_unknown_chars(article):
unknown_chars_articles.append(article) unknown_chars_articles.append(article)
# Decrement export limit
max_articles_export -= 1
# Loop over articles related files (images …) # Loop over articles related files (images …)
for document, counter in documents: documents = section.get_documents()
write_document(document, counter, articledir) for i, document in enumerate(documents):
# Break 2 lines when finished exporting the section write_document(i, len(documents), document, sectiondir, 2)
print("\n") # Break line when finished exporting the section
print()
# Loop through each article that contains an unknown character # Loop through each article that contains an unknown character
for article in unknown_chars_articles: for article in unknown_chars_articles: