import asyncio import pickle from pathlib import Path import random from typing import List import httpx import ssl import os from sqlmodel import select, Session from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult from worker.search_engine.search_result_db import SearchResultManager, db_manager, SearchPageResult, SearchResultItem from mylib.base import ensure_output_dir, save_to_file, load_from_pickle from mylib.logu import get_logger from config.settings import PANDOC_EXE, HTTP_PROXY, HTTPS_PROXY from worker.html_convert.models import HtmlConvertResult from worker.html_convert.docling_converter import DoclingConverter from worker.html_convert.crawl_filter import CrawlFilter import subprocess from docx import Document from docx.oxml.ns import qn from docx.oxml import OxmlElement import tempfile logger = get_logger('pandoc') class PandocConverter: """Class for handling Pandoc conversions with customizable options""" def __init__(self, font_name: str = "宋体", include_toc: bool = False): """ Initialize PandocConverter with optional parameters Args: font_name (str): The default font to use in DOCX output include_toc (bool): Whether to include table of contents in DOCX output """ self.font_name = font_name self.include_toc = include_toc self._temp_dir = tempfile.TemporaryDirectory() # Create a temporary directory for reference docs def convert_md_to_docx(self, md_path: Path, output_path: Path) -> bool: """Convert markdown file to docx using pandoc with custom options""" try: # Prepare environment variables with proxy settings env = os.environ.copy() if HTTP_PROXY: env['HTTP_PROXY'] = HTTP_PROXY env['http_proxy'] = HTTP_PROXY # Some systems use lowercase if HTTPS_PROXY: env['HTTPS_PROXY'] = HTTPS_PROXY env['https_proxy'] = HTTPS_PROXY cmd = [ PANDOC_EXE, '-f', 'markdown+pipe_tables+simple_tables+multiline_tables', '-t', 'docx', '--standalone=true', '--embed-resources=true', '--reference-doc', self._get_reference_doc(), '-o', str(output_path), str(md_path) ] if self.include_toc: # Specify heading levels for TOC cmd.insert(-1, '--toc') cmd.insert(-1, '--toc-depth=2') # Include up to level 3 headings # Add verbose flag to capture more information about resource fetching cmd.append('--verbose') result = subprocess.run( cmd, check=True, env=env, # Pass the environment with proxies stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Log any warnings about resources if "Could not fetch resource" in result.stderr: logger.warning(f"Resource fetching issue detected for {md_path}: {result.stderr}") return result.returncode == 0 except subprocess.CalledProcessError as e: logger.exception(f"Pandoc conversion error for {md_path}: {e.stderr}") return False except Exception as e: logger.exception(f"Error converting {md_path} to docx: {e}") return False def process_single_result(self, result_id: int, skip_existing: bool = True) -> bool: """Process a single SearchResultItem and convert both filtered and docling markdown files to docx""" db_manager = SearchResultManager() with Session(db_manager.engine) as session: # Get the search result item result_item = session.get(SearchResultItem, result_id) if not result_item: logger.error(f"SearchResultItem with ID {result_id} not found") return False # Get the HTML convert result html_convert:HtmlConvertResult = session.exec( select(HtmlConvertResult) .where(HtmlConvertResult.search_result_item_id == result_id) ).first() if not html_convert: logger.error(f"No HtmlConvertResult found for SearchResultItem {result_id}") return False logger.info(f"pandoc start html_convert id {html_convert.id} result_id {result_id}") # Initialize success flags docling_success = False filtered_success = False # Convert docling markdown if available if html_convert.docling_md_path: docling_md_path = Path(html_convert.docling_md_path) docling_docx_path = docling_md_path.with_suffix('.docx') # Skip if already converted if skip_existing and docling_docx_path.exists(): logger.info(f"Skipping already converted docling markdown: {docling_docx_path}") docling_success = True else: docling_success = self.convert_md_to_docx(docling_md_path, docling_docx_path) if docling_success: html_convert.pandoc_docx_path = str(docling_docx_path) html_convert.is_pandoc_converted = True logger.info(f"Successfully converted docling markdown to {docling_docx_path}") # Convert filtered markdown if available # 暂时不处理filtered markdown,因为 pandoc 要加载资源,很慢 if False and html_convert.filter_crawl_md_path: filtered_md_path = Path(html_convert.filter_crawl_md_path) filtered_docx_path = filtered_md_path.with_suffix('.docx') # Skip if already converted if skip_existing and filtered_docx_path.exists(): logger.info(f"Skipping already converted filtered markdown: {filtered_docx_path}") filtered_success = True else: filtered_success = self.convert_md_to_docx(filtered_md_path, filtered_docx_path) if filtered_success: html_convert.pandoc_docx_path = str(filtered_docx_path) html_convert.is_pandoc_converted = True logger.info(f"Successfully converted filtered markdown to {filtered_docx_path}") # Update database if either conversion succeeded if docling_success or filtered_success: session.add(html_convert) session.commit() return True return False def _get_reference_doc(self) -> str: """Get path to reference document with specified font""" reference_doc = Path(self._temp_dir.name) / f"{self.font_name.replace(' ', '_')}.docx" if not reference_doc.exists(): self._create_reference_doc(reference_doc) return str(reference_doc) def _create_reference_doc(self, reference_doc: Path): """Create reference document with specified font and heading styles""" from docx import Document from docx.shared import Pt doc = Document() # Set Normal style font normal_style = doc.styles['Normal'] normal_style.font.name = self.font_name normal_style.font.size = Pt(12) doc.save(str(reference_doc)) def process_single_example(result_id: int, skip_existing=True): # 未来可能删除, 现在先不删 Process a single result example docling_converter = DoclingConverter() search_result_item = docling_converter.get_search_result_item(result_id) if (search_result_item and search_result_item.save_path and search_result_item.save_path.endswith('.html')): docling_converter.process_conversion_by_id(result_id, skip_existing=skip_existing) crawl_filter = CrawlFilter() crawl_filter.process_filter_by_id(result_id, skip_existing=skip_existing) pandoc_converter = PandocConverter(font_name="宋体", include_toc=True) logger.info(f"skip_existing {skip_existing}") success = pandoc_converter.process_single_result(result_id, skip_existing=skip_existing) if success: logger.info(f"Successfully processed result {result_id}") else: logger.error(f"Failed to process result {result_id}") return success def process_all_results(): # Process all results in the database global db_manager with Session(db_manager.engine) as session: # Fetch all IDs with explicit ordering result_ids = session.exec(select(SearchResultItem.id, SearchResultItem.save_path).order_by(SearchResultItem.id)).all() logger.info(f"Total results: {len(result_ids)}") logger.info(f"First 5 result IDs: {result_ids[:5]}") for result_id, save_path in result_ids: try: if save_path and save_path.endswith('.html'): process_single_example(result_id) except Exception as e: logger.error(f"Error processing result {result_id}: {e}") if __name__ == "__main__": # 计算运行时间 import time start_time = time.time() process_single_example(996, skip_existing=False) end_time = time.time() print(f"Total time: {end_time - start_time} seconds") # process_all_results()