| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- from pathlib import Path
- import asyncio
- from DrissionPage import ChromiumPage
- class PageAccessError(Exception):
- """自定义页面访问错误异常"""
- def __init__(self, message, html_path=None, screenshot_path=None):
- super().__init__(message)
- self.html_path = html_path
- self.screenshot_path = screenshot_path
- from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
- from mylib.base import (
- save_to_file,
- ensure_output_dir,
- replace_space,
- browser_config
- )
- from database.search_model import SearchDatabaseManager,SearchResult
- from database.excel_import import ExcelDatabaseManager,KeywordModel
- from database.sqlite_engine import create_db_and_tables, drop_table
- from mylib.drission_page import load_chrome_from_ini,load_random_ua_chrome
- from lxml import html
- from config.settings import GOOGLE_SEARCH_DIR
- class SearchManager:
- def __init__(self, page: ChromiumPage):
- self.page = page
- self.tab = page.latest_tab
- self.search_db_manager = SearchDatabaseManager()
- self.excel_db_manager = ExcelDatabaseManager()
-
- def search_keyword(self, keyword: str, start: int = 0, cache: bool = True) -> SearchResult:
- """搜索关键词并返回结果链接和保存的HTML文件路径
-
- Args:
- keyword: 要搜索的关键词
- start: 起始结果位置
- cache: 是否使用缓存
-
- Returns:
- 包含搜索结果链接的列表和保存的HTML文件路径
- """
- # 检查缓存
- if cache:
- existing_result = self.search_db_manager.get_existing_result(keyword, start)
- print(f"Using existing result for {keyword} {start}")
- if existing_result:
- return existing_result
- url = f"https://www.google.com/search?q={keyword}"
- if start == 0:
- # 执行搜索
- self.tab.get(url)
- else:
- self.go_to_next_page()
- # 保存HTML文件
- html_path = self.save_page(keyword, start)
- # 检查是否是最后一页
- is_last_page = self.check_last_page()
- # 保存到数据库
- return self.save_search_result(keyword, start, url, html_path, is_last_page)
- def check_last_page(self):
- tree = html.fromstring(self.tab.html)
-
- # 检查 id="search" 是否存在
- search_element = tree.xpath('//*[@id="search"]')
-
- # 检查 id="pnnext" 是否存在
- pnnext_element = tree.xpath('//*[@id="pnnext"]')
-
- # 如果 id="search" 存在,且 id="pnnext" 不存在,则说明是最后一页
- if pnnext_element:
- return False
- elif search_element and not pnnext_element:
- return True # 是最后一页
- else:
- raise PageAccessError("网页错误,无法确定是否是最后一页")
- def walk_search_one_keywords(self, keyword_model: KeywordModel, start: int = 0, pages_num: int = 250, cache=True, skip_exist=True):
- keyword= keyword_model.key_word
- for current_start in range(start, pages_num, 10):
- search_result:SearchResult = self.search_keyword(keyword, current_start, cache)
- if search_result.is_last_page:
- print(f"Reached last page for {keyword} at start={current_start}")
- self.excel_db_manager.mark_keyword_done(keyword)
- break
- else:
- self.go_to_next_page()
- def save_search_result(self, keyword: str, start: int, url: str, html_path: str, is_last_page: bool = False) -> SearchResult:
- """保存搜索结果到数据库
-
- Args:
- keyword: 搜索关键词
- start: 起始位置
- url: 搜索URL
- html_path: 保存的HTML文件路径
- is_last_page: 是否是最后一页
-
- Returns:
- 数据库中的SearchResult记录
- """
- return self.search_db_manager.save_search_result(
- keyword=keyword,
- start=start,
- url=url,
- html_path=str(html_path),
- is_last_page=is_last_page
- )
-
- async def next_page(self, keyword: str, current_start: int, cache: bool = True) -> list[str]:
- """翻到下一页并返回结果链接
-
- Args:
- keyword: 要搜索的关键词
- current_start: 当前起始结果位置
- cache: 是否使用缓存
-
- Returns:
- 包含所有搜索结果链接的列表
- """
- # 检查是否是最后一页
- existing = self.search_db_manager.get_existing_result(keyword, current_start)
- if existing and existing.is_last_page:
- print(f"Reached last page for {keyword} at start={current_start}")
- return []
-
- return await self.search_keyword(keyword, current_start + 10, cache)
-
- def save_page(self, keyword: str, start: int) -> Path:
- """保存当前页面"""
- save_dir = GOOGLE_SEARCH_DIR / keyword
- ensure_output_dir(save_dir)
-
- save_path = save_dir / f"{start}.html"
- save_to_file(self.tab.html, save_path)
- return save_path
-
- async def _process_page(self, url: str) -> CrawlResult:
- """处理页面内容"""
- async with AsyncWebCrawler(config=browser_config) as crawler:
- return await crawler.arun(
- url=url,
- cache_mode=CacheMode.ENABLED,
- user_agent='random'
- )
-
- def is_search_result_empty(self, html_content: str) -> bool:
- """检查搜索结果是否为空"""
- tree = html.fromstring(html_content)
- search_elements = tree.xpath('//*[@id="search"]/*')
- return len(search_elements) == 0
-
- def go_to_next_page(self) -> bool:
- """跳转到下一页"""
- next_button = self.tab.ele('#pnnext', timeout=1)
- if not next_button:
- return False
- next_button.click()
- return True
- def go_to_prev_page(self) -> bool:
- """跳转到上一页"""
- prev_button = self.tab.ele('#pnprev', timeout=1)
- if not prev_button:
- return False
- prev_button.click()
- return True
- def extract_search_results(self, html_content: str) -> list[str]:
- """从搜索结果页面提取所有链接
-
- Args:
- html_content: 页面HTML内容
-
- Returns:
- 包含所有搜索结果链接的列表
- """
- tree = html.fromstring(html_content)
- rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0]
- links = []
- for element in rso.xpath('.//*[@href]'):
- href = element.get('href')
- if href and not href.startswith('#'):
- links.append(href)
- return links
- def restart_browser(self,):
- """重启浏览器"""
- self.page.quit()
- self.page = load_random_ua_chrome()
- self.tab = self.page.latest_tab
-
- def test_one():
- # 初始化浏览器
- self = SearchManager(load_random_ua_chrome())
- key_model_list = self.excel_db_manager.get_keywords_by_status()
- key_model = key_model_list.pop(0)
- print(key_model)
- self.walk_search_one_keywords(key_model)
- def test_all():
- # 初始化浏览器
- self = SearchManager(load_random_ua_chrome())
- key_model_list = self.excel_db_manager.get_keywords_by_status()
- all_count = self.excel_db_manager.get_keywords_count()
- print("遍历所有搜索词, len = ", len(key_model_list))
- # 遍历所有搜索词
- for key_model in key_model_list:
- print('---------------------------')
- print(f"Processing keyword: {key_model.key_word}")
- print(f"总计: len {all_count} / 当前 {key_model.id}")
- try:
- self.walk_search_one_keywords(key_model)
- except PageAccessError as e:
- print(f"{str(e)}")
- # 保存当前页面HTML和截图
- start = 0
- error_html_path = self.save_page(key_model.key_word, "error_" + str(start))
- screenshot_path = str(error_html_path.with_suffix('.png'))
- self.tab.get_screenshot(path=screenshot_path)
- print(f"screenshot saved to {screenshot_path}")
- self.restart_browser()
- async def main():
- create_db_and_tables()
- test_all()
-
- # test_one()
- # global page
- # self = SearchManager(page)
- # self.search_db_manager.get_search_results("python", 0)
- # 搜索关键词
- # res = manager.search_keyword(keyword_model.key_word, cache=True)
- # print(f"Found results: {res.model_dump_json(indent=4)} ")
- if __name__ == "__main__":
- asyncio.run(main())
|