| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- import asyncio
- import pickle
- from pathlib import Path
- import random
- from typing import List
- import httpx
- import ssl
- import os
- from sqlmodel import select, Session
- from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
- from worker.search_engine.search_result_db import SearchResultManager, db_manager, SearchPageResult, SearchResultItem
- from mylib.base import ensure_output_dir, save_to_file, load_from_pickle
- from mylib.logu import get_logger
- from config.settings import PANDOC_EXE, HTTP_PROXY, HTTPS_PROXY
- from worker.html_convert.models import HtmlConvertResult
- from worker.html_convert.docling_converter import DoclingConverter
- from worker.html_convert.crawl_filter import CrawlFilter
- import subprocess
- from docx import Document
- from docx.oxml.ns import qn
- from docx.oxml import OxmlElement
- import tempfile
- logger = get_logger('pandoc')
- class PandocConverter:
- """Class for handling Pandoc conversions with customizable options"""
-
- def __init__(self, font_name: str = "宋体", include_toc: bool = False):
- """
- Initialize PandocConverter with optional parameters
-
- Args:
- font_name (str): The default font to use in DOCX output
- include_toc (bool): Whether to include table of contents in DOCX output
- """
- self.font_name = font_name
- self.include_toc = include_toc
- self._temp_dir = tempfile.TemporaryDirectory() # Create a temporary directory for reference docs
-
- def convert_md_to_docx(self, md_path: Path, output_path: Path) -> bool:
- """Convert markdown file to docx using pandoc with custom options"""
- try:
- # Prepare environment variables with proxy settings
- env = os.environ.copy()
- if HTTP_PROXY:
- env['HTTP_PROXY'] = HTTP_PROXY
- env['http_proxy'] = HTTP_PROXY # Some systems use lowercase
- if HTTPS_PROXY:
- env['HTTPS_PROXY'] = HTTPS_PROXY
- env['https_proxy'] = HTTPS_PROXY
-
- cmd = [
- PANDOC_EXE,
- '-f', 'markdown+pipe_tables+simple_tables+multiline_tables',
- '-t', 'docx',
- '--standalone=true',
- '--embed-resources=true',
- '--reference-doc', self._get_reference_doc(),
- '-o', str(output_path),
- str(md_path)
- ]
-
- if self.include_toc:
- # Specify heading levels for TOC
- cmd.insert(-1, '--toc')
- cmd.insert(-1, '--toc-depth=2') # Include up to level 3 headings
-
- # Add verbose flag to capture more information about resource fetching
- cmd.append('--verbose')
-
- result = subprocess.run(
- cmd,
- check=True,
- env=env, # Pass the environment with proxies
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True
- )
-
- # Log any warnings about resources
- if "Could not fetch resource" in result.stderr:
- logger.warning(f"Resource fetching issue detected for {md_path}: {result.stderr}")
-
- return result.returncode == 0
- except subprocess.CalledProcessError as e:
- logger.exception(f"Pandoc conversion error for {md_path}: {e.stderr}")
- return False
- except Exception as e:
- logger.exception(f"Error converting {md_path} to docx: {e}")
- return False
-
- def process_single_result(self, result_id: int, skip_existing: bool = True) -> bool:
- """Process a single SearchResultItem and convert both filtered and docling markdown files to docx"""
- db_manager = SearchResultManager()
-
- with Session(db_manager.engine) as session:
- # Get the search result item
- result_item = session.get(SearchResultItem, result_id)
- if not result_item:
- logger.error(f"SearchResultItem with ID {result_id} not found")
- return False
-
- # Get the HTML convert result
- html_convert:HtmlConvertResult = session.exec(
- select(HtmlConvertResult)
- .where(HtmlConvertResult.search_result_item_id == result_id)
- ).first()
-
- if not html_convert:
- logger.error(f"No HtmlConvertResult found for SearchResultItem {result_id}")
- return False
- logger.info(f"pandoc start html_convert id {html_convert.id} result_id {result_id}")
- # Initialize success flags
- docling_success = False
- filtered_success = False
-
- # Convert docling markdown if available
- if html_convert.docling_md_path:
- docling_md_path = Path(html_convert.docling_md_path)
- docling_docx_path = docling_md_path.with_suffix('.docx')
-
- # Skip if already converted
- if skip_existing and docling_docx_path.exists():
- logger.info(f"Skipping already converted docling markdown: {docling_docx_path}")
- docling_success = True
- else:
- docling_success = self.convert_md_to_docx(docling_md_path, docling_docx_path)
- if docling_success:
- html_convert.pandoc_docx_path = str(docling_docx_path)
- html_convert.is_pandoc_converted = True
- logger.info(f"Successfully converted docling markdown to {docling_docx_path}")
-
- # Convert filtered markdown if available
- # 暂时不处理filtered markdown,因为 pandoc 要加载资源,很慢
- if False and html_convert.filter_crawl_md_path:
- filtered_md_path = Path(html_convert.filter_crawl_md_path)
- filtered_docx_path = filtered_md_path.with_suffix('.docx')
-
- # Skip if already converted
- if skip_existing and filtered_docx_path.exists():
- logger.info(f"Skipping already converted filtered markdown: {filtered_docx_path}")
- filtered_success = True
- else:
- filtered_success = self.convert_md_to_docx(filtered_md_path, filtered_docx_path)
- if filtered_success:
- html_convert.pandoc_docx_path = str(filtered_docx_path)
- html_convert.is_pandoc_converted = True
- logger.info(f"Successfully converted filtered markdown to {filtered_docx_path}")
-
- # Update database if either conversion succeeded
- if docling_success or filtered_success:
- session.add(html_convert)
- session.commit()
- return True
-
- return False
-
- def _get_reference_doc(self) -> str:
- """Get path to reference document with specified font"""
- reference_doc = Path(self._temp_dir.name) / f"{self.font_name.replace(' ', '_')}.docx"
-
- if not reference_doc.exists():
- self._create_reference_doc(reference_doc)
-
- return str(reference_doc)
-
- def _create_reference_doc(self, reference_doc: Path):
- """Create reference document with specified font and heading styles"""
- from docx import Document
- from docx.shared import Pt
-
- doc = Document()
-
- # Set Normal style font
- normal_style = doc.styles['Normal']
- normal_style.font.name = self.font_name
- normal_style.font.size = Pt(12)
- doc.save(str(reference_doc))
- def process_single_example(result_id: int, skip_existing=True):
- # 未来可能删除, 现在先不删 Process a single result example
- docling_converter = DoclingConverter()
- search_result_item = docling_converter.get_search_result_item(result_id)
- if (search_result_item and
- search_result_item.save_path and
- search_result_item.save_path.endswith('.html')):
- docling_converter.process_conversion_by_id(result_id, skip_existing=skip_existing)
- crawl_filter = CrawlFilter()
- crawl_filter.process_filter_by_id(result_id, skip_existing=skip_existing)
-
- pandoc_converter = PandocConverter(font_name="宋体", include_toc=True)
- logger.info(f"skip_existing {skip_existing}")
- success = pandoc_converter.process_single_result(result_id, skip_existing=skip_existing)
- if success:
- logger.info(f"Successfully processed result {result_id}")
- else:
- logger.error(f"Failed to process result {result_id}")
- return success
- def process_all_results():
- # Process all results in the database
- global db_manager
- with Session(db_manager.engine) as session:
- # Fetch all IDs with explicit ordering
- result_ids = session.exec(select(SearchResultItem.id, SearchResultItem.save_path).order_by(SearchResultItem.id)).all()
- logger.info(f"Total results: {len(result_ids)}")
- logger.info(f"First 5 result IDs: {result_ids[:5]}")
-
- for result_id, save_path in result_ids:
- try:
- if save_path and save_path.endswith('.html'):
- process_single_example(result_id)
- except Exception as e:
- logger.error(f"Error processing result {result_id}: {e}")
- if __name__ == "__main__":
- # 计算运行时间
- import time
- start_time = time.time()
- process_single_example(996, skip_existing=False)
- end_time = time.time()
- print(f"Total time: {end_time - start_time} seconds")
- # process_all_results()
|