Ver código fonte

启动停止worker 的前后端操作

mrh 9 meses atrás
pai
commit
ec7d9a838b

+ 25 - 0
tests/mytest/redis_celery_t.py

@@ -0,0 +1,25 @@
+from mylib.logu import logger
+from worker.celery.app import app as celery_app
+from worker.celery.crawl_client import submit_page_crawl_tasks
+from worker.celery.client import get_uncompleted_keywords,submit_tasks
+
+
+# 提交任务到指定的队列
+def main():
+    # 提交到 search_queue
+    keywords = ['123']
+    # browser_config = {"proxy_pool": ["http://127.0.0.1:7890"]}
+    search_result = celery_app.send_task('search_worker.add', kwargs={"keywords": keywords})
+    return search_result
+    print(f"Search task submitted. Task ID: {search_result.id}")
+
+    # 提交到 crawl_queue
+    crawl_result = crawl_task.apply_async(args=["example crawl data"], queue='crawl_queue')
+    print(f"Crawl task submitted. Task ID: {crawl_result.id}")
+
+    # 提交到 convert_queue
+    convert_result = convert_task.apply_async(args=["example convert data"], queue='convert_queue')
+    print(f"Convert task submitted. Task ID: {convert_result.id}")
+
+if __name__ == "__main__":
+    main()

+ 10 - 15
tests/mytest/t.py

@@ -7,8 +7,11 @@ import requests
 from bs4 import BeautifulSoup
 import json
 from dotenv import load_dotenv
-from scrapegraphai.graphs import ScriptCreatorGraph
-from scrapegraphai.utils import prettify_exec_info
+import sys
+from pathlib import Path
+sys.path.append(Path(r'G:\code\upwork\zhang_crawl_bio'))
+os.environ['DB_URL'] = 'sqlite:///' + str(Path(r'G:\code\upwork\zhang_crawl_bio\output\search_results copy.db'))
+from worker.celery.client import get_uncompleted_keywords
 import yaml
 import socket
 
@@ -18,20 +21,12 @@ from mylib.logu import logger
 # load_dotenv()
 config_path = Path(f"config/pc_configs/{'pc1'}.yaml")
 def t_main():
-    
-    if config_path.exists():
-        with open(config_path) as f:
-            config_data = yaml.safe_load(f)
-            logger.info(f"Loaded browser config from {config_path}")
-            logger.info(f"{config_data}")
-    # 获取主机名
-    hostname = socket.gethostname() 
-    ip_address = socket.gethostbyname(hostname)
-
-    print("主机名:", hostname)
-    print("IP 地址:", ip_address)
+    res = get_uncompleted_keywords()
+    print(res)
+    print(len(res))
 async def main():
-    # t_main()
+    t_main()
+    return
     print(len(None))
     # s = '''python\nimport requests\nfrom bs4 import BeautifulSoup\nimport json\n\ndef main():\n    url = \"https://perinim.github.io/projects\"\n    response = requests.get(url)\n    soup = BeautifulSoup(response.content, 'html.parser')\n    \n    news_list = []\n    \n    for news in soup.find_all('div', class_='news-item'):\n        title = news.find('h2').text.strip()\n        description = news.find('p').text.strip()\n        news_list.append({\n            \"title\": title,\n            \"description\": description\n        })\n    \n    print(json.dumps(news_list, indent=4))\n\nif __name__ == \"__main__\":\n    main()\n'''
     # print(s)

+ 1 - 1
ui/backend/config.yaml

@@ -12,7 +12,7 @@ sub:
   proxies:
     9660:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9660.yaml
-      name: "\U0001F1ED\U0001F1F0\u9999\u6E2F2\u53F7"
+      name: "\U0001F1E9\U0001F1EA\u5FB7\u56FD\u6CD5\u5170\u514B\u798F"
       port: 9660
       startup: true
     9662:

+ 1 - 1
ui/backend/routers/proxy.py

@@ -132,7 +132,7 @@ async def get_proxies(port: int = None):
         return await get_proxy_response(port)
     else:
         ret = await get_all_proxy_response()
-        logger.info(f"{ret}")
+        logger.debug(f"{ret}")
         return ret
 
 class ProxyPost(BaseModel):

