|
|
@@ -1,144 +1,97 @@
|
|
|
from pathlib import Path
|
|
|
from docling.document_converter import DocumentConverter
|
|
|
+from worker.html_convert.converter_base import ConverterBase
|
|
|
from worker.html_convert.models import HtmlConvertResult
|
|
|
-from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem
|
|
|
-from sqlmodel import Session, select
|
|
|
+from worker.search_engine.search_result_db import SearchResultItem
|
|
|
+from sqlmodel import Session
|
|
|
from mylib.logu import get_logger
|
|
|
-import re
|
|
|
-from urllib.parse import urlparse, urljoin
|
|
|
|
|
|
logger = get_logger('docling_converter')
|
|
|
|
|
|
-def extract_content_after_first_h1(content: str) -> str:
|
|
|
- """Extract content starting from the first H1 heading"""
|
|
|
- h1_pattern = r'^# .+$'
|
|
|
- match = re.search(h1_pattern, content, re.MULTILINE)
|
|
|
- if match:
|
|
|
- return content[match.start():]
|
|
|
- return content
|
|
|
-
|
|
|
-def fix_inline_links(content: str) -> str:
|
|
|
- """Fix inline links by handling the special URL patterns"""
|
|
|
- link_pattern = r'\[([^\]]+)\]\(([^<]*)<([^>]*)>\)'
|
|
|
-
|
|
|
- def replace_link(match):
|
|
|
- text = match.group(1)
|
|
|
- domain = match.group(2)
|
|
|
- url = match.group(3)
|
|
|
-
|
|
|
- if url.startswith('/'):
|
|
|
- if domain:
|
|
|
- parsed_domain = urlparse(domain)
|
|
|
- base_url = f"{parsed_domain.scheme}://{parsed_domain.netloc}"
|
|
|
- return f'[{text}]({urljoin(base_url, url)})'
|
|
|
- return f'[{text}]({url})'
|
|
|
-
|
|
|
- return f'[{text}]({url})'
|
|
|
-
|
|
|
- return re.sub(link_pattern, replace_link, content)
|
|
|
-
|
|
|
-def add_url_header(content: str, url: str) -> str:
|
|
|
- """Add URL as a header at the top of the content"""
|
|
|
- return f"[{url}]({url})\n\n{content}"
|
|
|
-
|
|
|
-def filter_markdown(content: str) -> str:
|
|
|
- """Filter markdown content according to specified rules"""
|
|
|
- content = extract_content_after_first_h1(content)
|
|
|
- logger.info(f"extract_content_after_first_h1: {content[:300]}")
|
|
|
- content = fix_inline_links(content)
|
|
|
- logger.info(f"fix_inline_links: {content[:300]}")
|
|
|
- return content
|
|
|
-
|
|
|
-def convert_html_to_markdown(html_convert: HtmlConvertResult, skip_existing: bool = True) -> HtmlConvertResult:
|
|
|
- """Convert HTML to markdown using docling"""
|
|
|
- if not html_convert.search_result_item:
|
|
|
- logger.warning(f"html_convert id {html_convert.id} has no search_result_item")
|
|
|
- return html_convert
|
|
|
-
|
|
|
- if skip_existing and html_convert.is_docling_converted:
|
|
|
- logger.info(f"Skipping already converted content for {html_convert.id}")
|
|
|
- return html_convert
|
|
|
-
|
|
|
- html_path = Path(html_convert.search_result_item.html_path)
|
|
|
- if not html_path.exists():
|
|
|
- logger.warning(f"html_path {html_path} not exists")
|
|
|
- return html_convert
|
|
|
+class DoclingConverter(ConverterBase):
|
|
|
+ """Class for handling Docling conversions"""
|
|
|
|
|
|
- convert_dir = html_path.parent.parent / "html_convert"
|
|
|
- convert_dir.mkdir(exist_ok=True)
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__()
|
|
|
|
|
|
- try:
|
|
|
- converter = DocumentConverter()
|
|
|
- result = converter.convert(html_path)
|
|
|
- markdown_content = result.document.export_to_markdown()
|
|
|
+ def process_conversion(self, html_convert: HtmlConvertResult, skip_existing: bool = True) -> HtmlConvertResult:
|
|
|
+ """Process HTML to markdown conversion using docling"""
|
|
|
+ if not html_convert.search_result_item:
|
|
|
+ logger.warning(f"html_convert id {html_convert.id} has no search_result_item")
|
|
|
+ return html_convert
|
|
|
+
|
|
|
+ html_path = Path(html_convert.search_result_item.html_path)
|
|
|
+ if not html_path.exists():
|
|
|
+ logger.warning(f"html_path {html_path} not exists")
|
|
|
+ return html_convert
|
|
|
|
|
|
- # Apply filtering and add URL header
|
|
|
- markdown_content = filter_markdown(markdown_content)
|
|
|
- if html_convert.search_result_item.url:
|
|
|
- markdown_content = add_url_header(markdown_content, html_convert.search_result_item.url)
|
|
|
+ # Skip if already converted
|
|
|
+ if skip_existing and html_convert.is_docling_converted:
|
|
|
+ logger.info(f"Skipping already converted content for {html_convert.id}")
|
|
|
+ return html_convert
|
|
|
|
|
|
- docling_md_path = convert_dir / f"{html_path.stem}_docling.md"
|
|
|
- with open(docling_md_path, 'w', encoding='utf-8') as f:
|
|
|
- f.write(markdown_content)
|
|
|
+ convert_dir = self.ensure_convert_dir(html_path)
|
|
|
|
|
|
- html_convert.docling_md_path = str(docling_md_path)
|
|
|
- html_convert.is_docling_converted = True
|
|
|
-
|
|
|
- logger.info(f"Successfully converted HTML to markdown: {docling_md_path}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error converting HTML to markdown: {e}")
|
|
|
- html_convert.is_docling_converted = False
|
|
|
-
|
|
|
- return html_convert
|
|
|
+ try:
|
|
|
+ # Perform the conversion
|
|
|
+ converter = DocumentConverter()
|
|
|
+ result = converter.convert(html_path)
|
|
|
+ markdown_content = result.document.export_to_markdown()
|
|
|
+
|
|
|
+ # Apply filtering and add URL header
|
|
|
+ markdown_content = self.filter_markdown(markdown_content)
|
|
|
+ if html_convert.search_result_item.url:
|
|
|
+ markdown_content = self.add_url_header(markdown_content, html_convert.search_result_item.url)
|
|
|
+
|
|
|
+ # Save the converted markdown
|
|
|
+ docling_md_path = convert_dir / f"{html_path.stem}_docling.md"
|
|
|
+ with open(docling_md_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write(markdown_content)
|
|
|
+
|
|
|
+ # Update the conversion result
|
|
|
+ html_convert.docling_md_path = str(docling_md_path)
|
|
|
+ html_convert.is_docling_converted = True
|
|
|
+
|
|
|
+ logger.info(f"Successfully converted HTML to markdown: {docling_md_path}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error converting HTML to markdown: {e}")
|
|
|
+ html_convert.is_docling_converted = False
|
|
|
+
|
|
|
+ return html_convert
|
|
|
|
|
|
-def convert_single_result(search_result_item: SearchResultItem, skip_existing: bool = True) -> HtmlConvertResult:
|
|
|
- """Convert a single SearchResultItem and store results in HtmlConvertResult"""
|
|
|
- db_manager = SearchResultManager()
|
|
|
-
|
|
|
- with Session(db_manager.engine) as session:
|
|
|
- # Check if conversion already exists
|
|
|
- existing = session.exec(
|
|
|
- select(HtmlConvertResult)
|
|
|
- .where(HtmlConvertResult.search_result_item_id == search_result_item.id)
|
|
|
- ).first()
|
|
|
+ def process_conversion_by_id(self, result_id: int, skip_existing: bool = True) -> HtmlConvertResult:
|
|
|
+ """Process conversion for a specific result ID"""
|
|
|
+ existing_html_convert = self.get_html_convert_result(result_id)
|
|
|
+ result = None
|
|
|
|
|
|
- if existing and existing.is_docling_converted:
|
|
|
- if skip_existing:
|
|
|
- logger.info(f"Found existing conversion for result {search_result_item.id}")
|
|
|
- return existing
|
|
|
+ if existing_html_convert:
|
|
|
+ if existing_html_convert.is_docling_converted and skip_existing:
|
|
|
+ logger.info(f"Skipping already converted content for {result_id}")
|
|
|
+ return existing_html_convert
|
|
|
else:
|
|
|
- # Update existing record
|
|
|
- result = convert_html_to_markdown(existing, skip_existing)
|
|
|
+ result = self.process_conversion(existing_html_convert, skip_existing)
|
|
|
+ else:
|
|
|
+ result_item_model = self.get_search_result_item(result_id)
|
|
|
+ html_convert = HtmlConvertResult(
|
|
|
+ search_result_item_id=result_item_model.id,
|
|
|
+ search_result_item=result_item_model
|
|
|
+ )
|
|
|
+ result = self.process_conversion(html_convert, skip_existing)
|
|
|
+
|
|
|
+ if result:
|
|
|
+ with Session(self.db_manager.engine) as session:
|
|
|
session.add(result)
|
|
|
session.commit()
|
|
|
session.refresh(result)
|
|
|
return result
|
|
|
- else:
|
|
|
- # Create new record using the session's search_result_item
|
|
|
- session_search_item = session.merge(search_result_item)
|
|
|
- html_convert = HtmlConvertResult(
|
|
|
- search_result_item_id=session_search_item.id,
|
|
|
- search_result_item=session_search_item
|
|
|
- )
|
|
|
- result = convert_html_to_markdown(html_convert, skip_existing)
|
|
|
- session.add(result)
|
|
|
- session.commit()
|
|
|
- session.refresh(result)
|
|
|
- return result
|
|
|
-
|
|
|
-def convert_single_result_by_id(result_id: int, skip_existing: bool = True) -> HtmlConvertResult:
|
|
|
- """Convert a single SearchResultItem by ID"""
|
|
|
- db_manager = SearchResultManager()
|
|
|
- with Session(db_manager.engine) as session:
|
|
|
- result_item = session.get(SearchResultItem, result_id)
|
|
|
- if not result_item:
|
|
|
- raise ValueError(f"SearchResultItem with ID {result_id} not found")
|
|
|
- return convert_single_result(result_item, skip_existing)
|
|
|
|
|
|
def main():
|
|
|
- result_id = 21566
|
|
|
- result = convert_single_result_by_id(result_id)
|
|
|
+ # Example: Process a single result with ID 21566
|
|
|
+ result_id = 21567
|
|
|
+ converter = DoclingConverter()
|
|
|
+ converter.process_conversion_by_id(result_id)
|
|
|
+ logger.info(f"Successfully processed result {result_id}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|