Explorar el Código

完成 worker 微服务的路由、服务发现、健康检查;完成前后端文件上传和结果展示;

mrh hace 9 meses
padre
commit
5b963cdf68

+ 0 - 2
.gitignore

@@ -14,5 +14,3 @@ crawl_env
 *.zip
 *.tar.gz
 .env
-.env
-.env

+ 1 - 1
config/settings.py

@@ -12,7 +12,7 @@ LOG_LEVEL='info'
 LOG_DIR = OUTPUT_DIR / "logs"
 
 # DB_URL = f"sqlite:///{OUTPUT_DIR}/search_results.db"
-DB_URL = f"sqlite:///{OUTPUT_DIR}/temp.db"
+DB_URL = os.environ.get('DB_URL') or f"sqlite:///{OUTPUT_DIR}/temp.db"
 
 HTTP_PROXY='http://127.0.0.1:1881'
 HTTPS_PROXY='http://127.0.0.1:1881'

+ 176 - 1
poetry.lock

@@ -264,6 +264,18 @@ files = [
     {file = "billiard-4.2.1.tar.gz", hash = "sha256:12b641b0c539073fc8d3f5b8b7be998956665c4233c7c1fcd66a7e677c4fb36f"},
 ]
 
+[[package]]
+name = "bottle"
+version = "0.13.2"
+description = "Fast and simple WSGI-framework for small web-applications."
+optional = false
+python-versions = "*"
+groups = ["main"]
+files = [
+    {file = "bottle-0.13.2-py2.py3-none-any.whl", hash = "sha256:27569ab8d1332fbba3e400b3baab2227ab4efb4882ff147af05a7c00ed73409c"},
+    {file = "bottle-0.13.2.tar.gz", hash = "sha256:e53803b9d298c7d343d00ba7d27b0059415f04b9f6f40b8d58b5bf914ba9d348"},
+]
+
 [[package]]
 name = "brotli"
 version = "1.1.0"
@@ -810,6 +822,22 @@ prompt-toolkit = ">=3.0.36"
 [package.extras]
 testing = ["pytest (>=7.2.1)", "pytest-cov (>=4.0.0)", "tox (>=4.4.3)"]
 
+[[package]]
+name = "clr-loader"
+version = "0.2.7.post0"
+description = "Generic pure Python loader for .NET runtimes"
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+markers = "sys_platform == \"win32\""
+files = [
+    {file = "clr_loader-0.2.7.post0-py3-none-any.whl", hash = "sha256:e0b9fcc107d48347a4311a28ffe3ae78c4968edb216ffb6564cb03f7ace0bb47"},
+    {file = "clr_loader-0.2.7.post0.tar.gz", hash = "sha256:b7a8b3f8fbb1bcbbb6382d887e21d1742d4f10b5ea209e4ad95568fe97e1c7c6"},
+]
+
+[package.dependencies]
+cffi = {version = ">=1.17", markers = "python_version >= \"3.8\""}
+
 [[package]]
 name = "colorama"
 version = "0.4.6"
@@ -3537,6 +3565,17 @@ files = [
     {file = "propcache-0.2.1.tar.gz", hash = "sha256:3f77ce728b19cb537714499928fe800c3dda29e8d9428778fc7c186da4c09a64"},
 ]
 
+[[package]]
+name = "proxy-tools"
+version = "0.1.0"
+description = "Proxy Implementation"
+optional = false
+python-versions = "*"
+groups = ["main"]
+files = [
+    {file = "proxy_tools-0.1.0.tar.gz", hash = "sha256:ccb3751f529c047e2d8a58440d86b205303cf0fe8146f784d1cbcd94f0a28010"},
+]
+
 [[package]]
 name = "psutil"
 version = "6.1.1"
