| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import json
- import os
- import time
- import re
- import logging
- from pathlib import Path
- from typing import Dict, Optional, List
- from DrissionPage import ChromiumOptions, ChromiumPage
- from pydantic import BaseModel
- from scrapling import Adaptor
- from sqlmodel import Session, select
- from mylib.logu import logger
- from mylib.base import save_to_file
- from config.settings import GOOGLE_SEARCH_DIR
- from mylib.drission_page import load_chrome_from_ini
- from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask, VerificationItem
- from worker.search_engine.smart_selector import get_search_ele
- from DrissionPage.common import Keys
- from utils.proxy_pool import get_random_proxy
- from mylib.base import ensure_output_dir, save_to_file
- from scrapling import Adaptor
- class ValidSearchResult:
- def __init__(self):
- self.db_manager = SearchResultManager()
- def find_first_item_with_keyword(self, keyword: str = "真人") -> Optional[SearchResultItem]:
- """
- 获取数据库中所有 SearchResultItem,检查每个 html_path 文件内容,
- 如果包含指定关键词,则返回第一个匹配的 SearchResultItem。
- """
- # 获取所有 SearchResultItem
- items = self.db_manager.get_all_search_result_items()
-
- for item in items:
- if item.html_path and Path(item.html_path).exists() and item.html_path.endswith(".html"):
- try:
- # 读取 HTML 文件内容
- with open(item.html_path, 'r', encoding='utf-8') as file:
- content = file.read()
- # 检查是否包含关键词
- if keyword in content:
- logger.info(f"找到包含关键词 '{keyword}' 的结果: {item}")
- return item
- except Exception as e:
- logger.error(f"读取文件 {item.html_path} 时出错: {e}")
-
- logger.info(f"未找到包含关键词 '{keyword}' 的结果")
- return None
- def populate_verification_table(self, keyword: str = "真人"):
- """
- 遍历所有 SearchResultItem,将包含关键词的结果存入 VerificationItem 表。
- """
- items = self.db_manager.get_all_search_result_items()
-
- for item in items:
- if item.html_path and Path(item.html_path).exists() and item.html_path.endswith(".html"):
- if item.id % 100 == 0:
- logger.info(f"处理第 {item.id} 个结果")
- try:
- with open(item.html_path, 'r', encoding='utf-8') as file:
- content = file.read()
- page = Adaptor(content)
- body = Adaptor(page.body)
- if keyword in body.get_all_text():
- logger.info(f"将包含关键词 '{keyword}' 的结果 {item.id} 添加到 VerificationItem 表")
- self.db_manager.add_to_verification(item.id)
- except Exception as e:
- logger.error(f"处理文件 {item.html_path} 时出错: {e}")
- def get_valid_search_result_items(self) -> List[SearchResultItem]:
- """
- 获取所有未被标记为需要验证的 SearchResultItem。
- """
- all_items = self.db_manager.get_all_search_result_items()
- with Session(self.db_manager.engine) as session:
- valid_items = []
- for item in all_items:
- exists_in_verification = session.exec(
- select(VerificationItem)
- .where(VerificationItem.result_item_id == item.id)
- ).first()
- if not exists_in_verification:
- valid_items.append(item)
- return valid_items
- def get_single_valid_search_result(self, result_item_id: int) -> Optional[SearchResultItem]:
- """
- 根据 SearchResultItem.id 获取单个有效的 SearchResultItem。
- 如果该结果未被标记为需要验证,则返回该结果。
- """
- with Session(self.db_manager.engine) as session:
- # 检查 VerificationItem 表中是否存在对应的 result_item_id
- exists_in_verification = session.exec(
- select(VerificationItem)
- .where(VerificationItem.result_item_id == result_item_id)
- ).first()
- if not exists_in_verification:
- # 获取对应的 SearchResultItem
- result_item = session.exec(
- select(SearchResultItem)
- .where(SearchResultItem.id == result_item_id)
- ).first()
- if result_item:
- return result_item
- return None
- def try_get_url(self, browser_config: dict = {}):
- browser_config.update({'proxy': get_random_proxy()})
- logger.info(f"browser_config: {browser_config}")
- page = load_chrome_from_ini(**browser_config) if browser_config else load_chrome_from_ini()
- result_item = self.find_first_item_with_keyword()
- if result_item:
- page.get(result_item.url)
- logger.info(f"访问 URL: {result_item.url}")
- else:
- logger.warning("未找到包含关键词的结果")
- # page.quit()
- return result_item
-
- def load_dp_page(self, proxy=None, no_imgs=False):
- chrome_options = ChromiumOptions()
- if proxy:
- chrome_options.set_proxy(proxy)
- # 如果存在代理环境变量
- elif 'HTTP_PROXY' in os.environ:
- chrome_options.set_proxy(os.environ['HTTP_PROXY'])
- chrome_options.auto_port(True)
- chrome_options.no_imgs(no_imgs)
-
- logger.info(f"proxy {proxy}")
- page = ChromiumPage(chrome_options)
- tab = page.latest_tab
- tab.set.cookies
- return page
- def load_cookies(self):
- path = Path(r'G:\code\upwork\zhang_crawl_bio\CF-Clearance-Scraper\cookies.json')
- export_cookies = json.loads(path.read_text(encoding='utf-8'))
- export_cookies.get('cookies')
- def main():
- vsr = ValidSearchResult()
- res = vsr.get_single_valid_search_result(52)
- print(res)
- # items = vsr.get_valid_search_result_items()
- # # for item in items:
- # # print(item.id)
- # ids = [item.id for item in items]
- # with open("valid_ids.txt", "w") as f:
- # f.write("\n".join(str(id) for id in ids))
- # vsr.try_get_url()
- if __name__ == "__main__":
- main()
|