| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- import argparse
- import json
- from pathlib import Path
- import pandas as pd
- import sys
- from typing import List, Dict, Any
- from worker.celery.async_tasks import async_browser_task, async_search_task
- from mylib.logu import logger
- def read_keywords_from_file(file_path: Path) -> List[str]:
- """读取文件第0列关键词"""
- try:
- if file_path.suffix.lower() in ['.xlsx', '.xls']:
- df = pd.read_excel(file_path, header=0, engine='openpyxl')
- elif file_path.suffix.lower() in ['.csv', '.tsv']:
- df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',')
- else:
- raise ValueError(f"不支持的格式: {file_path.suffix}")
-
- return df.iloc[:, 0].astype(str).tolist()
-
- except Exception as e:
- logger.error(f"文件读取失败: {str(e)}")
- raise
- def submit_keyword_tasks(keywords: List[str]) -> List[Dict[str, Any]]:
- """提交关键词搜索任务"""
- results = []
- for keyword in keywords:
- try:
- task = async_search_task.delay(
- keyword=keyword.strip(),
- max_result_items=200,
- skip_existing=True
- )
- results.append({
- "keyword": keyword,
- "task_id": task.id,
- "status": "submitted"
- })
- logger.info(f"已提交搜索任务: {keyword}")
- except Exception as e:
- results.append({
- "keyword": keyword,
- "error": str(e),
- "status": "failed"
- })
- logger.error(f"任务提交失败 [{keyword}]: {str(e)}")
- return results
- def main(file_path: Path, start: int = None, end: int = None) -> Dict[str, Any]:
- """主流程"""
- try:
- keywords = read_keywords_from_file(file_path)
- if not keywords:
- raise ValueError("文件未包含有效关键词")
-
- # 处理切片范围
- processed_keywords = keywords[start:end] if start is not None and end is not None else keywords
-
- return {
- "total_keywords": len(keywords),
- "processed": len(processed_keywords),
- "tasks": submit_keyword_tasks(processed_keywords)
- }
-
- except Exception as e:
- return {"error": str(e), "status": "failed"}
- def parse_args():
- parser = argparse.ArgumentParser(description="关键词处理工具")
- subparsers = parser.add_subparsers(dest='command', required=True)
- # main 命令
- main_parser = subparsers.add_parser('main', help='完整处理流程')
- main_parser.add_argument('file', type=Path, help='关键词文件路径')
- main_parser.add_argument('--start', type=int, help='起始索引')
- main_parser.add_argument('--end', type=int, help='结束索引')
- main_parser.add_argument('--json', action='store_true', help='输出JSON格式')
- # read 命令
- read_parser = subparsers.add_parser('read', help='仅读取关键词')
- read_parser.add_argument('file', type=Path, help='关键词文件路径')
- read_parser.add_argument('--json', action='store_true', help='输出JSON格式')
- # submit 命令
- submit_parser = subparsers.add_parser('submit', help='仅提交任务')
- submit_parser.add_argument('-f', '--file', type=Path, help='关键词文件路径')
- submit_parser.add_argument('-k', '--keywords', nargs='+', help='直接输入关键词列表')
- submit_parser.add_argument('--json', action='store_true', help='输出JSON格式')
- return parser.parse_args()
- def handle_output(result, use_json: bool):
- if use_json:
- print(json.dumps(result, indent=2, ensure_ascii=False))
- else:
- if isinstance(result, dict) and "error" in result:
- print(f"错误: {result['error']}")
- elif isinstance(result, list):
- print("\n".join(result))
- else:
- print(result)
- if __name__ == '__main__':
- args = parse_args()
-
- try:
- if args.command == 'main':
- result = main(args.file, args.start, args.end)
- handle_output(result, args.json)
-
- elif args.command == 'read':
- keywords = read_keywords_from_file(args.file)
- handle_output(keywords, args.json)
-
- elif args.command == 'submit':
- if args.file:
- keywords = read_keywords_from_file(args.file)
- elif args.keywords:
- keywords = args.keywords
- else:
- raise ValueError("必须指定文件或关键词列表")
-
- tasks = submit_keyword_tasks(keywords)
- handle_output(tasks, args.json)
-
- except Exception as e:
- error_result = {"error": str(e), "status": "failed"}
- handle_output(error_result, args.json if hasattr(args, 'json') else False)
- sys.exit(1)
|