Эх сурвалжийг харах

完成 dp 搜索和翻页

mrh 10 сар өмнө
parent
commit
faae2ca9f0

+ 2 - 1
.gitignore

@@ -5,4 +5,5 @@ __pycache__
 .aider*
 .env
 .pytest_cache
-download
+download
+local_proxy_pool/

+ 1 - 2
CONVENTIONS.md

@@ -21,8 +21,7 @@
 
 - 当前环境是 python 3.12 ,务必要保持最新的接口来开发,例如 Fastapi 不再使用 app.event ,而是使用 lifespan 。pydantic.BaseModel 不再支持 dict() ,而是用 model_dump()
 
-重要:由于你是在 aider 开发环境中,如果你要编写任何文件的代码,都不能省略已有代码,必须完整写完
-
+- 必须使用 REPLACE 、 SEARCH 功能来增删代码
 # 项目说明:
 - 我的场景中,Worker 是执行浏览器搜索任务的,任务执行时间较长,对于网络和计算性能有要求,因此是多机分别部署的,且可能因错误被终止。
 - 我的场景适用于公司内部局域网,只需要不到10台电脑来完成 Worker 。

+ 3 - 1
config/settings.py

@@ -15,4 +15,6 @@ HTTPS_PROXY='http://127.0.0.1:1881'
 
 REDIS_URL = 'redis://127.0.0.1:6379/1'
 
-PC_NAME=os.getenv('PC_NAME', 'pc1') 
+PC_NAME=os.getenv('PC_NAME', 'pc1') 
+
+PROXY_POOL_BASE_URL = 'http://localhost:5010'

+ 3 - 2
mylib/drission_page.py

@@ -75,14 +75,15 @@ def test_random_ua_chrome():
 
 def test_normal_chrome():
     # genarate_chrome_ini()
-    page = load_chrome_from_ini()
+    page = load_chrome_from_ini(proxy='http://localhost:9356')
     tab = page.latest_tab
     keyword = "Acalypha rivularis essential oil"
     url = f"https://www.google.com/search?q={keyword}"
-    # url = "https://bot.sannysoft.com/"
+    url = "https://bot.sannysoft.com/"
     # recaptcha 验证码检测
     # url = "https://patrickhlauke.github.io/recaptcha/"
     tab.get(url)
+    tab.scroll.to_bottom()
     # tab.get_screenshot('./1.png')
     # page.quit()
 

+ 58 - 8
worker/celery/async_client.py

@@ -1,15 +1,65 @@
-from worker.celery.async_tasks import async_browser_task, add
+from worker.celery.async_tasks import async_browser_task, async_search_task
+from pathlib import Path
+import pandas as pd
 import sys
 from mylib.logu import logger
+from typing import List
 
-def submit_async_task(url: str):
-    """提交异步浏览器测试任务"""
+def read_keywords_from_file(file_path: Path) -> List[str]:
+    """读取文件第0列关键词(参考client.py实现)"""
     try:
-        async_browser_task.delay(url)  # 修复参数传递方式
-        logger.info(f"已提交异步浏览器任务: {url}")
+        if file_path.suffix.lower() in ['.xlsx', '.xls']:
+            df = pd.read_excel(file_path, header=0, engine='openpyxl')
+        elif file_path.suffix.lower() in ['.csv', '.tsv']:
+            df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',')
+        else:
+            raise ValueError(f"Unsupported format: {file_path.suffix}")
+            
+        return df.iloc[:, 0].astype(str).tolist()
+        
     except Exception as e:
-        logger.error(f"任务提交失败: {str(e)}")
+        logger.error(f"文件读取失败: {str(e)}")
+        raise
+
+def submit_keyword_tasks(keywords: List[str]):
+    """提交关键词搜索任务(适配async_tasks.py的接口)"""
+    for keyword in keywords:
+        try:
+            # 直接使用async_search_task定义的原生参数格式
+            async_search_task.delay(
+                keyword=keyword.strip(),
+                max_result_items=200,
+                skip_existing=True,
+                # browser_config={}  # 使用空字典代替默认BrowserConfig对象
+            )
+            logger.info(f"已提交搜索任务: {keyword}")
+        except Exception as e:
+            logger.error(f"任务提交失败 [{keyword}]: {str(e)}")
+
+def main(file_path: str):
+    """主流程(整合文件读取和任务提交)"""
+    try:
+        path = Path(file_path)
+        if not path.exists():
+            raise FileNotFoundError(f"文件不存在: {path}")
+            
+        keywords = read_keywords_from_file(path)
+        if not keywords:
+            raise ValueError("文件未包含有效关键词")
+            
+        logger.info(f"成功读取 {len(keywords)} 个关键词")
+        logger.info(f"示例关键词: {keywords[:3]}...")  # 只显示前3个示例
+        submit_keyword_tasks(keywords[5:10])
+        logger.info("所有搜索任务已提交完成")
+        
+    except Exception as e:
+        logger.error(f"程序异常终止: {str(e)}")
+        sys.exit(1)
 
 if __name__ == '__main__':
-    submit_async_task('https://www.baidu.com')
-    # res = add.delay(1,2)
+    if len(sys.argv) != 2:
+        print("使用方法: python -m worker.celery.async_client <关键词文件路径>")
+        # '示例: python -m worker.celery.async_client "G:\code\upwork\zhang_crawl_bio\download\测试-精油-2000.xlsx"'
+        sys.exit(1)
+        
+    main(sys.argv[1])

+ 19 - 3
worker/celery/async_tasks.py

@@ -1,3 +1,4 @@
+import httpx
 from worker.celery.app import app
 from camoufox.async_api import AsyncCamoufox
 from worker.search_engine.google_search import search_keyword
@@ -5,6 +6,7 @@ from worker.search_engine.camoufox_broswer import BrowserConfig
 from mylib.logu import logger
 import asyncio
 import sys
+from config.settings import PROXY_POOL_BASE_URL
 
 @app.task
 def async_browser_task(url: str):
@@ -35,16 +37,30 @@ def async_browser_task(url: str):
         loop.close()
         asyncio.set_event_loop_policy(None)  # Cleanup policy after use
 
+async def get_random_proxy():
+    """测试所有运行中代理的延迟并启动可用代理"""
+    url = f"{PROXY_POOL_BASE_URL}/get"
+    async with httpx.AsyncClient() as client:
+        response = await client.get(url, timeout=30)
+        response.raise_for_status()
+        results = response.json()
+        logger.info(f"results {results}")
+        port = results["port"]
+        addr = f'http://127.0.0.1:{port}'
+        logger.info(f"curl -i -x {addr} https://www.google.com")
+        return addr
 @app.task(name='search_worker.search')
-def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True):
+def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: BrowserConfig=BrowserConfig()):
     """异步关键词搜索任务"""
     if sys.platform == 'win32':
         asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
-
     async def _execute_search():
         try:
+            proxy = {'server': await get_random_proxy()} 
+            browser_config.proxy.update(proxy)
+            logger.info(f"{browser_config.proxy}")
             logger.info(f"开始处理关键词搜索任务: {keyword}")
-            result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
+            result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing, config=browser_config)
             return {"keyword": keyword, "result": result}
         except Exception as e:
             logger.error(f"关键词搜索任务失败: {str(e)}")

+ 2 - 8
worker/readme.md

@@ -12,7 +12,8 @@ search_keyword_task.delay(task_data)
 ```
 命令行方式
 ```shell
-celery -A worker.celery.tasks flower --persistent=True --db=".\output\celery\flower_db"
+G:\code\upwork\zhang_crawl_bio\download\Redis-x64-5.0.14.1\redis-server.exe
+celery -A worker.celery.app flower --persistent=True --db=".\output\flower_db"
 # 为不同PC启动worker时指定配置
 $env:PC_NAME="w1"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
 $env:PC_NAME="w2"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
