| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- import asyncio
- import pickle
- from pathlib import Path
- import random
- from typing import List
- import httpx
- import ssl
- from sqlmodel import select, Session
- from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
- from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
- from mylib.base import ensure_output_dir, save_to_file,load_from_pickle
- from mylib.logu import logger
- from utils.proxy_pool import get_random_proxy
- class URLCrawler:
- def __init__(self, max_concurrent: int = 3):
- self.max_concurrent = max_concurrent
- self.db_manager = SearchResultManager()
-
- # Create custom SSL context that allows self-signed certificates
- self.ssl_context = ssl.create_default_context()
- self.ssl_context.check_hostname = False
- self.ssl_context.verify_mode = ssl.CERT_NONE
- async def get_random_proxy(self, proxy_pool_url: str=None) -> str:
-
- # return get_random_proxy(proxy_pool_url)
- return await asyncio.to_thread(get_random_proxy,proxy_pool_url)
- url = f"{PROXY_POOL_BASE_URL}/get"
- async with httpx.AsyncClient() as client:
- response = await client.get(url, timeout=30)
- response.raise_for_status()
- results = response.json()
- port = results["port"]
- return f'http://127.0.0.1:{port}'
-
- async def download_pdf(self, url: str, output_path: Path):
- """Download PDF file from URL"""
- async with httpx.AsyncClient(verify=self.ssl_context) as client:
- response = await client.get(url)
- if response.status_code == 200:
- content_type = response.headers.get('content-type', '').lower()
- if 'pdf' in content_type:
- with open(output_path, 'wb') as f:
- f.write(response.content)
- return True
- return False
- async def crawl_url(self, url: str, item_id: int, output_dir: Path, browser_config: BrowserConfig = None, overwrite: bool = False) -> CrawlResult:
- """Crawl a single URL and save results with item_id as filename"""
- # Check if we should skip this URL
- with Session(self.db_manager.engine) as session:
- item = session.exec(
- select(SearchResultItem)
- .where(SearchResultItem.id == item_id)
- ).first()
-
- if item and item.html_path and not overwrite:
- logger.info(f"Skipping {url} (item_id: {item_id}) - already has html_path: {item.html_path}")
- return {"search_result_model": item, "crawl_result": None, 'message': 'already has html_path'}
-
- if not browser_config:
- browser_config = BrowserConfig(
- headless=True,
- verbose=False,
- extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
- )
-
-
- logger.info(f"Crawling id: {item_id} URL: {url}")
- content_type = ''
- # First try to detect if it's a PDF by checking headers
- async with httpx.AsyncClient(verify=self.ssl_context) as client:
- try:
- pdf_path = ''
- if url.endswith('.pdf'):
- pdf_path = output_dir / f"{item_id}.pdf"
- else:
- response = await client.head(url)
- content_type = response.headers.get('content-type', '').lower()
- if 'pdf' in content_type:
- pdf_path = output_dir / f"{item_id}.pdf"
- logger.info(f"crwal id {item_id} content_type {content_type} {pdf_path}")
- if pdf_path:
- if await self.download_pdf(url, pdf_path):
- # Update database with PDF path
- with Session(self.db_manager.engine) as session:
- item = session.exec(
- select(SearchResultItem)
- .where(SearchResultItem.id == item_id)
- ).first()
- if item:
- item.html_path = str(pdf_path)
- session.add(item)
- session.commit()
- return {"search_result_model": item, "crawl_result": None, 'message': response.headers.get('content-type')}
- except Exception as e:
- logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}")
- # return {"search_result_model": None, "crawl_result": None, 'message': str(e)}
- # if 'html' not in content_type:
- # logger.info(f"Skipping {url} (item_id: {item_id}) - not html, conent_type {content_type}")
- # return {"search_result_model": None, "crawl_result": None,'message': f'not html, content_type {content_type}'}
- logger.info(f"crawler.arun start {item_id} content-type: {content_type}, {url} ")
- logger.info(f"browser_config use_managed_browser {browser_config.use_managed_browser} , cdp_url: {browser_config.cdp_url}, headless: {browser_config.headless}")
- # If not PDF or header check failed, try regular crawl
- crawler = AsyncWebCrawler(config=browser_config)
- await crawler.start()
-
- try:
- crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
- result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
- logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
- # If crawl failed but URL contains 'download', try PDF again
- if not result.success and 'download' in url.lower():
- pdf_path = output_dir / f"{item_id}.pdf"
- if await self.download_pdf(url, pdf_path):
- # Update database with PDF path
- with Session(self.db_manager.engine) as session:
- item = session.exec(
- select(SearchResultItem)
- .where(SearchResultItem.id == item_id)
- ).first()
- if item:
- item.html_path = str(pdf_path)
- session.add(item)
- session.commit()
- session.refresh(item)
- return {"search_result_model": item, "crawl_result": result}
-
- # Save results
- ensure_output_dir(output_dir)
-
- # Save pickle
- pickle_path = output_dir / f"{item_id}.pkl"
- with open(pickle_path, "wb") as f:
- pickle.dump(result, f)
-
- # Save HTML and Markdown if available
- html_path = None
- if result.html:
- html_path = output_dir / f"{item_id}.html"
- save_to_file(result.html, html_path)
-
- if result.markdown:
- md_path = output_dir / f"{item_id}.md"
- save_to_file(result.markdown, md_path)
-
- # Update database with HTML path
- if html_path:
- with Session(self.db_manager.engine) as session:
- item = session.exec(
- select(SearchResultItem)
- .where(SearchResultItem.id == item_id)
- ).first()
- if item:
- item.html_path = str(html_path)
- session.add(item)
- session.commit()
- session.refresh(item)
- logger.info(f"{item_id} crawler.arun result.success: {item}")
- return {"search_result_model": item, "crawl_result": result}
- except Exception as e:
- logger.error(f"Failed to crawl id: {item_id} , {url} {str(e)}")
- return {"search_result_model": item, "crawl_result": None, 'message': str(e)}
- finally:
- await crawler.close()
- async def crawl_page_urls(self, page_id: SearchPageResult, browser_config: BrowserConfig = None, overwrite: bool = False, dry_run: bool = False):
- """Crawl all URLs for a specific search page"""
- with Session(self.db_manager.engine) as session:
- # Get page info
- page = session.exec(
- select(SearchPageResult)
- .where(SearchPageResult.id == page_id)
- ).first()
-
- if not page:
- raise ValueError(f"Page with id {page_id} not found")
- logger.info(f"提取搜索结果页: {page.html_path} ")
- logger.info(f"提取到页面数量: {page.results_count}")
- # Create output directory
- html_dir = Path(page.html_path).parent if page.html_path else Path("output")
- urls_dir = html_dir / "crawled_urls"
- ensure_output_dir(urls_dir)
-
- # Crawl URLs in parallel
- tasks = []
- for item in page.items:
- url = item.url
- item_id = item.id
- if dry_run:
- logger.info(f"crawl_url {item_id} {url}")
- async def dry_run_task():
- dry_run_ret = {"search_result_model": '', "crawl_result": None,'message': 'dry_run'}
- return dry_run_ret
-
- task = asyncio.create_task(dry_run_task())
- else:
- task = self.crawl_url(url, item_id, urls_dir, browser_config, overwrite)
- tasks.append(task)
-
- return urls_dir, await asyncio.gather(*tasks, return_exceptions=True)
- async def crawl_keyword_urls(self, keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False):
- """Crawl all URLs for a specific keyword"""
- with Session(self.db_manager.engine) as session:
- # Get keyword task
- task = session.exec(
- select(KeywordTask)
- .where(KeywordTask.keyword == keyword)
- .where(KeywordTask.is_completed == True)
- ).first()
-
- if not task:
- raise ValueError(f"Completed keyword {keyword} not found")
-
- # Crawl URLs for each page
- for page in task.pages:
- await self.crawl_page_urls(page.id, browser_config, overwrite)
- async def main():
- crawler = URLCrawler()
-
- # Example usage:
- # Crawl URLs for a specific keyword
- # await crawler.crawl_keyword_urls("example keyword", overwrite=True)
-
- # Or crawl URLs for a specific page
- await crawler.crawl_page_urls(594, overwrite=False)
- if __name__ == "__main__":
- asyncio.run(main())
|