client.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from worker.celery.app import app
  2. from worker.celery.models import KeywordTaskModel
  3. import pandas as pd
  4. from pathlib import Path
  5. import logging
  6. from typing import List, Dict, Optional
  7. import sys
  8. from mylib.logu import logger
  9. import argparse
  10. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
  11. from sqlmodel import select, Session, exists, distinct
  12. def read_keywords_from_file(file_path: Path) -> List[str]:
  13. """读取文件第0列第一行到末尾的内容"""
  14. try:
  15. # 根据文件类型选择读取方式
  16. if file_path.suffix.lower() in ['.xlsx', '.xls']:
  17. df = pd.read_excel(file_path, header=0, engine='openpyxl')
  18. elif file_path.suffix.lower() in ['.csv', '.tsv']:
  19. df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',')
  20. else:
  21. raise ValueError(f"不支持的格式: {file_path.suffix}")
  22. return df.iloc[:, 0].astype(str).tolist() # 强制转换为字符串类型
  23. except Exception as e:
  24. logger.error(f"读取文件失败: {str(e)}")
  25. raise
  26. def get_uncompleted_keywords() -> list[str]:
  27. """从数据库获取已完成搜索但未完成爬取的关键词"""
  28. manager = SearchResultManager()
  29. with Session(manager.engine) as session:
  30. # 使用JOIN优化查询,避免子查询
  31. query = (
  32. select(distinct(KeywordTask.keyword))
  33. .where(KeywordTask.is_completed != True)
  34. )
  35. # 使用stream_results=True来优化内存使用
  36. keywords = session.exec(query).all()
  37. return keywords
  38. def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None):
  39. """提交所有关键词任务"""
  40. for keyword in keywords:
  41. try:
  42. task_data = {
  43. 'keyword': keyword.strip(),
  44. 'max_result_items': 200,
  45. 'skip_existing': True,
  46. 'browser_config': browser_config or {}
  47. }
  48. result = app.send_task('search_worker.drission_search', kwargs=task_data, queue='search_queue')
  49. logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
  50. except Exception as e:
  51. logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
  52. def clear_tasks():
  53. """清除所有任务"""
  54. try:
  55. app.control.purge() # 删除所有待处理的任务
  56. logger.info("所有待处理任务已清除")
  57. except Exception as e:
  58. logger.error(f"清除任务失败: {str(e)}")
  59. raise
  60. def main(file_path: str, clear: bool = False):
  61. try:
  62. if clear:
  63. clear_tasks()
  64. return
  65. path = Path(file_path)
  66. if not path.exists():
  67. raise FileNotFoundError(f"文件不存在: {path}")
  68. keywords = read_keywords_from_file(path)
  69. if not keywords:
  70. raise ValueError("文件未包含有效关键词")
  71. logger.info(f"共读取到 {len(keywords)} 个关键词")
  72. logger.info(f"示例关键词: {keywords[25:33]}...")
  73. # 浏览器配置
  74. browser_config = {
  75. 'headless': True
  76. }
  77. submit_tasks(keywords, browser_config) # 测试前5个关键词
  78. logger.info("所有任务已提交完成")
  79. except Exception as e:
  80. logger.error(f"程序异常终止: {str(e)}")
  81. sys.exit(1)
  82. if __name__ == '__main__':
  83. parser = argparse.ArgumentParser(
  84. description='谷歌搜索任务客户端',
  85. epilog="示例:\r\n"
  86. " python -m worker.celery.client -f G:\download\测试-精油-2000.xlsx # 提交任务\r\n"
  87. " python -m worker.celery.client -c # 清除任务\r\n"
  88. " python -m worker.celery.client -h # 显示帮助",
  89. formatter_class=argparse.RawTextHelpFormatter
  90. )
  91. parser.add_argument('-f', '--file_path', help='包含关键词的文件路径')
  92. parser.add_argument('-c', '--clear', action='store_true', help='清除所有待处理任务')
  93. args = parser.parse_args()
  94. if not args.file_path and not args.clear:
  95. parser.print_help()
  96. sys.exit(1)
  97. main(args.file_path, args.clear)