| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import asyncio
- from crawl4ai import *
- from pathlib import Path
- import json
- from DrissionPage import Chromium, ChromiumOptions, ChromiumPage
- from mylib.base import save_to_file, save_all_result,save_to_pickle,ensure_output_dir,save_base64_to_file,browser_config,replace_space
- from mylib.drission_page import load_chrome_from_ini
- from lxml import html # 使用 lxml.html 模块
- page = load_chrome_from_ini()
- async def aprocess_html(url:str, html:str=''):
- async with AsyncWebCrawler(config=browser_config) as crawler:
- result = await crawler.arun(
- url=url,
- cache_mode=CacheMode.ENABLED,
- user_agent='random'
- )
- return result
-
- def get_if_cache_exist(page:ChromiumPage, save_to_html_path:Path, url:str):
- if save_to_html_path.exists():
- page.get(save_to_html_path)
- else:
- page.get(url)
- def is_search_result_empty(html_content:str):
- ''' 当页面为空时,会存在 <div class="card-section"> 元素 '''
- tree = html.fromstring(html_content)
- # 使用 XPath 查找所有 class 包含 "card-section" 的 div 元素
- card_sections = tree.xpath("//div[contains(@class, 'card-section')]")
- print(f"card_sections {card_sections}")
- return len(card_sections) != 0
- async def search_all(cache=True):
- search_key = 'Acalypha malabarica essential oil'
- start = 30
- search_key = replace_space(search_key)
- url = f"https://www.google.com/search?q={search_key}&start={start}"
- print(f"search url: {url}")
-
- sav_search_key_dir = OUTPUT_DIR / search_key
- save_to_html_path = sav_search_key_dir / f"{start}.html"
- if not cache or not save_to_html_path.exists():
- page.get(url)
- print(f"save to {save_to_html_path}")
- save_to_file(page.html,save_to_html_path)
- html_content = page.html
- else:
- with open(save_to_html_path, 'r', encoding='utf-8') as f:
- html_content = f.read()
- if is_search_result_empty(html_content):
- save_path = page.get_screenshot(sav_search_key_dir / f"{start}.png")
- print(f"没有找到多余的页面 {url},退出")
- print(f"screenshot saved to {save_path}")
- else:
- file_html = f"file://{save_to_html_path}"
- result:CrawlResult = await aprocess_html(file_html)
- # result = await aprocess_html(url)
- # print(f"result.cleaned_html \n{result.cleaned_html}")
- # print(f"result.links: {len(result.links)}\n {result.links}")
- linkes = filter_links(result.links)
- # print(f"linkes: {len(linkes)}\n {linkes}")
-
-
- async def main():
- await search_all()
-
- if __name__ == "__main__":
- asyncio.run(main())
|