search_result_db.py 8.8 KB


  1. from datetime import datetime
  2. from typing import Optional, List
  3. from sqlmodel import SQLModel, Field, Relationship, create_engine, Session, select, delete, func
  4. from sqlalchemy.orm import relationship
  5. from sqlalchemy import UniqueConstraint
  6. from sqlalchemy.sql import text
  7. from pathlib import Path
  8. from config.settings import DB_URL
  9. class KeywordTask(SQLModel, table=True):
  10. __table_args__ = (UniqueConstraint("keyword", name="uq_keyword"),)
  11. id: Optional[int] = Field(default=None, primary_key=True)
  12. keyword: str = Field(index=True, unique=True)
  13. total_results: Optional[int] = None
  14. is_completed: bool = Field(default=False)
  15. created_at: datetime = Field(default_factory=datetime.now)
  16. pages: List["SearchPageResult"] = Relationship(back_populates="keyword_task")
  17. items: List["SearchResultItem"] = Relationship(back_populates="keyword_task")
  18. class SearchPageResult(SQLModel, table=True):
  19. __table_args__ = (UniqueConstraint("keyword_id", "page_number", name="uq_keyword_page"),)
  20. id: Optional[int] = Field(default=None, primary_key=True)
  21. keyword_id: int = Field(foreign_key="keywordtask.id")
  22. keyword: str = Field(index=True)
  23. page_number: int
  24. results_count: int
  25. has_next_page: bool
  26. html_path: Optional[str] = None
  27. created_at: datetime = Field(default_factory=datetime.now)
  28. keyword_task: Optional[KeywordTask] = Relationship(back_populates="pages")
  29. items: List["SearchResultItem"] = Relationship(back_populates="search_page")
  30. class SearchResultItem(SQLModel, table=True):
  31. __table_args__ = (UniqueConstraint("url", "page_id", name="uq_url_page"),)
  32. id: Optional[int] = Field(default=None, primary_key=True)
  33. url: str
  34. title: Optional[str] = None
  35. content: Optional[str] = None
  36. html_path: Optional[str] = None
  37. keyword_id: int = Field(foreign_key="keywordtask.id")
  38. keyword: str = Field(index=True)
  39. page_id: int = Field(foreign_key="searchpageresult.id")
  40. created_at: datetime = Field(default_factory=datetime.now)
  41. keyword_task: Optional[KeywordTask] = Relationship(back_populates="items")
  42. search_page: Optional[SearchPageResult] = Relationship(back_populates="items")
  43. class VerificationItem(SQLModel, table=True):
  44. __table_args__ = (UniqueConstraint("result_item_id", name="uq_verification_item"),)
  45. id: Optional[int] = Field(default=None, primary_key=True)
  46. result_item_id: int = Field(foreign_key="searchresultitem.id")
  47. search_result_item: Optional[SearchResultItem] = Relationship(
  48. sa_relationship=relationship("SearchResultItem", lazy="joined")
  49. )
  50. verified: bool = Field(default=False)
  51. created_at: datetime = Field(default_factory=datetime.now)
  52. class SearchResultManager:
  53. def __init__(self, db_url: str = DB_URL):
  54. self.engine = create_engine(db_url)
  55. SQLModel.metadata.create_all(self.engine)
  56. def get_keyword_task(self, keyword: str) -> KeywordTask | None:
  57. with Session(self.engine) as session:
  58. return session.exec(
  59. select(KeywordTask)
  60. .where(KeywordTask.keyword == keyword)
  61. ).first()
  62. def delete_keyword_task(self, keyword: str):
  63. """删除关键词及其所有关联数据"""
  64. with Session(self.engine) as session:
  65. # 先获取关键词任务
  66. keyword_task = session.exec(
  67. select(KeywordTask)
  68. .where(KeywordTask.keyword == keyword)
  69. ).first()
  70. if keyword_task:
  71. # 删除关联的SearchResultItem
  72. session.execute(
  73. delete(SearchResultItem)
  74. .where(SearchResultItem.keyword_id == keyword_task.id)
  75. )
  76. # 删除关联的SearchPageResult
  77. session.execute(
  78. delete(SearchPageResult)
  79. .where(SearchPageResult.keyword_id == keyword_task.id)
  80. )
  81. # 删除KeywordTask
  82. session.delete(keyword_task)
  83. session.commit()
  84. def create_keyword_task(self, keyword: str) -> KeywordTask:
  85. with Session(self.engine) as session:
  86. # 先删除可能存在的旧数据(在process_keyword中已处理)
  87. task = KeywordTask(keyword=keyword)
  88. session.add(task)
  89. session.commit()
  90. session.refresh(task)
  91. return task
  92. def save_page_results(
  93. self,
  94. keyword: str,
  95. page_number: int,
  96. results_count: int,
  97. has_next_page: bool,
  98. html_path: Optional[Path] = None
  99. ) -> SearchPageResult | None:
  100. with Session(self.engine) as session:
  101. existing = session.exec(
  102. select(SearchPageResult)
  103. .where(SearchPageResult.keyword == keyword)
  104. .where(SearchPageResult.page_number == page_number)
  105. ).first()
  106. if existing:
  107. return existing
  108. keyword_task = self.get_keyword_task(keyword)
  109. if not keyword_task:
  110. raise ValueError("Keyword task must exist before saving page results")
  111. page_result = SearchPageResult(
  112. keyword_id=keyword_task.id,
  113. keyword=keyword,
  114. page_number=page_number,
  115. results_count=results_count,
  116. has_next_page=has_next_page,
  117. html_path=str(html_path) if html_path else None
  118. )
  119. session.add(page_result)
  120. session.commit()
  121. session.refresh(page_result)
  122. return page_result
  123. def save_result_items(
  124. self,
  125. keyword: str,
  126. page_id: int,
  127. items: List[SearchResultItem],
  128. html_path: Optional[Path] = None
  129. ) -> int:
  130. with Session(self.engine) as session:
  131. keyword_task = session.exec(
  132. select(KeywordTask)
  133. .where(KeywordTask.keyword == keyword)
  134. ).first()
  135. if not keyword_task:
  136. raise ValueError(f"Keyword task not found for keyword: {keyword}")
  137. new_items = []
  138. for item in items:
  139. exists = session.exec(
  140. select(SearchResultItem)
  141. .where(SearchResultItem.url == item.url)
  142. .where(SearchResultItem.page_id == page_id)
  143. ).first()
  144. if not exists:
  145. new_item = SearchResultItem(
  146. url=item.url,
  147. title=item.title,
  148. content=item.content,
  149. html_path=str(html_path) if html_path else None,
  150. keyword_id=keyword_task.id,
  151. keyword=keyword,
  152. page_id=page_id
  153. )
  154. new_items.append(new_item)
  155. session.add_all(new_items)
  156. session.commit()
  157. return new_items
  158. def mark_task_completed(self, keyword: str):
  159. with Session(self.engine) as session:
  160. task = self.get_keyword_task(keyword)
  161. if not task:
  162. raise ValueError(f"Keyword task {keyword} not found")
  163. # 使用窗口函数确保统计准确性
  164. total_results = session.scalar(
  165. select(func.sum(SearchPageResult.results_count))
  166. .where(SearchPageResult.keyword_id == task.id)
  167. .execution_options(compile_kwargs={"literal_binds": True})
  168. ) or 0
  169. task.is_completed = True
  170. task.total_results = total_results
  171. session.add(task)
  172. session.commit()
  173. session.refresh(task)
  174. return task
  175. def is_task_completed(self, keyword: str) -> bool:
  176. task = self.get_keyword_task(keyword)
  177. return task.is_completed if task else False
  178. def get_all_search_result_items(self) -> List[SearchResultItem]:
  179. """
  180. 获取数据库中所有的 SearchResultItem。
  181. """
  182. with Session(self.engine) as session:
  183. return session.exec(select(SearchResultItem)).all()
  184. def add_to_verification(self, result_item_id: int):
  185. """
  186. 将 SearchResultItem 添加到 VerificationItem 表中,避免重复添加。
  187. """
  188. with Session(self.engine) as session:
  189. exists = session.exec(
  190. select(VerificationItem)
  191. .where(VerificationItem.result_item_id == result_item_id)
  192. ).first()
  193. if not exists:
  194. verification_item = VerificationItem(result_item_id=result_item_id)
  195. session.add(verification_item)
  196. session.commit()
  197. session.refresh(verification_item)
  198. return verification_item
  199. return exists