@@ -20,11 +21,4 @@ $env:PC_NAME="w3"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
 $env:PC_NAME="w4"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
 $env:PC_NAME="w5"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
 
-
-
-# 单机多worker不同配置(使用不同配置文件)
-
-PC_NAME=worker1_config celery -A worker.celery.app worker --hostname=worker1@%h
-
-PC_NAME=worker2_config celery -A worker.celery.app worker --hostname=worker2@%h
 ```

+ 76 - 38
worker/search_engine/drission_google_search.py

@@ -1,16 +1,17 @@
-from pathlib import Path
 import time
 import re
-from typing import Optional
+import logging
+from pathlib import Path
 from DrissionPage import ChromiumPage
-from DrissionPage.common import Keys
 from pydantic import BaseModel
 from scrapling import Adaptor
 from mylib.logu import logger
 from mylib.base import save_to_file
-from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
 from config.settings import OUTPUT_DIR
 from mylib.drission_page import load_chrome_from_ini
+from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
+from worker.search_engine.smart_selector import get_search_ele
+from DrissionPage.common import Keys
 
 class SearchResultEle(BaseModel):
     search_div: bool | None = None
@@ -24,7 +25,7 @@ class GoogleSearchHandlerDrission:
         self.db_manager = SearchResultManager()
         self.save_dir = OUTPUT_DIR / 'results'
     
-    def save_current_page(self, keyword: str, filename: str = time.strftime("%Y%m%d_%H%M%S")) -> Path:
+    def save_current_page(self, keyword: str, filename: str=time.strftime("%Y%m%d_%H%M%S")) -> Path:
         html_dir = self.save_dir / keyword
         html_dir.mkdir(parents=True, exist_ok=True)
         html_path = save_to_file(self.page.html, html_dir / f"{filename}.html")
@@ -58,61 +59,78 @@ class GoogleSearchHandlerDrission:
             
         return result_ele
 
-    def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False) -> KeywordTask:
-        if skip_existing:
-            key_model = self.db_manager.get_keyword_task(keyword)
-            if key_model:
-                logger.info(f"关键词任务已完成,跳过处理: {keyword}")
-                return key_model
+    def check_keyword(self, keyword: str, skip_existing: bool) -> tuple[bool, KeywordTask]:
+        key_model = self.db_manager.get_keyword_task(keyword)
+        if skip_existing and key_model.is_completed:
+            logger.info(f"关键词任务已完成,跳过处理: id={key_model.id} {keyword}")
+            return True, key_model
+
+        if key_model:
+            self.db_manager.delete_keyword_task(keyword)
+        key_model = self.db_manager.create_keyword_task(keyword)
+        return False, key_model
 
-        self.db_manager.create_keyword_task(keyword)
+    def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
+        _, key_model = self.check_keyword(keyword, skip_existing)
         self.search(keyword)
-        
-        has_next = True
         search_result_item_count = 0
-        finitsh_flag = False
-        while has_next:
+        should_complete = False
+        
+        while True:
             result_ele = self._process_single_page(keyword)
+            if not result_ele.search_div:
+                break
+
             search_result_item_count += len(result_ele.results) if result_ele.results else 0
-            
-            if search_result_item_count > max_result_items:
-                logger.info(f"关键词 {keyword} 单页结果数量超过 {max_result_items} ,跳过处理下一页")
-                finitsh_flag = True
+            if search_result_item_count >= max_result_items or not result_ele.next_page_url:
+                should_complete = True
                 break
-            if result_ele.next_page_url:
+
+            try:
                 self.page.scroll.to_bottom()
                 time.sleep(3)
                 next_btn = self.page.ele('#pnnext')
                 if next_btn:
                     next_btn.click()
                     logger.info(f"跳转到下一页: {self.page.url}")
+                    self.page.wait.load_start()
                 else:
-                    finitsh_flag = True
                     break
-            else:
+            except Exception as e:
+                logger.warning(f"翻页失败: {str(e)}")
                 break
-                
+
+        if should_complete:
+            key_model = self.db_manager.mark_task_completed(keyword)
+            logger.info(f"正常完成关键词处理: {keyword}")
+        else:
+            logger.warning(f"关键词处理被中断: {keyword}")
         return key_model
 
     def goto_home_page(self):
         url = "https://www.google.com"
         if self.page.url != url:
             self.page.get(url)
+            self.page.wait.load_start()
         if 'sorry/' in self.page.url:
-            raise Exception("出现人机验证,需要人工干预")
+            raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
 
     def search(self, query: str):
         self.goto_home_page()
-        search_box = self.page.ele('textarea')
+        search_ele_dict = get_search_ele(self.page.html)
+        if not search_ele_dict:
+            raise Exception("未找到搜索框")
+        search_box = self.page.ele(f'xpath:{search_ele_dict['xpath']}')
         search_box.input(query)
         self.page.actions.type(Keys.ENTER)
+        self.page.wait.load_start()
 
     def get_current_page_num(self) -> int:
         if '/search?q=' in self.page.url:
             match = re.search(r'&start=(\d+)', self.page.url)
             return int(match.group(1)) // 10 + 1 if match else 1
 
-    def get_search_result_ele(self, html_content: str) -> SearchResultEle:
+    def get_search_result_ele(self, html_content: str):
         res = SearchResultEle(
             search_div=None,
             next_page_url=None,
@@ -126,12 +144,11 @@ class GoogleSearchHandlerDrission:
         next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
         res.search_div = bool(search_div)
         res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
-        
         if not search_div:
             return res
 
         result_list = search_div.xpath('//*[@data-snc]')
-        logger.info(f"当前页结果数量: {len(result_list)}")
+        logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}")
         
         for result_item in result_list:
             if len(result_item.children) < 2:
@@ -152,18 +169,39 @@ class GoogleSearchHandlerDrission:
                 res.results.append(result)
         return res
 
-def search_keyword_drission(keyword: str, max_result_items: int = 1, skip_existing: bool = False):
-    # page = load_chrome_from_ini(proxy='http://localhost:1881')
-    page = load_chrome_from_ini(proxy='http://localhost:1881')
+def search_keyword_drission(keyword, max_result_items=200, skip_existing=True):
+    ret = {'error': 0, 'msg': '', 'data': None}
+    logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
+    search_handler = GoogleSearchHandlerDrission(None)
+    exist, keyword_model = search_handler.check_keyword(keyword, skip_existing)
+    if exist and keyword_model.is_completed:
+        ret['data'] = keyword_model.model_dump()
+        return ret
+    
+    page = load_chrome_from_ini(proxy='http://127.0.0.1:1881')
     try:
-        handler = GoogleSearchHandlerDrission(page)
-        return handler.process_keyword(keyword, max_result_items, skip_existing)
+        search_handler = GoogleSearchHandlerDrission(page)
+        kw = search_handler.process_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
+        if not kw:
+            ret['error'] = 1
+            html_path = search_handler.save_current_page(keyword, filename=f"warning_{time.strftime('%Y%m%d_%H%M%S')}")
+            logger.warning(f"关键词任务未完成: {keyword} html_path: {html_path}")
+            ret['msg'] = f"关键词任务未完成: {keyword}"
+            ret['data'] = html_path
+            return ret
+        ret['data'] = kw.model_dump()
+        return ret
     except Exception as e:
-        logger.exception(f"关键词处理失败: {keyword}")
-        raise
+        html_path = search_handler.save_current_page(keyword, filename=f"error_{time.strftime('%Y%m%d_%H%M%S')}")
+        logger.exception(f"失败: {str(e)} html_path: {html_path}")
+        ret['error'] = 1
+        ret['msg'] = f"失败: {str(e)}"
+        ret['data'] = html_path
+        page.quit()
+        return ret
 
 def main():
-    search_keyword_drission("drission")
+    search_keyword_drission("drission", max_result_items=15)
 
 if __name__ == "__main__":
-    main()
+    main()

+ 19 - 11
worker/search_engine/google_search.py

@@ -65,17 +65,19 @@ class GoogleSearchHandler():
             
         return result_ele
 
-    async def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
+    def check_keyword(self, keyword: str, skip_existing: bool) -> tuple[bool, KeywordTask]:
         key_model = self.db_manager.get_keyword_task(keyword)
         if skip_existing and key_model:
-            logger.info(f"关键词任务已完成,跳过处理: {keyword}")
-            return key_model
+            logger.info(f"关键词任务已完成,跳过处理: id={key_model.id} {keyword}")
+            return True, key_model
 
         # 删除旧数据重新创建任务
         if key_model:
             self.db_manager.delete_keyword_task(keyword)
         key_model = self.db_manager.create_keyword_task(keyword)
-
+        return False, key_model
+    async def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
+        _, key_model = self.check_keyword(keyword, skip_existing)
         await self.search(keyword)
         search_result_item_count = 0
         should_complete = False  # 标记是否满足完成条件
@@ -116,11 +118,11 @@ class GoogleSearchHandler():
             await self.page.goto(url)
             await self.page.wait_for_load_state('load', timeout=10000)
         if 'sorry/' in self.page.url:
-            user_input = await async_input("出现人机验证,是否继续?(y/n): ")
-            if user_input.lower() != 'y':
-                self.goto_home_page()
-            else:
-                raise Exception("用户选择退出,程序终止。")
+            # user_input = await async_input("出现人机验证,是否继续?(y/n): ")
+            # if user_input.lower() != 'y':
+            #     self.goto_home_page()
+            # else:
+            #     raise Exception("用户选择退出,程序终止。")
             
             raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
 
@@ -135,7 +137,7 @@ class GoogleSearchHandler():
         textarea = self.page.locator(search_ele_dict['xpath'])
         await textarea.fill(query, timeout=10000)
         await textarea.press('Enter')
-        await self.page.wait_for_load_state(state='load', timeout=10000)
+        await self.page.wait_for_load_state(state='domcontentloaded', timeout=10000)
         return await self.page.content()
 
     def get_current_page_num(self) -> int:
@@ -182,11 +184,17 @@ class GoogleSearchHandler():
                 res.results.append(result)
         return res
 
-async def search_keyword(keyword, max_result_items=200, skip_existing=False, config: BrowserConfig = BrowserConfig()):
+async def search_keyword(keyword, max_result_items=200, skip_existing=True, config: BrowserConfig = BrowserConfig()):
     ret = {'error': 0, 'msg': '', 'data': None}
     config_dict = config.model_dump()
     logger.info(f"BrowserConfig {config_dict}")
     logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
+    search_handler = GoogleSearchHandler(None)
+    exist, keyword_model = search_handler.check_keyword(keyword, skip_existing)
+    if exist and keyword_model.is_completed:
+        ret['data'] = keyword_model.model_dump()
+        return ret
+    
     async with AsyncCamoufox(**config_dict) as browser:
         try:
             

+ 1 - 0
worker/search_engine/smart_selector.py

@@ -189,6 +189,7 @@ def get_search_rule():
         {"type": "attribute_contains", "name": "title", "value": "search", "weight": 25},
         {"type": "attribute_contains", "name": "aria-label", "value": "search", "weight": 25},
         {"type": "attribute_contains", "name": "role", "value": "search", "weight": 30},
+        {"type": "attribute_contains", "name": "name", "value": "1", "weight": 30},
         
         # 层级关系检测
         {