+ 60 - 7
ui/backend/routers/worker.py

@@ -1,3 +1,4 @@
+import subprocess
 from fastapi import APIRouter, HTTPException, Request, Depends
 from fastapi.responses import JSONResponse, StreamingResponse
 import httpx
@@ -9,7 +10,7 @@ from typing import Optional
 import pathlib
 import os
 from utils.config import WORKER_SERVICE_URL,config,Browser
-from src.services.celery_worker import CeleryWorker
+from src.services.celery_worker import celery_worker
 from utils.logu import logger
 
 router = APIRouter()
@@ -34,6 +35,20 @@ class ResponseStatus(BaseModel):
     browser_config: Optional[Browser] = None
     celery_status: Optional[Dict] = {}
     
+def transform_workers_data(workers_data: list) -> list:
+    """转换worker数据为前端需要的格式"""
+    return [
+        {
+            "hostname": worker.get("hostname"),
+            "worker-online": worker.get("worker-online"),
+            "pid": worker.get("pid"),
+            "active": worker.get("active"),
+            "processed": worker.get("processed"),
+            "status": worker.get("status")
+        }
+        for worker in workers_data
+    ]
+
 @router.post("/browser_config", tags=["worker"])
 async def update_browser_config(new_config: Browser):
     """更新浏览器配置(包含自动路径校验)"""
@@ -92,18 +107,56 @@ async def reset_browser_config():
     return {"status": "success", "browser_config": config.browser}
 
 @router.get("/status", tags=["worker"])
-async def status():
-    global config
+async def status() -> ResponseStatus:
+    global config,celery_worker
     try:
         health = await health_check()
     except Exception as e:
         logger.error(e)
         health = {"err": 1, "msg": str(e)}
-    celery_worker = CeleryWorker()
     celery_status = await celery_worker.check_worker_status()
-    """获取当前请求的端点"""
-    return ResponseStatus(endpoint=Endpoint(health=health), browser_config=config.browser, celery_status=celery_status)
+    
+    # 数据转换处理
+    if celery_status.get("workers") and celery_status["workers"].get("data"):
+        celery_status["workers"]["data"] = transform_workers_data(
+            celery_status["workers"]["data"]
+        )
+    
+    logger.info(f"Filtered celery_status: {celery_status}")
+    return ResponseStatus(
+        endpoint=Endpoint(health=health),
+        browser_config=config.browser,
+        celery_status=celery_status
+    )
 
+class StartupRequest(BaseModel):
+    worker_name: str
+    action: str
+    data: Optional[Dict] = {}
+
+@router.post("/ctrl", tags=["worker"])
+async def start_worker(request: StartupRequest):
+    global celery_worker
+    if request.action == "start":
+        await celery_worker.start_worker(request.worker_name)
+    elif request.action == "stop":
+        await celery_worker.stop_worker(request.worker_name)
+    elif request.action == "clean":
+        logger.info(f"clean {request.data}")
+    else:
+        raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}")
+    logger.info(f"{request.action} {request.worker_name}")
+    flower_workers = await celery_worker.check_worker_status()
+    
+    # 保持数据转换的一致性
+    if flower_workers.get("workers") and flower_workers["workers"].get("data"):
+        flower_workers["workers"]["data"] = transform_workers_data(
+            flower_workers["workers"]["data"]
+        )
+    
+    if flower_workers.get("err") == 1:
+        raise HTTPException(status_code=500, detail=f"Flower workers error: {flower_workers.get('msg')}")
+    return {"status": "success", "flower_workers": flower_workers}
 
 async def health_check():
     """健康检查"""
@@ -112,4 +165,4 @@ async def health_check():
         response.raise_for_status()
         ret = response.json()
         ret.update({"err": 0})
-        return ret
+        return ret

+ 66 - 8
ui/backend/src/services/celery_worker.py

@@ -1,8 +1,10 @@
 from pathlib import Path
+import subprocess
 import sys
-from typing import Dict,Any
+from typing import Dict,Any, List, Optional
 
 import httpx
+from pydantic import BaseModel,field_validator,Field
 import redis
 from src.services.subscription_manager import SubscriptionManager
 from utils.config import config,APP_PATH
