pandoc.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import asyncio
  2. import pickle
  3. from pathlib import Path
  4. import random
  5. from typing import List
  6. import httpx
  7. import ssl
  8. import os
  9. from sqlmodel import select, Session
  10. from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
  11. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
  12. from mylib.base import ensure_output_dir, save_to_file, load_from_pickle
  13. from mylib.logu import get_logger
  14. from config.settings import PANDOC_EXE, HTTP_PROXY, HTTPS_PROXY
  15. from worker.html_convert.models import HtmlConvertResult
  16. from worker.html_convert.docling_converter import DoclingConverter
  17. from worker.html_convert.crawl_filter import CrawlFilter
  18. import subprocess
  19. from docx import Document
  20. from docx.oxml.ns import qn
  21. from docx.oxml import OxmlElement
  22. import tempfile
  23. logger = get_logger('pandoc')
  24. class PandocConverter:
  25. """Class for handling Pandoc conversions with customizable options"""
  26. def __init__(self, font_name: str = "宋体", include_toc: bool = False):
  27. """
  28. Initialize PandocConverter with optional parameters
  29. Args:
  30. font_name (str): The default font to use in DOCX output
  31. include_toc (bool): Whether to include table of contents in DOCX output
  32. """
  33. self.font_name = font_name
  34. self.include_toc = include_toc
  35. self._temp_dir = tempfile.TemporaryDirectory() # Create a temporary directory for reference docs
  36. def convert_md_to_docx(self, md_path: Path, output_path: Path) -> bool:
  37. """Convert markdown file to docx using pandoc with custom options"""
  38. try:
  39. # Prepare environment variables with proxy settings
  40. env = os.environ.copy()
  41. if HTTP_PROXY:
  42. env['HTTP_PROXY'] = HTTP_PROXY
  43. env['http_proxy'] = HTTP_PROXY # Some systems use lowercase
  44. if HTTPS_PROXY:
  45. env['HTTPS_PROXY'] = HTTPS_PROXY
  46. env['https_proxy'] = HTTPS_PROXY
  47. cmd = [
  48. PANDOC_EXE,
  49. '-f', 'markdown+pipe_tables+simple_tables+multiline_tables',
  50. '-t', 'docx',
  51. '--reference-doc', self._get_reference_doc(),
  52. '-o', str(output_path),
  53. str(md_path)
  54. ]
  55. if self.include_toc:
  56. # Specify heading levels for TOC
  57. cmd.insert(-1, '--toc')
  58. cmd.insert(-1, '--toc-depth=2') # Include up to level 3 headings
  59. # Add verbose flag to capture more information about resource fetching
  60. cmd.append('--verbose')
  61. result = subprocess.run(
  62. cmd,
  63. check=True,
  64. env=env, # Pass the environment with proxies
  65. stdout=subprocess.PIPE,
  66. stderr=subprocess.PIPE,
  67. text=True
  68. )
  69. # Log any warnings about resources
  70. if "Could not fetch resource" in result.stderr:
  71. logger.warning(f"Resource fetching issue detected for {md_path}: {result.stderr}")
  72. return result.returncode == 0
  73. except subprocess.CalledProcessError as e:
  74. logger.error(f"Pandoc conversion error for {md_path}: {e.stderr}")
  75. return False
  76. except Exception as e:
  77. logger.error(f"Error converting {md_path} to docx: {e}")
  78. return False
  79. def process_single_result(self, result_id: int, skip_existing: bool = True) -> bool:
  80. """Process a single SearchResultItem and convert both filtered and docling markdown files to docx"""
  81. db_manager = SearchResultManager()
  82. with Session(db_manager.engine) as session:
  83. # Get the search result item
  84. result_item = session.get(SearchResultItem, result_id)
  85. if not result_item:
  86. logger.error(f"SearchResultItem with ID {result_id} not found")
  87. return False
  88. # Get the HTML convert result
  89. html_convert:HtmlConvertResult = session.exec(
  90. select(HtmlConvertResult)
  91. .where(HtmlConvertResult.search_result_item_id == result_id)
  92. ).first()
  93. if not html_convert:
  94. logger.error(f"No HtmlConvertResult found for SearchResultItem {result_id}")
  95. return False
  96. logger.info(f"pandoc start html_convert id {html_convert.id} result_id {result_id}")
  97. # Initialize success flags
  98. docling_success = False
  99. filtered_success = False
  100. # Convert docling markdown if available
  101. if html_convert.docling_md_path:
  102. docling_md_path = Path(html_convert.docling_md_path)
  103. docling_docx_path = docling_md_path.with_suffix('.docx')
  104. # Skip if already converted
  105. if skip_existing and docling_docx_path.exists():
  106. logger.info(f"Skipping already converted docling markdown: {docling_docx_path}")
  107. docling_success = True
  108. else:
  109. docling_success = self.convert_md_to_docx(docling_md_path, docling_docx_path)
  110. if docling_success:
  111. html_convert.pandoc_docx_path = str(docling_docx_path)
  112. html_convert.is_pandoc_converted = True
  113. logger.info(f"Successfully converted docling markdown to {docling_docx_path}")
  114. # Convert filtered markdown if available
  115. if html_convert.filter_crawl_md_path:
  116. filtered_md_path = Path(html_convert.filter_crawl_md_path)
  117. filtered_docx_path = filtered_md_path.with_suffix('.docx')
  118. # Skip if already converted
  119. if skip_existing and filtered_docx_path.exists():
  120. logger.info(f"Skipping already converted filtered markdown: {filtered_docx_path}")
  121. filtered_success = True
  122. else:
  123. # filtered_success = self.convert_md_to_docx(filtered_md_path, filtered_docx_path)
  124. if filtered_success:
  125. html_convert.pandoc_docx_path = str(filtered_docx_path)
  126. html_convert.is_pandoc_converted = True
  127. logger.info(f"Successfully converted filtered markdown to {filtered_docx_path}")
  128. # Update database if either conversion succeeded
  129. if docling_success or filtered_success:
  130. session.add(html_convert)
  131. session.commit()
  132. return True
  133. return False
  134. def _get_reference_doc(self) -> str:
  135. """Get path to reference document with specified font"""
  136. reference_doc = Path(self._temp_dir.name) / f"{self.font_name.replace(' ', '_')}.docx"
  137. if not reference_doc.exists():
  138. self._create_reference_doc(reference_doc)
  139. return str(reference_doc)
  140. def _create_reference_doc(self, reference_doc: Path):
  141. """Create reference document with specified font and heading styles"""
  142. from docx import Document
  143. from docx.shared import Pt
  144. doc = Document()
  145. # Set Normal style font
  146. normal_style = doc.styles['Normal']
  147. normal_style.font.name = self.font_name
  148. normal_style.font.size = Pt(12)
  149. doc.save(str(reference_doc))
  150. def process_single_example(result_id: int, skip_existing=True):
  151. # Process a single result example
  152. docling_converter = DoclingConverter()
  153. search_result_item = docling_converter.get_search_result_item(result_id)
  154. if search_result_item.html_path.endswith('.html'):
  155. docling_converter.process_conversion_by_id(result_id, skip_existing=skip_existing)
  156. crawl_filter = CrawlFilter()
  157. crawl_filter.process_filter_by_id(result_id, skip_existing=skip_existing)
  158. pandoc_converter = PandocConverter(font_name="宋体", include_toc=True)
  159. logger.info(f"skip_existing {skip_existing}")
  160. success = pandoc_converter.process_single_result(result_id, skip_existing=skip_existing)
  161. if success:
  162. logger.info(f"Successfully processed result {result_id}")
  163. else:
  164. logger.error(f"Failed to process result {result_id}")
  165. def process_all_results():
  166. # Process all results in the database
  167. db_manager = SearchResultManager()
  168. with Session(db_manager.engine) as session:
  169. # Fetch all IDs with explicit ordering
  170. result_ids = session.exec(select(SearchResultItem.id, SearchResultItem.html_path).order_by(SearchResultItem.id)).all()
  171. logger.info(f"Total results: {len(result_ids)}")
  172. logger.info(f"First 5 result IDs: {result_ids[:5]}")
  173. for result_id, html_path in result_ids:
  174. try:
  175. if html_path.endswith('.html'):
  176. process_single_example(result_id)
  177. except Exception as e:
  178. logger.error(f"Error processing result {result_id}: {e}")
  179. if __name__ == "__main__":
  180. # 计算运行时间
  181. import time
  182. start_time = time.time()
  183. process_single_example(996, skip_existing=False)
  184. end_time = time.time()
  185. print(f"Total time: {end_time - start_time} seconds")
  186. # process_all_results()