drission_page_process.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import asyncio
  2. from crawl4ai import *
  3. from pathlib import Path
  4. import json
  5. from DrissionPage import Chromium, ChromiumOptions, ChromiumPage
  6. from mylib.base import save_to_file, save_all_result,OUTPUT_DIR,save_to_pickle,ensure_output_dir,save_base64_to_file,browser_config,replace_space
  7. from mylib.drission_page import load_chrome_from_ini
  8. from lxml import html # 使用 lxml.html 模块
  9. page = load_chrome_from_ini()
  10. async def aprocess_html(url:str, html:str=''):
  11. async with AsyncWebCrawler(config=browser_config) as crawler:
  12. result = await crawler.arun(
  13. url=url,
  14. cache_mode=CacheMode.ENABLED,
  15. user_agent='random'
  16. )
  17. return result
  18. def get_if_cache_exist(page:ChromiumPage, save_to_html_path:Path, url:str):
  19. if save_to_html_path.exists():
  20. page.get(save_to_html_path)
  21. else:
  22. page.get(url)
  23. def is_search_result_empty(html_content:str):
  24. ''' 当页面为空时,会存在 <div class="card-section"> 元素 '''
  25. tree = html.fromstring(html_content)
  26. # 使用 XPath 查找所有 class 包含 "card-section" 的 div 元素
  27. card_sections = tree.xpath("//div[contains(@class, 'card-section')]")
  28. print(f"card_sections {card_sections}")
  29. return len(card_sections) != 0
  30. async def search_all(cache=True):
  31. search_key = 'Acalypha malabarica essential oil'
  32. start = 30
  33. search_key = replace_space(search_key)
  34. url = f"https://www.google.com/search?q={search_key}&start={start}"
  35. print(f"search url: {url}")
  36. sav_search_key_dir = OUTPUT_DIR / search_key
  37. save_to_html_path = sav_search_key_dir / f"{start}.html"
  38. if not cache or not save_to_html_path.exists():
  39. page.get(url)
  40. print(f"save to {save_to_html_path}")
  41. save_to_file(page.html,save_to_html_path)
  42. html_content = page.html
  43. else:
  44. with open(save_to_html_path, 'r', encoding='utf-8') as f:
  45. html_content = f.read()
  46. if is_search_result_empty(html_content):
  47. save_path = page.get_screenshot(sav_search_key_dir / f"{start}.png")
  48. print(f"没有找到多余的页面 {url},退出")
  49. print(f"screenshot saved to {save_path}")
  50. else:
  51. file_html = f"file://{save_to_html_path}"
  52. result:CrawlResult = await aprocess_html(file_html)
  53. # result = await aprocess_html(url)
  54. # print(f"result.cleaned_html \n{result.cleaned_html}")
  55. # print(f"result.links: {len(result.links)}\n {result.links}")
  56. linkes = filter_links(result.links)
  57. # print(f"linkes: {len(linkes)}\n {linkes}")
  58. async def main():
  59. await search_all()
  60. if __name__ == "__main__":
  61. asyncio.run(main())