| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- from celery import Celery
- from typing import List, Dict, Optional
- import asyncio
- from pathlib import Path
- import sys
- import time
- sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend')))
- from utils.process_mgr import ProcessManager
- from utils.logu import get_logger
- logger = get_logger('mytests', file=True)
- app = Celery('search_worker', broker='redis://127.0.0.1:6379/1')
- import redis
- redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
- def get_pending_task_count() -> int:
- """
- 获取当前待处理任务的数量
- """
- # 获取 Celery 使用的默认队列名称
- queue_name = app.conf.task_default_queue
- logger.info(f"{queue_name}")
- # 获取队列中的任务数量
- task_count = redis_client.llen(queue_name)
- logger.info(f"{task_count}")
- logger.info(f"{app.conf.task_routes}")
- return task_count
- def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None):
- """提交所有关键词任务"""
- for keyword in keywords:
- try:
- task_data = {
- 'keyword': keyword.strip(),
- 'max_result_items': 200,
- 'skip_existing': True,
- 'browser_config': browser_config or {}
- }
- result = app.send_task('search_worker.drission_search', kwargs=task_data, queue='search_queue')
- logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
- except Exception as e:
- logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
- def submit_all_tasks(browser_config: Optional[Dict] = None):
- """提交所有关键词任务
- def search_all_uncompleted_keywords_task(max_result_items: int = 200, skip_existing: bool = True, browser_config: dict = {}, proxy_pool: List[str] = None):
- """
- clear_specific_queue('search_queue')
- task_data = {
- 'max_result_items': 1,
- 'skip_existing': True,
- 'browser_config': browser_config or {},
- 'proxy_pool': None,
- 'dry_run': True
- }
- result = app.send_task('search_worker.search_all_uncompleted_keywords', kwargs=task_data, queue='search_queue')
- logger.info(f"任务已提交: (任务ID: {result.id})")
- def get_queue():
- keys = redis_client.keys('*')
- logger.info(f"{keys}")
- # 查看队列中的任务
- queue_tasks = redis_client.lrange('search_queue', 0, -1)
- logger.info(f"len search_queue {len(queue_tasks)}")
- queue_tasks = redis_client.lrange('crawl_queue', 0, -1)
- logger.info(f"len crawl_queue {len(queue_tasks)}")
- default_queue_tasks = redis_client.lrange('default', 0, -1)
- logger.info(f"len default {len(default_queue_tasks)}")
- return
- # for task in queue_tasks:
- # print(task.decode('utf-8'))
- # 清空队列
- # redis_client.delete('search_queue')
- # 查看队列中的任务
- def inspect_queue(queue_name: str = 'search_queue'):
- # 获取 Celery 的 inspect 对象
- inspector = app.control.inspect()
- # 查看指定队列中的任务
- reserved_tasks = inspector.reserved()
- if reserved_tasks:
- print(f"队列 {queue_name} 中的任务:")
- for worker, tasks in reserved_tasks.items():
- for task in tasks:
- print(f"任务 ID: {task['id']}, 任务名称: {task['name']}, 参数: {task['kwargs']}")
- else:
- print(f"队列 {queue_name} 中没有任务。")
- def clear_all_queues():
- """
- 清空 Redis 中所有队列的信息
- """
- try:
- # 清空当前数据库中的所有数据
- redis_client.flushdb()
- logger.info("所有队列信息已清空")
- except Exception as e:
- logger.error(f"清空队列信息失败: {str(e)}")
- def clear_specific_queue(queue_name: str):
- """
- 清空指定的队列
- """
- try:
- # 删除指定的队列
- redis_client.delete(queue_name)
- logger.info(f"队列 {queue_name} 已清空")
- except Exception as e:
- logger.error(f"清空队列 {queue_name} 失败: {str(e)}")
- def main():
- # submit_tasks(['test', 'test2', 'test3', 'test4', 'test5'])
- # clear_all_queues()
- # clear_specific_queue('default')
- # submit_all_tasks()
- # 连接到 Redis
- get_queue()
- # inspect_queue()
- # get_pending_task_count()
- if __name__ == "__main__":
- main()
|