import asyncio
from crawl4ai import *
from pathlib import Path
import json
from DrissionPage import Chromium, ChromiumOptions, ChromiumPage
from mylib.base import save_to_file, save_all_result,OUTPUT_DIR,save_to_pickle,ensure_output_dir,save_base64_to_file,browser_config,replace_space
from mylib.drission_page import load_chrome_from_ini
from lxml import html # 使用 lxml.html 模块
page = load_chrome_from_ini()
async def aprocess_html(url:str, html:str=''):
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url=url,
cache_mode=CacheMode.ENABLED,
user_agent='random'
)
return result
def get_if_cache_exist(page:ChromiumPage, save_to_html_path:Path, url:str):
if save_to_html_path.exists():
page.get(save_to_html_path)
else:
page.get(url)
def is_search_result_empty(html_content:str):
''' 当页面为空时,会存在
元素 '''
tree = html.fromstring(html_content)
# 使用 XPath 查找所有 class 包含 "card-section" 的 div 元素
card_sections = tree.xpath("//div[contains(@class, 'card-section')]")
print(f"card_sections {card_sections}")
return len(card_sections) != 0
async def search_all(cache=True):
search_key = 'Acalypha malabarica essential oil'
start = 30
search_key = replace_space(search_key)
url = f"https://www.google.com/search?q={search_key}&start={start}"
print(f"search url: {url}")
sav_search_key_dir = OUTPUT_DIR / search_key
save_to_html_path = sav_search_key_dir / f"{start}.html"
if not cache or not save_to_html_path.exists():
page.get(url)
print(f"save to {save_to_html_path}")
save_to_file(page.html,save_to_html_path)
html_content = page.html
else:
with open(save_to_html_path, 'r', encoding='utf-8') as f:
html_content = f.read()
if is_search_result_empty(html_content):
save_path = page.get_screenshot(sav_search_key_dir / f"{start}.png")
print(f"没有找到多余的页面 {url},退出")
print(f"screenshot saved to {save_path}")
else:
file_html = f"file://{save_to_html_path}"
result:CrawlResult = await aprocess_html(file_html)
# result = await aprocess_html(url)
# print(f"result.cleaned_html \n{result.cleaned_html}")
# print(f"result.links: {len(result.links)}\n {result.links}")
linkes = filter_links(result.links)
# print(f"linkes: {len(linkes)}\n {linkes}")
async def main():
await search_all()
if __name__ == "__main__":
asyncio.run(main())