mirrored 3 minutes ago
0
yuanmengqifix: improve EPUB processing by checking for file existence before reading - Added checks for the presence of "toc.ncx" and "content.opf" in the EPUB file before attempting to process them. - Introduced debug logging to notify when these files are not found, enhancing error handling and traceability. - Maintained existing logic while improving robustness of the EPUB processing function. 122b167
import logging
import os
import os.path
import zipfile
from typing import List, Dict
from typing import Union, TypeVar

import lxml.html
from lxml.html import HtmlElement
from mutagen.easyid3 import EasyID3

from .general import diff_text_file
from .utils import _match_value_to_rule

logger = logging.getLogger("desktopenv.metric.others")


def process_epub(filename: str) -> List[str]:
    file_list: List[str] = []

    base_dir: str = filename + ".dir"
    os.makedirs(base_dir, exist_ok=True)

    try:
        with zipfile.ZipFile(filename, "r") as z_f:
            # Get list of all files in the zip archive
            zip_file_list = z_f.namelist()
            
            # Process toc.ncx if it exists
            if "toc.ncx" in zip_file_list:
                with z_f.open("toc.ncx") as in_f \
                        , open(os.path.join(base_dir, "toc.ncx"), "w") as out_f:
                    contents: str = in_f.read().decode()
                    contents = contents.splitlines()
                    for l in contents:
                        if "navPoint" not in l:
                            out_f.write(l + "\n")
                file_list.append(os.path.join(base_dir, "toc.ncx"))
            else:
                logger.debug("toc.ncx not found in epub file: %s", filename)
            
            # Process content.opf if it exists
            if "content.opf" in zip_file_list:
                with z_f.open("content.opf") as in_f \
                        , open(os.path.join(base_dir, "content.opf"), "w") as out_f:
                    contents: str = in_f.read().decode()
                    contents = contents.splitlines()
                    for l in contents:
                        if "dc:identifier" not in l:
                            out_f.write(l + "\n")
                file_list.append(os.path.join(base_dir, "content.opf"))
            else:
                logger.debug("content.opf not found in epub file: %s", filename)
            for f_n in z_f.namelist():
                if f_n.endswith(".html"):
                    with z_f.open(f_n) as in_f \
                            , open(os.path.join(base_dir, f_n), "w") as out_f:
                        html: HtmlElement = lxml.html.fromstring(
                            ''.join(filter(lambda ch: ch != "\n" and ch != "\r"
                                           , in_f.read().decode()
                                           )
                                    ).encode()
                        )
                        out_f.write(lxml.html.tostring(html, pretty_print=True, encoding="unicode"))
                    file_list.append(os.path.join(base_dir, f_n))
        logger.debug("%s: %s", filename, file_list)
        return list(sorted(file_list))
    except zipfile.BadZipFile:
        return []


def compare_epub(result: str, expected: str) -> float:
    if result is None:
        return 0.
    result_files: List[str] = process_epub(result)
    expected_files: List[str] = process_epub(expected)

    metric: float = 0.
    for f1, f2 in zip(result_files, expected_files):
        current_metric: float = diff_text_file(f1, f2)
        logger.debug("%s vs %s: %f", f1, f2, current_metric)
        metric += current_metric
    if len(result_files) > 0:
        metric /= len(result_files)
    return metric


V = TypeVar("Value")


def check_mp3_meta(result: str, meta: Dict[str, Dict[str, Union[str, V]]]) -> bool:
    # checks using _match_value_to_rule
    if result is None:
        return 0.

    id3_dict = EasyID3(result)
    metric: bool = True
    for k, r in meta.items():
        value = id3_dict.get(k, "")
        if isinstance(value, list):
            value: str = ",".join(value)
        logger.debug("%s.%s: %s", result, k, value)
        metric = metric and _match_value_to_rule(value, r)
    return float(metric)