@@ -11,12 +13,39 @@ from utils.process_mgr import process_manager
 import asyncio
 from utils.logu import get_logger,logger
 import os
+WORKER_DIR_BASE = APP_PATH.parent.parent
+py_client: Optional[Dict[str,Any]] = {
+    'search': WORKER_DIR_BASE / 'worker\celery\client.py' ,
+    'crawl': WORKER_DIR_BASE / 'worker\celery\crawl_client.py',
+    'convert': WORKER_DIR_BASE / 'worker\celery\html_convert_tasks.py'
+}
 
 
+class WorkerModel(BaseModel):
+    name: str
+    queue_name: Optional[str] = Field(default=None,validate_default=True) 
+    cmd: Optional[List[str]] = None
+    pid: Optional[int] = None
+
+    @field_validator("queue_name", mode="after")
+    @classmethod
+    def set_queue_name(cls, v: Optional[str], values) -> str:
+        if v is None:
+            if "name" not in values.data:
+                raise ValueError("name 字段缺失,无法生成 queue_name")
+            return f"{values.data['name']}_queue"
+        return v    
+    
+
 class CeleryWorker:
+    def __init__(self, python_exe: str=sys.executable):
+        self.workers_model: Dict[str, WorkerModel] = {}
+        for worker_name in py_client.keys():
+            model = WorkerModel(name=worker_name)
+            model.cmd = [python_exe, '-m', 'celery', '-A', 'worker.celery.app', 'worker', '-Q',model.queue_name, f'--hostname={worker_name}@%h']
+            self.workers_model[worker_name] = model
     async def run(self):
         python_exe = sys.executable
-        WORKER_DIR_BASE = APP_PATH.parent.parent
         logger.info(f"{WORKER_DIR_BASE}")
         # return
         redis_cmd = [config.redis_exe]
@@ -33,12 +62,40 @@ class CeleryWorker:
         await process_manager.start_process("flower", flower_cmd, cwd=WORKER_DIR_BASE)
         proces = process_manager.processes.get("flower").get('process')
 
-        search_worker_name = 'search'
-        crawl_worker_name = 'crawl'
-        convert_worker_name = 'convert'
-        worker_list = [search_worker_name, crawl_worker_name, convert_worker_name]
-        for worker_name in worker_list:
-            await process_manager.start_process(f"{worker_name}_worker", [python_exe, '-m', 'celery', '-A', 'worker.celery.app', 'worker', '-Q',f'{worker_name}_queue', f'--hostname={worker_name}@%h'], cwd=WORKER_DIR_BASE)
+    async def start_all_workers(self):
+        for worker_name,worker_model in self.workers_model.items():
+            pid = await process_manager.start_process(worker_model.name, worker_model.cmd, cwd=WORKER_DIR_BASE)
+            worker_model.pid = pid
+    async def start_worker(self, name: str, in_cmd_windows: bool = False) -> WorkerModel:
+        if name in process_manager.processes:
+            return True
+        worker_model = self.workers_model.get(name)
+        if not worker_model:
+            raise ValueError(f"Invalid worker name: {name}")
+        if in_cmd_windows:
+            cmd = ['start','cmd', '/k' ]
+            cmd.extend(worker_model.cmd)
+            logger.info(f"run {' '.join(cmd)}")
+            process = subprocess.Popen(cmd, shell=True)
+            worker_model.pid = process.pid
+        else:
+            worker_model.pid = await process_manager.start_process(name, worker_model.cmd, cwd=WORKER_DIR_BASE)
+        return worker_model
+    
+    async def stop_worker(self, name: str):
+        worker_model = self.workers_model.get(name)
+        if not worker_model:
+            raise ValueError(f"Invalid worker name: {name}")
+        await process_manager.stop_process(worker_model.name)
+        worker_model.pid = None
+        return worker_model
+
+    async def clean_worker_queue(self, name: str):
+        worker_model = self.workers_model.get(name)
+        if not worker_model:
+            raise ValueError(f"Invalid worker name: {name}")
+        queue_name = worker_model.queue_name
+        return subprocess.run([sys.executable, "-m", "celery", "-A", "worker.celery.app", "purge", "-Q", queue_name])
 
     async def check_worker_status(self) -> Dict[str, Any]:
         flower_url = "http://127.0.0.1:5555/workers?json=1"
