from pathlib import Path import asyncio from DrissionPage import ChromiumPage class PageAccessError(Exception): """自定义页面访问错误异常""" def __init__(self, message, html_path=None, screenshot_path=None): super().__init__(message) self.html_path = html_path self.screenshot_path = screenshot_path from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult from mylib.base import ( save_to_file, ensure_output_dir, replace_space, browser_config ) from database.search_model import SearchDatabaseManager,SearchResult from database.excel_import import ExcelDatabaseManager,KeywordModel from database.sqlite_engine import create_db_and_tables, drop_table from mylib.drission_page import load_chrome_from_ini,load_random_ua_chrome from lxml import html from config.settings import GOOGLE_SEARCH_DIR class SearchManager: def __init__(self, page: ChromiumPage): self.page = page self.tab = page.latest_tab self.search_db_manager = SearchDatabaseManager() self.excel_db_manager = ExcelDatabaseManager() def search_keyword(self, keyword: str, start: int = 0, cache: bool = True) -> SearchResult: """搜索关键词并返回结果链接和保存的HTML文件路径 Args: keyword: 要搜索的关键词 start: 起始结果位置 cache: 是否使用缓存 Returns: 包含搜索结果链接的列表和保存的HTML文件路径 """ # 检查缓存 if cache: existing_result = self.search_db_manager.get_existing_result(keyword, start) print(f"Using existing result for {keyword} {start}") if existing_result: return existing_result url = f"https://www.google.com/search?q={keyword}" if start == 0: # 执行搜索 self.tab.get(url) else: self.go_to_next_page() # 保存HTML文件 html_path = self.save_page(keyword, start) # 检查是否是最后一页 is_last_page = self.check_last_page() # 保存到数据库 return self.save_search_result(keyword, start, url, html_path, is_last_page) def check_last_page(self): tree = html.fromstring(self.tab.html) # 检查 id="search" 是否存在 search_element = tree.xpath('//*[@id="search"]') # 检查 id="pnnext" 是否存在 pnnext_element = tree.xpath('//*[@id="pnnext"]') # 如果 id="search" 存在,且 id="pnnext" 不存在,则说明是最后一页 if pnnext_element: return False elif search_element and not pnnext_element: return True # 是最后一页 else: raise PageAccessError("网页错误,无法确定是否是最后一页") def walk_search_one_keywords(self, keyword_model: KeywordModel, start: int = 0, pages_num: int = 250, cache=True, skip_exist=True): keyword= keyword_model.key_word for current_start in range(start, pages_num, 10): search_result:SearchResult = self.search_keyword(keyword, current_start, cache) if search_result.is_last_page: print(f"Reached last page for {keyword} at start={current_start}") self.excel_db_manager.mark_keyword_done(keyword) break else: self.go_to_next_page() def save_search_result(self, keyword: str, start: int, url: str, html_path: str, is_last_page: bool = False) -> SearchResult: """保存搜索结果到数据库 Args: keyword: 搜索关键词 start: 起始位置 url: 搜索URL html_path: 保存的HTML文件路径 is_last_page: 是否是最后一页 Returns: 数据库中的SearchResult记录 """ return self.search_db_manager.save_search_result( keyword=keyword, start=start, url=url, html_path=str(html_path), is_last_page=is_last_page ) async def next_page(self, keyword: str, current_start: int, cache: bool = True) -> list[str]: """翻到下一页并返回结果链接 Args: keyword: 要搜索的关键词 current_start: 当前起始结果位置 cache: 是否使用缓存 Returns: 包含所有搜索结果链接的列表 """ # 检查是否是最后一页 existing = self.search_db_manager.get_existing_result(keyword, current_start) if existing and existing.is_last_page: print(f"Reached last page for {keyword} at start={current_start}") return [] return await self.search_keyword(keyword, current_start + 10, cache) def save_page(self, keyword: str, start: int) -> Path: """保存当前页面""" save_dir = GOOGLE_SEARCH_DIR / keyword ensure_output_dir(save_dir) save_path = save_dir / f"{start}.html" save_to_file(self.tab.html, save_path) return save_path async def _process_page(self, url: str) -> CrawlResult: """处理页面内容""" async with AsyncWebCrawler(config=browser_config) as crawler: return await crawler.arun( url=url, cache_mode=CacheMode.ENABLED, user_agent='random' ) def is_search_result_empty(self, html_content: str) -> bool: """检查搜索结果是否为空""" tree = html.fromstring(html_content) search_elements = tree.xpath('//*[@id="search"]/*') return len(search_elements) == 0 def go_to_next_page(self) -> bool: """跳转到下一页""" next_button = self.tab.ele('#pnnext', timeout=1) if not next_button: return False next_button.click() return True def go_to_prev_page(self) -> bool: """跳转到上一页""" prev_button = self.tab.ele('#pnprev', timeout=1) if not prev_button: return False prev_button.click() return True def extract_search_results(self, html_content: str) -> list[str]: """从搜索结果页面提取所有链接 Args: html_content: 页面HTML内容 Returns: 包含所有搜索结果链接的列表 """ tree = html.fromstring(html_content) rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0] links = [] for element in rso.xpath('.//*[@href]'): href = element.get('href') if href and not href.startswith('#'): links.append(href) return links def restart_browser(self,): """重启浏览器""" self.page.quit() self.page = load_random_ua_chrome() self.tab = self.page.latest_tab def test_one(): # 初始化浏览器 self = SearchManager(load_random_ua_chrome()) key_model_list = self.excel_db_manager.get_keywords_by_status() key_model = key_model_list.pop(0) print(key_model) self.walk_search_one_keywords(key_model) def test_all(): # 初始化浏览器 self = SearchManager(load_random_ua_chrome()) key_model_list = self.excel_db_manager.get_keywords_by_status() all_count = self.excel_db_manager.get_keywords_count() print("遍历所有搜索词, len = ", len(key_model_list)) # 遍历所有搜索词 for key_model in key_model_list: print('---------------------------') print(f"Processing keyword: {key_model.key_word}") print(f"总计: len {all_count} / 当前 {key_model.id}") try: self.walk_search_one_keywords(key_model) except PageAccessError as e: print(f"{str(e)}") # 保存当前页面HTML和截图 start = 0 error_html_path = self.save_page(key_model.key_word, "error_" + str(start)) screenshot_path = str(error_html_path.with_suffix('.png')) self.tab.get_screenshot(path=screenshot_path) print(f"screenshot saved to {screenshot_path}") self.restart_browser() async def main(): create_db_and_tables() test_all() # test_one() # global page # self = SearchManager(page) # self.search_db_manager.get_search_results("python", 0) # 搜索关键词 # res = manager.search_keyword(keyword_model.key_word, cache=True) # print(f"Found results: {res.model_dump_json(indent=4)} ") if __name__ == "__main__": asyncio.run(main())