| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- from worker.celery.app import app
- from worker.celery.models import KeywordTaskModel
- import pandas as pd
- from pathlib import Path
- import logging
- from typing import List, Dict, Optional
- import sys
- from mylib.logu import logger
- import argparse
- from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
- from sqlmodel import select, Session, exists, distinct
- def read_keywords_from_file(file_path: Path) -> List[str]:
- """读取文件第0列第一行到末尾的内容"""
- try:
- # 根据文件类型选择读取方式
- if file_path.suffix.lower() in ['.xlsx', '.xls']:
- df = pd.read_excel(file_path, header=0, engine='openpyxl')
- elif file_path.suffix.lower() in ['.csv', '.tsv']:
- df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',')
- else:
- raise ValueError(f"不支持的格式: {file_path.suffix}")
-
- return df.iloc[:, 0].astype(str).tolist() # 强制转换为字符串类型
-
- except Exception as e:
- logger.error(f"读取文件失败: {str(e)}")
- raise
- def get_uncompleted_keywords() -> list[str]:
- """从数据库获取已完成搜索但未完成爬取的关键词"""
- manager = SearchResultManager()
- with Session(manager.engine) as session:
- # 使用JOIN优化查询,避免子查询
- query = (
- select(distinct(KeywordTask.keyword))
- .where(KeywordTask.is_completed != True)
- )
-
- # 使用stream_results=True来优化内存使用
- keywords = session.exec(query).all()
- return keywords
- def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None):
- """提交所有关键词任务"""
- for keyword in keywords:
- try:
- task_data = {
- 'keyword': keyword.strip(),
- 'max_result_items': 200,
- 'skip_existing': True,
- 'browser_config': browser_config or {}
- }
- result = app.send_task('search_worker.drission_search', kwargs=task_data, queue='search_queue')
- logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
- except Exception as e:
- logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
- def clear_tasks():
- """清除所有任务"""
- try:
- app.control.purge() # 删除所有待处理的任务
- logger.info("所有待处理任务已清除")
- except Exception as e:
- logger.error(f"清除任务失败: {str(e)}")
- raise
- def main(file_path: str, clear: bool = False):
- try:
- if clear:
- clear_tasks()
- return
-
- path = Path(file_path)
- if not path.exists():
- raise FileNotFoundError(f"文件不存在: {path}")
-
- keywords = read_keywords_from_file(path)
- if not keywords:
- raise ValueError("文件未包含有效关键词")
-
- logger.info(f"共读取到 {len(keywords)} 个关键词")
- logger.info(f"示例关键词: {keywords[25:33]}...")
-
- # 浏览器配置
- browser_config = {
- 'headless': True
- }
-
- submit_tasks(keywords, browser_config) # 测试前5个关键词
- logger.info("所有任务已提交完成")
-
- except Exception as e:
- logger.error(f"程序异常终止: {str(e)}")
- sys.exit(1)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description='谷歌搜索任务客户端',
- epilog="示例:\r\n"
- " python -m worker.celery.client -f G:\download\测试-精油-2000.xlsx # 提交任务\r\n"
- " python -m worker.celery.client -c # 清除任务\r\n"
- " python -m worker.celery.client -h # 显示帮助",
- formatter_class=argparse.RawTextHelpFormatter
- )
- parser.add_argument('-f', '--file_path', help='包含关键词的文件路径')
- parser.add_argument('-c', '--clear', action='store_true', help='清除所有待处理任务')
-
- args = parser.parse_args()
-
- if not args.file_path and not args.clear:
- parser.print_help()
- sys.exit(1)
-
- main(args.file_path, args.clear)
|