@@ -69,3 +126,4 @@ class CeleryWorker:
         except Exception as e:
             return {"err": 1, "msg": f"Failed to connect to Redis: {e}"}
         
+celery_worker = CeleryWorker()

+ 31 - 0
ui/backend/tests/mytests/t.py

@@ -0,0 +1,31 @@
+from typing import List, Optional
+from pydantic import BaseModel, Field, field_validator
+
+class WorkerModel(BaseModel):
+    name: str
+    queue_name: Optional[str] = Field(
+        default=None,
+        validate_default=True  # 关键配置:启用默认值验证
+    )
+    cmd: List[str]
+    pid: Optional[int] = None
+
+    @field_validator("queue_name", mode="after")
+    @classmethod
+    def set_queue_name(cls, v: Optional[str], values) -> str:
+        if v is None:
+            # 此时能安全访问已验证的name字段
+            return f"{values.data['name']}_queue"
+        return v
+
+# 测试用例
+if __name__ == "__main__":
+    worker1 = WorkerModel(name="worker1", cmd=["python", "app.py"])
+    print(worker1.queue_name)  # 输出: worker1_queue
+
+    worker2 = WorkerModel(
+        name="worker2", 
+        queue_name="custom_queue", 
+        cmd=["python", "app.py"]
+    )
+    print(worker2.queue_name)  # 输出: custom_queue

+ 37 - 0
ui/backend/tests/mytests/t_cmd.py

@@ -0,0 +1,37 @@
+import asyncio
+import sys
+import time
+import os
+from pathlib import Path
+sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend')))
+from src.services.celery_worker import process_manager,celery_worker,WorkerModel
+import subprocess
+async def main():
+        # 检查系统 PATH
+    system_path = os.environ.get("PATH", "")
+    pyexe = sys.executable
+    file = r'G:\code\upwork\zhang_crawl_bio\worker\celery\client.py'
+    command = f"{pyexe} {file}"
+        # 自动生成queue_name
+    worker1 = WorkerModel(name="worker1", cmd=["python", "app.py"])
+    print(worker1.queue_name)  # 输出: worker1_queue
+
+    # 显式指定queue_name
+    worker2 = WorkerModel(name="worker2", queue_name="custom_queue", cmd=["python", "app.py"])
+    print(worker2.queue_name)  # 输出: custom_queue
+    return
+    # startupinfo = subprocess.STARTUPINFO()
+    # startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
+    # process = subprocess.Popen(['cmd', '/k', command ], shell=True,startupinfo=startupinfo, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
+    # process = subprocess.Popen(['start','cmd', '/k', command ], shell=True)
+    # res = process.pid
+    res = await celery_worker.start_worker('search')
+    res = await celery_worker.start_worker('crawl')
+    res = await celery_worker.start_worker('convert')
+    print(res)  
+    input('Press Enter to exit...')
+    await celery_worker.start_worker('crawl')
+    # `/k` 表示执行后保持窗口,`/c` 则表示执行后关闭
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 135 - 0
ui/docs/gpt/architecture.md

