import argparse import json from pathlib import Path import pandas as pd import sys from typing import List, Dict, Any from worker.celery.async_tasks import async_browser_task, async_search_task from mylib.logu import logger def read_keywords_from_file(file_path: Path) -> List[str]: """读取文件第0列关键词""" try: if file_path.suffix.lower() in ['.xlsx', '.xls']: df = pd.read_excel(file_path, header=0, engine='openpyxl') elif file_path.suffix.lower() in ['.csv', '.tsv']: df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',') else: raise ValueError(f"不支持的格式: {file_path.suffix}") return df.iloc[:, 0].astype(str).tolist() except Exception as e: logger.error(f"文件读取失败: {str(e)}") raise def submit_keyword_tasks(keywords: List[str]) -> List[Dict[str, Any]]: """提交关键词搜索任务""" results = [] for keyword in keywords: try: task = async_search_task.delay( keyword=keyword.strip(), max_result_items=200, skip_existing=True ) results.append({ "keyword": keyword, "task_id": task.id, "status": "submitted" }) logger.info(f"已提交搜索任务: {keyword}") except Exception as e: results.append({ "keyword": keyword, "error": str(e), "status": "failed" }) logger.error(f"任务提交失败 [{keyword}]: {str(e)}") return results def main(file_path: Path, start: int = None, end: int = None) -> Dict[str, Any]: """主流程""" try: keywords = read_keywords_from_file(file_path) if not keywords: raise ValueError("文件未包含有效关键词") # 处理切片范围 processed_keywords = keywords[start:end] if start is not None and end is not None else keywords return { "total_keywords": len(keywords), "processed": len(processed_keywords), "tasks": submit_keyword_tasks(processed_keywords) } except Exception as e: return {"error": str(e), "status": "failed"} def parse_args(): parser = argparse.ArgumentParser(description="关键词处理工具") subparsers = parser.add_subparsers(dest='command', required=True) # main 命令 main_parser = subparsers.add_parser('main', help='完整处理流程') main_parser.add_argument('file', type=Path, help='关键词文件路径') main_parser.add_argument('--start', type=int, help='起始索引') main_parser.add_argument('--end', type=int, help='结束索引') main_parser.add_argument('--json', action='store_true', help='输出JSON格式') # read 命令 read_parser = subparsers.add_parser('read', help='仅读取关键词') read_parser.add_argument('file', type=Path, help='关键词文件路径') read_parser.add_argument('--json', action='store_true', help='输出JSON格式') # submit 命令 submit_parser = subparsers.add_parser('submit', help='仅提交任务') submit_parser.add_argument('-f', '--file', type=Path, help='关键词文件路径') submit_parser.add_argument('-k', '--keywords', nargs='+', help='直接输入关键词列表') submit_parser.add_argument('--json', action='store_true', help='输出JSON格式') return parser.parse_args() def handle_output(result, use_json: bool): if use_json: print(json.dumps(result, indent=2, ensure_ascii=False)) else: if isinstance(result, dict) and "error" in result: print(f"错误: {result['error']}") elif isinstance(result, list): print("\n".join(result)) else: print(result) if __name__ == '__main__': args = parse_args() try: if args.command == 'main': result = main(args.file, args.start, args.end) handle_output(result, args.json) elif args.command == 'read': keywords = read_keywords_from_file(args.file) handle_output(keywords, args.json) elif args.command == 'submit': if args.file: keywords = read_keywords_from_file(args.file) elif args.keywords: keywords = args.keywords else: raise ValueError("必须指定文件或关键词列表") tasks = submit_keyword_tasks(keywords) handle_output(tasks, args.json) except Exception as e: error_result = {"error": str(e), "status": "failed"} handle_output(error_result, args.json if hasattr(args, 'json') else False) sys.exit(1)