search_keyward.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import asyncio
  2. from crawl4ai import *
  3. from pathlib import Path
  4. import json
  5. from lxml import html # 使用 lxml.html 模块
  6. from sqlmodel import Session, select
  7. from mylib.base import replace_space, save_to_file, save_all_result,OUTPUT_DIR,save_to_pickle,ensure_output_dir,save_base64_to_file,browser_config
  8. from mylib.drission_page import load_chrome_from_ini
  9. async def google_search(search_key:str, start:int=0, config=None)->CrawlResult:
  10. async with AsyncWebCrawler(config=browser_config) as crawler:
  11. url = f"https://www.google.com/search?q={search_key}&start={start}"
  12. print(f"search url: {url}")
  13. result = await crawler.arun(
  14. url=url,
  15. cache_mode=CacheMode.DISABLED,
  16. config=config,
  17. )
  18. # save_to_pickle(result, OUTPUT_DIR / f"{search_key}.pickle")
  19. return result
  20. def filter_links(links):
  21. '''
  22. input: {
  23. 'internal': [{}],
  24. 'external': [
  25. {
  26. "href": "xx",
  27. "text": "xxm",
  28. "title": "",
  29. "base_domain": "benlcollins.com"
  30. }
  31. ],
  32. }
  33. '''
  34. external_links = links["external"]
  35. filtered_links = [link for link in external_links if "google" not in link["base_domain"]]
  36. return filtered_links
  37. def is_search_result_empty(html_content:str):
  38. ''' 当页面为空时,会存在 <div class="card-section"> 元素 '''
  39. tree = html.fromstring(html_content)
  40. # 使用 XPath 查找所有 class 包含 "card-section" 的 div 元素
  41. card_sections = tree.xpath("//div[contains(@class, 'card-section')]")
  42. print(f"card_sections {card_sections}")
  43. return len(card_sections) != 0
  44. async def get_keywords_from_db():
  45. """从数据库获取所有关键词"""
  46. from database.sql_model import Keyword, Session, engine
  47. with Session(engine) as session:
  48. statement = select(Keyword).where(Keyword.done == False)
  49. keywords = session.exec(statement).all()
  50. return [keyword.key_word for keyword in keywords]
  51. def is_already_processed(keyword: str) -> bool:
  52. """检查关键词是否已处理"""
  53. save_dir = OUTPUT_DIR / replace_space(keyword) / 'pkl'
  54. return save_dir.exists() and any(save_dir.glob("*.pickle"))
  55. async def process_keyword(keyword: str, start=0, pages_num=250):
  56. """处理单个关键词"""
  57. keyword = replace_space(keyword)
  58. save_dir = OUTPUT_DIR / keyword / 'pkl'
  59. ensure_output_dir(save_dir)
  60. # 如果已经处理过,直接返回保存目录
  61. if is_already_processed(keyword):
  62. print(f"关键词 {keyword} 已处理,跳过搜索")
  63. return save_dir
  64. # 未处理过,执行搜索
  65. for i in range(start, pages_num, 10):
  66. result: CrawlResult = await google_search(keyword, i)
  67. if is_search_result_empty(result.html):
  68. print(f"没有找到多余的页面 {result.url},退出")
  69. break
  70. links = filter_links(result.links)
  71. print(f"start: {i}, links: {links} \n len: {len(links)}")
  72. save_to_pickle(result, save_dir / f"result-{i}.pickle")
  73. return save_dir
  74. async def search_all():
  75. """处理所有未完成的关键词"""
  76. keywords = await get_keywords_from_db()
  77. for keyword in keywords:
  78. if is_already_processed(keyword):
  79. print(f"关键词 {keyword} 已处理,跳过")
  80. continue
  81. await process_keyword(keyword)
  82. async def test_single_search():
  83. result = await google_search("Acalypha malabarica essential oil", start=50)
  84. print(f"result clean html:\n {result.cleaned_html}")
  85. print(f"result.links\n {result.links['external']}")
  86. # res = filter_links(result.links)
  87. # print(res)
  88. # 漂亮打印
  89. # print(json.dumps(res, indent=4))
  90. async def main():
  91. await search_all()
  92. # await test_single_search()
  93. if __name__ == "__main__":
  94. asyncio.run(main())