@@ -0,0 +1,135 @@
+# 系统架构文档
+
+## 组件概览
+
+![系统架构图](diagrams/architecture.drawio.png)
+
+```mermaid
+graph TD
+    A[前端界面] -->|HTTP API| B(FastAPI后端)
+    B -->|Redis| C[代理池管理]
+    B -->|Celery| D[Worker集群]
+    C --> E[本地代理实例]
+    D --> F[分布式任务]
+```
+
+## 核心组件说明
+
+### 1. 前端界面 (Vue3 + Element-Plus)
+- 采用组合式API+语法糖开发
+- 通过FastAPI挂载静态文件
+- 主要功能:
+  - 代理池状态监控
+  - Worker任务管理
+  - 浏览器配置管理
+  - 系统代理设置
+
+### 2. FastAPI后端
+```mermaid
+classDiagram
+    class SubscriptionManager{
+        +redis_client: Redis
+        +list_proxies_mgr: Dict[int, ProxyManager]
+        +download_subscription()
+        +create_custom_config()
+        +start_proxy()
+        +ping_proxies()
+    }
+    class ProxyManager{
+        +mimo_exe: str
+        +config_path: str
+        +start_proxy()
+        +stop_proxy()
+        +ping_proxies()
+    }
+    class ProcessManager{
+        +processes: dict
+        +start_process()
+        +stop_process()
+    }
+```
+
+### 3. 代理池管理系统
+- Redis存储结构:
+  ```python
+  # 代理池集合(自动去重)
+  redis_client.sadd('proxy_pool', *healthy_proxies)
+  # 自动过期机制
+  redis_client.expire('proxy_pool', interval*2)
+  ```
+- 健康检查机制:
+  - 定时任务(默认80秒)
+  - 端口可达性检测
+  - 代理质量评估(延迟测试)
+
+### 4. Worker管理系统
+- 基于Celery的分布式任务系统
+- 任务类型:
+  - 网页爬取任务
+  - 数据处理任务
+  - 浏览器自动化任务
+- 通信机制:
+  ```python
+  # 任务调用示例
+  celery_app.send_task('search_worker.add', args=[2, 2])
+  ```
+
+## 数据流设计
+
+```mermaid
+sequenceDiagram
+    前端->>+后端: /api/proxy/proxies (GET)
+    后端->>+Redis: SMEMBERS proxy_pool
+    Redis-->>-后端: 代理列表
+    后端-->>-前端: 返回代理状态
+    
+    前端->>后端: /api/worker/browser_config (POST)
+    后端->>Celery: 发送配置更新任务
+    Celery->>Worker: 应用新配置
+    Worker-->>Celery: 确认结果
+    Celery-->>后端: 任务状态
+    后端-->>前端: 操作结果
+```
+
+## 核心特性
+
+1. **代理管理**
+- 自动订阅更新
+- 多实例隔离(端口分配)
+- 智能健康检查
+- 流量控制策略
+
+2. **Worker管理**
+- 分布式任务调度
+- 浏览器配置热更新
+- 任务状态监控
+- 异常自动恢复
+
+3. **系统集成**
+- 基于Redis的代理池共享
+- 跨进程管理(ProcessManager)
+- Windows系统代理集成
+- 日志统一管理
+
+## 部署架构
+
+```
+单机部署(当前):
+前端 + 后端 + Redis + Celery Worker
+
+未来扩展:
+           [Redis]
+            /   \
+[管理节点]       [Worker节点1]
+ 前端+后端       [Worker节点2]
+           [Worker节点...]
+```
+
+## 性能指标
+
+| 项目 | 单机规格 | 扩展目标 |
+|------|---------|---------|
+| 代理实例 | 5-10个 | 无上限 |
+| 任务吞吐 | 100任务/分钟 | 500+任务/分钟 |
+| 延迟 | <200ms | <100ms |
+| 可用性 | 99% | 99.9% |

+ 37 - 0
ui/fontend/src/api-types.ts

@@ -20,3 +20,40 @@ export interface ProxyPostResponse {
     msg: string
     data: ProxyResponse
 }
+// 数据模型
+export interface WorkerEndpoint {
+    serviceUrl: string
+    health: {
+      err: number
+      msg: string
+      db_url?: string
+      google_search_dir?: string
+    }
+    browserConfig?: {
+      exe_path: string
+      no_imgs: boolean
+    }
+  }
+  
+export interface TaskStats {
+    total_tasks: number | null
+    completed_tasks: number | null 
+    pending_tasks: number | null
+  }
+  
+export interface UploadResult {
+    name: string
+    size: number
+    uploadTime: string
+    total_keywords?: number
+    inserted_count?: number
+    success?: boolean
+    error?: string
+  }
+  
+export interface  CeleryStatusResponse {
+    err: number | null,
+    msg: string | null,
+    workers: JSON | null,
+    redis: number | null
+  }

+ 3 - 0
ui/fontend/src/components.d.ts

