Эх сурвалжийг харах

完成提交所有任务。同时前端提示框改为正确的提示信息

mrh 9 сар өмнө
parent
commit
7f972f1222

Файлын зөрүү хэтэрхий том тул дарагдсан байна
+ 4 - 826
poetry.lock


+ 3 - 2
pyproject.toml

@@ -17,8 +17,9 @@ dependencies = [
     "celery (>=5.4.0,<6.0.0)",
     "flower (>=2.0.1,<3.0.0)",
     "docling (>=2.21.0,<3.0.0)",
-    "pywebview (>=5.4,<6.0)",
-    "prefect (>=3.2.7,<4.0.0)",
+    "fastapi (>=0.115.10,<0.116.0)",
+    "cachetools (>=5.5.2,<6.0.0)",
+    "uvicorn (>=0.34.0,<0.35.0)",
 ]
 
 

+ 8 - 2
ui/backend/routers/worker.py

@@ -173,10 +173,16 @@ async def ctrl_worker(request: StartupRequest):
             select_proxy=request.select_proxy
         )
         return {"status": "success", "res": res}
+    elif request.action == "submit_all":
+        res = await celery_worker.submit_all_tasks(
+            request.worker_name,
+            request.data,
+            select_proxy=request.select_proxy
+        )
+        return {"status": "success", "res": res}
     else:
+        logger.error(f"Invalid action: {request.action}")
         raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}")
-    
-    logger.info(f"{request}")
     return {"status": "success", "worker_model": worker_model}
 
 async def health_check():

+ 38 - 7
ui/backend/src/services/celery_worker.py

@@ -238,7 +238,16 @@ class CeleryWorker:
             result = await asyncio.to_thread(pipe.execute)
             queue_lengths[worker_name] = result[0] if result else 0
         return queue_lengths
-
+    def _prepare_search_task(self, data: Dict, select_proxy: Optional[str] = None):
+            task_model = SearchTaskInput(**data)
+            if select_proxy == 'pool':
+                task_model.config.proxy_pool_url = f"http://{self.config.backend.host}:{self.config.backend.port}/api/proxy/proxies-pool"
+            return task_model
+    def _prepare_crawl_task(self, data: Dict, select_proxy: Optional[str] = None):
+            task_model = CrawlTaskParams(**data)
+            if select_proxy == 'pool':
+                task_model.proxy_pool_url = f"http://{self.config.backend.host}:{self.config.backend.port}/api/proxy/proxies-pool"
+            return task_model                           
     async def submit_tasks(self, worker_name: str, data: Dict, select_proxy: Optional[str] = None):
         """提交任务到指定队列"""
         
@@ -254,13 +263,9 @@ class CeleryWorker:
         
         # 根据worker类型验证任务数据
         if worker_name == 'search':
-            task_model = SearchTaskInput(**data)
-            if select_proxy == 'pool':
-                task_model.config.proxy_pool_url = f"http://{self.config.backend.host}:{self.config.backend.port}/api/proxy/proxies-pool"
+            task_model = self._prepare_search_task(data, select_proxy)
         elif worker_name == 'crawl':
-            task_model = CrawlTaskParams(**data)
-            if select_proxy == 'pool':
-                task_model.proxy_pool_url = f"http://{self.config.backend.host}:{self.config.backend.port}/api/proxy/proxies-pool"
+            task_model = self._prepare_crawl_task(data, select_proxy)
         elif worker_name == 'convert':
             task_model = ConvertTaskParams(**data)
         
@@ -273,5 +278,31 @@ class CeleryWorker:
         
         logger.info(f"成功通过Celery提交 {task_model} 个任务到 {TASK_MAP[worker_name]}")
         return task_model
+    async def submit_all_tasks(self, worker_name: str, data: List[Dict], select_proxy: Optional[str] = None):
+        """提交任务到指定队列"""
+        # 定义任务名称映射
+        TASK_MAP = {
+            'search': 'search_worker.search_all_uncompleted_keywords',
+            'crawl': 'crawl_worker.crawl_all_unprocessed_urls',
+            'convert': 'html_convert_tasks.convert_all_unprocessed_results'
+        }
 
+        if worker_name not in TASK_MAP:
+            raise ValueError(f"未知的worker类型: {worker_name}")
+        if worker_name == 'search':
+            pre_task_model = self._prepare_search_task(data, select_proxy)
+            task_model = pre_task_model.config
+        elif worker_name == 'crawl':
+            task_model = self._prepare_crawl_task(data, select_proxy)
+        logger.info(f"{task_model}")
+        self.celery_app.send_task(
+                name=TASK_MAP[worker_name],
+                args=(task_model.model_dump(),),
+                queue=self.workers_model[worker_name].queue_name,
+                serializer='json'
+            )
+        
+        logger.info(f"成功通过Celery提交 {task_model} 个任务到 {TASK_MAP[worker_name]}")
+        return task_model 
+            
 celery_worker = CeleryWorker()

+ 1 - 0
ui/fontend/src/components.d.ts

