t_celery.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from celery import Celery
  2. from typing import List, Dict, Optional
  3. import asyncio
  4. from pathlib import Path
  5. import sys
  6. import time
  7. sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend')))
  8. from utils.process_mgr import ProcessManager
  9. from utils.logu import get_logger
  10. logger = get_logger('mytests', file=True)
  11. app = Celery('search_worker', broker='redis://127.0.0.1:6379/1')
  12. import redis
  13. redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
  14. def get_pending_task_count() -> int:
  15. """
  16. 获取当前待处理任务的数量
  17. """
  18. # 获取 Celery 使用的默认队列名称
  19. queue_name = app.conf.task_default_queue
  20. logger.info(f"{queue_name}")
  21. # 获取队列中的任务数量
  22. task_count = redis_client.llen(queue_name)
  23. logger.info(f"{task_count}")
  24. logger.info(f"{app.conf.task_routes}")
  25. return task_count
  26. def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None):
  27. """提交所有关键词任务"""
  28. for keyword in keywords:
  29. try:
  30. task_data = {
  31. 'keyword': keyword.strip(),
  32. 'max_result_items': 200,
  33. 'skip_existing': True,
  34. 'browser_config': browser_config or {}
  35. }
  36. result = app.send_task('search_worker.drission_search', kwargs=task_data, queue='search_queue')
  37. logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
  38. except Exception as e:
  39. logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
  40. def submit_all_tasks(browser_config: Optional[Dict] = None):
  41. """提交所有关键词任务
  42. def search_all_uncompleted_keywords_task(max_result_items: int = 200, skip_existing: bool = True, browser_config: dict = {}, proxy_pool: List[str] = None):
  43. """
  44. clear_specific_queue('search_queue')
  45. task_data = {
  46. 'max_result_items': 1,
  47. 'skip_existing': True,
  48. 'browser_config': browser_config or {},
  49. 'proxy_pool': None,
  50. 'dry_run': True
  51. }
  52. result = app.send_task('search_worker.search_all_uncompleted_keywords', kwargs=task_data, queue='search_queue')
  53. logger.info(f"任务已提交: (任务ID: {result.id})")
  54. def get_queue():
  55. keys = redis_client.keys('*')
  56. logger.info(f"{keys}")
  57. # 查看队列中的任务
  58. queue_tasks = redis_client.lrange('search_queue', 0, -1)
  59. logger.info(f"len search_queue {len(queue_tasks)}")
  60. queue_tasks = redis_client.lrange('crawl_queue', 0, -1)
  61. logger.info(f"len crawl_queue {len(queue_tasks)}")
  62. default_queue_tasks = redis_client.lrange('default', 0, -1)
  63. logger.info(f"len default {len(default_queue_tasks)}")
  64. return
  65. # for task in queue_tasks:
  66. # print(task.decode('utf-8'))
  67. # 清空队列
  68. # redis_client.delete('search_queue')
  69. # 查看队列中的任务
  70. def inspect_queue(queue_name: str = 'search_queue'):
  71. # 获取 Celery 的 inspect 对象
  72. inspector = app.control.inspect()
  73. # 查看指定队列中的任务
  74. reserved_tasks = inspector.reserved()
  75. if reserved_tasks:
  76. print(f"队列 {queue_name} 中的任务:")
  77. for worker, tasks in reserved_tasks.items():
  78. for task in tasks:
  79. print(f"任务 ID: {task['id']}, 任务名称: {task['name']}, 参数: {task['kwargs']}")
  80. else:
  81. print(f"队列 {queue_name} 中没有任务。")
  82. def clear_all_queues():
  83. """
  84. 清空 Redis 中所有队列的信息
  85. """
  86. try:
  87. # 清空当前数据库中的所有数据
  88. redis_client.flushdb()
  89. logger.info("所有队列信息已清空")
  90. except Exception as e:
  91. logger.error(f"清空队列信息失败: {str(e)}")
  92. def clear_specific_queue(queue_name: str):
  93. """
  94. 清空指定的队列
  95. """
  96. try:
  97. # 删除指定的队列
  98. redis_client.delete(queue_name)
  99. logger.info(f"队列 {queue_name} 已清空")
  100. except Exception as e:
  101. logger.error(f"清空队列 {queue_name} 失败: {str(e)}")
  102. def main():
  103. # submit_tasks(['test', 'test2', 'test3', 'test4', 'test5'])
  104. # clear_all_queues()
  105. # clear_specific_queue('default')
  106. # submit_all_tasks()
  107. # 连接到 Redis
  108. get_queue()
  109. # inspect_queue()
  110. # get_pending_task_count()
  111. if __name__ == "__main__":
  112. main()