Przeglądaj źródła

新增asin爬取限制检测

mrh 6 miesięcy temu
rodzic
commit
86a3372fc7

+ 2 - 2
config/celery.py

@@ -29,13 +29,13 @@ def on_worker_shutdown(sender=None, **kwargs):
     logger.info("Worker正在关闭,执行清理操作...")
     
     # 示例:关闭数据库连接、释放资源等
-    # close_db_connections()
     
     logger.info("Worker关闭完成")
     
 app = Celery(
     'copywriting_production',
-    backend=CFG.redis_url,
+    backend=CFG.celery_reulst_url,
+    broker=CFG.redis_url,
     include=[
         'src.tasks.crawl_asin_save_task',
         'src.tasks.crawl_asin_exract_task',

+ 1 - 1
config/celery_config.py

@@ -6,7 +6,7 @@ FLOWER_PERSISTENT = True
 FLOWER_DB = str(OUTPUT_DIR / "celery" / "flower_monitor.db")
 broker_url = CFG.redis_url
 
-result_backend = CFG.redis_url
+celery_result_backend = CFG.celery_reulst_url
 task_serializer = 'json'
 result_serializer = 'json'
 accept_content = ['json']

+ 34 - 0
config/dp_conf/9324.ini

@@ -0,0 +1,34 @@
+[paths]
+download_path = .
+tmp_path = 
+
+[chromium_options]
+address = 127.0.0.1:9324
+browser_path = C:\Program Files\Google\Chrome\Application\chrome.exe
+arguments = ['--no-default-browser-check', '--disable-suggestions-ui', '--no-first-run', '--disable-infobars', '--disable-popup-blocking', '--hide-crash-restore-bubble', '--disable-features=PrivacySandboxSettings4', '--mute-audio', '--lang=en-US', '--proxy-server=localhost:1881', '--user-data-dir=G:\\code\\amazone\\copywriting_production\\output\\user_data_dir4']
+extensions = []
+prefs = {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}}
+flags = {}
+load_mode = normal
+user = Default
+auto_port = False
+system_user_path = False
+existing_only = False
+new_env = False
+
+[session_options]
+headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'connection': 'keep-alive', 'accept-charset': 'GB2312,utf-8;q=0.7,*;q=0.7'}
+
+[timeouts]
+base = 10
+page_load = 30
+script = 30
+
+[proxies]
+http = localhost:8851
+https = localhost:8851
+
+[others]
+retry_times = 3
+retry_interval = 2
+

+ 6 - 0
config/dp_conf/9324_worker.yaml

@@ -0,0 +1,6 @@
+chrome_config_ini: G:\code\amazone\copywriting_production\config\dp_conf\9324.ini
+s3_access_key: bh9LbfsPHRJgQ44wXIlv
+s3_endpoint: http://vs1.lan:9002
+storage: s3
+redis_url: redis://sv-v2.lan:7777/0
+celery_reulst_url: redis://sv-v2.lan:7777/1

+ 34 - 0
config/dp_conf/9325.ini

