import asyncio from crawl4ai import * from pathlib import Path import json from DrissionPage import Chromium, ChromiumOptions, ChromiumPage from mylib.base import save_to_file, save_all_result,OUTPUT_DIR,save_to_pickle,ensure_output_dir,save_base64_to_file,browser_config,replace_space from mylib.drission_page import load_chrome_from_ini from lxml import html # 使用 lxml.html 模块 page = load_chrome_from_ini() async def aprocess_html(url:str, html:str=''): async with AsyncWebCrawler(config=browser_config) as crawler: result = await crawler.arun( url=url, cache_mode=CacheMode.ENABLED, user_agent='random' ) return result def get_if_cache_exist(page:ChromiumPage, save_to_html_path:Path, url:str): if save_to_html_path.exists(): page.get(save_to_html_path) else: page.get(url) def is_search_result_empty(html_content:str): ''' 当页面为空时,会存在
元素 ''' tree = html.fromstring(html_content) # 使用 XPath 查找所有 class 包含 "card-section" 的 div 元素 card_sections = tree.xpath("//div[contains(@class, 'card-section')]") print(f"card_sections {card_sections}") return len(card_sections) != 0 async def search_all(cache=True): search_key = 'Acalypha malabarica essential oil' start = 30 search_key = replace_space(search_key) url = f"https://www.google.com/search?q={search_key}&start={start}" print(f"search url: {url}") sav_search_key_dir = OUTPUT_DIR / search_key save_to_html_path = sav_search_key_dir / f"{start}.html" if not cache or not save_to_html_path.exists(): page.get(url) print(f"save to {save_to_html_path}") save_to_file(page.html,save_to_html_path) html_content = page.html else: with open(save_to_html_path, 'r', encoding='utf-8') as f: html_content = f.read() if is_search_result_empty(html_content): save_path = page.get_screenshot(sav_search_key_dir / f"{start}.png") print(f"没有找到多余的页面 {url},退出") print(f"screenshot saved to {save_path}") else: file_html = f"file://{save_to_html_path}" result:CrawlResult = await aprocess_html(file_html) # result = await aprocess_html(url) # print(f"result.cleaned_html \n{result.cleaned_html}") # print(f"result.links: {len(result.links)}\n {result.links}") linkes = filter_links(result.links) # print(f"linkes: {len(linkes)}\n {linkes}") async def main(): await search_all() if __name__ == "__main__": asyncio.run(main())