| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- from pathlib import Path
- import asyncio
- from DrissionPage import ChromiumPage
- from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
- from mylib.base import (
- save_to_file,
- OUTPUT_DIR,
- ensure_output_dir,
- replace_space,
- browser_config
- )
- from database.sql_model import DatabaseManager,SearchResult
- from lxml import html
- class SearchManager:
- def __init__(self, page: ChromiumPage):
- self.page = page
- self.db_manager = DatabaseManager()
-
- def search_keyword(self, keyword: str, start: int = 0) -> SearchResult:
- """搜索关键词并返回结果链接和保存的HTML文件路径
-
- Args:
- keyword: 要搜索的关键词
- start: 起始结果位置
-
- Returns:
- 包含搜索结果链接的列表和保存的HTML文件路径
- """
- url = f"https://www.google.com/search?q={keyword}&start={start}"
- self.page.get(url)
-
- # 保存HTML文件
- html_path = self.save_page(keyword, start)
-
- # 保存到数据库
- return self.db_manager.save_search_result(
- keyword=keyword,
- start=start,
- url=url,
- html_path=str(html_path)
- )
-
- async def next_page(self, keyword: str, current_start: int) -> list[str]:
- """翻到下一页并返回结果链接
-
- Args:
- keyword: 要搜索的关键词
- current_start: 当前起始结果位置
-
- Returns:
- 包含所有搜索结果链接的列表
- """
- return await self.search_keyword(keyword, current_start + 10)
-
- def save_page(self, keyword: str, start: int) -> Path:
- """保存当前页面"""
- keyword = replace_space(keyword)
- save_dir = OUTPUT_DIR / keyword
- ensure_output_dir(save_dir)
-
- save_path = save_dir / f"{start}.html"
- save_to_file(self.page.html, save_path)
- return save_path
-
- async def _process_page(self, url: str) -> CrawlResult:
- """处理页面内容"""
- async with AsyncWebCrawler(config=browser_config) as crawler:
- return await crawler.arun(
- url=url,
- cache_mode=CacheMode.ENABLED,
- user_agent='random'
- )
-
- def is_search_result_empty(self, html_content: str) -> bool:
- """检查搜索结果是否为空"""
- tree = html.fromstring(html_content)
- search_elements = tree.xpath('//*[@id="search"]/*')
- return len(search_elements) == 0
-
- def take_screenshot(self, save_path: Path) -> Path:
- """截图当前页面"""
- return self.page.get_screenshot(save_path)
-
- def check_cache(self, file_path: Path) -> bool:
- """检查缓存文件是否存在"""
- return file_path.exists()
-
- def load_from_cache(self, file_path: Path):
- """从缓存加载页面"""
- self.page.get(f"file://{file_path}")
- def go_to_next_page(self) -> bool:
- """跳转到下一页"""
- next_button = self.page.ele('#pnnext', timeout=1)
- if not next_button:
- return False
- next_button.click()
- return True
- def go_to_prev_page(self) -> bool:
- """跳转到上一页"""
- prev_button = self.page.ele('#pnprev', timeout=1)
- if not prev_button:
- return False
- prev_button.click()
- return True
- def extract_search_results(self, html_content: str) -> list[str]:
- """从搜索结果页面提取所有链接
-
- Args:
- html_content: 页面HTML内容
-
- Returns:
- 包含所有搜索结果链接的列表
- """
- tree = html.fromstring(html_content)
- rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0]
- links = []
- for element in rso.xpath('.//*[@href]'):
- href = element.get('href')
- if href and not href.startswith('#'):
- links.append(href)
- return links
- async def main():
- from mylib.drission_page import load_chrome_from_ini
-
- # 初始化浏览器
- page = load_chrome_from_ini()
- manager = SearchManager(page)
-
- # 示例用法
- keyword = "Acalypha matsudae essential oil"
-
- # 搜索关键词
- res = manager.search_keyword(keyword)
- print(f"Found results: {res.model_dump_json(indent=4)} ")
- if __name__ == "__main__":
- asyncio.run(main())
|