crawl_tasks.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import asyncio
  2. import sys
  3. from worker.celery.app import app
  4. from worker.crawl_pages.crawl_urls import URLCrawler
  5. from mylib.logu import get_logger
  6. from config.settings import PROXIES
  7. from crawl4ai import BrowserConfig
  8. logger = get_logger('crawl_worker')
  9. @app.task(name='crawl_worker.crawl_keyword_urls')
  10. def crawl_keyword_urls_task(keyword: str, browser_config: dict = None, overwrite: bool = False, proxy_pool:list[str]=None):
  11. """Celery task to crawl all URLs for a specific keyword"""
  12. if sys.platform == 'win32':
  13. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  14. PROXIES = proxy_pool if proxy_pool else None
  15. async def _execute_crawl():
  16. try:
  17. # Convert dict config to BrowserConfig if provided
  18. config = BrowserConfig(**browser_config) if browser_config else None
  19. logger.info(f"Starting URL crawl for keyword: {keyword}")
  20. crawler = URLCrawler()
  21. config.proxy = await crawler.get_random_proxy()
  22. logger.info(f"BrowserConfig config.proxy: {config.proxy}")
  23. await crawler.crawl_keyword_urls(keyword, config, overwrite)
  24. logger.info(f"{'*' * 20} URL crawl completed for keyword: {keyword}")
  25. return {"keyword": keyword, "status": "completed"}
  26. except Exception as e:
  27. logger.exception(f"URL crawl task failed for keyword: {keyword}: {str(e)}")
  28. raise
  29. loop = asyncio.new_event_loop()
  30. asyncio.set_event_loop(loop)
  31. try:
  32. return loop.run_until_complete(_execute_crawl())
  33. finally:
  34. loop.close()
  35. asyncio.set_event_loop_policy(None)
  36. @app.task(name='crawl_worker.crawl_page_urls')
  37. def crawl_page_urls_task(page_id: int, browser_config: dict = None, overwrite: bool = False, proxy_pool:list[str]=None):
  38. """Celery task to crawl all URLs for a specific search page"""
  39. if sys.platform == 'win32':
  40. asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
  41. PROXIES = proxy_pool if proxy_pool else None
  42. async def _execute_crawl():
  43. try:
  44. # Convert dict config to BrowserConfig if provided
  45. config = BrowserConfig(**browser_config) if browser_config else None
  46. logger.info(f"Starting URL crawl for page ID: {page_id}")
  47. crawler = URLCrawler()
  48. config.proxy = await crawler.get_random_proxy()
  49. logger.info(f"BrowserConfig config.proxy: {config.proxy}")
  50. await crawler.crawl_page_urls(page_id, config, overwrite)
  51. logger.info(f"{'*' * 20} URL crawl completed for page ID: {page_id}")
  52. return {"page_id": page_id, "status": "completed"}
  53. except Exception as e:
  54. logger.error(f"URL crawl task failed for page ID {page_id}: {str(e)}")
  55. raise
  56. loop = asyncio.new_event_loop()
  57. asyncio.set_event_loop(loop)
  58. try:
  59. return loop.run_until_complete(_execute_crawl())
  60. finally:
  61. loop.close()
  62. asyncio.set_event_loop_policy(None)
  63. def main():
  64. # Example usage
  65. pass
  66. if __name__ == "__main__":
  67. main()