Browse Source

删除 API 代码,准备改为 celery 的方式

mrh 10 months ago
parent
commit
2b701092ff

+ 2 - 35
worker/ARCHITECTURE.md

@@ -58,40 +58,6 @@ graph TB
     T --> C[关闭浏览器]
 ```
 
-### 任务处理流程
-1. 任务分配机制
-```python
-# api_server.py
-system_state.task_queue.append(TaskAssignment(...))
-```
-2. 任务执行逻辑
-```python
-# worker_server.py
-result = await search_handler.process_keyword(keyword)
-await manager.save_page_results(...)
-```
-
-## 核心组件接口
-
-### Master服务端点
-| 端点                      | 方法 | 功能描述                 |
-|--------------------------|------|--------------------------|
-| /api/workers/register    | POST | Worker注册               |
-| /api/workers/{id}/heartbeat | POST | 心跳状态上报             |
-| /api/tasks/assign        | POST | 任务分配                 |
-| /api/tasks/{id}/status   | GET  | 任务状态查询             |
-
-### Worker核心能力
-```python
-# worker_server.py 关键功能
-class GoogleSearchHandler:
-    async def process_keyword(self, keyword: str) -> SearchResult
-
-class BrowserCore:
-    async def get_instance(config: BrowserConfig) -> "BrowserCore"
-    async def close()
-```
-
 ## 数据流向
 ```mermaid
 graph LR
