Просмотр исходного кода

完成 docling 转换为 markdown 文件,还需要一点清洗数据,并且 URL 几乎不可用。表格是正常的

mrh 1 год назад
Родитель
Сommit
60680e264e

+ 3 - 0
config/settings.py

@@ -1,9 +1,12 @@
 import os
 from pathlib import Path
+import shutil
+import subprocess
 WORK_DIR = Path(__file__).parent.parent.absolute()
 OUTPUT_DIR = WORK_DIR / "output"
 CONFIG_DIR = WORK_DIR / "config" / "conf"
 GOOGLE_SEARCH_DIR = OUTPUT_DIR / 'results'
+PANDOC_EXE = pandoc_path = shutil.which('pandoc')
 
 LOG_LEVEL='info'
 LOG_DIR = OUTPUT_DIR / "logs"

+ 21 - 2
tests/mytest/pandoc_t.py

@@ -23,7 +23,26 @@ def pandoc_html2docx(html_file_path: Path, output_file_path: Path):
         print("发生错误!")
         print(e)
 
-def main():
+def find_pandoc_exe():
+    import shutil
+    import subprocess
+
+    # 搜索 pandoc 可执行文件的位置
+    pandoc_path = shutil.which('pandoc')
+
+    if pandoc_path:
+        print(f"Found pandoc at: {pandoc_path}")
+        
+        # 调用 pandoc
+        try:
+            result = subprocess.run([pandoc_path, '--version'], capture_output=True, text=True)
+            print("Pandoc version:")
+            print(result.stdout)
+        except subprocess.CalledProcessError as e:
+            print(f"Error running pandoc: {e}")
+    else:
+        print("Pandoc not found in the current environment.")
+def main_convert():
     dir_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\Acalypha malabarica essential oil")    
     # '''
     # pandoc -f html -t docx -o 0.docx "K:\code\upwork\zhang_crawl_bio\output\Acalypha malabarica essential oil\all_paper\0.html"
@@ -41,4 +60,4 @@ def main():
             pandoc_html2docx(file, output_file_path)
 
 if __name__ == "__main__":
-    main()
+    find_pandoc_exe()

+ 50 - 45
worker/html_convert/crawl_filter.py

@@ -2,8 +2,8 @@ import re
 from pathlib import Path
 from urllib.parse import urlparse, urljoin
 
-from sqlmodel import Session
-from config.settings import GOOGLE_SEARCH_DIR
+from sqlmodel import Session, select
+from config.settings import GOOGLE_SEARCH_DIR,PANDOC_EXE
 from worker.html_convert.models import HtmlConvertResult
 from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
 from mylib.base import ensure_output_dir, save_to_file
@@ -22,7 +22,6 @@ def extract_content_after_first_h1(content: str) -> str:
 
 def fix_inline_links(content: str) -> str:
     """Fix inline links by handling the special URL patterns"""
-    # Pattern to match links with nested URLs
     link_pattern = r'\[([^\]]+)\]\(([^<]*)<([^>]*)>\)'
     
     def replace_link(match):
@@ -30,16 +29,13 @@ def fix_inline_links(content: str) -> str:
         domain = match.group(2)
         url = match.group(3)
         
-        # If URL is relative, prepend the domain
         if url.startswith('/'):
             if domain:
-                # Parse domain to get scheme and netloc
                 parsed_domain = urlparse(domain)
                 base_url = f"{parsed_domain.scheme}://{parsed_domain.netloc}"
                 return f'[{text}]({urljoin(base_url, url)})'
             return f'[{text}]({url})'
         
-        # If URL is absolute, use it directly
         return f'[{text}]({url})'
     
     return re.sub(link_pattern, replace_link, content)
@@ -50,93 +46,102 @@ def add_url_header(content: str, url: str) -> str:
 
 def filter_markdown(content: str) -> str:
     """Filter markdown content according to specified rules"""
-    # Step 1: Extract content after first H1
     content = extract_content_after_first_h1(content)
     logger.info(f"extract_content_after_first_h1: {content[:300]}")
-    
-    # Step 2: Fix inline links
     content = fix_inline_links(content)
     logger.info(f"fix_inline_links: {content[:300]}")
-    
     return content
 
