Преглед на файлове

完成环境变量自定义数据库路径

mrh преди 9 месеца
родител
ревизия
560e038387

BIN
%SystemDrive%/ProgramData/SogouInput/Components/Picface/Cloud/sgim_picface_cloud.bin


BIN
%SystemDrive%/ProgramData/SogouInput/Components/Picface/Cloud/sgim_picface_cloud_bak.bin


+ 1 - 1
ui/backend/config.yaml

@@ -11,7 +11,7 @@ redis:
   host: localhost
   port: 6379
 select_proxy: system
-sqluri: G:\code\upwork\zhang_crawl_bio\output\temp.db
+sqluri: sqlite:///G:\code\upwork\zhang_crawl_bio\output\temp.db
 sub:
   auto_start: true
   file: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\6137e542.yaml

+ 13 - 4
ui/backend/src/services/celery_worker.py

@@ -1,3 +1,4 @@
+import os
 from pathlib import Path
 import subprocess
 import sys
@@ -18,11 +19,17 @@ py_client: Optional[Dict[str,Any]] = {
     'crawl': WORKER_DIR_BASE / r'worker\celery\crawl_client.py',
     'convert': WORKER_DIR_BASE / r'worker\celery\html_convert_tasks.py'
 }
+class SearchBrowserConfig(BaseModel):
+    # def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None, browser_path=None, no_imgs=True):
+    headless: Optional[bool] = False
+    proxy: Optional[str] = None
+    browser_path: Optional[str] = None
+    no_imgs: Optional[bool] = True
 
 class SearchTaskConfig(BaseModel):
     max_result_items: Optional[int] = 200
     skip_existing: Optional[bool] = True
-    browser_config: Optional[dict] = {}
+    browser_config: Optional[SearchBrowserConfig] = SearchBrowserConfig()
     proxy_pool_url: Optional[str] = None
     dry_run: Optional[bool] = True
 
@@ -58,6 +65,7 @@ class CeleryWorker:
     def __init__(self, python_exe: str=sys.executable, config = config):
         self.python_exe = python_exe
         self.config = config
+        os.environ['DB_URL'] = self.config.sqluri
         self.workers_model: Dict[str, WorkerModel] = {}
         self.redis_url = f"redis://{config.redis.host}:{config.redis.port}/{config.redis.db}"
         self.redis_client = redis.Redis(host=config.redis.host, port=config.redis.port, db=config.redis.db)
@@ -131,16 +139,16 @@ class CeleryWorker:
         if not worker_model:
             raise ValueError(f"Invalid worker name: {name}")
         if in_cmd_windows:
+            logger.info(f"self.config.sqluri {self.config.sqluri}")
             cmd = ['start','cmd', '/c' ]
             sub_cmd = ' '.join(worker_model.cmd)
             cmd.append(f'{sub_cmd}')
             logger.info(f"run {' '.join(cmd)}")
             process = subprocess.Popen(cmd, shell=True, cwd=WORKER_DIR_BASE)
-            # 立即记录CMD进程的PID作为后备
-            worker_model.pid = process.pid
             # 等待flower返回真实worker PID
             if not await self.wait_for_worker_online(name, timeout):
-                logger.warning(f"Worker {name} 未能及时注册到flower,使用CMD进程PID {process.pid}")
+                # logger.warning(f"Worker {name} 未能及时注册到flower,使用CMD进程PID {process.pid}")
+                raise ValueError(f"{name} 未能启动")
             logger.info(f"start sucess {worker_model}")
         else:
             worker_model.pid = await process_manager.start_process(name, worker_model.cmd, cwd=WORKER_DIR_BASE)
@@ -240,6 +248,7 @@ class CeleryWorker:
         return queue_lengths
     def _prepare_search_task(self, data: Dict, select_proxy: Optional[str] = None):
             task_model = SearchTaskInput(**data)
