from celery import Celery from typing import List, Dict, Optional import asyncio from pathlib import Path import sys import time sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend'))) from utils.process_mgr import ProcessManager from utils.logu import get_logger logger = get_logger('mytests', file=True) app = Celery('search_worker', broker='redis://127.0.0.1:6379/1') import redis redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1) def get_pending_task_count() -> int: """ 获取当前待处理任务的数量 """ # 获取 Celery 使用的默认队列名称 queue_name = app.conf.task_default_queue logger.info(f"{queue_name}") # 获取队列中的任务数量 task_count = redis_client.llen(queue_name) logger.info(f"{task_count}") logger.info(f"{app.conf.task_routes}") return task_count def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None): """提交所有关键词任务""" for keyword in keywords: try: task_data = { 'keyword': keyword.strip(), 'max_result_items': 200, 'skip_existing': True, 'browser_config': browser_config or {} } result = app.send_task('search_worker.drission_search', kwargs=task_data, queue='search_queue') logger.info(f"任务已提交: {keyword} (任务ID: {result.id})") except Exception as e: logger.error(f"提交任务失败 [{keyword}]: {str(e)}") def submit_all_tasks(browser_config: Optional[Dict] = None): """提交所有关键词任务 def search_all_uncompleted_keywords_task(max_result_items: int = 200, skip_existing: bool = True, browser_config: dict = {}, proxy_pool: List[str] = None): """ clear_specific_queue('search_queue') task_data = { 'max_result_items': 1, 'skip_existing': True, 'browser_config': browser_config or {}, 'proxy_pool': None, 'dry_run': True } result = app.send_task('search_worker.search_all_uncompleted_keywords', kwargs=task_data, queue='search_queue') logger.info(f"任务已提交: (任务ID: {result.id})") def get_queue(): keys = redis_client.keys('*') logger.info(f"{keys}") # 查看队列中的任务 queue_tasks = redis_client.lrange('search_queue', 0, -1) logger.info(f"len search_queue {len(queue_tasks)}") queue_tasks = redis_client.lrange('crawl_queue', 0, -1) logger.info(f"len crawl_queue {len(queue_tasks)}") default_queue_tasks = redis_client.lrange('default', 0, -1) logger.info(f"len default {len(default_queue_tasks)}") return # for task in queue_tasks: # print(task.decode('utf-8')) # 清空队列 # redis_client.delete('search_queue') # 查看队列中的任务 def inspect_queue(queue_name: str = 'search_queue'): # 获取 Celery 的 inspect 对象 inspector = app.control.inspect() # 查看指定队列中的任务 reserved_tasks = inspector.reserved() if reserved_tasks: print(f"队列 {queue_name} 中的任务:") for worker, tasks in reserved_tasks.items(): for task in tasks: print(f"任务 ID: {task['id']}, 任务名称: {task['name']}, 参数: {task['kwargs']}") else: print(f"队列 {queue_name} 中没有任务。") def clear_all_queues(): """ 清空 Redis 中所有队列的信息 """ try: # 清空当前数据库中的所有数据 redis_client.flushdb() logger.info("所有队列信息已清空") except Exception as e: logger.error(f"清空队列信息失败: {str(e)}") def clear_specific_queue(queue_name: str): """ 清空指定的队列 """ try: # 删除指定的队列 redis_client.delete(queue_name) logger.info(f"队列 {queue_name} 已清空") except Exception as e: logger.error(f"清空队列 {queue_name} 失败: {str(e)}") def main(): # submit_tasks(['test', 'test2', 'test3', 'test4', 'test5']) # clear_all_queues() # clear_specific_queue('default') # submit_all_tasks() # 连接到 Redis get_queue() # inspect_queue() # get_pending_task_count() if __name__ == "__main__": main()