-def process_html_conversion(html_convert: HtmlConvertResult):
+def process_html_conversion(html_convert: HtmlConvertResult, skip_existing: bool = True) -> HtmlConvertResult:
     """Process HTML conversion result"""
     if not html_convert.search_result_item:
         logger.warning(f"html_convert id {html_convert.id} has no search_result_item")
-        return
+        return html_convert
     
-    # Get HTML file path
     html_path = Path(html_convert.search_result_item.html_path)
     if not html_path.exists():
         logger.warning(f"html_path {html_path} not exists")
-        return
+        return html_convert
     
-    # Create parent directory's html_convert path
     convert_dir = html_path.parent.parent / "html_convert"
     convert_dir.mkdir(exist_ok=True)
     
-    # 在源目录下查找同名的md文件
     md_path = html_path.parent / f"{html_path.stem}.md"
     logger.info(f"md_path {md_path}")
-    # If markdown file exists, process it
+    
     if md_path.exists():
         html_convert.source_crawl_md_path = str(md_path)
         
-        # Read and filter markdown content
+        # Skip if already filtered
+        if skip_existing and html_convert.is_filtered:
+            logger.info(f"Skipping already filtered content for {html_convert.id}")
+            return html_convert
+            
         with open(md_path, 'r', encoding='utf-8') as f:
             content = f.read()
-            
-            # Step 1: Filter the content
             filtered_content = filter_markdown(content)
             
-            # Step 2: Add URL header
             if html_convert.search_result_item.url:
                 filtered_content = add_url_header(filtered_content, html_convert.search_result_item.url)
             
-            # Save filtered content
             filtered_md_path = convert_dir / f"{html_path.stem}_filtered.md"
             with open(filtered_md_path, 'w', encoding='utf-8') as f_out:
                 f_out.write(filtered_content)
+            
             logger.info(f"filtered_md_path {filtered_md_path}")
             html_convert.filter_crawl_md_path = str(filtered_md_path)
             html_convert.is_filtered = True
+            
             logger.info(f"{html_convert.model_dump_json(indent=2)}")
+    
+    return html_convert
 
-def convert_single_result(search_result_item: SearchResultItem) -> HtmlConvertResult:
+def convert_single_result(search_result_item: SearchResultItem, skip_existing: bool = True) -> HtmlConvertResult:
     """Convert a single SearchResultItem and store results in HtmlConvertResult"""
-    # Create new HtmlConvertResult instance
-    html_convert = HtmlConvertResult(
-        search_result_item_id=search_result_item.id,
-        search_result_item=search_result_item
-    )
-    logger.info(f"html_convert {html_convert}")
-    logger.info(f"search_result_item {html_convert.search_result_item}")
-    # Get HTML file path
-    html_path = Path(search_result_item.html_path)
-    if not html_path.exists():
-        return html_convert
+    db_manager = SearchResultManager()
     
-    # Create conversion directory
-    convert_dir = html_path.parent.parent / "html_convert"
-    ensure_output_dir(convert_dir)
-    # Step 3: Process the conversion results
-    process_html_conversion(html_convert)
-    logger.info(f"result {html_convert.model_dump_json(indent=2)}")
-    return html_convert    
+    with Session(db_manager.engine) as session:
+        # Check if conversion already exists
+        existing = session.exec(
+            select(HtmlConvertResult)
+            .where(HtmlConvertResult.search_result_item_id == search_result_item.id)
+        ).first()
+        
+        if existing and existing.is_filtered:
+            if skip_existing:
+                logger.info(f"Found existing conversion for result {search_result_item.id}")
+                return existing
+            else:
+                # Update existing record
+                result = process_html_conversion(existing, skip_existing)
+                session.add(result)
+                session.commit()
+                session.refresh(result)
+                return result
+        else:
+            # Create new record using the session's search_result_item
+            session_search_item = session.merge(search_result_item)
+            html_convert = HtmlConvertResult(
+                search_result_item_id=session_search_item.id,
+                search_result_item=session_search_item
+            )
+            result = process_html_conversion(html_convert, skip_existing)
+            session.add(result)
+            session.commit()
+            session.refresh(result)
+            return result
 
-def convert_single_result_by_id(result_id: int) -> HtmlConvertResult:
+def convert_single_result_by_id(result_id: int, skip_existing: bool = True) -> HtmlConvertResult:
     """Convert a single SearchResultItem by ID"""
     db_manager = SearchResultManager()
     with Session(db_manager.engine) as session:
         result_item = session.get(SearchResultItem, result_id)
         if not result_item:
             raise ValueError(f"SearchResultItem with ID {result_id} not found")