@@ -10,6 +10,8 @@ declare module 'vue' {
   export interface GlobalComponents {
     ElAside: typeof import('element-plus/es')['ElAside']
     ElButton: typeof import('element-plus/es')['ElButton']
+    ElCard: typeof import('element-plus/es')['ElCard']
+    ElCol: typeof import('element-plus/es')['ElCol']
     ElContainer: typeof import('element-plus/es')['ElContainer']
     ElDescriptions: typeof import('element-plus/es')['ElDescriptions']
     ElDescriptionsItem: typeof import('element-plus/es')['ElDescriptionsItem']
@@ -36,6 +38,7 @@ declare module 'vue' {
     ProxyPool: typeof import('./components/ProxyPool.vue')['default']
     RouterLink: typeof import('vue-router')['RouterLink']
     RouterView: typeof import('vue-router')['RouterView']
+    WorkerCtrl: typeof import('./components/WorkerCtrl.vue')['default']
   }
   export interface ComponentCustomProperties {
     vLoading: typeof import('element-plus/es')['ElLoadingDirective']

+ 14 - 35
ui/fontend/src/components/Home.vue

@@ -16,9 +16,6 @@
           {{ isWorkerAvailable ? '导入关键词' : '服务不可用' }}
         </el-button>
       </el-upload>
-      <el-button
-        type="primary"
-      >开始运行</el-button>
     </el-row>
 
     <!-- 显示上传结果 -->
@@ -126,44 +123,19 @@
         </span>
       </template>
     </el-dialog>
+    <WorkerCtrl />
+    
   </div>
+
 </template>
 
 <script setup lang="ts">
-import { ref, onMounted } from 'vue'
+import { provide, ref, onMounted } from 'vue'
 import { ElMessage } from 'element-plus'
 import { useProxyStore } from '../stores/proxyStore'
 
-// 数据模型
-interface WorkerEndpoint {
-  serviceUrl: string
-  health: {
-    err: number
-    msg: string
-    db_url?: string
-    google_search_dir?: string
-  }
-  browserConfig?: {
-    exe_path: string
-    no_imgs: boolean
-  }
-}
-
-interface TaskStats {
-  total_tasks: number | null
-  completed_tasks: number | null 
-  pending_tasks: number | null
-}
-
-interface UploadResult {
-  name: string
-  size: number
-  uploadTime: string
-  total_keywords?: number
-  inserted_count?: number
-  success?: boolean
-  error?: string
-}
+import WorkerCtrl from './WorkerCtrl.vue'
+import type { CeleryStatusResponse, WorkerEndpoint } from '../api-types'
 
 // 响应式状态
 const backendBaseUrl = import.meta.env.VITE_API_BASE_URL || ''
@@ -172,7 +144,12 @@ const workerEndpoint = ref<WorkerEndpoint>({
   serviceUrl: '',
   health: { err: 1, msg: '正在初始化...' }
 })
-
+const celeryStatus = ref<CeleryStatusResponse>({
+  err: null,
+  msg: null,
+  workers: null,
+  redis: null
+})
 const isWorkerAvailable = computed(() => workerEndpoint.value.health?.err === 0)
 const taskStats = ref<{
   total_tasks: number | null;
@@ -220,6 +197,7 @@ const fetchWorkerEndpoint = async () => {
     if (!response.ok) throw new Error('获取 worker 地址失败')
     
     const data = await response.json()
+    celeryStatus.value = data.celery_status
     workerEndpoint.value = {
       serviceUrl: data.endpoint.service_url,
       health: data.endpoint.health,
@@ -361,6 +339,7 @@ const handleUpload = async (options: any) => {
     }
   } 
 }
+provide('celeryStatus', celeryStatus)
 </script>
 
 <style scoped>

+ 2 - 2
ui/fontend/src/components/ProxyPool.vue

@@ -69,9 +69,9 @@
   </el-descriptions>
 </el-dialog>
     </div>
-  </template>
+</template>
   
-<script setup lang="ts">
+<script lang="ts" setup>
 import { ref } from 'vue'
 import type { ProxyResponse, ProxyPostResponse } from '../api-types'
 import { useProxyStore } from '../stores/proxyStore'

+ 172 - 0
ui/fontend/src/components/WorkerCtrl.vue

@@ -0,0 +1,172 @@
+<template>
+    <div>
+        <el-row :gutter="16" justify="center">
+        <el-col :span="8">
+          <el-card shadow="hover">
+            <div class="task-container">
+              <span class="task-label">浏览器搜索任务:</span>
+              <el-button type="primary" class="task-button"
+                :disabled="workerStatus.search || loadingStates.search"
+                :loading="loadingStates.search"
+                @click="sendRequest('search', 'start')">开始运行</el-button>
+              <el-button type="primary" class="task-button"
+                :disabled="!workerStatus.search || loadingStates.search"
+                :loading="loadingStates.search"
+                @click="sendRequest('search', 'stop')">停止任务</el-button>
+            </div>
+          </el-card>
+        </el-col>
+    
+        <el-col :span="8">
+          <el-card shadow="hover">
+            <div class="task-container">
+              <span class="task-label">提取搜索结果页:</span>
+              <el-button type="primary" class="task-button"
+                :disabled="workerStatus.crawl || loadingStates.crawl"
+                :loading="loadingStates.crawl"
+                @click="sendRequest('crawl', 'start')">开始运行</el-button>
+              <el-button type="primary" class="task-button"
+                :disabled="!workerStatus.crawl || loadingStates.crawl"
+                :loading="loadingStates.crawl"
+                @click="sendRequest('crawl', 'stop')">停止任务</el-button>
+            </div>
+          </el-card>
+        </el-col>
+    
+        <el-col :span="8">
+          <el-card shadow="hover">
+            <div class="task-container">
+              <span class="task-label">将结果页转换成文档:</span>
+              <el-button type="primary" class="task-button"
+                :disabled="workerStatus.convert || loadingStates.convert"
+                :loading="loadingStates.convert"
+                @click="sendRequest('convert', 'start')">开始运行</el-button>
+              <el-button type="primary" class="task-button"
+                :disabled="!workerStatus.convert || loadingStates.convert"
+                :loading="loadingStates.convert"
+                @click="sendRequest('convert', 'stop')">停止任务</el-button>
+            </div>
+          </el-card>
+        </el-col>
+      </el-row>
+    </div>
+</template>
+
+<script lang="ts" setup>
+import { ref, computed, onMounted, onUnmounted } from 'vue'
+import { ElMessage } from 'element-plus'
+
+const backendBaseUrl = import.meta.env.VITE_API_BASE_URL || ''
+const workers = ref<Array<any>>([])
+const loadingStates = ref({
+  search: false,
+  crawl: false,
+  convert: false
+})
+let pollTimer: number | null = null
+
+const workerStatus = computed(() => {
+  const status: Record<string, boolean> = {
+    search: false,
+    crawl: false,
+    convert: false
+  }
+
+  workers.value.forEach((worker: any) => {
+    if (!worker?.hostname) return
+    
+    // 修正:从hostname中正确提取worker类型(如search、crawl、convert)
+    const [hostnamePrefix] = worker.hostname.split('@')
+    const workerType = hostnamePrefix.split('_')[0] // 假设hostname格式为"search_worker@..."或"crawl@..."
+    
+    if (workerType in status) {
+      status[workerType] = worker['status'] === true
+    }
+    console.log('Worker状态:', workerType, status[workerType])
+  })
+
+  return status
+})
+
+async function fetchWorkerStatus() {
+  try {
+    const response = await fetch(`${backendBaseUrl}/worker/status`)
+    if (!response.ok) throw new Error('请求失败')
+    const data = await response.json()
+  workers.value = data.celery_status?.workers?.data || []
+  console.log('Worker 状态信息:', workers.value)
+  } catch (error) {
+    ElMessage.error('获取Worker状态失败')
+    console.error('Fetch worker status error:', error)
+  }
+}
+
+function startPolling(interval = 5000) {
+  if (!pollTimer) {
+    pollTimer = setInterval(fetchWorkerStatus, interval)
+    fetchWorkerStatus()
+  }
+}
+
+function stopPolling() {
+  if (pollTimer) {
+    clearInterval(pollTimer)
+    pollTimer = null
+  }
+}
+
+onMounted(() => startPolling())
+onUnmounted(() => stopPolling())
+
+const sendRequest = async (workerName: string, action: string) => {
+  const loadingKey = workerName as keyof typeof loadingStates.value
+  loadingStates.value[loadingKey] = true
+
+  try {
+    const response = await fetch(`${backendBaseUrl}/worker/ctrl`, {
+      method: 'POST',
+      headers: {
+        'Content-Type': 'application/json',
+      },
+      body: JSON.stringify({
+        worker_name: workerName,
+        action: action,
+        data: {}
+      })
+    })
+
+    if (!response.ok) {
+      const errorData = await response.json()
+      throw new Error(errorData.detail || '请求失败')
+    }
+
+    await fetchWorkerStatus()
+    ElMessage.success(`${action === 'start' ? '启动' : '停止'}${workerName}成功`)
+  } catch (error) {
+    ElMessage.error(`操作失败: ${error instanceof Error ? error.message : '未知错误'}`)
+    console.error('API请求错误:', error)
+  } finally {
+    loadingStates.value[loadingKey] = false
+    await fetchWorkerStatus()
+  }
+}
+</script>
+
+<style lang="css" scoped>
+.task-container {
+    display: flex;
+    flex-direction: column;
+    gap: 12px;
+    padding: 16px;
+}
+
+.task-label {
+    margin-bottom: 8px;
+    font-weight: 500;
+}
+
+.task-button {
+    margin-top: 8px;
+    width: 100%;
+}
+</style>

+ 8 - 2
worker/api/search_cli.py

@@ -9,6 +9,7 @@ from mylib.drission_page import load_chrome_from_ini
 from mylib.logu import logger
 from worker.celery.app import app as celery_app
 from worker.celery.crawl_client import submit_page_crawl_tasks
+from worker.celery.client import get_uncompleted_keywords
 
 
 app = APIRouter()
@@ -24,7 +25,7 @@ class SearchRequest(BaseModel):
     skip_existing: Optional[bool] = True
     browser_config: Optional[Dict] = {}
     proxy_pool: Optional[List[str]] = None
-
+    from_db: Optional[bool] = True
 
 class CrawlKeywordsRequest(BaseModel):
     keywords: List[str]
@@ -39,11 +40,16 @@ class BrowserTestRequest(BaseModel):
     browser_config: Dict = {}
     init_url: str = "https://www.google.com"
 
+
 @app.post("/search", summary="执行Google搜索")
 def search(request: SearchRequest) -> List[TaskResponse]:
     """提交所有关键词任务"""
     responses = []
-    for keyword in request.keywords:
+    if request.from_db:
+        keywords = get_uncompleted_keywords()
+    else:
+        keywords = request.keywords
+    for keyword in keywords:
         try:
             result = celery_app.send_task('search_worker.drission_search', kwargs=request.model_dump())
             logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")

+ 17 - 1
worker/celery/client.py

@@ -7,6 +7,8 @@ from typing import List, Dict, Optional
 import sys
 from mylib.logu import logger
 import argparse
+from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
+from sqlmodel import select, Session, exists, distinct
 
 def read_keywords_from_file(file_path: Path) -> List[str]:
     """读取文件第0列第一行到末尾的内容"""
@@ -25,6 +27,20 @@ def read_keywords_from_file(file_path: Path) -> List[str]:
         logger.error(f"读取文件失败: {str(e)}")
         raise
 
+def get_uncompleted_keywords() -> list[str]:
+    """从数据库获取已完成搜索但未完成爬取的关键词"""
+    manager = SearchResultManager()
+    with Session(manager.engine) as session:
+        # 使用JOIN优化查询,避免子查询
+        query = (
+            select(distinct(KeywordTask.keyword))
+            .where(KeywordTask.is_completed != True)
+        )
+        
+        # 使用stream_results=True来优化内存使用
+        keywords = session.exec(query).all()
+        return keywords
+
 def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None):
     """提交所有关键词任务"""
     for keyword in keywords:
@@ -35,7 +51,7 @@ def submit_tasks(keywords: List[str], browser_config: Optional[Dict] = None):
                 'skip_existing': True,
                 'browser_config': browser_config or {}
             }
-            result = app.send_task('search_worker.drission_search', kwargs=task_data)
+            result = app.send_task('search_worker.drission_search', kwargs=task_data, queue='search_queue')
             logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
         except Exception as e:
             logger.error(f"提交任务失败 [{keyword}]: {str(e)}")