search_keyward.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import asyncio
  2. from crawl4ai import *
  3. from pathlib import Path
  4. import json
  5. from lxml import html # 使用 lxml.html 模块
  6. from sqlmodel import Session, select
  7. from mylib.base import (replace_space, save_to_file, save_all_result,
  8. OUTPUT_DIR,save_to_pickle,ensure_output_dir,
  9. save_base64_to_file,browser_config)
  10. from mylib.drission_page import load_chrome_from_ini
  11. page = load_chrome_from_ini()
  12. async def google_search(url:str, config=None)->CrawlResult:
  13. async with AsyncWebCrawler(config=browser_config) as crawler:
  14. result = await crawler.arun(
  15. url=url,
  16. cache_mode=CacheMode.ENABLED,
  17. user_agent='random',
  18. )
  19. # save_to_pickle(result, OUTPUT_DIR / f"{search_key}.pickle")
  20. return result
  21. def filter_links(links):
  22. '''
  23. input: {
  24. 'internal': [{}],
  25. 'external': [
  26. {
  27. "href": "xx",
  28. "text": "xxm",
  29. "title": "",
  30. "base_domain": "benlcollins.com"
  31. }
  32. ],
  33. }
  34. '''
  35. external_links = links["external"]
  36. filtered_links = [link for link in external_links if "google" not in link["base_domain"]]
  37. return filtered_links
  38. def is_search_result_empty(html_content: str) -> bool:
  39. '''
  40. 检查页面是否存在 id="search" 的元素
  41. 并检查其是否有子元素
  42. 如果没有子元素则返回 True 表示搜索结果为空
  43. '''
  44. tree = html.fromstring(html_content)
  45. search_elements = tree.xpath('//*[@id="search"]/*')
  46. return len(search_elements) == 0
  47. def is_search_result_links_empty(result: CrawlResult) -> bool:
  48. print
  49. async def get_keywords_from_db():
  50. """从数据库获取所有关键词"""
  51. from database.sql_model import Keyword, Session, engine
  52. with Session(engine) as session:
  53. statement = select(Keyword).where(Keyword.done == False)
  54. keywords = session.exec(statement).all()
  55. return [keyword.key_word for keyword in keywords]
  56. def is_already_processed(keyword: str) -> bool:
  57. """检查关键词是否已处理"""
  58. save_dir = OUTPUT_DIR / replace_space(keyword) / 'pkl'
  59. return save_dir.exists() and any(save_dir.glob("*.pickle"))
  60. async def process_keyword(keyword: str, start=0, pages_num=250, cache=True, skip_exist=True):
  61. global page
  62. """处理单个关键词"""
  63. keyword = replace_space(keyword)
  64. save_dir = OUTPUT_DIR / keyword
  65. ensure_output_dir(save_dir)
  66. # # 如果已经处理过,直接返回保存目录
  67. # if is_already_processed(keyword):
  68. # print(f"关键词 {keyword} 已处理,跳过搜索")
  69. # return save_dir
  70. # 未处理过,执行搜索
  71. for i in range(start, pages_num, 10):
  72. save_html_path = OUTPUT_DIR / keyword / f"{i}.html"
  73. url = f"https://www.google.com/search?q={keyword}&start={i}"
  74. print(f"search url: {url}")
  75. # 如果缓存文件存在,直接读取
  76. if skip_exist and save_html_path.exists():
  77. print(f"跳过缓存文件 {save_html_path}")
  78. continue
  79. print(f"读取缓存文件 {save_html_path}")
  80. else:
  81. page.get(url)
  82. save_to_file(page.html,save_html_path)
  83. # result: CrawlResult = await google_search(url)
  84. # 保存 HTML 文件
  85. # save_to_file(result.html, save_html_path)
  86. print(f"保存 HTML 文件 {save_html_path}")
  87. url = f"file://{save_html_path}"
  88. result: CrawlResult = await google_search(url)
  89. # 漂亮打印 result.links
  90. # print(json.dumps(result.links, indent=4))
  91. save_json_path = save_to_file(json.dumps(result.links, indent=4), save_dir / f"links-{i}.json")
  92. print(f"保存 links 文件 {save_json_path}")
  93. # if is_search_result_empty(result.html):
  94. search_res_links = filter_links(result.links)
  95. if not search_res_links:
  96. print(f"没有找到多余的页面 {result.url},退出")
  97. break
  98. # links = filter_links(result.links)
  99. # print(f"start: {i}, links: {links} \n len: {len(links)}")
  100. # save_to_pickle(result, save_dir / f"result-{i}.pickle")
  101. return save_dir
  102. async def search_all():
  103. """处理所有未完成的关键词"""
  104. keywords = await get_keywords_from_db()
  105. for keyword in keywords:
  106. # if is_already_processed(keyword):
  107. # print(f"关键词 {keyword} 已处理,跳过")
  108. # continue
  109. await process_keyword(keyword)
  110. async def test_single_search():
  111. await process_keyword("Acalypha malabarica essential oil", start=0, pages_num=250)
  112. # result = await google_search("Acalypha malabarica essential oil", start=50)
  113. # print(f"result clean html:\n {result.cleaned_html}")
  114. # print(f"result.links\n {result.links['external']}")
  115. # res = filter_links(result.links)
  116. # print(res)
  117. # 漂亮打印
  118. # print(json.dumps(res, indent=4))
  119. async def main():
  120. await search_all()
  121. # await test_single_search()
  122. if __name__ == "__main__":
  123. asyncio.run(main())