| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import asyncio
- import sys
- from worker.celery.app import app
- from worker.crawl_pages.crawl_urls import URLCrawler
- from mylib.logu import get_logger
- from config.settings import PROXIES
- from crawl4ai import BrowserConfig
- logger = get_logger('crawl_worker')
- @app.task(name='crawl_worker.crawl_keyword_urls')
- def crawl_keyword_urls_task(keyword: str, browser_config: dict = None, overwrite: bool = False, proxy_pool:list[str]=None):
- """Celery task to crawl all URLs for a specific keyword"""
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- PROXIES = proxy_pool if proxy_pool else None
- async def _execute_crawl():
- try:
- # Convert dict config to BrowserConfig if provided
- config = BrowserConfig(**browser_config) if browser_config else None
-
- logger.info(f"Starting URL crawl for keyword: {keyword}")
- crawler = URLCrawler()
- config.proxy = await crawler.get_random_proxy()
- logger.info(f"BrowserConfig config.proxy: {config.proxy}")
- await crawler.crawl_keyword_urls(keyword, config, overwrite)
- logger.info(f"{'*' * 20} URL crawl completed for keyword: {keyword}")
- return {"keyword": keyword, "status": "completed"}
- except Exception as e:
- logger.exception(f"URL crawl task failed for keyword: {keyword}: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_crawl())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- @app.task(name='crawl_worker.crawl_page_urls')
- def crawl_page_urls_task(page_id: int, browser_config: dict = None, overwrite: bool = False, proxy_pool:list[str]=None):
- """Celery task to crawl all URLs for a specific search page"""
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- PROXIES = proxy_pool if proxy_pool else None
-
- async def _execute_crawl():
- try:
- # Convert dict config to BrowserConfig if provided
- config = BrowserConfig(**browser_config) if browser_config else None
-
- logger.info(f"Starting URL crawl for page ID: {page_id}")
- crawler = URLCrawler()
- config.proxy = await crawler.get_random_proxy()
- logger.info(f"BrowserConfig config.proxy: {config.proxy}")
- await crawler.crawl_page_urls(page_id, config, overwrite)
- logger.info(f"{'*' * 20} URL crawl completed for page ID: {page_id}")
- return {"page_id": page_id, "status": "completed"}
- except Exception as e:
- logger.error(f"URL crawl task failed for page ID {page_id}: {str(e)}")
- raise
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- return loop.run_until_complete(_execute_crawl())
- finally:
- loop.close()
- asyncio.set_event_loop_policy(None)
- def main():
- # Example usage
- pass
- if __name__ == "__main__":
- main()
|