Kaynağa Gözat

celery Camoufox 完成一个关键词任务,并存储到数据库。下一步开发读取代理

mrh 10 ay önce
ebeveyn
işleme
f114c146d5

+ 0 - 1
CONVENTIONS.md

@@ -24,6 +24,5 @@
 重要:由于你是在 aider 开发环境中,如果你要编写任何文件的代码,都不能省略已有代码,必须完整写完
 
 # 项目说明:
-- 测试模块在 tests 的目录中, `tests\mytest` 是我私人的草稿,不用理会
 - 我的场景中,Worker 是执行浏览器搜索任务的,任务执行时间较长,对于网络和计算性能有要求,因此是多机分别部署的,且可能因错误被终止。
 - 我的场景适用于公司内部局域网,只需要不到10台电脑来完成 Worker 。

+ 2 - 2
worker/celery/async_tasks.py

@@ -44,7 +44,7 @@ def async_search_task(keyword: str, max_result_items: int=200, skip_existing: bo
     async def _execute_search():
         try:
             logger.info(f"开始处理关键词搜索任务: {keyword}")
-            result = await search_keyword(keyword, max_result_items=1, skip_existing=skip_existing)
+            result = await search_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
             return {"keyword": keyword, "result": result}
         except Exception as e:
             logger.error(f"关键词搜索任务失败: {str(e)}")
@@ -66,7 +66,7 @@ def main():
     # logger.info(f"url")
     # res = async_browser_task.delay("https://www.baidu.com")
     # res = add.delay(1,2)
-    res = async_search_task.delay("Acalypha psilostachya essential oil",1,False)
+    res = async_search_task.delay("Acalypha psilostachya essential oil",200,False)
     logger.info(f"{res}")
 
 if __name__ == "__main__":

+ 0 - 0
worker/celery/readme.md → worker/readme.md


+ 39 - 32
worker/search_engine/google_search.py

@@ -37,13 +37,14 @@ class GoogleSearchHandler():
         html_path = save_to_file(await self.page.content(), html_dir / f"{filename}.html")
         logger.info(f"save_to_file {html_path}")
         return html_path
+
     async def _process_single_page(self, keyword: str) -> SearchResultEle:
         content = await self.page.content()
         result_ele = self.get_search_result_ele(content)
         
         if not result_ele.search_div:
             logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
-            return False, result_ele
+            return result_ele
 
         html_path = await self.save_current_page(keyword, filename=f"{result_ele.current_page}")
         page_result = self.db_manager.save_page_results(
@@ -65,36 +66,48 @@ class GoogleSearchHandler():
         return result_ele
 
     async def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
-        if skip_existing:
-            key_model = self.db_manager.get_keyword_task(keyword)
-            if key_model:
-                logger.info(f"关键词任务已完成,跳过处理: {keyword}")
-                return key_model
+        key_model = self.db_manager.get_keyword_task(keyword)
+        if skip_existing and key_model:
+            logger.info(f"关键词任务已完成,跳过处理: {keyword}")
+            return key_model
 
-        self.db_manager.create_keyword_task(keyword)
+        # 删除旧数据重新创建任务
+        if key_model:
+            self.db_manager.delete_keyword_task(keyword)
+        key_model = self.db_manager.create_keyword_task(keyword)
 
         await self.search(keyword)
-        has_next = True
-        result_ele = None
         search_result_item_count = 0
-        while has_next:
+        should_complete = False  # 标记是否满足完成条件
+        
+        while True:
             result_ele = await self._process_single_page(keyword)
+            # 处理验证页面等异常情况
+            if not result_ele.search_div:
+                break
+
             search_result_item_count += len(result_ele.results) if result_ele.results else 0
-            if search_result_item_count > max_result_items:
-                logger.info(f"关键词 {keyword} 单页结果数量超过 {max_result_items} ,跳过处理下一页")
+            # 达到最大结果数或没有下一页时标记完成
+            if search_result_item_count >= max_result_items or not result_ele.next_page_url:
+                should_complete = True
                 break
-            if result_ele.next_page_url:
+
+            try:
                 await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
                 await asyncio.sleep(3)
                 await self.page.click("//a[@id='pnnext']", timeout=10000)
                 logger.info(f"self.page.url {self.page.url}")
-                logger.debug(f"goto next_page_url {result_ele.next_page_url}")
                 await self.page.wait_for_load_state('load', timeout=10000)
-            else:
+            except Exception as e:
+                logger.warning(f"翻页失败: {str(e)}")
                 break
-                
-        key_model = self.db_manager.mark_task_completed(keyword)
-        logger.info(f"完成关键词处理: {keyword}")
+
+        # 只有正常完成时才标记任务完成
+        if should_complete:
+            key_model = self.db_manager.mark_task_completed(keyword)
+            logger.info(f"正常完成关键词处理: {keyword}")
+        else:
+            logger.warning(f"关键词处理被中断: {keyword}")
         return key_model
 
     async def goto_home_page(self):
@@ -110,16 +123,18 @@ class GoogleSearchHandler():
                 raise Exception("用户选择退出,程序终止。")
             
             raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
+
     def find_search_div(self, html_content: str) -> str:
         return bool(Adaptor(html_content).xpath_first('//div[@id="search"]'))
+
     async def search(self, query: str) -> dict:
         await self.goto_home_page()
         search_ele_dict = get_search_ele(await self.page.content())
         if not search_ele_dict:
             raise Exception("未找到搜索框")
         textarea = self.page.locator(search_ele_dict['xpath'])
-        await textarea.fill(query, timeout=10000)  # 使用 textarea.fill() 而不是 page.fill()
-        await textarea.press('Enter')  # 使用 press 方法模拟按下 Enter 键
+        await textarea.fill(query, timeout=10000)
+        await textarea.press('Enter')
         await self.page.wait_for_load_state(state='load', timeout=10000)
         return await self.page.content()
 
@@ -146,7 +161,7 @@ class GoogleSearchHandler():
             return res
 
         result_list = search_div.xpath('//*[@data-snc]')
-        logger.info(f"当前页结果数量: {len(result_list)}")
+        logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}")
         
         for result_item in result_list:
             if len(result_item.children) < 2:
@@ -167,11 +182,11 @@ class GoogleSearchHandler():
                 res.results.append(result)
         return res
 
-async def search_keyword(keyword, max_result_items=200, skip_existing=True, config: BrowserConfig = BrowserConfig()):
+async def search_keyword(keyword, max_result_items=200, skip_existing=False, config: BrowserConfig = BrowserConfig()):
     ret = {'error': 0, 'msg': '', 'data': None}
     config_dict = config.model_dump()
     logger.info(f"BrowserConfig {config_dict}")
-    # config_dict['config'] = {'navigator.cookieEnabled': False}
+    logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
     async with AsyncCamoufox(**config_dict) as browser:
         try:
             
@@ -193,6 +208,7 @@ async def search_keyword(keyword, max_result_items=200, skip_existing=True, conf
             ret['msg'] = f"失败: {str(e)}"
             ret['data'] = html_path
             return ret
+
 async def aio_main(config: BrowserConfig = BrowserConfig()):
     try:
         async with BrowserCore(config) as core:
@@ -200,20 +216,13 @@ async def aio_main(config: BrowserConfig = BrowserConfig()):
             await search_handler.goto_home_page()
             keywords = [
                 'Acampe carinata essential oil',
-                # 'Acampe cephalotes essential oil',
-                # 'Acampe hulae essential oil',
-                # 'Acampe rigida essential oil',
-                # 'Acamptopappus shockleyi essential oil'
             ]
-            # for keyword in keywords:
-            #     await search_handler.process_keyword(keyword)
             while True:
                 await asyncio.sleep(1)
                 
     except Exception as e:
         logger.error(f"失败: {str(e)}")
 
-
 def analyze():
     html_file = Path(r"K:\code\upwork\zhang_crawl_bio\output\results\Acampe_rigida_essential_oil\page_1.html")
     class TestPage:
@@ -223,9 +232,7 @@ def analyze():
     logger.info(f"{res.model_dump_json(indent=4,)}")
 
 def main():
-    # analyze()
     asyncio.run(aio_main())
-    # asyncio.run(search_keyword('Acampe carinata essential oil', config=BrowserConfig(), skip_existing=False))
 
 if __name__ == "__main__":
     main()

+ 35 - 12
worker/search_engine/search_result_db.py

@@ -1,7 +1,8 @@
 from datetime import datetime
 from typing import Optional, List
-from sqlmodel import SQLModel, Field, Relationship, create_engine, Session, select, UniqueConstraint
-from sqlalchemy.sql import func
+from sqlmodel import SQLModel, Field, Relationship, create_engine, Session, select, delete, func
+from sqlalchemy import UniqueConstraint
+from sqlalchemy.sql import text
 from pathlib import Path
 from config.settings import DB_URL
 
@@ -60,11 +61,33 @@ class SearchResultManager:
                 .where(KeywordTask.keyword == keyword)
             ).first()
     
+    def delete_keyword_task(self, keyword: str):
+        """删除关键词及其所有关联数据"""
+        with Session(self.engine) as session:
+            # 先获取关键词任务
+            keyword_task = session.exec(
+                select(KeywordTask)
+                .where(KeywordTask.keyword == keyword)
+            ).first()
+            
+            if keyword_task:
+                # 删除关联的SearchResultItem
+                session.execute(
+                    delete(SearchResultItem)
+                    .where(SearchResultItem.keyword_id == keyword_task.id)
+                )
+                # 删除关联的SearchPageResult
+                session.execute(
+                    delete(SearchPageResult)
+                    .where(SearchPageResult.keyword_id == keyword_task.id)
+                )
+                # 删除KeywordTask
+                session.delete(keyword_task)
+                session.commit()
+    
     def create_keyword_task(self, keyword: str) -> KeywordTask:
         with Session(self.engine) as session:
-            if task := self.get_keyword_task(keyword):
-                return task
-                
+            # 先删除可能存在的旧数据(在process_keyword中已处理)
             task = KeywordTask(keyword=keyword)
             session.add(task)
             session.commit()
@@ -88,7 +111,10 @@ class SearchResultManager:
             if existing:
                 return existing
 
-            keyword_task = self.create_keyword_task(keyword)
+            keyword_task = self.get_keyword_task(keyword)
+            if not keyword_task:
+                raise ValueError("Keyword task must exist before saving page results")
+
             page_result = SearchPageResult(
                 keyword_id=keyword_task.id,
                 keyword=keyword,
@@ -110,7 +136,6 @@ class SearchResultManager:
         html_path: Optional[Path] = None
     ) -> int:
         with Session(self.engine) as session:
-            # 获取关联的 keyword_task
             keyword_task = session.exec(
                 select(KeywordTask)
                 .where(KeywordTask.keyword == keyword)
@@ -119,10 +144,8 @@ class SearchResultManager:
             if not keyword_task:
                 raise ValueError(f"Keyword task not found for keyword: {keyword}")
 
-            # 批量处理新增条目
             new_items = []
             for item in items:
-                # 检查是否已存在相同条目
                 exists = session.exec(
                     select(SearchResultItem)
                     .where(SearchResultItem.url == item.url)
@@ -141,7 +164,6 @@ class SearchResultManager:
                     )
                     new_items.append(new_item)
             
-            # 批量添加新条目
             session.add_all(new_items)
             session.commit()
             return new_items
@@ -152,11 +174,12 @@ class SearchResultManager:
             if not task:
                 raise ValueError(f"Keyword task {keyword} not found")
             
-            # 修复SUM查询返回类型问题
+            # 使用窗口函数确保统计准确性
             total_results = session.scalar(
                 select(func.sum(SearchPageResult.results_count))
                 .where(SearchPageResult.keyword_id == task.id)
-            ) or 0  # 处理None情况
+                .execution_options(compile_kwargs={"literal_binds": True})
+            ) or 0
             
             task.is_completed = True
             task.total_results = total_results