@@ -0,0 +1,34 @@
+[paths]
+download_path = .
+tmp_path = 
+
+[chromium_options]
+address = 127.0.0.1:9325
+browser_path = C:\Program Files\Google\Chrome\Application\chrome.exe
+arguments = ['--no-default-browser-check', '--disable-suggestions-ui', '--no-first-run', '--disable-infobars', '--disable-popup-blocking', '--hide-crash-restore-bubble', '--disable-features=PrivacySandboxSettings4', '--mute-audio', '--lang=en-US', '--proxy-server=localhost:1881', '--user-data-dir=G:\\code\\amazone\\copywriting_production\\output\\user_data_dir5']
+extensions = []
+prefs = {'profile.default_content_settings.popups': 0, 'profile.default_content_setting_values': {'notifications': 2}}
+flags = {}
+load_mode = normal
+user = Default
+auto_port = False
+system_user_path = False
+existing_only = False
+new_env = False
+
+[session_options]
+headers = {'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'connection': 'keep-alive', 'accept-charset': 'GB2312,utf-8;q=0.7,*;q=0.7'}
+
+[timeouts]
+base = 10
+page_load = 30
+script = 30
+
+[proxies]
+http = localhost:8851
+https = localhost:8851
+
+[others]
+retry_times = 3
+retry_interval = 2
+

+ 4 - 0
config/dp_conf/9325.yaml

@@ -0,0 +1,4 @@
+chrome_config_ini: G:\code\amazone\copywriting_production\config\dp_conf\9325.ini
+s3_access_key: bh9LbfsPHRJgQ44wXIlv
+s3_endpoint: http://vs1.lan:9002
+storage: s3

+ 9 - 0
config/dp_conf/9325_worker.yaml

@@ -0,0 +1,9 @@
+chrome_config_ini: G:\code\amazone\copywriting_production\config\dp_conf\9325.ini
+redis_url: redis://sv-v2.lan:7777/0
+celery_reulst_url: redis://sv-v2.lan:7777/1
+s3_access_key: bh9LbfsPHRJgQ44wXIlv
+s3_endpoint: http://s3.vs1.lan
+s3_prefix: s3://public/amazone/copywriting_production
+s3_secret_key: N744RZ60T1b4zlcWG2MROCzjEE2mPTdNQCc7Pk3M
+storage: local
+version: 0.0.1-alpha

+ 1 - 0
config/settings.py

@@ -31,6 +31,7 @@ class Config(BaseModel):
     s3_prefix: Optional[str] = 's3://public/amazone/copywriting_production'
     chrome_config_ini: Optional[str] = r'G:\code\amazone\copywriting_production\config\dp_conf\9322.ini'
     redis_url: Optional[str] = os.environ.get("REDIS_URL", 'redis://localhost:6379/0')
+    celery_reulst_url: Optional[str] = os.environ.get("REDIS_URL", 'redis://localhost:6379/1')
     version: Optional[str] = "0.0.1-alpha"
     def save(self, config_path: Path = None):
         config_path = config_path or get_config_path()

+ 14 - 1
src/browser/crawl_asin.py

@@ -164,7 +164,20 @@ class Crawler(CrawlerBase):
         if excract_unique_words:
             data['unique_words'] = [item['word'] for item in excract_unique_words]
         return data
-
+    async def extra_limitation(self, html:str, input_schema:dict={}) -> CrawlResult:
+        schema = input_schema or {
+           "name": "Limitations",
+            "baseSelector": '//span[@class="text-muted"]',  # 定位到包含限制信息的段落
+            "fields": [
+                {
+                    "name": "limitations",  # 限制信息
+                    "type": "text",
+                }
+            ]
+        }
+        result:CrawlResult = await self.excra_strategy_raw_html(html, schema, JsonXPathExtractionStrategy)
+        if result.success:
+            return json.loads(result.extracted_content)[0].get('limitations')
     def get_mpath_html_content(self, mhtml_path:str):
         mhtml_data = read_file(mhtml_path)
         mhtml_path_name = PurePath(mhtml_path).name

+ 6 - 3
src/tasks/crawl_asin_exract_task.py

@@ -22,12 +22,14 @@ logger = get_logger('browser')
 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
 async def async_run_extractions(crawler:Crawler, html_content:str, upload_s3_dir:str=None):
     """异步运行提取逻辑"""
-    product_info, result_table = await asyncio.gather(
+    product_info, result_table,limitation = await asyncio.gather(
         crawler.extract_product_and_save_resource(html_content,upload_s3_dir),
-        crawler.extra_result_table(html_content)
+        crawler.extra_result_table(html_content),
+        crawler.extra_limitation(html_content),
     )
     res = {
         "result_table": result_table,
+        "limitation": limitation,
     }
     res.update(product_info)
     return res
@@ -49,7 +51,8 @@ async def async_process_mhtml(mhtml_path: str):
         logger.exception(f"异步任务失败: {str(e)}")
         raise
     finally:
-        await asyncio.to_thread(os.unlink, temp_mhtml_path)
+        if 'temp_mhtml_path' in locals() and os.path.exists(temp_mhtml_path):
+            await asyncio.to_thread(os.unlink, temp_mhtml_path)
 
 async def async_process_search_suggestion(search_key: str):
     """异步处理关键词建议"""

+ 35 - 2
src/tasks/crawl_asin_save_task.py

@@ -1,8 +1,9 @@
 # tasks/save_tasks.py
+import os
 from config.celery import app  # Keep celery app
 from src.browser.crawl_asin import Crawler
 from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey,CrawlerSearchKeyInput
-from config.settings import CFG
+from config.settings import CFG, read_config, get_config_path
 from utils.drission_page import ChromeOptions
 from utils.file import check_exists, save_to_file,s3_uri_to_http_url
 from utils.logu import get_logger
@@ -10,6 +11,8 @@ from utils.drission_page import load_chrome_from_ini,ChromeOptions
 
 logger = get_logger('worker')   
 import asyncio
+logger.info(f"CONFIG_PATH {os.environ.get('CONFIG_PATH')}")
+CFG = read_config(get_config_path())
 logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}")
 logger.info(f"浏览器配置: {CFG.chrome_config_ini}")
 chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
@@ -61,4 +64,34 @@ def get_amz_search_key_suggestion(self, input_data: CrawlerSearchKeyInput):
         
     except Exception as e:
         logger.exception(f"任务失败:{e}")
-        self.retry(exc=e, countdown=60)
+        self.retry(exc=e, countdown=60)
+
+from pydantic import BaseModel
+from typing import Optional
+
+class TestInput(BaseModel):
+    mhtml_path:Optional[str] = None
+    overwrite:Optional[bool] = False
+@app.task(
+        bind=True,
+        name='tasks.crawl_asin_save_task.test_task',
+        )
+def test_task(self, input_data: TestInput):
+    logger.info(f"input type {type(input_data)}")
+    inputdata = TestInput(**input_data)
+    logger.info(f"任务开始: test_task {inputdata.model_dump()}")
+
+    logger.info(f"{self.request.hostname}")
+    return {'status':'success', 'worker_name': self.request.hostname, 'args':input_data}
+
+from celery import signals
+@signals.worker_shutdown.connect
+def on_worker_shutdown(sender=None, **kwargs):
+    """Worker关闭时执行清理操作"""
+    logger.info("Worker正在关闭,执行清理操作...")
+    
+    # 示例:关闭数据库连接、释放资源等
+    chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
+    crawler = Crawler(chrome_options=chrome_options)
+    crawler.page.quit()
+    logger.info("Worker关闭完成")