valid_google_search.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import json
  2. import os
  3. import time
  4. import re
  5. import logging
  6. from pathlib import Path
  7. from typing import Dict, Optional, List
  8. from DrissionPage import ChromiumOptions, ChromiumPage
  9. from pydantic import BaseModel
  10. from scrapling import Adaptor
  11. from sqlmodel import Session, select
  12. from mylib.logu import logger
  13. from mylib.base import save_to_file
  14. from config.settings import GOOGLE_SEARCH_DIR
  15. from mylib.drission_page import load_chrome_from_ini
  16. from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask, VerificationItem
  17. from worker.search_engine.smart_selector import get_search_ele
  18. from DrissionPage.common import Keys
  19. from utils.proxy_pool import get_random_proxy
  20. from mylib.base import ensure_output_dir, save_to_file
  21. from scrapling import Adaptor
  22. class ValidSearchResult:
  23. def __init__(self):
  24. self.db_manager = SearchResultManager()
  25. def find_first_item_with_keyword(self, keyword: str = "真人") -> Optional[SearchResultItem]:
  26. """
  27. 获取数据库中所有 SearchResultItem,检查每个 html_path 文件内容,
  28. 如果包含指定关键词,则返回第一个匹配的 SearchResultItem。
  29. """
  30. # 获取所有 SearchResultItem
  31. items = self.db_manager.get_all_search_result_items()
  32. for item in items:
  33. if item.html_path and Path(item.html_path).exists() and item.html_path.endswith(".html"):
  34. try:
  35. # 读取 HTML 文件内容
  36. with open(item.html_path, 'r', encoding='utf-8') as file:
  37. content = file.read()
  38. # 检查是否包含关键词
  39. if keyword in content:
  40. logger.info(f"找到包含关键词 '{keyword}' 的结果: {item}")
  41. return item
  42. except Exception as e:
  43. logger.error(f"读取文件 {item.html_path} 时出错: {e}")
  44. logger.info(f"未找到包含关键词 '{keyword}' 的结果")
  45. return None
  46. def populate_verification_table(self, keyword: str = "真人"):
  47. """
  48. 遍历所有 SearchResultItem,将包含关键词的结果存入 VerificationItem 表。
  49. """
  50. items = self.db_manager.get_all_search_result_items()
  51. for item in items:
  52. if item.html_path and Path(item.html_path).exists() and item.html_path.endswith(".html"):
  53. if item.id % 100 == 0:
  54. logger.info(f"处理第 {item.id} 个结果")
  55. try:
  56. with open(item.html_path, 'r', encoding='utf-8') as file:
  57. content = file.read()
  58. page = Adaptor(content)
  59. body = Adaptor(page.body)
  60. if keyword in body.get_all_text():
  61. logger.info(f"将包含关键词 '{keyword}' 的结果 {item.id} 添加到 VerificationItem 表")
  62. self.db_manager.add_to_verification(item.id)
  63. except Exception as e:
  64. logger.error(f"处理文件 {item.html_path} 时出错: {e}")
  65. def get_valid_search_result_items(self) -> List[SearchResultItem]:
  66. """
  67. 获取所有未被标记为需要验证的 SearchResultItem。
  68. """
  69. all_items = self.db_manager.get_all_search_result_items()
  70. with Session(self.db_manager.engine) as session:
  71. valid_items = []
  72. for item in all_items:
  73. exists_in_verification = session.exec(
  74. select(VerificationItem)
  75. .where(VerificationItem.result_item_id == item.id)
  76. ).first()
  77. if not exists_in_verification:
  78. valid_items.append(item)
  79. return valid_items
  80. def get_single_valid_search_result(self, result_item_id: int) -> Optional[SearchResultItem]:
  81. """
  82. 根据 SearchResultItem.id 获取单个有效的 SearchResultItem。
  83. 如果该结果未被标记为需要验证,则返回该结果。
  84. """
  85. with Session(self.db_manager.engine) as session:
  86. # 检查 VerificationItem 表中是否存在对应的 result_item_id
  87. exists_in_verification = session.exec(
  88. select(VerificationItem)
  89. .where(VerificationItem.result_item_id == result_item_id)
  90. ).first()
  91. if not exists_in_verification:
  92. # 获取对应的 SearchResultItem
  93. result_item = session.exec(
  94. select(SearchResultItem)
  95. .where(SearchResultItem.id == result_item_id)
  96. ).first()
  97. if result_item:
  98. return result_item
  99. return None
  100. def try_get_url(self, browser_config: dict = {}):
  101. browser_config.update({'proxy': get_random_proxy()})
  102. logger.info(f"browser_config: {browser_config}")
  103. page = load_chrome_from_ini(**browser_config) if browser_config else load_chrome_from_ini()
  104. result_item = self.find_first_item_with_keyword()
  105. if result_item:
  106. page.get(result_item.url)
  107. logger.info(f"访问 URL: {result_item.url}")
  108. else:
  109. logger.warning("未找到包含关键词的结果")
  110. # page.quit()
  111. return result_item
  112. def load_dp_page(self, proxy=None, no_imgs=False):
  113. chrome_options = ChromiumOptions()
  114. if proxy:
  115. chrome_options.set_proxy(proxy)
  116. # 如果存在代理环境变量
  117. elif 'HTTP_PROXY' in os.environ:
  118. chrome_options.set_proxy(os.environ['HTTP_PROXY'])
  119. chrome_options.auto_port(True)
  120. chrome_options.no_imgs(no_imgs)
  121. logger.info(f"proxy {proxy}")
  122. page = ChromiumPage(chrome_options)
  123. tab = page.latest_tab
  124. tab.set.cookies
  125. return page
  126. def load_cookies(self):
  127. path = Path(r'G:\code\upwork\zhang_crawl_bio\CF-Clearance-Scraper\cookies.json')
  128. export_cookies = json.loads(path.read_text(encoding='utf-8'))
  129. export_cookies.get('cookies')
  130. def main():
  131. vsr = ValidSearchResult()
  132. res = vsr.get_single_valid_search_result(52)
  133. print(res)
  134. # items = vsr.get_valid_search_result_items()
  135. # # for item in items:
  136. # # print(item.id)
  137. # ids = [item.id for item in items]
  138. # with open("valid_ids.txt", "w") as f:
  139. # f.write("\n".join(str(id) for id in ids))
  140. # vsr.try_get_url()
  141. if __name__ == "__main__":
  142. main()