@@ -28,6 +28,7 @@ declare module 'vue' {
     ElRadioGroup: typeof import('element-plus/es')['ElRadioGroup']
     ElRow: typeof import('element-plus/es')['ElRow']
     ElSpace: typeof import('element-plus/es')['ElSpace']
+    ElSwitch: typeof import('element-plus/es')['ElSwitch']
     ElTable: typeof import('element-plus/es')['ElTable']
     ElTableColumn: typeof import('element-plus/es')['ElTableColumn']
     ElTag: typeof import('element-plus/es')['ElTag']

+ 57 - 14
ui/fontend/src/components/WorkerCtrl.vue

@@ -1,6 +1,17 @@
 <template>
     <!-- 保持模板结构不变 -->
     <div>
+        <el-row>
+        <el-tooltip content="测试模式不会直接运行程序,只会输出日志信息">
+            <el-switch
+            v-model="dryRun"
+            size="large"
+            active-text="测试模式"
+            inactive-text="工作模式"
+            />
+        </el-tooltip>
+        </el-row>
+
         <el-row :gutter="16" justify="center">
         <el-col :span="8">
           <el-card shadow="hover">
@@ -38,12 +49,6 @@
                 >
                   提交单个搜索任务
                 </el-button>
-                <el-button 
-                :disabled="!workerStatus.search || loadingStates.search"
-                @click="sendRequest('search', 'submit', true)"
-                >
-                  仅测试
-                </el-button>
               </el-row>
             </el-space>
           </el-card>
@@ -89,7 +94,7 @@ import { ref, computed, onMounted, onUnmounted } from 'vue'
 import { ElMessage } from 'element-plus'
 import { useProxyStore } from '../stores/proxyStore'
 
-const backendBaseUrl = import.meta.env.VITE_API_BASE_URL || ''
+const backendBaseUrl = (import.meta as any).env.VITE_API_BASE_URL || ''
 const store = useProxyStore()
 const workers = ref<Array<any>>([])
 const keywordInput = ref('')
@@ -103,6 +108,7 @@ const loadingStates = ref({
   crawl: false,
   convert: false
 })
+const dryRun = ref(false)
 let pollTimer: number | null = null
 
 const workerStatus = computed(() => {
@@ -161,7 +167,7 @@ function stopPolling() {
 onMounted(() => startPolling())
 onUnmounted(() => stopPolling())
 
-const sendRequest = async (workerName: string, action: string, dryRun: boolean = false) => {
+const sendRequest = async (workerName: string, action: string) => {
   const loadingKey = workerName as keyof typeof loadingStates.value
   loadingStates.value[loadingKey] = true
 
@@ -176,15 +182,15 @@ const sendRequest = async (workerName: string, action: string, dryRun: boolean =
         action: action,
         select_proxy: store.selectedProxy,
         data: {
-          ...(workerName === 'search' && keywordInput.value ? {
-            keyword: keywordInput.value.trim(),
+          ...(workerName === 'search' ?  {
+            keyword: keywordInput.value.trim() || '',
             config: {
               max_result_items: 200,
               skip_existing: true,
               browser_config: {},
-              dry_run: dryRun
+              dry_run: dryRun.value
             }
-          } : {}),
+          }: {}),
           ...(workerName === 'crawl' ? {
             overwrite: false,
             proxy_pool_url: store.selectedProxy
@@ -204,9 +210,46 @@ const sendRequest = async (workerName: string, action: string, dryRun: boolean =
 
     // 直接刷新状态而不是尝试局部更新
     await fetchWorkerStatus()
-    ElMessage.success(`${action === 'start' ? '启动' : '停止'}${workerName}成功`)
+    // 操作类型中文映射
+    const actionMap: Record<string, string> = {
+      start: '启动',
+      stop: '停止',
+      submit_all: '提交所有任务',
+      clean: '清空任务',
+      submit: '提交搜索'
+    }
+    
+    // 模块名称中文映射
+    const workerNameMap: Record<string, string> = {
+      search: '浏览器搜索',
+      crawl: '提取结果页',
+      convert: '文档转换'
+    }
+
+    const actionName = actionMap[action] || '操作'
+    const moduleName = workerNameMap[workerName] || '任务'
+    
+    ElMessage.success(`${actionName}${moduleName}成功`)
   } catch (error) {
-    ElMessage.error(`操作失败: ${error instanceof Error ? error.message : '未知错误'}`)
+    // 错误提示中文映射
+    const errorActionMap: Record<string, string> = {
+      start: '启动',
+      stop: '停止',
+      submit_all: '提交所有',
+      clean: '清空',
+      submit: '提交'
+    }
+    
+    const errorModuleMap: Record<string, string> = {
+      search: '搜索任务',
+      crawl: '提取任务',
+      convert: '转换任务'
+    }
+    
+    const actionName = errorActionMap[action] || '操作'
+    const moduleName = errorModuleMap[workerName] || '任务'
+    
+    ElMessage.error(`${actionName}${moduleName}失败: ${error instanceof Error ? error.message : '未知错误'}`)
     console.error('API请求错误:', error)
   } finally {
     loadingStates.value[loadingKey] = false

+ 6 - 3
worker/celery/search_tasks.py

@@ -34,9 +34,9 @@ class SearchTaskInput(SearchTaskConfig):
     serializer='pickle',
     accept=['pickle', 'json']
 )
-def search_all_uncompleted_keywords_task(config: dict):
+def search_all_uncompleted_keywords_task(config: dict|SearchTaskConfig):
     """异步任务:搜索所有未完成的关键词"""
-    config = SearchTaskConfig(**(config if isinstance(config, dict) else config.dict()))
+    config = SearchTaskConfig(**config)
     try:
         manager = SearchResultManager()
         uncompleted_keywords = manager.get_uncompleted_keywords()
@@ -49,7 +49,10 @@ def search_all_uncompleted_keywords_task(config: dict):
         
         task_group = group([
             drission_search_task.s(
-                keyword, config
+                SearchTaskInput(
+                        keyword=keyword,
+                        config=config
+                    ).model_dump()
                 ).set(queue='search_queue')
             for keyword in uncompleted_keywords
         ])

Энэ ялгаанд хэт олон файл өөрчлөгдсөн тул зарим файлыг харуулаагүй болно