from worker.celery.app import app from worker.celery.models import KeywordTaskModel import pandas as pd from pathlib import Path import logging from typing import List, Dict, Optional import sys from mylib.logu import logger import argparse from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult from sqlmodel import select, Session, exists, distinct def read_keywords_from_file(file_path: Path) -> List[str]: """读取文件第0列第一行到末尾的内容""" try: # 根据文件类型选择读取方式 if file_path.suffix.lower() in ['.xlsx', '.xls']: df = pd.read_excel(file_path, header=0, engine='openpyxl') elif file_path.suffix.lower() in ['.csv', '.tsv']: df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',') else: raise ValueError(f"不支持的格式: {file_path.suffix}") return df.iloc[:, 0].astype(str).tolist() # 强制转换为字符串类型 except Exception as e: logger.error(f"读取文件失败: {str(e)}") raise def get_uncompleted_keywords() -> list[str]: """从数据库获取已完成搜索但未完成爬取的关键词""" manager = SearchResultManager() with Session(manager.engine) as session: # 使用JOIN优化查询,避免子查询 query = ( select(distinct(KeywordTask.keyword)) .where(KeywordTask.is_completed != True) ) # 使用stream_results=True来优化内存使用 keywords = session.exec(query).all() return keywords def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None): """提交所有关键词任务""" for keyword in keywords: try: task_data = { 'keyword': keyword.strip(), 'max_result_items': 200, 'skip_existing': True, 'browser_config': browser_config or {} } result = app.send_task('search_worker.drission_search', kwargs=task_data, queue='search_queue') logger.info(f"任务已提交: {keyword} (任务ID: {result.id})") except Exception as e: logger.error(f"提交任务失败 [{keyword}]: {str(e)}") def clear_tasks(): """清除所有任务""" try: app.control.purge() # 删除所有待处理的任务 logger.info("所有待处理任务已清除") except Exception as e: logger.error(f"清除任务失败: {str(e)}") raise def main(file_path: str, clear: bool = False): try: if clear: clear_tasks() return path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"文件不存在: {path}") keywords = read_keywords_from_file(path) if not keywords: raise ValueError("文件未包含有效关键词") logger.info(f"共读取到 {len(keywords)} 个关键词") logger.info(f"示例关键词: {keywords[25:33]}...") # 浏览器配置 browser_config = { 'headless': True } submit_tasks(keywords, browser_config) # 测试前5个关键词 logger.info("所有任务已提交完成") except Exception as e: logger.error(f"程序异常终止: {str(e)}") sys.exit(1) if __name__ == '__main__': parser = argparse.ArgumentParser( description='谷歌搜索任务客户端', epilog="示例:\r\n" " python -m worker.celery.client -f G:\download\测试-精油-2000.xlsx # 提交任务\r\n" " python -m worker.celery.client -c # 清除任务\r\n" " python -m worker.celery.client -h # 显示帮助", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument('-f', '--file_path', help='包含关键词的文件路径') parser.add_argument('-c', '--clear', action='store_true', help='清除所有待处理任务') args = parser.parse_args() if not args.file_path and not args.clear: parser.print_help() sys.exit(1) main(args.file_path, args.clear)