| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- from pathlib import Path
- import asyncio
- from DrissionPage import ChromiumPage
- from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
- from mylib.base import (
- save_to_file,
- ensure_output_dir,
- replace_space,
- browser_config
- )
- from database.search_model import SearchDatabaseManager,SearchResult
- from database.excel_import import ExcelDatabaseManager,KeywordModel
- from database.sqlite_engine import create_db_and_tables, drop_table
- from mylib.drission_page import load_chrome_from_ini
- from lxml import html
- from mylib.settings import GOOGLE_SEARCH_DIR
- page = load_chrome_from_ini()
- class SearchManager:
- def __init__(self, page: ChromiumPage):
- self.page = page
- self.search_db_manager = SearchDatabaseManager()
- self.excel_db_manager = ExcelDatabaseManager()
-
- def search_keyword(self, keyword: str, start: int = 0, cache: bool = True) -> SearchResult:
- """搜索关键词并返回结果链接和保存的HTML文件路径
-
- Args:
- keyword: 要搜索的关键词
- start: 起始结果位置
- cache: 是否使用缓存
-
- Returns:
- 包含搜索结果链接的列表和保存的HTML文件路径
- """
- # 检查缓存
- if cache:
- existing_result = self.search_db_manager.get_existing_result(keyword, start)
- print(f"Using existing result for {keyword} {start}")
- if existing_result:
- return existing_result
-
- # 执行搜索
- url = f"https://www.google.com/search?q={keyword}&start={start}"
- self.page.get(url)
-
- # 保存HTML文件
- html_path = self.save_page(keyword, start)
-
- # 检查是否是最后一页
- is_last_page = self.check_last_page()
-
- # 保存到数据库
- return self.save_search_result(keyword, start, url, html_path, is_last_page)
- def check_last_page(self):
- tree = html.fromstring(self.page.html)
-
- # 检查 id="search" 是否存在
- search_element = tree.xpath('//*[@id="search"]')
-
- # 检查 id="pnnext" 是否存在
- pnnext_element = tree.xpath('//*[@id="pnnext"]')
-
- # 如果 id="search" 存在,且 id="pnnext" 不存在,则说明是最后一页
- if pnnext_element:
- return False
- elif search_element and not pnnext_element:
- return True # 是最后一页
- else:
- raise ValueError("网页错误,无法确定是否是最后一页。")
- def walk_search_one_keywords(self, keyword_model: KeywordModel, start: int = 0, pages_num: int = 250, cache=True, skip_exist=True):
- keyword= keyword_model.key_word
- for current_start in range(start, pages_num, 10):
- search_result:SearchResult = self.search_keyword(keyword, current_start, cache)
- if search_result.is_last_page:
- print(f"Reached last page for {keyword} at start={current_start}")
- self.excel_db_manager.mark_keyword_done(keyword)
- break
- def save_search_result(self, keyword: str, start: int, url: str, html_path: str, is_last_page: bool = False) -> SearchResult:
- """保存搜索结果到数据库
-
- Args:
- keyword: 搜索关键词
- start: 起始位置
- url: 搜索URL
- html_path: 保存的HTML文件路径
- is_last_page: 是否是最后一页
-
- Returns:
- 数据库中的SearchResult记录
- """
- return self.search_db_manager.save_search_result(
- keyword=keyword,
- start=start,
- url=url,
- html_path=str(html_path),
- is_last_page=is_last_page
- )
-
- async def next_page(self, keyword: str, current_start: int, cache: bool = True) -> list[str]:
- """翻到下一页并返回结果链接
-
- Args:
- keyword: 要搜索的关键词
- current_start: 当前起始结果位置
- cache: 是否使用缓存
-
- Returns:
- 包含所有搜索结果链接的列表
- """
- # 检查是否是最后一页
- existing = self.search_db_manager.get_existing_result(keyword, current_start)
- if existing and existing.is_last_page:
- print(f"Reached last page for {keyword} at start={current_start}")
- return []
-
- return await self.search_keyword(keyword, current_start + 10, cache)
-
- def save_page(self, keyword: str, start: int) -> Path:
- """保存当前页面"""
- save_dir = GOOGLE_SEARCH_DIR / keyword
- ensure_output_dir(save_dir)
-
- save_path = save_dir / f"{start}.html"
- save_to_file(self.page.html, save_path)
- return save_path
-
- async def _process_page(self, url: str) -> CrawlResult:
- """处理页面内容"""
- async with AsyncWebCrawler(config=browser_config) as crawler:
- return await crawler.arun(
- url=url,
- cache_mode=CacheMode.ENABLED,
- user_agent='random'
- )
-
- def is_search_result_empty(self, html_content: str) -> bool:
- """检查搜索结果是否为空"""
- tree = html.fromstring(html_content)
- search_elements = tree.xpath('//*[@id="search"]/*')
- return len(search_elements) == 0
-
- def go_to_next_page(self) -> bool:
- """跳转到下一页"""
- next_button = self.page.ele('#pnnext', timeout=1)
- if not next_button:
- return False
- next_button.click()
- return True
- def go_to_prev_page(self) -> bool:
- """跳转到上一页"""
- prev_button = self.page.ele('#pnprev', timeout=1)
- if not prev_button:
- return False
- prev_button.click()
- return True
- def extract_search_results(self, html_content: str) -> list[str]:
- """从搜索结果页面提取所有链接
-
- Args:
- html_content: 页面HTML内容
-
- Returns:
- 包含所有搜索结果链接的列表
- """
- tree = html.fromstring(html_content)
- rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0]
- links = []
- for element in rso.xpath('.//*[@href]'):
- href = element.get('href')
- if href and not href.startswith('#'):
- links.append(href)
- return links
- def test_one():
- global page
- # 初始化浏览器
- self = SearchManager(page)
- key_model_list = self.excel_db_manager.get_keywords_by_status()
- key_model = key_model_list.pop(0)
- print(key_model)
- self.walk_search_one_keywords(key_model)
- async def main():
- create_db_and_tables()
- # test_one()
- # global page
- # self = SearchManager(page)
- # self.search_db_manager.get_search_results("python", 0)
- # 搜索关键词
- # res = manager.search_keyword(keyword_model.key_word, cache=True)
- # print(f"Found results: {res.model_dump_json(indent=4)} ")
- if __name__ == "__main__":
- asyncio.run(main())
|