| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import asyncio
- from crawl4ai import *
- from pathlib import Path
- import json
- from lxml import html # 使用 lxml.html 模块
- from sqlmodel import Session, select
- from mylib.base import replace_space, save_to_file, save_all_result,OUTPUT_DIR,save_to_pickle,ensure_output_dir,save_base64_to_file,browser_config
- from mylib.drission_page import load_chrome_from_ini
- async def google_search(search_key:str, start:int=0, config=None)->CrawlResult:
- async with AsyncWebCrawler(config=browser_config) as crawler:
- url = f"https://www.google.com/search?q={search_key}&start={start}"
- print(f"search url: {url}")
- result = await crawler.arun(
- url=url,
- cache_mode=CacheMode.DISABLED,
- config=config,
- )
- # save_to_pickle(result, OUTPUT_DIR / f"{search_key}.pickle")
- return result
- def filter_links(links):
- '''
- input: {
- 'internal': [{}],
- 'external': [
- {
- "href": "xx",
- "text": "xxm",
- "title": "",
- "base_domain": "benlcollins.com"
- }
- ],
- }
- '''
- external_links = links["external"]
- filtered_links = [link for link in external_links if "google" not in link["base_domain"]]
- return filtered_links
- def is_search_result_empty(html_content:str):
- ''' 当页面为空时,会存在 <div class="card-section"> 元素 '''
- tree = html.fromstring(html_content)
- # 使用 XPath 查找所有 class 包含 "card-section" 的 div 元素
- card_sections = tree.xpath("//div[contains(@class, 'card-section')]")
- print(f"card_sections {card_sections}")
- return len(card_sections) != 0
-
- async def get_keywords_from_db():
- """从数据库获取所有关键词"""
- from database.sql_model import Keyword, Session, engine
- with Session(engine) as session:
- statement = select(Keyword).where(Keyword.done == False)
- keywords = session.exec(statement).all()
- return [keyword.key_word for keyword in keywords]
- def is_already_processed(keyword: str) -> bool:
- """检查关键词是否已处理"""
- save_dir = OUTPUT_DIR / replace_space(keyword) / 'pkl'
- return save_dir.exists() and any(save_dir.glob("*.pickle"))
- async def process_keyword(keyword: str, start=0, pages_num=250):
- """处理单个关键词"""
- keyword = replace_space(keyword)
- save_dir = OUTPUT_DIR / keyword / 'pkl'
- ensure_output_dir(save_dir)
-
- # 如果已经处理过,直接返回保存目录
- if is_already_processed(keyword):
- print(f"关键词 {keyword} 已处理,跳过搜索")
- return save_dir
-
- # 未处理过,执行搜索
- for i in range(start, pages_num, 10):
- result: CrawlResult = await google_search(keyword, i)
- if is_search_result_empty(result.html):
- print(f"没有找到多余的页面 {result.url},退出")
- break
-
- links = filter_links(result.links)
- print(f"start: {i}, links: {links} \n len: {len(links)}")
- save_to_pickle(result, save_dir / f"result-{i}.pickle")
- return save_dir
- async def search_all():
- """处理所有未完成的关键词"""
- keywords = await get_keywords_from_db()
- for keyword in keywords:
- if is_already_processed(keyword):
- print(f"关键词 {keyword} 已处理,跳过")
- continue
- await process_keyword(keyword)
- async def test_single_search():
- result = await google_search("Acalypha malabarica essential oil", start=50)
- print(f"result clean html:\n {result.cleaned_html}")
- print(f"result.links\n {result.links['external']}")
- # res = filter_links(result.links)
- # print(res)
- # 漂亮打印
- # print(json.dumps(res, indent=4))
- async def main():
- await search_all()
- # await test_single_search()
-
- if __name__ == "__main__":
- asyncio.run(main())
|