|
|
@@ -52,8 +52,8 @@ class CrawlTaskConfig(BaseModel):
|
|
|
queue_name: str = 'crawl_queue' # 新增队列名称参数
|
|
|
|
|
|
class ConvertTaskParams(BaseModel):
|
|
|
- result_ids: List[str] = Field(..., min_length=1)
|
|
|
- batch_size: Optional[int] = Field(10, gt=0)
|
|
|
+ result_ids: List[str] = Field(..., min_length=0)
|
|
|
+ batch_size: Optional[int] = Field(0, gt=-1)
|
|
|
|
|
|
class WorkerModel(BaseModel):
|
|
|
name: str
|
|
|
@@ -115,6 +115,7 @@ class CeleryWorker:
|
|
|
continue
|
|
|
|
|
|
async def run(self):
|
|
|
+ logger.info(f"config {config.model_dump_json(indent=2)}")
|
|
|
python_exe = self.python_exe
|
|
|
logger.info(f"WORKER_DIR_BASE {WORKER_DIR_BASE}")
|
|
|
# return
|
|
|
@@ -169,7 +170,7 @@ class CeleryWorker:
|
|
|
if not worker_model:
|
|
|
raise ValueError(f"Invalid worker name: {name}")
|
|
|
# 检查进程是否真实存在
|
|
|
- if psutil.pid_exists(worker_model.pid):
|
|
|
+ if worker_model.pid and psutil.pid_exists(worker_model.pid):
|
|
|
proc = await asyncio.to_thread(psutil.Process, worker_model.pid) # 异步获取进程对象
|
|
|
await asyncio.to_thread(proc.terminate) # 异步发送终止信号
|
|
|
try:
|
|
|
@@ -303,7 +304,7 @@ class CeleryWorker:
|
|
|
TASK_MAP = {
|
|
|
'search': 'search_worker.search_all_uncompleted_keywords',
|
|
|
'crawl': 'crawl_worker.crawl_all_unprocessed_pages',
|
|
|
- 'convert': 'html_convert_tasks.convert_all_unprocessed_results'
|
|
|
+ 'convert': 'html_convert_tasks.convert_all_results'
|
|
|
}
|
|
|
|
|
|
if worker_name not in TASK_MAP:
|
|
|
@@ -313,6 +314,8 @@ class CeleryWorker:
|
|
|
task_model = pre_task_model.config
|
|
|
elif worker_name == 'crawl':
|
|
|
task_model = self._prepare_crawl_task(data, select_proxy)
|
|
|
+ elif worker_name == 'convert':
|
|
|
+ task_model = ConvertTaskParams(**data)
|
|
|
logger.info(f"{task_model}")
|
|
|
self.celery_app.send_task(
|
|
|
name=TASK_MAP[worker_name],
|