async_client.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import argparse
  2. import json
  3. from pathlib import Path
  4. import pandas as pd
  5. import sys
  6. from typing import List, Dict, Any
  7. from worker.celery.async_tasks import async_browser_task, async_search_task
  8. from mylib.logu import logger
  9. def read_keywords_from_file(file_path: Path) -> List[str]:
  10. """读取文件第0列关键词"""
  11. try:
  12. if file_path.suffix.lower() in ['.xlsx', '.xls']:
  13. df = pd.read_excel(file_path, header=0, engine='openpyxl')
  14. elif file_path.suffix.lower() in ['.csv', '.tsv']:
  15. df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',')
  16. else:
  17. raise ValueError(f"不支持的格式: {file_path.suffix}")
  18. return df.iloc[:, 0].astype(str).tolist()
  19. except Exception as e:
  20. logger.error(f"文件读取失败: {str(e)}")
  21. raise
  22. def submit_keyword_tasks(keywords: List[str]) -> List[Dict[str, Any]]:
  23. """提交关键词搜索任务"""
  24. results = []
  25. for keyword in keywords:
  26. try:
  27. task = async_search_task.delay(
  28. keyword=keyword.strip(),
  29. max_result_items=200,
  30. skip_existing=True
  31. )
  32. results.append({
  33. "keyword": keyword,
  34. "task_id": task.id,
  35. "status": "submitted"
  36. })
  37. logger.info(f"已提交搜索任务: {keyword}")
  38. except Exception as e:
  39. results.append({
  40. "keyword": keyword,
  41. "error": str(e),
  42. "status": "failed"
  43. })
  44. logger.error(f"任务提交失败 [{keyword}]: {str(e)}")
  45. return results
  46. def main(file_path: Path, start: int = None, end: int = None) -> Dict[str, Any]:
  47. """主流程"""
  48. try:
  49. keywords = read_keywords_from_file(file_path)
  50. if not keywords:
  51. raise ValueError("文件未包含有效关键词")
  52. # 处理切片范围
  53. processed_keywords = keywords[start:end] if start is not None and end is not None else keywords
  54. return {
  55. "total_keywords": len(keywords),
  56. "processed": len(processed_keywords),
  57. "tasks": submit_keyword_tasks(processed_keywords)
  58. }
  59. except Exception as e:
  60. return {"error": str(e), "status": "failed"}
  61. def parse_args():
  62. parser = argparse.ArgumentParser(description="关键词处理工具")
  63. subparsers = parser.add_subparsers(dest='command', required=True)
  64. # main 命令
  65. main_parser = subparsers.add_parser('main', help='完整处理流程')
  66. main_parser.add_argument('file', type=Path, help='关键词文件路径')
  67. main_parser.add_argument('--start', type=int, help='起始索引')
  68. main_parser.add_argument('--end', type=int, help='结束索引')
  69. main_parser.add_argument('--json', action='store_true', help='输出JSON格式')
  70. # read 命令
  71. read_parser = subparsers.add_parser('read', help='仅读取关键词')
  72. read_parser.add_argument('file', type=Path, help='关键词文件路径')
  73. read_parser.add_argument('--json', action='store_true', help='输出JSON格式')
  74. # submit 命令
  75. submit_parser = subparsers.add_parser('submit', help='仅提交任务')
  76. submit_parser.add_argument('-f', '--file', type=Path, help='关键词文件路径')
  77. submit_parser.add_argument('-k', '--keywords', nargs='+', help='直接输入关键词列表')
  78. submit_parser.add_argument('--json', action='store_true', help='输出JSON格式')
  79. return parser.parse_args()
  80. def handle_output(result, use_json: bool):
  81. if use_json:
  82. print(json.dumps(result, indent=2, ensure_ascii=False))
  83. else:
  84. if isinstance(result, dict) and "error" in result:
  85. print(f"错误: {result['error']}")
  86. elif isinstance(result, list):
  87. print("\n".join(result))
  88. else:
  89. print(result)
  90. if __name__ == '__main__':
  91. args = parse_args()
  92. try:
  93. if args.command == 'main':
  94. result = main(args.file, args.start, args.end)
  95. handle_output(result, args.json)
  96. elif args.command == 'read':
  97. keywords = read_keywords_from_file(args.file)
  98. handle_output(keywords, args.json)
  99. elif args.command == 'submit':
  100. if args.file:
  101. keywords = read_keywords_from_file(args.file)
  102. elif args.keywords:
  103. keywords = args.keywords
  104. else:
  105. raise ValueError("必须指定文件或关键词列表")
  106. tasks = submit_keyword_tasks(keywords)
  107. handle_output(tasks, args.json)
  108. except Exception as e:
  109. error_result = {"error": str(e), "status": "failed"}
  110. handle_output(error_result, args.json if hasattr(args, 'json') else False)
  111. sys.exit(1)