converter_base.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. from pathlib import Path
  2. from typing import Optional
  3. from sqlmodel import Session, select
  4. from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
  5. from worker.html_convert.models import HtmlConvertResult
  6. from mylib.logu import get_logger
  7. import re
  8. from urllib.parse import urlparse, urljoin
  9. from config.settings import GOOGLE_SEARCH_DIR
  10. logger = get_logger('html_convert')
  11. class ConverterBase:
  12. """Base class for all conversion operations"""
  13. def __init__(self):
  14. self.db_manager = SearchResultManager()
  15. def get_search_result_item(self, result_id: int) -> Optional[SearchResultItem]:
  16. """Get the search result item by ID"""
  17. with Session(self.db_manager.engine) as session:
  18. return session.get(SearchResultItem, result_id)
  19. def get_html_convert_result(self, result_id: int) -> Optional[HtmlConvertResult]:
  20. """Get HtmlConvertResult by SearchResultItem ID"""
  21. with Session(self.db_manager.engine) as session:
  22. return session.exec(
  23. select(HtmlConvertResult)
  24. .where(HtmlConvertResult.search_result_item_id == result_id)
  25. ).first()
  26. def ensure_convert_dir(self, html_path: Path) -> Path:
  27. """Ensure conversion directory exists"""
  28. convert_dir = html_path.parent.parent / "html_convert"
  29. convert_dir.mkdir(exist_ok=True)
  30. return convert_dir
  31. def extract_content_after_first_h1(self, content: str) -> str:
  32. """
  33. Extract content starting from the first H1 heading.
  34. This removes any content before the first H1 tag.
  35. Example:
  36. Input:
  37. ```
  38. Some header content
  39. ## Subtitle
  40. More content
  41. # First Main Title
  42. Actual content starts here
  43. ```
  44. Output:
  45. ```
  46. # First Main Title
  47. Actual content starts here
  48. ```
  49. """
  50. h1_pattern = r'^# .+$'
  51. match = re.search(h1_pattern, content, re.MULTILINE)
  52. if match:
  53. return content[match.start():]
  54. return content
  55. def fix_inline_links(self, content: str) -> str:
  56. """
  57. Fix inline links by handling special URL patterns.
  58. This method processes markdown links in the format:
  59. [text](domain<url>) and converts them to [text](url).
  60. Handles three cases:
  61. 1. If URL is relative, it combines with domain
  62. 2. If URL is absolute, it uses the URL directly
  63. 3. If link has empty text but contains <> pattern
  64. Examples:
  65. 1. [Author](https://example.com/<https://actual.com/path>)
  66. => [Author](https://actual.com/path)
  67. 2. [Link](https://domain.com/<relative/path>)
  68. => [Link](https://domain.com/relative/path)
  69. 3. ![](image.png) [](https://domain.com/<#anchor>)
  70. => ![](image.png) [](https://domain.com/#anchor)
  71. """
  72. link_pattern = r'\[([^\]]*)\]\(([^<]*)<([^>]*)>\)'
  73. def replace_link(match):
  74. text = match.group(1)
  75. domain = match.group(2)
  76. url = match.group(3)
  77. if not text and url.startswith('#'):
  78. # Handle empty text with anchor links
  79. if domain:
  80. parsed_domain = urlparse(domain)
  81. base_url = f"{parsed_domain.scheme}://{parsed_domain.netloc}{parsed_domain.path}"
  82. return f'[]({base_url}{url})'
  83. return f'[]({url})'
  84. if url.startswith('/'):
  85. if domain:
  86. parsed_domain = urlparse(domain)
  87. base_url = f"{parsed_domain.scheme}://{parsed_domain.netloc}"
  88. return f'[{text}]({urljoin(base_url, url)})'
  89. return f'[{text}]({url})'
  90. return f'[{text}]({url})'
  91. return re.sub(link_pattern, replace_link, content)
  92. def add_url_header(self, content: str, url: str) -> str:
  93. """
  94. Add URL as a header at the top of the content.
  95. The URL is added in markdown link format:
  96. [URL](URL)
  97. Example:
  98. Input:
  99. ```
  100. Some content
  101. ```
  102. With URL: https://example.com
  103. Output:
  104. ```
  105. [https://example.com](https://example.com)
  106. Some content
  107. ```
  108. """
  109. return f"[{url}]({url})\n\n{content}"
  110. def filter_markdown(self, content: str) -> str:
  111. """
  112. Filter markdown content according to specified rules:
  113. 1. Remove content before first H1
  114. 2. Fix inline links
  115. 3. (URL header is added separately)
  116. Example:
  117. Input:
  118. ```
  119. [ Skip to main content ](https://pmc.ncbi.nlm.nih.gov/articles/PMC9919988/<#main-content>)
  120. ![](https://pmc.ncbi.nlm.nih.gov/static/img/us_flag.svg)
  121. ## PERMALINK
  122. Copy
  123. # Main Title Here
  124. ### Author Name
  125. [Author](https://example.com/<https://actual.com/path>)
  126. ```
  127. Output:
  128. ```
  129. # Main Title Here
  130. ### Author Name
  131. [Author](https://actual.com/path)
  132. ```
  133. """
  134. content = self.extract_content_after_first_h1(content)
  135. logger.info(f"extract_content_after_first_h1: {content[:300]}")
  136. content = self.fix_inline_links(content)
  137. logger.info(f"fix_inline_links: {content[:300]}")
  138. return content
  139. def save_html_convert_result(self, html_convert: HtmlConvertResult):
  140. """Save HtmlConvertResult to database"""
  141. with Session(self.db_manager.engine) as session:
  142. session.add(html_convert)
  143. session.commit()
  144. session.refresh(html_convert)