| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- import asyncio
- import re
- from crawl4ai import *
- from pathlib import Path
- import json
- from lxml import html # 使用 lxml.html 模块
- from sqlmodel import Session, select
- from mylib.base import (replace_space, save_to_file, save_all_result,
- save_to_pickle,ensure_output_dir,
- save_base64_to_file,browser_config)
- from mylib.drission_page import load_chrome_from_ini
- from database.excel_import import ExcelDatabaseManager,KeywordModel
- from config.settings import GOOGLE_SEARCH_DIR
- from mylib.crawl_lib_func import filter_links,filter_local_domain
- page = load_chrome_from_ini()
- async def google_search(url:str, config=None)->CrawlResult:
- run_config = CrawlerRunConfig(
- magic=True,
- simulate_user=True,
- override_navigator=True
- )
- async with AsyncWebCrawler(config=browser_config) as crawler:
- result = await crawler.arun(
- url=url,
- cache_mode=CacheMode.ENABLED,
- user_agent='random',
- config=run_config,
- )
- # save_to_pickle(result, GOOGLE_SEARCH_DIR / f"{search_key}.pickle")
- return result
-
- def is_search_result_empty(html_content: str) -> bool:
- '''
- 检查页面是否存在 id="search" 的元素
- 并检查其是否有子元素
- 如果没有子元素则返回 True 表示搜索结果为空
- '''
- tree = html.fromstring(html_content)
- search_elements = tree.xpath('//*[@id="search"]/*')
- return len(search_elements) == 0
- def is_already_processed(keyword: str) -> bool:
- """检查关键词是否已处理"""
- save_dir = GOOGLE_SEARCH_DIR / replace_space(keyword) / 'pkl'
- return save_dir.exists() and any(save_dir.glob("*.pickle"))
- async def process_keyword(keyword: str, start=0, pages_num=250, cache=True, skip_exist=True):
- global page
- """处理单个关键词"""
- # keyword = replace_space(keyword)
- save_dir = GOOGLE_SEARCH_DIR / keyword
- ensure_output_dir(save_dir)
-
- # # 如果已经处理过,直接返回保存目录
- # if is_already_processed(keyword):
- # print(f"关键词 {keyword} 已处理,跳过搜索")
- # return save_dir
-
- # 未处理过,执行搜索
- for i in range(start, pages_num, 10):
- save_html_path = GOOGLE_SEARCH_DIR / keyword / f"{i}.html"
- url = f"https://www.google.com/search?q={keyword}&start={i}"
- print(f"search url: {url}")
-
- # 如果缓存文件存在,直接读取
- # if skip_exist and save_html_path.exists():
- # print(f"跳过缓存文件 {save_html_path}")
- # continue
- # print(f"读取缓存文件 {save_html_path}")
- # else:
- # page.get(url)
- # save_to_file(page.html,save_html_path)
- # # result: CrawlResult = await google_search(url)
- # # 保存 HTML 文件
- # # save_to_file(result.html, save_html_path)
- # print(f"保存 HTML 文件 {save_html_path}")
- # url = f"file://{save_html_path}"
- result: CrawlResult = await google_search(url)
-
- # 漂亮打印 result.links
- # print(json.dumps(result.links, indent=4))
- save_json_path = save_to_file(json.dumps(result.links, indent=4), save_dir / f"links-{i}.json")
- print(f"保存 links 文件 {save_json_path}")
- # if is_search_result_empty(result.html):
- search_res_links = filter_links(result.links)
- if not search_res_links:
- print(f"没有找到多余的页面 {result.url},退出")
- break
-
- # links = filter_links(result.links)
- # print(f"start: {i}, links: {links} \n len: {len(links)}")
- # save_to_pickle(result, save_dir / f"result-{i}.pickle")
- return save_dir
- async def search_all():
- excel_db_manager = ExcelDatabaseManager()
- """处理所有未完成的关键词"""
- key_model_list = excel_db_manager.get_keywords_by_status()
- for keyword_model in key_model_list:
- # if is_already_processed(keyword):
- # print(f"关键词 {keyword} 已处理,跳过")
- # continue
- await process_keyword(keyword_model.key_word)
- async def test_single_search():
- # await process_keyword("Acalypha malabarica essential oil", start=0, pages_num=250)
- # save_html_path = Path(r'K:\code\upwork\zhang_crawl_bio\output\debug\正常的搜索结果.html')
- # save_html_path = Path(r'K:\code\upwork\zhang_crawl_bio\output\debug\查询不到内容.html')
- # save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\流量异常.html")
- # save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\最后一页.html")
- save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\只有一页.html")
- save_dir = save_html_path.parent
- file_name = save_html_path.name
- url = f"file://{save_html_path}"
- result: CrawlResult = await google_search(url)
- # 漂亮打印 result.links
- # print(json.dumps(result.links, indent=4))
- save_json_path = save_to_file(json.dumps(result.links, indent=4), save_dir / f"links-{file_name}.json")
- print(f"保存 result.links 文件 {save_json_path}")
- links = filter_links(result.links)
- # print('\n -----------------links:')
- # print(json.dumps(links, indent=4))
- save_json_path = save_to_file(json.dumps(links, indent=4), save_dir / f"links-{file_name}-filter.json")
- links_not_local = filter_local_domain(links)
- print(f"保存 links_not_local.json 文件 {save_json_path}")
- # print('\n -----------------links_not_local:')
- # print(json.dumps(links_not_local, indent=4))
- # print(f"len links_not_local: {len(links_not_local)}")
- save_links_not_local_path = save_to_file(json.dumps(links_not_local, indent=4), save_dir / f"links-{file_name}-filter-not-local.json")
- print(f"保存 links-{file_name}-filter-not-local.json 文件 {save_links_not_local_path}")
- # print(f"start: {i}, links: {links} \n len: {len(links)}")
- # result = await google_search("Acalypha malabarica essential oil", start=50)
- # print(f"result clean html:\n {result.cleaned_html}")
- # print(f"result.links\n {result.links['external']}")
- # res = filter_links(result.links)
- # print(res)
- # 漂亮打印
- # print(json.dumps(res, indent=4))
- async def test_html_to_doc():
- save_dir = Path(r"K:\code\upwork\zhang_crawl_bio\output\google_search")
- # 获取所有文件夹
- folders = [f for f in save_dir.iterdir() if f.is_dir()]
- print(folders)
- for folder in folders:
- # 获取文件夹下的所有 html 文件
- html_files = [f for f in folder.iterdir() if f.suffix == '.html']
- async def test_sigle_html_links(save_html_path=None):
- if not save_html_path:
- save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\Acalypha malabarica essential oil\0.html")
- url = f"file://{save_html_path}"
- result: CrawlResult = await google_search(url)
- not_google_external_links = filter_links(result.links)
- links_not_local = filter_local_domain(not_google_external_links)
- # print(links_not_local)
- return links_not_local
- async def test_dir_links_not_local(dir_path:Path):
- '''
- 获取所有目录中的 html 文件,解析得到过滤后的域名列表, base_domain 不包含本地路径
- '''
- html_files = [f for f in dir_path.iterdir() if f.suffix == '.html']
- all_links = []
- for html_file in html_files:
- print(f"Processing {html_file}")
- links = await test_sigle_html_links(html_file)
- print(f"Found {len(links)} links in {html_file}")
- all_links.extend(links)
- print(f"Found {len(all_links)} links in total")
- # 按行写入到文件中
- save_to_file(all_links, dir_path / "links.json.txt")
- return all_links
- async def main():
- # await search_all()
- # await test_single_search()
- await test_dir_links_not_local(Path(r"K:\code\upwork\zhang_crawl_bio\output\Acalypha malabarica essential oil"))
-
- if __name__ == "__main__":
- asyncio.run(main())
|