@@ -3867,6 +3906,72 @@ files = [
 [package.dependencies]
 pyobjc-core = ">=11.0"
 
+[[package]]
+name = "pyobjc-framework-quartz"
+version = "11.0"
+description = "Wrappers for the Quartz frameworks on macOS"
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+markers = "sys_platform == \"darwin\""
+files = [
+    {file = "pyobjc_framework_Quartz-11.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:da3ab13c9f92361959b41b0ad4cdd41ae872f90a6d8c58a9ed699bc08ab1c45c"},
+    {file = "pyobjc_framework_Quartz-11.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:d251696bfd8e8ef72fbc90eb29fec95cb9d1cc409008a183d5cc3246130ae8c2"},
+    {file = "pyobjc_framework_Quartz-11.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:cb4a9f2d9d580ea15e25e6b270f47681afb5689cafc9e25712445ce715bcd18e"},
+    {file = "pyobjc_framework_Quartz-11.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:973b4f9b8ab844574461a038bd5269f425a7368d6e677e3cc81fcc9b27b65498"},
+    {file = "pyobjc_framework_Quartz-11.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:66ab58d65348863b8707e63b2ec5cdc54569ee8189d1af90d52f29f5fdf6272c"},
+    {file = "pyobjc_framework_Quartz-11.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:1032f63f2a4ee98366764e69c249f1d93813821e17d224cf626cf11fb1801fc4"},
+    {file = "pyobjc_framework_quartz-11.0.tar.gz", hash = "sha256:3205bf7795fb9ae34747f701486b3db6dfac71924894d1f372977c4d70c3c619"},
+]
+
+[package.dependencies]
+pyobjc-core = ">=11.0"
+pyobjc-framework-Cocoa = ">=11.0"
+
+[[package]]
+name = "pyobjc-framework-security"
+version = "11.0"
+description = "Wrappers for the framework Security on macOS"
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+markers = "sys_platform == \"darwin\""
+files = [
+    {file = "pyobjc_framework_Security-11.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:a2db348ba43aff24ae71d239ed585bf061e61f84a50226677049ed220737ffd0"},
+    {file = "pyobjc_framework_Security-11.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:93bc23630563de2551ac49048af010ac9cb40f927cc25c898b7cc48550ccd526"},
+    {file = "pyobjc_framework_Security-11.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:421e03b8560ed296a7f5ee67f42f5f978f8c7959d65c8fec99cd77dc65786355"},
+    {file = "pyobjc_framework_Security-11.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:dda83260c5638dd0470c01ca9d37eccedbce15d0642d9c28b357329e4145528f"},
+    {file = "pyobjc_framework_Security-11.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:51dd6fb24235f4623d68a02bda4dabd85f48bce00f9b0b306016cf2c891392c4"},
+    {file = "pyobjc_framework_Security-11.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:d550ea972da4a4a4a50b8dc190a3da90095acfffd1a894012d64fb4729e53974"},
+    {file = "pyobjc_framework_security-11.0.tar.gz", hash = "sha256:ac078bb9cc6762d6f0f25f68325dcd7fe77acdd8c364bf4378868493f06a0758"},
+]
+
+[package.dependencies]
+pyobjc-core = ">=11.0"
+pyobjc-framework-Cocoa = ">=11.0"
+
+[[package]]
+name = "pyobjc-framework-webkit"
+version = "11.0"
+description = "Wrappers for the framework WebKit on macOS"
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+markers = "sys_platform == \"darwin\""
+files = [
+    {file = "pyobjc_framework_WebKit-11.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:e1881d7443f916e49199fd9d4b109c19e3e063ab3a60e6d4d76299a3b9143ef6"},
+    {file = "pyobjc_framework_WebKit-11.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:163abaa5a665b59626ef20cdc3dcc5e2e3fcd9830d5fc328507e13f663acd0ed"},
+    {file = "pyobjc_framework_WebKit-11.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:2e4911519e94822011d99fdb9addf4a176f45a79808dab18dc303293f4590f7c"},
+    {file = "pyobjc_framework_WebKit-11.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:22d09bb22c3c48d9243f300f8264a68ecc0bdfe09d25794ee86ab2239eae7da2"},
+    {file = "pyobjc_framework_WebKit-11.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:6141a416f1eb33ded2c6685931d1b4d5f17c83814f2d17b7e2febff03c6f6bee"},
+    {file = "pyobjc_framework_WebKit-11.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:c7b56baf21d4b4b8fba42e5d62b08bb018be2f00595c3c7e0265b925ba81e505"},
+    {file = "pyobjc_framework_webkit-11.0.tar.gz", hash = "sha256:fa6bedf9873786b3376a74ce2ea9dcd311f2a80f61e33dcbd931cc956aa29644"},
+]
+
+[package.dependencies]
+pyobjc-core = ">=11.0"
+pyobjc-framework-Cocoa = ">=11.0"
+
 [[package]]
 name = "pyopenssl"
 version = "25.0.0"
@@ -4121,6 +4226,22 @@ Pillow = ">=3.3.2"
 typing-extensions = ">=4.9.0"
 XlsxWriter = ">=0.5.7"
 
+[[package]]
+name = "pythonnet"
+version = "3.0.5"
+description = ".NET and Mono integration for Python"
+optional = false
+python-versions = "<3.14,>=3.7"
+groups = ["main"]
+markers = "sys_platform == \"win32\""
+files = [
+    {file = "pythonnet-3.0.5-py3-none-any.whl", hash = "sha256:f6702d694d5d5b163c9f3f5cc34e0bed8d6857150237fae411fefb883a656d20"},
+    {file = "pythonnet-3.0.5.tar.gz", hash = "sha256:48e43ca463941b3608b32b4e236db92d8d40db4c58a75ace902985f76dac21cf"},
+]
+
+[package.dependencies]
+clr_loader = ">=0.2.7,<0.3.0"
+
 [[package]]
 name = "pytz"
 version = "2025.1"
@@ -4133,6 +4254,41 @@ files = [
     {file = "pytz-2025.1.tar.gz", hash = "sha256:c2db42be2a2518b28e65f9207c4d05e6ff547d1efa4086469ef855e4ab70178e"},
 ]
 
+[[package]]
+name = "pywebview"
+version = "5.4"
+description = "Build GUI for your Python program with JavaScript, HTML, and CSS"
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+files = [
+    {file = "pywebview-5.4-py3-none-any.whl", hash = "sha256:0559c47db543556498dd38604a2a0479896c320f86c9b23499b8e580b58b699d"},
+    {file = "pywebview-5.4.tar.gz", hash = "sha256:b5e2c6c7502aaf72a9ae6034daf83785f5fad874fac7fa82bf4fcf854f1f083a"},
+]
+
+[package.dependencies]
+bottle = "*"
+proxy_tools = "*"
+pyobjc-core = {version = "*", markers = "sys_platform == \"darwin\""}
+pyobjc-framework-Cocoa = {version = "*", markers = "sys_platform == \"darwin\""}
+pyobjc-framework-Quartz = {version = "*", markers = "sys_platform == \"darwin\""}
+pyobjc-framework-security = {version = "*", markers = "sys_platform == \"darwin\""}
+pyobjc-framework-WebKit = {version = "*", markers = "sys_platform == \"darwin\""}
+pythonnet = {version = "*", markers = "sys_platform == \"win32\""}
+QtPy = {version = "*", markers = "sys_platform == \"openbsd6\""}
+typing_extensions = "*"
+
+[package.extras]
+android = ["jnius", "kivy"]
+cef = ["cefpython3"]
+gtk = ["PyGObject", "PyGObject-stubs"]
+pyside2 = ["PySide2", "QtPy"]
+pyside6 = ["PySide6", "QtPy"]
+qt = ["PyQt6", "PyQt6-WebEngine", "QtPy"]
+qt5 = ["PyQt5", "QtPy", "pyqtwebengine"]
+qt6 = ["PyQt6", "PyQt6-WebEngine", "QtPy"]
+ssl = ["crpytography"]
+
 [[package]]
 name = "pywin32"
 version = "307"
@@ -4225,6 +4381,25 @@ files = [
     {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"},
 ]
 
+[[package]]
+name = "qtpy"
+version = "2.4.3"
+description = "Provides an abstraction layer on top of the various Qt bindings (PyQt5/6 and PySide2/6)."
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+markers = "sys_platform == \"openbsd6\""
+files = [
+    {file = "QtPy-2.4.3-py3-none-any.whl", hash = "sha256:72095afe13673e017946cc258b8d5da43314197b741ed2890e563cf384b51aa1"},
+    {file = "qtpy-2.4.3.tar.gz", hash = "sha256:db744f7832e6d3da90568ba6ccbca3ee2b3b4a890c3d6fbbc63142f6e4cdf5bb"},
+]
+
+[package.dependencies]
+packaging = "*"
+
+[package.extras]
+test = ["pytest (>=6,!=7.0.0,!=7.0.1)", "pytest-cov (>=3.0.0)", "pytest-qt"]
+
 [[package]]
 name = "rank-bm25"
 version = "0.2.2"
@@ -6177,4 +6352,4 @@ cffi = ["cffi (>=1.11)"]
 [metadata]
 lock-version = "2.1"
 python-versions = "3.12"
-content-hash = "eac513fbe3d43d14dbe8f091b25431a99e6a81d1676b0918cf802ad4146bc7ed"
+content-hash = "7112d8285d4d2cece0fc012f571cea4e2bf973d8e1a096e7854ba387b9c71dcc"

+ 2 - 1
pyproject.toml

@@ -16,7 +16,8 @@ dependencies = [
     "redis (>=5.2.1,<6.0.0)",
     "celery (>=5.4.0,<6.0.0)",
     "flower (>=2.0.1,<3.0.0)",
-    "docling (>=2.21.0,<3.0.0)"
+    "docling (>=2.21.0,<3.0.0)",
+    "pywebview (>=5.4,<6.0)",
 ]
 
 

+ 1 - 1
ui/backend/config.yaml

@@ -7,7 +7,7 @@ sub:
   proxies:
     9660:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9660.yaml
-      name: "\U0001F1EB\U0001F1F7\u6CD5\u56FD\u9A6C\u8D5B"
+      name: "\U0001F1F8\U0001F1EC\u4E9A\u9A6C\u900A\u65B0\u52A0\u57612"
       port: 9660
     9662:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9662.yaml

+ 6 - 7
ui/backend/main.py

@@ -4,7 +4,9 @@ import sys
 # 为了避免耦合,微服务,可能确实要将上级的上级目录作为一个单独的进程来处理,此目录作为一个单独的UI项目
 sys.path.append(str(Path(__file__).parent))
 from fastapi import FastAPI
-from routers.proxy import router,health_check_proxy_task
+from routers.proxy import router, health_check_proxy_task 
+from routers.file import router as file_router
+from routers.worker import router as worker_router
 from fastapi.middleware.cors import CORSMiddleware
 from contextlib import asynccontextmanager
 from utils.process_mgr import process_manager
@@ -34,13 +36,10 @@ app.add_middleware(
 )
 # 将 gpt_router 挂载到应用中
 app.include_router(router, prefix="/api/proxy", tags=["chat"])
-
-# 根路由,用于健康检查或欢迎信息
-@app.get("/", tags=["root"])
-def read_root():
-    return {"message": "Welcome to the ChatGPT API!"}
+app.include_router(worker_router, prefix="/api/worker", tags=["worker"])
 
 # 如果你需要运行这个应用,可以使用以下代码
 if __name__ == "__main__":
     import uvicorn
-    uvicorn.run(app, host="localhost", port=5835)
+    # uvicorn main:app --port 5835 --reload
+    uvicorn.run(app, host="localhost", port=5835)

+ 45 - 0
ui/backend/routers/file.py

@@ -0,0 +1,45 @@
+from fastapi import APIRouter, HTTPException, UploadFile, File
+from fastapi.responses import JSONResponse
+import aiofiles
+from pathlib import Path
+import aiofiles.os as aios
+from typing import Optional
+import platform
+from utils.logu import get_logger
+from utils.config import OUTPUT_DIR
+import os
+import uuid
+
+router = APIRouter(prefix="/file")
+logger = get_logger("FileRouter")
+
+from fastapi.concurrency import run_in_threadpool
+
+# 确保上传目录存在
+UPLOAD_DIR = OUTPUT_DIR / "upload"
+UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
+
+@router.post("/upload", tags=["file"])
+async def upload_file(file: UploadFile = File(...)):
+    """处理文件上传"""
+    try:
+        file_path = UPLOAD_DIR / file.filename
+        
+        # 异步写入文件
+        async with aiofiles.open(file_path, "wb") as f:
+            while chunk := await file.read(1024 * 1024):  # 1MB chunks
+                await f.write(chunk)
+        
+        return {
+            "filename": file.filename,
+            "saved_path": str(file_path),
+            "size": file_path.stat().st_size
+        }
+    except Exception as e:
+        logger.error(f"文件上传失败: {str(e)}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+@router.get("/list", tags=["file"])
+async def list_files(path: str):
+    """列出目录下的文件"""
+    

+ 3 - 1
ui/backend/routers/proxy.py

@@ -27,8 +27,10 @@ class SysProxyResponse(BaseModel):
 class SubUrlPost(BaseModel):
     sub_url: str
 
+from fastapi.requests import Request
+
 @router.get("/sys")
-def read_root():
+async def read_root(request: Request):
     proxy_enable, proxy_server = get_proxy_settings()
     return SysProxyResponse(sys_open=proxy_enable, proxy_server=proxy_server)
 

+ 80 - 0
ui/backend/routers/worker.py

@@ -0,0 +1,80 @@
+from fastapi import APIRouter, HTTPException, Request, Depends
+from fastapi.responses import JSONResponse, StreamingResponse
+import httpx
+import os
+from typing import Dict
+
+from pydantic import BaseModel
+from typing import Optional 
+from utils.config import WORKER_SERVICE_URL
+from utils.logu import logger
+
+router = APIRouter()
+
+class Endpoint(BaseModel):
+    service_url: str = WORKER_SERVICE_URL
+    token: Optional[str] = "temp_token_123"
+    health: Optional[Dict] = {}
+
+@router.get("/endpoint", tags=["worker"])
+async def endpoint():
+    try:
+        health = await health_check()
+    except Exception as e:
+        logger.error(e)
+        health = {"err": 1, "msg": str(e)}
+    """获取当前请求的端点"""
+    return Endpoint(health=health)
+
+async def health_check():
+    """健康检查"""
+    async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
+        response = await client.get("/health")
+        response.raise_for_status()
+        ret = response.json()
+        ret.update({"err": 0})
+        return ret
+
+async def get_request_body(request: Request):
+    """获取原始请求体"""
+    return await request.body()
+
+@router.api_route("/fuck/{path:path}", methods=["GET", "POST", "PUT", "DELETE"], tags=["worker"])
+async def proxy_worker_requests(
+    path: str, 
+    request: Request,
+    body: bytes = Depends(get_request_body)
+):
+    """全局反向代理到搜索微服务"""
+    async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
+        try:
+            # 构造目标请求头(过滤掉不需要的 headers)
+            headers = {
+                "Content-Type": request.headers.get("Content-Type", ""),
+                "Accept": request.headers.get("Accept", "application/json")
+            }
+
+            # 转发请求到目标服务
+            response = await client.request(
+                request.method,
+                f"/{path}",
+                content=body,
+                headers=headers,
+                timeout=60
+            )
+
+            # 流式返回响应内容
+            return StreamingResponse(
+                response.aiter_bytes(),
+                status_code=response.status_code,
+                headers=dict(response.headers)
+            )
+            
+        except httpx.ConnectError:
+            raise HTTPException(status_code=502, detail="搜索服务不可用")
+        except Exception as e:
+            raise HTTPException(status_code=500, detail=str(e))
+
+                
+
+        

+ 17 - 0
ui/backend/tests/mytests/t_worker.py

@@ -0,0 +1,17 @@
+from pathlib import Path
+import sys
+# 为了避免耦合,微服务,可能确实要将上级的上级目录作为一个单独的进程来处理,此目录作为一个单独的UI项目
+sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend')))
+from src.services.subscription_manager import SubscriptionManager
+from utils.config import config
+import asyncio
+from utils.logu import get_logger
+from routers.worker import health_check
+logger = get_logger('mytests', file=True)
+
+async def main():
+    res = await health_check()
+    print(res)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 16 - 0
ui/backend/tests/mytests/tk_html.py

@@ -0,0 +1,16 @@
+import tkinter as tk
+from tkinterweb import HtmlFrame
+
+# 创建主窗口
+root = tk.Tk()
+root.geometry("800x600")
+
+# 创建 HtmlFrame 组件
+browser = HtmlFrame(root)
+browser.pack(fill="both", expand=True)
+
+# 加载网页地址
+browser.load_url("https://aiapi.magong.site/channel")
+
+# 运行主循环
+root.mainloop()

+ 7 - 1
ui/backend/utils/config.py

@@ -4,6 +4,7 @@ from pydantic import BaseModel
 from typing import List, Dict, Union,Optional,Any
 from utils.pydantic_auto_field import AutoLoadModel
 APP_PATH = Path(__file__).parent.parent
+OUTPUT_DIR = APP_PATH / "output"
 CONFIG_PATH = APP_PATH / "config.yaml"
 REPO_BASE_DIR = Path(APP_PATH.parent.parent)
 DOWNLOAD_DIR = REPO_BASE_DIR / "download"
@@ -32,6 +33,7 @@ class Config(BaseModel):
     mimo_exe: Optional[str] = str(PROXY_POLL_DIR / r"mihomo-windows-amd64-go120.exe")
     chrome_exe: Optional[str] = str(REPO_BASE_DIR / r"download\chrome-win\chrome.exe")
     redis_exe: Optional[str] = str(REPO_BASE_DIR / r"download\Redis-x64-5.0.14.1\redis-server.exe")
+    sqluri: Optional[str] = r'G:\code\upwork\zhang_crawl_bio\output\temp.db'
     redis_port: Optional[int] = None  # Changed to int
     def save(self):
         config_path = get_config_path()
@@ -53,7 +55,11 @@ def read_config(config_path: Path):
     return config
 config = read_config(get_config_path())
 
+import os
+
+WORKER_SERVICE_URL = os.getenv("WORKER_SERVICE_URL", "http://localhost:8003")
+
 def main():
     print(config)
 if __name__ == "__main__":
-    main()
+    main()

+ 0 - 2
ui/backend/utils/logu.py

@@ -7,8 +7,6 @@ from loguru._logger import Logger
 from pathlib import Path
 LOG_LEVEL='info'
 LOG_DIR=Path(__file__).parent.parent / 'output' /'logs'
-print(LOG_DIR.absolute())
-# python_xx/site-packages/loguru/_handler.py  _serialize_record
 FORMAT = '<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{file}:{line}</cyan> :<cyan>{function}</cyan> - {message}'
 loguru.logger.remove()
 # logger.add(sys.stderr, format='<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>')

+ 4 - 3
ui/docs/gpt/vue_cache.md

@@ -3,6 +3,7 @@ G:\code\upwork\zhang_crawl_bio\ui 当前项目采用 vue3(语法糖) + eleme
 我想在前端层面首次启动时,就调用 get_sub_url get_sys_proxy 接口,并且记录时间戳存储到本地作为缓存。
 因此前端可能要封装这两个请求让它支持缓存的操作,避免频繁长时间请求,让 UI 切换更加顺滑。
 
-同时,缓存也会过期,因此或许可以设置一个代理守卫,定期获取数据,自动保存到前端本地。这样缓存就会一直保持最新,并且让用户无感体验。
-因为目前来说过期时间是1小时并不
-我这个思路是否符合前后端最佳软件设计架构?你来决定应该如何优化。
+同时,缓存也会过期,因此或许可以设置一个代理守卫,定期获取数据,自动保存到前端本地。这样缓存就会一直保持最新,并且让用户无感体验。因为目前来说过期时间是1小时并不能实时反映进程状态。
+而且一旦用户开启、停止、删除代理,也需要更新缓存,最好是发送信号或者异步调用更新的操作,避免等待太久。
+
+我这个思路是否符合前后端最佳软件设计架构?这些操作由前端来实现还是后端来实现?你来决定应该如何优化。

+ 25 - 0
ui/docs/gpt/worker_api.md

@@ -0,0 +1,25 @@
+# worker api
+上述代码似乎有很多问题,你来帮我判断:
+- 项目采用 SQLmodel 驱动 SQLite 数据库,DB_URL = os.environ.get('DB_URL') or f"sqlite:///{OUTPUT_DIR}/temp.db"  ,在别的地方会用到,而且几乎都是同步操作的代码。只有对外开放API接口时, Fastapi 是使用异步接口。
+- 如何兼容同步与异步的代码?
+- read_keywords_from_file 从表格读取数据,如果涉及到 2k~3w 行的数据读取,是否是一个耗时操作?如果耗时能否移入线程操作
+- bulk_import_keywords_to_db 是导入到数据库,如果关键词存在则跳过, 代码中是否能让 SQLite 优雅的批量导入。并且这同样可能是一个耗时操作
+- 这是一个已知错误: 025-02-21 20:28:15 | ERROR    | excel_load.py:51 :import_keywords - 文件处理失败: (sqlite3.IntegrityError) UNIQUE constraint failed: keywordtask.keyword
+[SQL: INSERT INTO keywordtask (keyword, total_results, is_completed, fail_count, created_at) VALUES (?, ?, ?, ?, ?) RETURNING id]
+[parameters: ('absorb', None, 0, 0, '2025-02-21 20:28:04.936669')]
+(Background on this error at: https://sqlalche.me/e/20/gkpj)
+INFO:     127.0.0.1:5911 - "POST /keywords/upload HTTP/1.1" 500 Internal Server Error
+
+
+
+
+worker\api\excel_load.py worker\api\main.py worker\api\search_cli.py
+
+假设这3个文件是构成一个微服务,并且可以启动许多个微服务来批量执行任务。不过目前场景中只需要一个微服务 localhost:8003 。
+
+然后我想在另一个独立的项目中发起微服务请求执行各种任务。
+例如 G:\code\upwork\zhang_crawl_bio\ui\backend\main.py 文件是一个UI后端 localhost:5835 ,它跟前端交互。
+
+现在用户在前端上传文件,我应该直接上传到 localhost:8003 还是经过 localhost:5835 进行路由?
+如果不用 5835 路由,那么又该如何通知前端有两个 API 地址?
+如果用 5835 路由,是否又符合最佳软件设计架构?

+ 3 - 2
ui/fontend/src/components.d.ts

@@ -14,8 +14,8 @@ declare module 'vue' {
     ElDescriptions: typeof import('element-plus/es')['ElDescriptions']
     ElDescriptionsItem: typeof import('element-plus/es')['ElDescriptionsItem']
     ElDialog: typeof import('element-plus/es')['ElDialog']
-    ElHeader: typeof import('element-plus/es')['ElHeader']
     ElIcon: typeof import('element-plus/es')['ElIcon']
+    ElInput: typeof import('element-plus/es')['ElInput']
     ElLink: typeof import('element-plus/es')['ElLink']
     ElMain: typeof import('element-plus/es')['ElMain']
     ElMenu: typeof import('element-plus/es')['ElMenu']
@@ -24,10 +24,11 @@ declare module 'vue' {
     ElRadioButton: typeof import('element-plus/es')['ElRadioButton']
     ElRadioGroup: typeof import('element-plus/es')['ElRadioGroup']
     ElRow: typeof import('element-plus/es')['ElRow']
-    ElSubMenu: typeof import('element-plus/es')['ElSubMenu']
     ElTable: typeof import('element-plus/es')['ElTable']
     ElTableColumn: typeof import('element-plus/es')['ElTableColumn']
     ElTag: typeof import('element-plus/es')['ElTag']
+    ElText: typeof import('element-plus/es')['ElText']
+    ElUpload: typeof import('element-plus/es')['ElUpload']
     Home: typeof import('./components/Home.vue')['default']
     Menu: typeof import('./components/Menu.vue')['default']
     Proxy: typeof import('./components/Proxy.vue')['default']

+ 125 - 2
ui/fontend/src/components/Home.vue

@@ -1,5 +1,39 @@
 <template>
   <div>
+    <el-row :span="8" class="child mb-5" justify="center">
+      <el-upload
+        action="#"
+        :http-request="handleUpload"
+        :show-file-list="false"
+        accept=".xlsx,.xls,.csv"
+        :disabled="!isWorkerAvailable"
+      >
+        <el-button 
+          type="primary" 
+          :disabled="!isWorkerAvailable"
+        >
+          {{ isWorkerAvailable ? '选择并上传文件' : '服务不可用' }}
+        </el-button>
+      </el-upload>
+    </el-row>
+
+    <!-- 显示上传结果 -->
+    <el-descriptions v-if="uploadResult" title="导入结果" :column="1" border class="mt-4">
+      <el-descriptions-item label="文件名">{{ uploadResult.name }}</el-descriptions-item>
+      <el-descriptions-item label="文件大小">{{ (uploadResult.size / 1024).toFixed(2) }} KB</el-descriptions-item>
+      <el-descriptions-item label="上传时间">{{ uploadResult.uploadTime }}</el-descriptions-item>
+      <el-descriptions-item label="关键词总数">{{ uploadResult.total_keywords }}</el-descriptions-item>
+      <el-descriptions-item label="成功插入数">{{ uploadResult.inserted_count }}</el-descriptions-item>
+      <el-descriptions-item label="状态">
+        <el-tag :type="uploadResult.success ? 'success' : 'danger'">
+          {{ uploadResult.success ? '导入成功' : '导入失败' }}
+        </el-tag>
+      </el-descriptions-item>
+      <el-descriptions-item v-if="uploadResult.error" label="错误信息">
+        <el-text type="danger">{{ uploadResult.error }}</el-text>
+      </el-descriptions-item>
+    </el-descriptions>
+
     <h2>当前代理状态</h2>
     <el-descriptions :column="1" border>
       <el-descriptions-item label="已选代理类型">{{ store.selectedProxy }}</el-descriptions-item>
@@ -10,9 +44,98 @@
 </template>
 
 <script setup lang="ts">
+import { ref, onMounted } from 'vue'
+import { ElMessage } from 'element-plus'
 import { useProxyStore } from '../stores/proxyStore'
+
+// 定义 worker 地址状态
+const workerUrl = ref('')
+const isWorkerAvailable = ref(false)
+const workerHealth = ref<{err: number, msg: string} | null>(null)
+
+const apiBaseUrl = import.meta.env.VITE_API_BASE_URL || ''
+const serverApiFileUrl = `${apiBaseUrl}/worker`
 const store = useProxyStore()
-console.log(store.selectedProxy);
-console.log(store.proxyData);
 
+// 定义上传结果
+const uploadResult = ref<{
+  name: string;
+  size: number;
+  uploadTime: string;
+  total_keywords?: number;
+  inserted_count?: number;
+  success?: boolean;
+  error?: string;
+} | null>(null)
+
+// 获取 worker endpoint
+const fetchWorkerEndpoint = async () => {
+  try {
+    const response = await fetch(`${apiBaseUrl}/worker/endpoint`)
+    if (!response.ok) throw new Error('获取 worker 地址失败')
+    
+    const data = await response.json()
+    console.log('worker endpoint:', data)
+    workerUrl.value = data.service_url
+    workerHealth.value = data.health
+    // 根据 health 状态判断服务是否可用
+    isWorkerAvailable.value = data.health && data.health.err === 0
+  } catch (error) {
+    console.error('获取 worker 地址错误:', error)
+    ElMessage.error('无法获取 worker 服务地址')
+  }
+}
+
+// 在组件挂载时获取 worker 地址
+onMounted(() => {
+  fetchWorkerEndpoint()
+})
+
+const handleUpload = async (options: any) => {
+  if (!isWorkerAvailable.value) {
+    ElMessage.warning(`worker 服务不可用,无法上传文件。状态码: ${store.workerHealth?.err}`)
+    return
+  }
+  try {
+    const formData = new FormData()
+    formData.append('file', options.file)
+    
+    const response = await fetch(`${workerUrl.value}/keywords/upload`, {
+      method: 'POST',
+      body: formData
+    })
+    
+    if (!response.ok) throw new Error('上传失败')
+    
+    const result = await response.json()
+    console.log('上传成功:', result)
+
+    // 显示上传结果
+    uploadResult.value = {
+      name: options.file.name,
+      size: options.file.size,
+      uploadTime: new Date().toLocaleString(),
+      total_keywords: result.total_keywords,
+      inserted_count: result.inserted_count,
+      success: true
+    }
+
+    ElMessage.success(`成功导入 ${result.inserted_count} 个关键词(共 ${result.total_keywords} 个)`)
+  } catch (error: any) {
+    console.error('上传错误:', error)
+    uploadResult.value = {
+      name: options.file.name,
+      size: options.file.size,
+      uploadTime: new Date().toLocaleString(),
+      success: false,
+      error: error.message || '文件上传失败'
+    }
+  } 
+}
 </script>
+
+<style scoped>
+.child {
+  margin-bottom: 20px;
+}
+</style>

+ 0 - 9
ui/fontend/src/components/Menu.vue

@@ -20,21 +20,12 @@
         <el-icon><Connection /></el-icon>
         <template #title>代理</template>
       </el-menu-item>
-      <el-menu-item index="3" disabled>
-        <el-icon><document /></el-icon>
-        <template #title>Navigator Three</template>
-      </el-menu-item>
-      <el-menu-item index="4">
-        <el-icon><setting /></el-icon>
-        <template #title>Navigator Four</template>
-      </el-menu-item>
     </el-menu>
     </div>
   </template>
   
 <script lang="ts" setup>
 import { ref } from 'vue'
-import Proxy from '@/components/Proxy.vue'
 
 const isCollapse = ref(false)
 const handleOpen = (key: string, keyPath: string[]) => {

+ 14 - 7
ui/fontend/src/components/Proxy.vue

@@ -66,20 +66,27 @@ const get_sys_proxy = async () => {
   }
 }
 
-// 初始化时自动获取一次代理信息
-onMounted(() => {
-  get_sys_proxy()
-  // get_sub_url()
+// 初始化时自动获取并启动后台轮询
+onMounted(async () => {
+  store.startBackgroundPolling()
+  try {
+    await Promise.all([get_sys_proxy(), get_sub_url()])
+  } catch (error) {
+    console.error('初始化失败:', error)
+  }
 })
 
-const proxy_change = (value: any) => {
+// 切换代理类型时强制刷新
+const proxy_change = async (value: any) => {
   store.setSelectedProxy(value)
   if (value === 'system') {
-    get_sys_proxy()
+    await store.cachedFetch(`${apiBaseUrl}/proxy/sys`, 'sys_proxy_cache', true)
     store.setProxyData(sys_proxy.value)
   } else if (value === 'poll') {
-    get_sub_url()
+    await store.cachedFetch(`${apiBaseUrl}/proxy/subs`, 'sub_url_cache', true)
   }
+  // 立即更新显示数据
+  sys_proxy.value = { ...store.proxyData }
 }
 
 </script>

+ 75 - 12
ui/fontend/src/stores/proxyStore.ts

@@ -2,45 +2,81 @@ import { defineStore } from 'pinia'
 import { ref } from 'vue'
 
 // 缓存配置
-const CACHE_TTL = 3600000 // 1小时缓存有效期
+const CACHE_TTL = 300000 // 5分钟缓存有效期(后台会自动刷新)
+const POLL_INTERVAL = 60000 // 1分钟轮询间隔
 
 interface CacheItem {
   timestamp: number
   data: any
+  etag?: string // 新增内容验证标识
 }
 
 export const useProxyStore = defineStore('proxy', () => {
   const selectedProxy = ref(localStorage.getItem('selectedProxy') || 'system')
   const proxyData = ref(JSON.parse(localStorage.getItem('proxyData') || '{}'))
+  const backgroundPollTimer = ref<number>()
 
   // 带缓存的请求方法
-  async function cachedFetch(url: string, cacheKey: string) {
+  async function cachedFetch(url: string, cacheKey: string, forceUpdate = false) {
     const now = Date.now()
     
-    // 尝试从缓存读取
+    // 使用新鲜数据快速返回,同时后台更新
+    let cacheItem: CacheItem | null = null
     const cached = localStorage.getItem(cacheKey)
     if (cached) {
       try {
-        const { timestamp, data }: CacheItem = JSON.parse(cached)
-        if (now - timestamp < CACHE_TTL) {
-          return data
+        cacheItem = JSON.parse(cached)
+        if (cacheItem === null) {
+          console.warn('Invalid cache format', cacheItem)
+          return cacheItem 
+        }
+        // 如果仍在有效期内且不强制更新,直接返回缓存
+        if (!forceUpdate && cacheItem && now - (cacheItem.timestamp || 0) < CACHE_TTL) {
+          // 发起静默后台更新
+          setTimeout(() => cachedFetch(url, cacheKey, true), 0)
+          return cacheItem.data
         }
       } catch (e) {
         console.warn('Invalid cache format', e)
       }
     }
 
-    // 发起新请求
-    const response = await fetch(url)
+    // 发起新请求并验证缓存
+    const headers: HeadersInit = {}
+    if (cacheItem?.etag) {
+      headers['If-None-Match'] = cacheItem.etag
+    }
+
+    const response = await fetch(url, { headers })
+    
+    // 304 Not Modified 处理
+    if (response.status === 304) {
+      if (cacheItem) {
+        const updatedCache: CacheItem = {
+          ...cacheItem,
+          timestamp: now
+        }
+        localStorage.setItem(cacheKey, JSON.stringify(updatedCache))
+        return cacheItem.data
+      } else {
+        // Handle the case where cacheItem is null
+        console.warn('Cache item is null for 304 response')
+        return null
+      }
+    }
+
     if (!response.ok) throw new Error(`Request failed: ${response.status}`)
+    
+    const etag = response.headers.get('ETag')
     const data = await response.json()
     
     // 更新缓存
-    const cacheItem: CacheItem = {
+    const newCacheItem: CacheItem = {
       timestamp: now,
-      data
+      data,
+      etag: etag || undefined
     }
-    localStorage.setItem(cacheKey, JSON.stringify(cacheItem))
+    localStorage.setItem(cacheKey, JSON.stringify(newCacheItem))
     
     return data
   }
@@ -50,6 +86,31 @@ export const useProxyStore = defineStore('proxy', () => {
     localStorage.setItem('selectedProxy', choice)
   }
 
+  function startBackgroundPolling() {
+    if (backgroundPollTimer.value) return
+    
+    backgroundPollTimer.value = window.setInterval(async () => {
+      try {
+        const apiBaseUrl = import.meta.env.VITE_API_BASE_URL || ''
+        await Promise.allSettled([
+          cachedFetch(`${apiBaseUrl}/proxy/sys`, 'sys_proxy_cache', true),
+          cachedFetch(`${apiBaseUrl}/proxy/subs`, 'sub_url_cache', true),
+          cachedFetch(`${apiBaseUrl}/proxy/proxies`,'proxies', true),
+        ])
+        console.log('[Background Poll] 缓存自动刷新完成')
+      } catch (error) {
+        console.warn('[Background Poll] 刷新失败:', error)
+      }
+    }, POLL_INTERVAL) as unknown as number
+  }
+
+  function stopBackgroundPolling() {
+    if (backgroundPollTimer.value) {
+      clearInterval(backgroundPollTimer.value)
+      backgroundPollTimer.value = undefined
+    }
+  }
+
   function setProxyData(data: any) {
     proxyData.value = data
     localStorage.setItem('proxyData', JSON.stringify(data))
@@ -65,6 +126,8 @@ export const useProxyStore = defineStore('proxy', () => {
     proxyData, 
     setSelectedProxy, 
     setProxyData,
-    cachedFetch 
+    cachedFetch,
+    startBackgroundPolling,
+    stopBackgroundPolling
   }
 })

+ 81 - 0
worker/api/excel_load.py

@@ -0,0 +1,81 @@
+from fastapi import FastAPI, File, UploadFile, HTTPException, APIRouter
+import asyncio
+from sqlalchemy.dialects.sqlite import insert
+from pathlib import Path
+import pandas as pd
+from typing import List
+from sqlmodel import select, SQLModel, Session
+from mylib.logu import logger
+from worker.search_engine.search_result_db import SearchResultManager, KeywordTask
+from fastapi.responses import JSONResponse
+from io import BytesIO
+from config.settings import DB_URL
+
+app = APIRouter()
+
+@app.post("/upload", 
+          summary="导入关键词文件",
+          description="支持Excel和CSV文件,读取第一列作为关键词")
+async def import_keywords(file: UploadFile = File(...)):
+    try:
+        db_manager = SearchResultManager()
+        logger.info(f"数据库连接: {DB_URL}")
+        
+        # 异步读取文件内容并使用线程池处理
+        content = await file.read()
+        loop = asyncio.get_event_loop()
+        
+        # 在线程池中处理文件读取
+        file_path = Path(file.filename)
+        df = await loop.run_in_executor(
+            None, 
+            lambda: pd.read_excel(BytesIO(content), header=0) if file_path.suffix.lower() in ('.xlsx', '.xls') 
+            else pd.read_csv(BytesIO(content), sep='\t' if file_path.suffix == '.tsv' else ',')
+        )
+        keywords = df.iloc[:, 0].astype(str).tolist()
+        
+        # 在另一个线程中处理数据库插入
+        inserted_count = await asyncio.to_thread(
+            bulk_import_keywords_to_db, 
+            keywords, 
+            db_manager
+        )
+        
+        return JSONResponse(content={
+            "total_keywords": len(keywords),
+            "inserted_count": inserted_count,
+            "message": "导入成功"
+        })
+        
+    except Exception as e:
+        logger.error(f"文件处理失败: {str(e)}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+def bulk_import_keywords_to_db(keywords: List[str], db_manager: SearchResultManager):
+    """
+    使用 SQLite 的 UPSERT 语法进行批量插入,避免重复
+    """
+    try:
+        with Session(db_manager.engine) as session:
+            # 使用 SQL 原生批量插入语法
+            stmt = insert(KeywordTask).values(
+                [{"keyword": kw} for kw in keywords]
+            ).on_conflict_do_nothing(index_elements=["keyword"])
+            
+            # 执行批量插入
+            result = session.exec(stmt)
+            session.commit()
+            
+            # 获取实际插入数量
+            inserted_count = result.rowcount
+            
+            logger.info(f"成功导入 {inserted_count} 个新关键词")
+            return inserted_count
+            
+    except Exception as e:
+        logger.error(f"批量导入失败: {str(e)}")
+        if 'session' in locals():
+            session.rollback()
+        raise
+

+ 54 - 0
worker/api/search_cli.py

@@ -0,0 +1,54 @@
+import asyncio
+from fastapi import APIRouter, FastAPI, HTTPException
+from pydantic import BaseModel
+from typing import Dict, Optional
+import DrissionPage
+from worker.search_engine.drission_google_search import search_keyword_drission
+from DrissionPage import ChromiumPage
+from mylib.drission_page import load_chrome_from_ini
+from mylib.logu import logger
+
+app = APIRouter()
+
+class SearchRequest(BaseModel):
+    keyword: str
+    max_result_items: int = 200
+    skip_existing: bool = True
+    browser_config: Dict = {}
+
+class BrowserTestRequest(BaseModel):
+    browser_config: Dict = {}
+    init_url: str = "https://www.google.com"
+
+@app.post("/search", summary="执行Google搜索")
+async def search(request: SearchRequest):
+    try:
+        logger.info(f"Starting search with parameters: {request.model_dump()}")
+        # 使用 to_thread 包装同步操作
+        result = await asyncio.to_thread(
+            search_keyword_drission,
+            keyword=request.keyword,
+            max_result_items=request.max_result_items,
+            skip_existing=request.skip_existing,
+            browser_config=request.browser_config
+        )
+        logger.info(f"Search completed with result: {result}")
+        return result
+    except Exception as e:
+        logger.error(f"Error during search execution: {str(e)}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+@app.post("/browser/test", summary="测试浏览器启动")
+async def test_browser(request: BrowserTestRequest):
+    try:
+        logger.info(f"Testing browser launch with parameters: {request.model_dump()}")
+        page = await test_browser_launch(request.browser_config, request.init_url)
+        return {"status": "success", "url": page.url}
+    except Exception as e:
+        logger.error(f"Error during browser test: {str(e)}")
+        raise HTTPException(status_code=500, detail=str(e))
+async def test_browser_launch(browser_config: Dict, init_url: str) -> ChromiumPage:
+    page = load_chrome_from_ini(**browser_config)
+    page.get(init_url)
+    return page
+

+ 32 - 0
worker/api/worker_server.py

@@ -0,0 +1,32 @@
+from fastapi import FastAPI
+from worker.api.excel_load import app as excel_load_app
+from worker.api.search_cli import app as search_cli_app
+from fastapi.middleware.cors import CORSMiddleware
+from config.settings import DB_URL
+app = FastAPI(
+    title="搜索微服务",
+    description="提供关键词导入和搜索功能的统一API接口",
+    version="1.0.0"
+)
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],  # 允许所有域名访问
+    allow_credentials=True,
+    allow_methods=["*"],  # 允许所有方法(GET, POST, PUT, DELETE 等)
+    allow_headers=["*"],  # 允许所有头部
+)
+
+# 合并两个子应用
+app.include_router(excel_load_app, prefix="/keywords")
+app.include_router(search_cli_app, prefix="/search")
+
+@app.get("/health")
+async def health_check():
+    """服务健康检查"""
+    return {"status": "healthy", 'db_url': DB_URL, 'msg': 'ok'}
+
+if __name__ == "__main__":
+    import uvicorn
+    # python .\worker\api\worker_server.py
+    # python -m uvicorn worker:api:worker_server:app --reload
+    uvicorn.run(app, host="127.0.0.1", port=8003)

+ 95 - 29
worker/celery/async_client.py

@@ -1,19 +1,21 @@
-from worker.celery.async_tasks import async_browser_task, async_search_task
+import argparse
+import json
 from pathlib import Path
 import pandas as pd
 import sys
+from typing import List, Dict, Any
+from worker.celery.async_tasks import async_browser_task, async_search_task
 from mylib.logu import logger
-from typing import List
 
 def read_keywords_from_file(file_path: Path) -> List[str]:
-    """读取文件第0列关键词(参考client.py实现)"""
+    """读取文件第0列关键词"""
     try:
         if file_path.suffix.lower() in ['.xlsx', '.xls']:
             df = pd.read_excel(file_path, header=0, engine='openpyxl')
         elif file_path.suffix.lower() in ['.csv', '.tsv']:
             df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',')
         else:
-            raise ValueError(f"Unsupported format: {file_path.suffix}")
+            raise ValueError(f"不支持的格式: {file_path.suffix}")
             
         return df.iloc[:, 0].astype(str).tolist()
         
@@ -21,45 +23,109 @@ def read_keywords_from_file(file_path: Path) -> List[str]:
         logger.error(f"文件读取失败: {str(e)}")
         raise
 
-def submit_keyword_tasks(keywords: List[str]):
-    """提交关键词搜索任务(适配async_tasks.py的接口)"""
+def submit_keyword_tasks(keywords: List[str]) -> List[Dict[str, Any]]:
+    """提交关键词搜索任务"""
+    results = []
     for keyword in keywords:
         try:
-            # 直接使用async_search_task定义的原生参数格式
-            async_search_task.delay(
+            task = async_search_task.delay(
                 keyword=keyword.strip(),
                 max_result_items=200,
-                skip_existing=True,
-                # browser_config={}  # 使用空字典代替默认BrowserConfig对象
+                skip_existing=True
             )
+            results.append({
+                "keyword": keyword,
+                "task_id": task.id,
+                "status": "submitted"
+            })
             logger.info(f"已提交搜索任务: {keyword}")
         except Exception as e:
+            results.append({
+                "keyword": keyword,
+                "error": str(e),
+                "status": "failed"
+            })
             logger.error(f"任务提交失败 [{keyword}]: {str(e)}")
+    return results
 
-def main(file_path: str):
-    """主流程(整合文件读取和任务提交)"""
+def main(file_path: Path, start: int = None, end: int = None) -> Dict[str, Any]:
+    """主流程"""
     try:
-        path = Path(file_path)
-        if not path.exists():
-            raise FileNotFoundError(f"文件不存在: {path}")
-            
-        keywords = read_keywords_from_file(path)
+        keywords = read_keywords_from_file(file_path)
         if not keywords:
             raise ValueError("文件未包含有效关键词")
             
-        logger.info(f"成功读取 {len(keywords)} 个关键词")
-        logger.info(f"示例关键词: {keywords[:3]}...")  # 只显示前3个示例
-        submit_keyword_tasks(keywords[5:10])
-        logger.info("所有搜索任务已提交完成")
+        # 处理切片范围
+        processed_keywords = keywords[start:end] if start is not None and end is not None else keywords
+        
+        return {
+            "total_keywords": len(keywords),
+            "processed": len(processed_keywords),
+            "tasks": submit_keyword_tasks(processed_keywords)
+        }
         
     except Exception as e:
-        logger.error(f"程序异常终止: {str(e)}")
-        sys.exit(1)
+        return {"error": str(e), "status": "failed"}
+
+def parse_args():
+    parser = argparse.ArgumentParser(description="关键词处理工具")
+    subparsers = parser.add_subparsers(dest='command', required=True)
+
+    # main 命令
+    main_parser = subparsers.add_parser('main', help='完整处理流程')
+    main_parser.add_argument('file', type=Path, help='关键词文件路径')
+    main_parser.add_argument('--start', type=int, help='起始索引')
+    main_parser.add_argument('--end', type=int, help='结束索引')
+    main_parser.add_argument('--json', action='store_true', help='输出JSON格式')
+
+    # read 命令
+    read_parser = subparsers.add_parser('read', help='仅读取关键词')
+    read_parser.add_argument('file', type=Path, help='关键词文件路径')
+    read_parser.add_argument('--json', action='store_true', help='输出JSON格式')
+
+    # submit 命令
+    submit_parser = subparsers.add_parser('submit', help='仅提交任务')
+    submit_parser.add_argument('-f', '--file', type=Path, help='关键词文件路径')
+    submit_parser.add_argument('-k', '--keywords', nargs='+', help='直接输入关键词列表')
+    submit_parser.add_argument('--json', action='store_true', help='输出JSON格式')
+
+    return parser.parse_args()
+
+def handle_output(result, use_json: bool):
+    if use_json:
+        print(json.dumps(result, indent=2, ensure_ascii=False))
+    else:
+        if isinstance(result, dict) and "error" in result:
+            print(f"错误: {result['error']}")
+        elif isinstance(result, list):
+            print("\n".join(result))
+        else:
+            print(result)
 
 if __name__ == '__main__':
-    if len(sys.argv) != 2:
-        print("使用方法: python -m worker.celery.async_client <关键词文件路径>")
-        # '示例: python -m worker.celery.async_client "G:\code\upwork\zhang_crawl_bio\download\测试-精油-2000.xlsx"'
-        sys.exit(1)
-        
-    main(sys.argv[1])
+    args = parse_args()
+    
+    try:
+        if args.command == 'main':
+            result = main(args.file, args.start, args.end)
+            handle_output(result, args.json)
+            
+        elif args.command == 'read':
+            keywords = read_keywords_from_file(args.file)
+            handle_output(keywords, args.json)
+            
+        elif args.command == 'submit':
+            if args.file:
+                keywords = read_keywords_from_file(args.file)
+            elif args.keywords:
+                keywords = args.keywords
+            else:
+                raise ValueError("必须指定文件或关键词列表")
+                
+            tasks = submit_keyword_tasks(keywords)
+            handle_output(tasks, args.json)
+            
+    except Exception as e:
+        error_result = {"error": str(e), "status": "failed"}
+        handle_output(error_result, args.json if hasattr(args, 'json') else False)
+        sys.exit(1)

+ 5 - 4
worker/search_engine/search_result_db.py

@@ -13,8 +13,9 @@ class KeywordTask(SQLModel, table=True):
     id: Optional[int] = Field(default=None, primary_key=True)
     keyword: str = Field(index=True, unique=True)
     total_results: Optional[int] = None
-    is_completed: bool = Field(default=False)
-    created_at: datetime = Field(default_factory=datetime.now)
+    is_completed: Optional[bool] = Field(default=False)
+    fail_count: Optional[int] = 0    
+    created_at: Optional[datetime] = Field(default_factory=datetime.now)
     
     pages: List["SearchPageResult"] = Relationship(back_populates="keyword_task")
     items: List["SearchResultItem"] = Relationship(back_populates="keyword_task")
@@ -84,12 +85,12 @@ class SearchResultManager:
             
             if keyword_task:
                 # 删除关联的SearchResultItem
-                session.execute(
+                session.exec(
                     delete(SearchResultItem)
                     .where(SearchResultItem.keyword_id == keyword_task.id)
                 )
                 # 删除关联的SearchPageResult
-                session.execute(
+                session.exec(
                     delete(SearchPageResult)
                     .where(SearchPageResult.keyword_id == keyword_task.id)
                 )