-        return convert_single_result(result_item)
+        return convert_single_result(result_item, skip_existing)
 
 def main():
-    # Example usage
-    result_id = 21566  # Replace with the actual ID
+    result_id = 21567
     result = convert_single_result_by_id(result_id)
 
 if __name__ == "__main__":

+ 100 - 0
worker/html_convert/docling_converter.py

@@ -0,0 +1,100 @@
+from pathlib import Path
+from docling.document_converter import DocumentConverter
+from worker.html_convert.models import HtmlConvertResult
+from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem
+from sqlmodel import Session, select
+from mylib.logu import get_logger
+
+logger = get_logger('docling_converter')
+
+def convert_html_to_markdown(html_convert: HtmlConvertResult, skip_existing: bool = True) -> HtmlConvertResult:
+    """Convert HTML to markdown using docling"""
+    if not html_convert.search_result_item:
+        logger.warning(f"html_convert id {html_convert.id} has no search_result_item")
+        return html_convert
+    
+    if skip_existing and html_convert.is_docling_converted:
+        logger.info(f"Skipping already converted content for {html_convert.id}")
+        return html_convert
+        
+    html_path = Path(html_convert.search_result_item.html_path)
+    if not html_path.exists():
+        logger.warning(f"html_path {html_path} not exists")
+        return html_convert
+    
+    convert_dir = html_path.parent.parent / "html_convert"
+    convert_dir.mkdir(exist_ok=True)
+    
+    try:
+        converter = DocumentConverter()
+        result = converter.convert(html_path)
+        markdown_content = result.document.export_to_markdown()
+        
+        if html_convert.search_result_item.url:
+            markdown_content = f"[{html_convert.search_result_item.url}]({html_convert.search_result_item.url})\n\n{markdown_content}"
+        
+        docling_md_path = convert_dir / f"{html_path.stem}_docling.md"
+        with open(docling_md_path, 'w', encoding='utf-8') as f:
+            f.write(markdown_content)
+        
+        html_convert.docling_md_path = str(docling_md_path)
+        html_convert.is_docling_converted = True
+        
+        logger.info(f"Successfully converted HTML to markdown: {docling_md_path}")
+        
+    except Exception as e:
+        logger.error(f"Error converting HTML to markdown: {e}")
+        html_convert.is_docling_converted = False
+        
+    return html_convert
+
+def convert_single_result(search_result_item: SearchResultItem, skip_existing: bool = True) -> HtmlConvertResult:
+    """Convert a single SearchResultItem and store results in HtmlConvertResult"""
+    db_manager = SearchResultManager()
+    
+    with Session(db_manager.engine) as session:
+        # Check if conversion already exists
+        existing = session.exec(
+            select(HtmlConvertResult)
+            .where(HtmlConvertResult.search_result_item_id == search_result_item.id)
+        ).first()
+        
+        if existing and existing.is_docling_converted:
+            if skip_existing:
+                logger.info(f"Found existing conversion for result {search_result_item.id}")
+                return existing
+            else:
+                # Update existing record
+                result = convert_html_to_markdown(existing, skip_existing)
+                session.add(result)
+                session.commit()
+                session.refresh(result)
+                return result
+        else:
+            # Create new record using the session's search_result_item
+            session_search_item = session.merge(search_result_item)
+            html_convert = HtmlConvertResult(
+                search_result_item_id=session_search_item.id,
+                search_result_item=session_search_item
+            )
+            result = convert_html_to_markdown(html_convert, skip_existing)
+            session.add(result)
+            session.commit()
+            session.refresh(result)
+            return result
+
+def convert_single_result_by_id(result_id: int, skip_existing: bool = True) -> HtmlConvertResult:
+    """Convert a single SearchResultItem by ID"""
+    db_manager = SearchResultManager()
+    with Session(db_manager.engine) as session:
+        result_item = session.get(SearchResultItem, result_id)
+        if not result_item:
+            raise ValueError(f"SearchResultItem with ID {result_id} not found")
+        return convert_single_result(result_item, skip_existing)
+
+def main():
+    result_id = 21566
+    result = convert_single_result_by_id(result_id)
+
+if __name__ == "__main__":
+    main()