@@ -106,7 +72,8 @@ graph LR
 ```
                     ┌──────────────┐
                     │   Master     │
-                    │ (API Server) │
+                │               │
+ │
                     └──────┬───────┘
          ┌─────────────────┼─────────────────┐

+ 0 - 47
worker/api_schemas.py

@@ -1,47 +0,0 @@
-from pydantic import BaseModel
-from typing import Optional
-from datetime import datetime
-from enum import Enum
-
-class TaskStatus(str, Enum):
-    QUEUED = "queued"
-    IN_PROGRESS = "in_progress"
-    COMPLETED = "completed"
-    FAILED = "failed"
-
-class TaskStatusResponse(BaseModel):
-    task_id: str
-    keyword: str
-    status: TaskStatus
-    assigned_worker: Optional[str] = None
-    completed_pages: Optional[int] = 0
-    total_results: Optional[int] = 0
-    error_message: Optional[str] = None
-
-class HealthCheckResponse(BaseModel):
-    status: str
-    browser_connected: bool
-    queue_size: int
-    workers_online: int
-    active_tasks: int
-
-class WorkerRegistration(BaseModel):
-    host: str
-    port: int
-    capacity: int
-    capabilities: list[str] = ["web_search"]
-
-class WorkerHeartbeat(BaseModel):
-    current_load: int
-    cpu_usage: Optional[float] = None
-    memory_usage: Optional[float] = None
-    last_ping: datetime
-
-class TaskAssignment(BaseModel):
-    task_id: str
-    keyword: str
-    priority: int = 1
-
-class TaskRequest(BaseModel):
-    keyword: str
-    priority: Optional[int] = 1

+ 0 - 103
worker/api_server.py

@@ -1,103 +0,0 @@
-from contextlib import asynccontextmanager
-import os
-from fastapi import FastAPI, BackgroundTasks, HTTPException
-from datetime import datetime, timedelta
-import asyncio
-from typing import Deque, Dict, Optional
-from collections import deque
-from pydantic import BaseModel
-import uvicorn
-import httpx
-from sqlmodel import SQLModel
-from worker.api_schemas import *
-from database.sqlite_engine import create_db_and_tables
-from mylib.logu import get_logger
-logger = get_logger("master_server")
-logger.info(f"http_proxy: {os.environ.get('HTTP_PROXY')}")
-logger.info(f"HTTPS_PROXY: {os.environ.get('HTTPS_PROXY')}")
-
-class SystemState:
-    def __init__(self):
-        self.registered_workers: Dict[str, dict] = {}
-        self.task_queue: Deque[TaskAssignment] = deque()
-        self.active_tasks: Dict[str, dict] = {}
-        self.task_registry: Dict[str, dict] = {}
-
-system_state = SystemState()
-
-@asynccontextmanager
-async def lifespan(app: FastAPI):
-    """生命周期管理:仅初始化数据库"""
-    try:
-        create_db_and_tables()
-        asyncio.create_task(monitor_workers())
-        logger.info("Master service initialized")
-        yield
-        
-    finally:
-        logger.info("Master service shutdown")
-
-app = FastAPI(lifespan=lifespan)
-
-@app.post("/api/workers/register")
-async def register_worker(worker: WorkerRegistration):
-    logger.info(f"Worker registered: {worker.model_dump_json(indent=4)}")
-    worker_id = f"worker-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(system_state.registered_workers)+1}"
-    system_state.registered_workers[worker_id] = {
-        "last_heartbeat": datetime.now(),
-        "metadata": worker.model_dump(),
-        "current_load": 0
-    }
-    return {"worker_id": worker_id}
-
-@app.post("/api/workers/{worker_id}/heartbeat")
-async def receive_heartbeat(worker_id: str, heartbeat: WorkerHeartbeat):
-    if worker_id not in system_state.registered_workers:
-        raise HTTPException(status_code=404, detail="Worker not found")
-    
-    # 更新最后心跳时间和负载数据
-    system_state.registered_workers[worker_id]["last_heartbeat"] = heartbeat.last_ping
-    system_state.registered_workers[worker_id]["current_load"] = heartbeat.current_load
-    return {"status": "acknowledged"}
-
-@app.get("/api/workers/health")
-async def health_check():
-    online_workers = [
-        wid for wid, w in system_state.registered_workers.items()
-        if datetime.now() - w["last_heartbeat"] < timedelta(minutes=3)
-    ]
-    return {"online_workers": len(online_workers)}
-
-@app.post("/api/tasks/assign")
-async def assign_task(task: TaskRequest):
-    new_task = TaskAssignment(
-        task_id=f"task-{datetime.now().timestamp()}",
-        keyword=task.keyword,
-        priority=task.priority
-    )
-    system_state.task_queue.append(new_task)
-    system_state.task_registry[new_task.task_id] = new_task.dict()
-    return {"task_id": new_task.task_id}
-
-@app.get("/api/tasks/{task_id}/status")
-async def get_task_status(task_id: str):
-    task = system_state.task_registry.get(task_id)
-    if not task:
-        raise HTTPException(status_code=404, detail="Task not found")
-    return task
-
-async def monitor_workers():
-    while True:
-        await asyncio.sleep(60)
-        now = datetime.now()
-        expired_workers = [
-            wid for wid, w in system_state.registered_workers.items()
-            if now - w["last_heartbeat"] > timedelta(minutes=3)
-        ]
-        for wid in expired_workers:
-            del system_state.registered_workers[wid]
-            logger.warning(f"Removed expired worker: {wid}")
-
-
-if __name__ == "__main__":
-    uvicorn.run(app, host="localhost", port=9300, proxy_headers=False)

+ 1 - 1
worker/search_engine/google_search.py

@@ -11,7 +11,7 @@ from playwright.sync_api import sync_playwright
 from mylib.logu import logger
 from mylib.base import save_to_file
 from config.settings import OUTPUT_DIR
-from database.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
+from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
 
 class SearchResult(BaseModel):
     total_count: int = 0

+ 0 - 0
database/search_result_db.py → worker/search_engine/search_result_db.py


+ 0 - 137
worker/worker_server.py

@@ -1,137 +0,0 @@
-import os
-import httpx
-import asyncio
-from fastapi import FastAPI
-from contextlib import asynccontextmanager
-from datetime import datetime
-from typing import Optional
-from pydantic import BaseModel
-from dotenv import load_dotenv
-from worker.search_engine.camoufox_broswer import BrowserCore, BrowserConfig
-from worker.search_engine.google_search import GoogleSearchHandler
-from database.search_result_db import SearchResultManager
-from mylib.logu import get_logger
-from worker.api_schemas import WorkerRegistration, WorkerHeartbeat
-
-logger = get_logger("worker_server")
-load_dotenv()
-logger.info(f"http_proxy: {os.environ.get('HTTP_PROXY')}")
-logger.info(f"HTTPS_PROXY: {os.environ.get('HTTPS_PROXY')}")
-
-class WorkerConfig(BaseModel):
-    master_url: str = os.getenv("MASTER_URL", "http://localhost:9300")
-    heartbeat_interval: int = 30
-    max_retries: int = 5
-    browser_headless: bool = True
-    host: str = os.getenv("WORKER_HOST", "0.0.0.0")
-    port: int = int(os.getenv("WORKER_PORT", 8001))
-    capacity: int = int(os.getenv("WORKER_CAPACITY", 1))
-
-@asynccontextmanager
-async def lifespan(app: FastAPI):
-    """生命周期管理:仅初始化数据库"""
-    try:
-        global browser_core, search_handler
-        browser_config = BrowserConfig(
-            headless=config.browser_headless,
-            proxy=None
-        )
-        browser_core = await BrowserCore.get_instance(browser_config)
-        search_handler = GoogleSearchHandler(browser_core.page)
-        
-        await register_worker()
-        asyncio.create_task(heartbeat_loop())
-        
-        yield
-        
-        if browser_core:
-            await browser_core.close()
-        logger.info("Worker shutdown completed")
-        
-    finally:
-        logger.info("Master service shutdown")
-
-app = FastAPI(lifespan=lifespan)
-config = WorkerConfig()
-worker_id: Optional[str] = None
-current_load: int = 0
-browser_core: Optional[BrowserCore] = None
-search_handler: Optional[GoogleSearchHandler] = None
-
-async def register_worker():
-    global worker_id
-    async with httpx.AsyncClient() as client:
-        try:
-            registration_data = WorkerRegistration(
-                host=config.host,
-                port=config.port,
-                capacity=config.capacity,
-                capabilities=["web_search"]
-            ).model_dump()
-            
-            logger.info(f"Registering to {config.master_url}")
-            resp = await client.post(
-                f"{config.master_url}/api/workers/register",
-                json=registration_data
-            )
-            logger.info(f"Registration response: {resp.json()}")
-            worker_id = resp.json()["worker_id"]
-            logger.success(f"Registered worker ID: {worker_id}")
-        except Exception as e:
-            logger.error(f"Registration failed: {str(e)}")
-            raise
-
-async def send_heartbeat():
-    async with httpx.AsyncClient() as client:
-        try:
-            heartbeat_data = WorkerHeartbeat(
-                current_load=current_load,
-                cpu_usage=None,  # 暂时保留为None,后续可以添加监控数据
-                memory_usage=None,
-                last_ping=datetime.now()
-            ).model_dump(mode="json")  # 使用mode="json"来序列化datetime
-            
-            await client.post(
-                f"{config.master_url}/api/workers/{worker_id}/heartbeat",
-                json=heartbeat_data
-            )
-        except Exception as e:
-            logger.error(f"Heartbeat failed: {str(e)}")
-
-async def heartbeat_loop():
-    while True:
-        await send_heartbeat()
-        await asyncio.sleep(config.heartbeat_interval)
-
-@app.get("/health")
-async def health_check():
-    return {
-        "status": "healthy",
-        "browser_connected": browser_core is not None,
-        "current_load": current_load
-    }
-
-@app.post("/tasks")
-async def handle_task(task: dict):
-    global current_load
-    current_load += 1
-    try:
-        logger.info(f"Processing task: {task['keyword']}")
-        
-        result = await search_handler.process_keyword(task['keyword'])
-        
-        manager = SearchResultManager()
-        await manager.save_page_results(
-            keyword=task['keyword'],
-            page_number=result.current_page,
-            results_count=len(result.items),
-            has_next_page=result.has_next_page
-        )
-        
-        return {"status": "success", "processed_pages": result.current_page}
-    finally:
-        current_load -= 1
-
-if __name__ == "__main__":
-    import uvicorn
-    uvicorn.run(app, host=config.host, port=config.port)