t_worker.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from pathlib import Path
  2. import sys
  3. # 为了避免耦合,微服务,可能确实要将上级的上级目录作为一个单独的进程来处理,此目录作为一个单独的UI项目
  4. sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend')))
  5. from src.services.subscription_manager import SubscriptionManager
  6. from utils.config import config,APP_PATH
  7. from utils.process_mgr import process_manager
  8. import asyncio
  9. from utils.logu import get_logger
  10. from routers.worker import health_check
  11. import os
  12. logger = get_logger('mytests', file=True)
  13. async def main():
  14. python_exe = sys.executable
  15. WORKER_DIR_BASE = APP_PATH.parent.parent
  16. logger.info(f"{WORKER_DIR_BASE}")
  17. # return
  18. redis_cmd = [config.redis_exe]
  19. logger.info(f"{redis_cmd}")
  20. flower_db = WORKER_DIR_BASE / 'output' / 'flower_db'
  21. await process_manager.start_process("redis_cmd", redis_cmd, cwd=WORKER_DIR_BASE)
  22. # G:\code\upwork\zhang_crawl_bio\crawl_env\python.exe -m celery -A worker.celery.app flower --address=127.0.0.1 --persistent=True --db=".\output\flower_db"
  23. flower_cmd = [python_exe, '-m', 'celery', '-A', 'worker.celery.app', 'flower', '--address=127.0.0.1', '--persistent=True', f'--db={flower_db}']
  24. await process_manager.start_process("flower", flower_cmd, cwd=WORKER_DIR_BASE)
  25. proces = process_manager.processes.get("flower").get('process')
  26. search_worker_name = 'search'
  27. crawl_worker_name = 'crawl'
  28. convert_worker_name = 'convert'
  29. worker_list = [search_worker_name, crawl_worker_name, convert_worker_name]
  30. for worker_name in worker_list:
  31. await process_manager.start_process(f"{worker_name}_worker", [python_exe, '-m', 'celery', '-A', 'worker.celery.app', 'worker', '-Q',f'{worker_name}_queue', f'--hostname={worker_name}@%h'], cwd=WORKER_DIR_BASE)
  32. logger.info(f"{proces}")
  33. await proces.wait()
  34. return
  35. return
  36. res = await health_check()
  37. print(res)
  38. if __name__ == "__main__":
  39. asyncio.run(main())