pandoc.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import asyncio
  2. import pickle
  3. from pathlib import Path
  4. import random
  5. from typing import List
  6. import httpx
  7. import ssl
  8. import os
  9. from sqlmodel import select, Session
  10. from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
  11. from worker.search_engine.search_result_db import SearchResultManager, db_manager, SearchPageResult, SearchResultItem
  12. from mylib.base import ensure_output_dir, save_to_file, load_from_pickle
  13. from mylib.logu import get_logger
  14. from config.settings import PANDOC_EXE, HTTP_PROXY, HTTPS_PROXY
  15. from worker.html_convert.models import HtmlConvertResult
  16. from worker.html_convert.docling_converter import DoclingConverter
  17. from worker.html_convert.crawl_filter import CrawlFilter
  18. import subprocess
  19. from docx import Document
  20. from docx.oxml.ns import qn
  21. from docx.oxml import OxmlElement
  22. import tempfile
  23. logger = get_logger('pandoc')
  24. class PandocConverter:
  25. """Class for handling Pandoc conversions with customizable options"""
  26. def __init__(self, font_name: str = "宋体", include_toc: bool = False):
  27. """
  28. Initialize PandocConverter with optional parameters
  29. Args:
  30. font_name (str): The default font to use in DOCX output
  31. include_toc (bool): Whether to include table of contents in DOCX output
  32. """
  33. self.font_name = font_name
  34. self.include_toc = include_toc
  35. self._temp_dir = tempfile.TemporaryDirectory() # Create a temporary directory for reference docs
  36. def convert_md_to_docx(self, md_path: Path, output_path: Path) -> bool:
  37. """Convert markdown file to docx using pandoc with custom options"""
  38. try:
  39. # Prepare environment variables with proxy settings
  40. env = os.environ.copy()
  41. if HTTP_PROXY:
  42. env['HTTP_PROXY'] = HTTP_PROXY
  43. env['http_proxy'] = HTTP_PROXY # Some systems use lowercase
  44. if HTTPS_PROXY:
  45. env['HTTPS_PROXY'] = HTTPS_PROXY
  46. env['https_proxy'] = HTTPS_PROXY
  47. cmd = [
  48. PANDOC_EXE,
  49. '-f', 'markdown+pipe_tables+simple_tables+multiline_tables',
  50. '-t', 'docx',
  51. '--standalone=true',
  52. '--embed-resources=true',
  53. '--reference-doc', self._get_reference_doc(),
  54. '-o', str(output_path),
  55. str(md_path)
  56. ]
  57. if self.include_toc:
  58. # Specify heading levels for TOC
  59. cmd.insert(-1, '--toc')
  60. cmd.insert(-1, '--toc-depth=2') # Include up to level 3 headings
  61. # Add verbose flag to capture more information about resource fetching
  62. cmd.append('--verbose')
  63. result = subprocess.run(
  64. cmd,
  65. check=True,
  66. env=env, # Pass the environment with proxies
  67. stdout=subprocess.PIPE,
  68. stderr=subprocess.PIPE,
  69. text=True
  70. )
  71. # Log any warnings about resources
  72. if "Could not fetch resource" in result.stderr:
  73. logger.warning(f"Resource fetching issue detected for {md_path}: {result.stderr}")
  74. return result.returncode == 0
  75. except subprocess.CalledProcessError as e:
  76. logger.exception(f"Pandoc conversion error for {md_path}: {e.stderr}")
  77. return False
  78. except Exception as e:
  79. logger.exception(f"Error converting {md_path} to docx: {e}")
  80. return False
  81. def process_single_result(self, result_id: int, skip_existing: bool = True) -> bool:
  82. """Process a single SearchResultItem and convert both filtered and docling markdown files to docx"""
  83. db_manager = SearchResultManager()
  84. with Session(db_manager.engine) as session:
  85. # Get the search result item
  86. result_item = session.get(SearchResultItem, result_id)
  87. if not result_item:
  88. logger.error(f"SearchResultItem with ID {result_id} not found")
  89. return False
  90. # Get the HTML convert result
  91. html_convert:HtmlConvertResult = session.exec(
  92. select(HtmlConvertResult)
  93. .where(HtmlConvertResult.search_result_item_id == result_id)
  94. ).first()
  95. if not html_convert:
  96. logger.error(f"No HtmlConvertResult found for SearchResultItem {result_id}")
  97. return False
  98. logger.info(f"pandoc start html_convert id {html_convert.id} result_id {result_id}")
  99. # Initialize success flags
  100. docling_success = False
  101. filtered_success = False
  102. # Convert docling markdown if available
  103. if html_convert.docling_md_path:
  104. docling_md_path = Path(html_convert.docling_md_path)
  105. docling_docx_path = docling_md_path.with_suffix('.docx')
  106. # Skip if already converted
  107. if skip_existing and docling_docx_path.exists():
  108. logger.info(f"Skipping already converted docling markdown: {docling_docx_path}")
  109. docling_success = True
  110. else:
  111. docling_success = self.convert_md_to_docx(docling_md_path, docling_docx_path)
  112. if docling_success:
  113. html_convert.pandoc_docx_path = str(docling_docx_path)
  114. html_convert.is_pandoc_converted = True
  115. logger.info(f"Successfully converted docling markdown to {docling_docx_path}")
  116. # Convert filtered markdown if available
  117. # 暂时不处理filtered markdown,因为 pandoc 要加载资源,很慢
  118. if False and html_convert.filter_crawl_md_path:
  119. filtered_md_path = Path(html_convert.filter_crawl_md_path)
  120. filtered_docx_path = filtered_md_path.with_suffix('.docx')
  121. # Skip if already converted
  122. if skip_existing and filtered_docx_path.exists():
  123. logger.info(f"Skipping already converted filtered markdown: {filtered_docx_path}")
  124. filtered_success = True
  125. else:
  126. filtered_success = self.convert_md_to_docx(filtered_md_path, filtered_docx_path)
  127. if filtered_success:
  128. html_convert.pandoc_docx_path = str(filtered_docx_path)
  129. html_convert.is_pandoc_converted = True
  130. logger.info(f"Successfully converted filtered markdown to {filtered_docx_path}")
  131. # Update database if either conversion succeeded
  132. if docling_success or filtered_success:
  133. session.add(html_convert)
  134. session.commit()
  135. return True
  136. return False
  137. def _get_reference_doc(self) -> str:
  138. """Get path to reference document with specified font"""
  139. reference_doc = Path(self._temp_dir.name) / f"{self.font_name.replace(' ', '_')}.docx"
  140. if not reference_doc.exists():
  141. self._create_reference_doc(reference_doc)
  142. return str(reference_doc)
  143. def _create_reference_doc(self, reference_doc: Path):
  144. """Create reference document with specified font and heading styles"""
  145. from docx import Document
  146. from docx.shared import Pt
  147. doc = Document()
  148. # Set Normal style font
  149. normal_style = doc.styles['Normal']
  150. normal_style.font.name = self.font_name
  151. normal_style.font.size = Pt(12)
  152. doc.save(str(reference_doc))
  153. def process_single_example(result_id: int, skip_existing=True):
  154. # 未来可能删除, 现在先不删 Process a single result example
  155. docling_converter = DoclingConverter()
  156. search_result_item = docling_converter.get_search_result_item(result_id)
  157. if (search_result_item and
  158. search_result_item.save_path and
  159. search_result_item.save_path.endswith('.html')):
  160. docling_converter.process_conversion_by_id(result_id, skip_existing=skip_existing)
  161. crawl_filter = CrawlFilter()
  162. crawl_filter.process_filter_by_id(result_id, skip_existing=skip_existing)
  163. pandoc_converter = PandocConverter(font_name="宋体", include_toc=True)
  164. logger.info(f"skip_existing {skip_existing}")
  165. success = pandoc_converter.process_single_result(result_id, skip_existing=skip_existing)
  166. if success:
  167. logger.info(f"Successfully processed result {result_id}")
  168. else:
  169. logger.error(f"Failed to process result {result_id}")
  170. return success
  171. def process_all_results():
  172. # Process all results in the database
  173. global db_manager
  174. with Session(db_manager.engine) as session:
  175. # Fetch all IDs with explicit ordering
  176. result_ids = session.exec(select(SearchResultItem.id, SearchResultItem.save_path).order_by(SearchResultItem.id)).all()
  177. logger.info(f"Total results: {len(result_ids)}")
  178. logger.info(f"First 5 result IDs: {result_ids[:5]}")
  179. for result_id, save_path in result_ids:
  180. try:
  181. if save_path and save_path.endswith('.html'):
  182. process_single_example(result_id)
  183. except Exception as e:
  184. logger.error(f"Error processing result {result_id}: {e}")
  185. if __name__ == "__main__":
  186. # 计算运行时间
  187. import time
  188. start_time = time.time()
  189. process_single_example(996, skip_existing=False)
  190. end_time = time.time()
  191. print(f"Total time: {end_time - start_time} seconds")
  192. # process_all_results()