import asyncio import re from crawl4ai import * from pathlib import Path import json from lxml import html # 使用 lxml.html 模块 from sqlmodel import Session, select from mylib.base import (replace_space, save_to_file, save_all_result, save_to_pickle,ensure_output_dir, save_base64_to_file,browser_config) from mylib.drission_page import load_chrome_from_ini from database.excel_import import ExcelDatabaseManager,KeywordModel from config.settings import GOOGLE_SEARCH_DIR from mylib.crawl_lib_func import filter_links,filter_local_domain page = load_chrome_from_ini() async def google_search(url:str, config=None)->CrawlResult: run_config = CrawlerRunConfig( magic=True, simulate_user=True, override_navigator=True ) async with AsyncWebCrawler(config=browser_config) as crawler: result = await crawler.arun( url=url, cache_mode=CacheMode.ENABLED, user_agent='random', config=run_config, ) # save_to_pickle(result, GOOGLE_SEARCH_DIR / f"{search_key}.pickle") return result def is_search_result_empty(html_content: str) -> bool: ''' 检查页面是否存在 id="search" 的元素 并检查其是否有子元素 如果没有子元素则返回 True 表示搜索结果为空 ''' tree = html.fromstring(html_content) search_elements = tree.xpath('//*[@id="search"]/*') return len(search_elements) == 0 def is_already_processed(keyword: str) -> bool: """检查关键词是否已处理""" save_dir = GOOGLE_SEARCH_DIR / replace_space(keyword) / 'pkl' return save_dir.exists() and any(save_dir.glob("*.pickle")) async def process_keyword(keyword: str, start=0, pages_num=250, cache=True, skip_exist=True): global page """处理单个关键词""" # keyword = replace_space(keyword) save_dir = GOOGLE_SEARCH_DIR / keyword ensure_output_dir(save_dir) # # 如果已经处理过,直接返回保存目录 # if is_already_processed(keyword): # print(f"关键词 {keyword} 已处理,跳过搜索") # return save_dir # 未处理过,执行搜索 for i in range(start, pages_num, 10): save_html_path = GOOGLE_SEARCH_DIR / keyword / f"{i}.html" url = f"https://www.google.com/search?q={keyword}&start={i}" print(f"search url: {url}") # 如果缓存文件存在,直接读取 # if skip_exist and save_html_path.exists(): # print(f"跳过缓存文件 {save_html_path}") # continue # print(f"读取缓存文件 {save_html_path}") # else: # page.get(url) # save_to_file(page.html,save_html_path) # # result: CrawlResult = await google_search(url) # # 保存 HTML 文件 # # save_to_file(result.html, save_html_path) # print(f"保存 HTML 文件 {save_html_path}") # url = f"file://{save_html_path}" result: CrawlResult = await google_search(url) # 漂亮打印 result.links # print(json.dumps(result.links, indent=4)) save_json_path = save_to_file(json.dumps(result.links, indent=4), save_dir / f"links-{i}.json") print(f"保存 links 文件 {save_json_path}") # if is_search_result_empty(result.html): search_res_links = filter_links(result.links) if not search_res_links: print(f"没有找到多余的页面 {result.url},退出") break # links = filter_links(result.links) # print(f"start: {i}, links: {links} \n len: {len(links)}") # save_to_pickle(result, save_dir / f"result-{i}.pickle") return save_dir async def search_all(): excel_db_manager = ExcelDatabaseManager() """处理所有未完成的关键词""" key_model_list = excel_db_manager.get_keywords_by_status() for keyword_model in key_model_list: # if is_already_processed(keyword): # print(f"关键词 {keyword} 已处理,跳过") # continue await process_keyword(keyword_model.key_word) async def test_single_search(): # await process_keyword("Acalypha malabarica essential oil", start=0, pages_num=250) # save_html_path = Path(r'K:\code\upwork\zhang_crawl_bio\output\debug\正常的搜索结果.html') # save_html_path = Path(r'K:\code\upwork\zhang_crawl_bio\output\debug\查询不到内容.html') # save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\流量异常.html") # save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\最后一页.html") save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\debug\只有一页.html") save_dir = save_html_path.parent file_name = save_html_path.name url = f"file://{save_html_path}" result: CrawlResult = await google_search(url) # 漂亮打印 result.links # print(json.dumps(result.links, indent=4)) save_json_path = save_to_file(json.dumps(result.links, indent=4), save_dir / f"links-{file_name}.json") print(f"保存 result.links 文件 {save_json_path}") links = filter_links(result.links) # print('\n -----------------links:') # print(json.dumps(links, indent=4)) save_json_path = save_to_file(json.dumps(links, indent=4), save_dir / f"links-{file_name}-filter.json") links_not_local = filter_local_domain(links) print(f"保存 links_not_local.json 文件 {save_json_path}") # print('\n -----------------links_not_local:') # print(json.dumps(links_not_local, indent=4)) # print(f"len links_not_local: {len(links_not_local)}") save_links_not_local_path = save_to_file(json.dumps(links_not_local, indent=4), save_dir / f"links-{file_name}-filter-not-local.json") print(f"保存 links-{file_name}-filter-not-local.json 文件 {save_links_not_local_path}") # print(f"start: {i}, links: {links} \n len: {len(links)}") # result = await google_search("Acalypha malabarica essential oil", start=50) # print(f"result clean html:\n {result.cleaned_html}") # print(f"result.links\n {result.links['external']}") # res = filter_links(result.links) # print(res) # 漂亮打印 # print(json.dumps(res, indent=4)) async def test_html_to_doc(): save_dir = Path(r"K:\code\upwork\zhang_crawl_bio\output\google_search") # 获取所有文件夹 folders = [f for f in save_dir.iterdir() if f.is_dir()] print(folders) for folder in folders: # 获取文件夹下的所有 html 文件 html_files = [f for f in folder.iterdir() if f.suffix == '.html'] async def test_sigle_html_links(save_html_path=None): if not save_html_path: save_html_path = Path(r"K:\code\upwork\zhang_crawl_bio\output\Acalypha malabarica essential oil\0.html") url = f"file://{save_html_path}" result: CrawlResult = await google_search(url) not_google_external_links = filter_links(result.links) links_not_local = filter_local_domain(not_google_external_links) # print(links_not_local) return links_not_local async def test_dir_links_not_local(dir_path:Path): ''' 获取所有目录中的 html 文件,解析得到过滤后的域名列表, base_domain 不包含本地路径 ''' html_files = [f for f in dir_path.iterdir() if f.suffix == '.html'] all_links = [] for html_file in html_files: print(f"Processing {html_file}") links = await test_sigle_html_links(html_file) print(f"Found {len(links)} links in {html_file}") all_links.extend(links) print(f"Found {len(all_links)} links in total") # 按行写入到文件中 save_to_file(all_links, dir_path / "links.json.txt") return all_links async def main(): # await search_all() # await test_single_search() await test_dir_links_not_local(Path(r"K:\code\upwork\zhang_crawl_bio\output\Acalypha malabarica essential oil")) if __name__ == "__main__": asyncio.run(main())