+            task_model.config.browser_config.browser_path = self.config.browser.exe_path
             if select_proxy == 'pool':
                 task_model.config.proxy_pool_url = f"http://{self.config.backend.host}:{self.config.backend.port}/api/proxy/proxies-pool"
             return task_model

+ 10 - 0
utils/base.py

@@ -0,0 +1,10 @@
+from config.settings import PROXIES,DB_URL
+from pathlib import Path
+def get_db_file_path():
+    if DB_URL.startswith("sqlite:///"):
+        db_file_path = Path(DB_URL.replace("sqlite:///", ""))
+        # 判断是否有效路径,不一定需要文件存在
+        if db_file_path.is_absolute():
+            return db_file_path
+    return None   
+     

+ 2 - 2
worker/api/worker_server.py

@@ -4,7 +4,7 @@ from worker.api.excel_load import app as excel_load_app
 from worker.api.worker_router import app as worker_app
 from fastapi.middleware.cors import CORSMiddleware
 from config.settings import DB_URL, GOOGLE_SEARCH_DIR
-
+from utils.base import get_db_file_path
 app = FastAPI(
     title="搜索微服务",
     description="提供关键词导入和搜索功能的统一API接口",
@@ -30,7 +30,7 @@ async def health_check():
         "status": "healthy",
         "host": args.host,
         "port": args.port,
-        "db_url": DB_URL,
+        "db_url": get_db_file_path(),
         "google_search_dir": GOOGLE_SEARCH_DIR
     }
 

+ 15 - 4
worker/celery/search_tasks.py

@@ -1,3 +1,4 @@
+import os
 import random
 from typing import List, Optional
 
@@ -9,13 +10,22 @@ import sys
 import asyncio
 import httpx
 from utils.proxy_pool import get_random_proxy
-from config.settings import PROXIES
+from config.settings import PROXIES,DB_URL
+from utils.base import get_db_file_path
 from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchResultItem, SearchPageResult
 from sqlmodel import select, Session, exists, distinct
 from celery import group
 import redis
+from mylib.drission_page import load_chrome_from_ini
 # redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
-
+logger.info(f"DB_URL {DB_URL}, environ: {os.environ.get('DB_URL')}")
+logger.info(f"数据库路径: {get_db_file_path()}")
+class SearchBrowserConfig(BaseModel):
+    # def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None, browser_path=None, no_imgs=True):
+    headless: Optional[bool] = False
+    proxy: Optional[str] = None
+    browser_path: Optional[str] = None
+    no_imgs: Optional[bool] = True
 
 class SearchTaskConfig(BaseModel):
     max_result_items: Optional[int] = 200
@@ -76,13 +86,14 @@ def drission_search_task(task_input: SearchTaskInput):
     # 统一配置解析
     input_model = SearchTaskInput(**task_input)
     config = input_model.config or SearchTaskConfig()
+    browser_config = SearchBrowserConfig(**config.browser_config)
     async def _execute_search():
         try:
             # 使用配置模型的参数
             logger.info(f"开始处理关键词搜索任务{"( dry_run 模式)" if config.dry_run else ""}: {input_model.keyword}")
             if config.proxy_pool_url:
                 proxy = await asyncio.to_thread(get_random_proxy, config.proxy_pool_url)
-                config.browser_config['proxy'] = proxy
+                browser_config.proxy = proxy
                 logger.info(f"使用代理池: {config.proxy_pool_url}  -->  {proxy}")
             else:
                 logger.info(f"使用代理: 跟随系统")
@@ -95,7 +106,7 @@ def drission_search_task(task_input: SearchTaskInput):
                     input_model.keyword,
                     max_result_items=config.max_result_items,
                     skip_existing=config.skip_existing,
-                    browser_config=config.browser_config)
+                    browser_config=browser_config.model_dump()),
             ret = {"keyword": input_model.keyword, "result": result}
             logger.info(f"关键词搜索任务完成: {ret}")
             return ret