2 次代码提交 0dc47ed40e ... d670490b0d

作者 SHA1 备注 提交日期
  mrh d670490b0d 完成基于 schema 的快速提取 json 1 年之前
  mrh 57b68a12e5 上传S3时附带参数和格式 1 年之前
共有 4 个文件被更改,包括 346 次插入77 次删除
  1. 115 0
      docs/gpt/crawler_Schema.md
  2. 102 57
      src/browser/crawl_asin.py
  3. 76 0
      tests/mytest/t_crawler.py
  4. 53 20
      utils/file.py

+ 115 - 0
docs/gpt/crawler_Schema.md

@@ -0,0 +1,115 @@
+3. Advanced Schema & Nested Structures
+Real sites often have nested or repeated data—like categories containing products, which themselves have a list of reviews or features. For that, we can define nested or list (and even nested_list) fields.
+
+Sample E-Commerce HTML
+We have a sample e-commerce HTML file on GitHub (example):
+
+https://gist.githubusercontent.com/githubusercontent/2d7b8ba3cd8ab6cf3c8da771ddb36878/raw/1ae2f90c6861ce7dd84cc50d3df9920dee5e1fd2/sample_ecommerce.html
+This snippet includes categories, products, features, reviews, and related items. Let’s see how to define a schema that fully captures that structure without LLM.
+schema = {
+    "name": "E-commerce Product Catalog",
+    "baseSelector": "div.category",
+    # (1) We can define optional baseFields if we want to extract attributes 
+    # from the category container
+    "baseFields": [
+        {"name": "data_cat_id", "type": "attribute", "attribute": "data-cat-id"}, 
+    ],
+    "fields": [
+        {
+            "name": "category_name",
+            "selector": "h2.category-name",
+            "type": "text"
+        },
+        {
+            "name": "products",
+            "selector": "div.product",
+            "type": "nested_list",    # repeated sub-objects
+            "fields": [
+                {
+                    "name": "name",
+                    "selector": "h3.product-name",
+                    "type": "text"
+                },
+                {
+                    "name": "price",
+                    "selector": "p.product-price",
+                    "type": "text"
+                },
+                {
+                    "name": "details",
+                    "selector": "div.product-details",
+                    "type": "nested",  # single sub-object
+                    "fields": [
+                        {
+                            "name": "brand",
+                            "selector": "span.brand",
+                            "type": "text"
+                        },
+                        {
+                            "name": "model",
+                            "selector": "span.model",
+                            "type": "text"
+                        }
+                    ]
+                },
+                {
+                    "name": "features",
+                    "selector": "ul.product-features li",
+                    "type": "list",
+                    "fields": [
+                        {"name": "feature", "type": "text"} 
+                    ]
+                },
+                {
+                    "name": "reviews",
+                    "selector": "div.review",
+                    "type": "nested_list",
+                    "fields": [
+                        {
+                            "name": "reviewer", 
+                            "selector": "span.reviewer", 
+                            "type": "text"
+                        },
+                        {
+                            "name": "rating", 
+                            "selector": "span.rating", 
+                            "type": "text"
+                        },
+                        {
+                            "name": "comment", 
+                            "selector": "p.review-text", 
+                            "type": "text"
+                        }
+                    ]
+                },
+                {
+                    "name": "related_products",
+                    "selector": "ul.related-products li",
+                    "type": "list",
+                    "fields": [
+                        {
+                            "name": "name", 
+                            "selector": "span.related-name", 
+                            "type": "text"
+                        },
+                        {
+                            "name": "price", 
+                            "selector": "span.related-price", 
+                            "type": "text"
+                        }
+                    ]
+                }
+            ]
+        }
+    ]
+}
+Key Takeaways:
+
+Nested vs. List:
+type: "nested" means a single sub-object (like ). details
+type: "list" means multiple items that are simple dictionaries or single text fields.
+type: "nested_list" means repeated complex objects (like or ).productsreviews
+Base Fields: We can extract attributes from the container element via . For instance, might be . "baseFields""data_cat_id"data-cat-id="elect123"
+Transforms: We can also define a if we want to lower/upper case, strip whitespace, or even run a custom function.transform
+
+参考上述说明,帮我定义Schema 查找该html表格中每个字段的值,包含超链接。

+ 102 - 57
src/browser/crawl_asin.py

@@ -15,10 +15,15 @@ import httpx
 import ssl
 from sqlmodel import select, Session
 from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
+from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
+from crawl4ai.extraction_strategy import JsonCssExtractionStrategy,JsonXPathExtractionStrategy
+from crawl4ai.content_filter_strategy import BM25ContentFilter
+from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
+
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR
 from utils.drission_page import load_chrome_from_ini,ChromeOptions
-from utils.file import save_to_file,check_exists
+from utils.file import save_to_file,check_exists,s3,read_file
 from utils.config import CFG
 
 logger = get_logger('browser')
@@ -28,19 +33,31 @@ ASIN_HTML_DIR.mkdir(parents=True, exist_ok=True)
 class Crawler():
     def __init__(self, chrome_options:ChromeOptions, storage_config:dict=None):
         self.chrome_options = chrome_options
+        self.page = None
+        self.browser_config = {
+            "headless": self.chrome_options.headless,
+            "use_managed_browser": True,
+        }
+        # BrowserConfig(
+            # headless=chrome_options.headless,
+            # verbose=False,
+            # use_managed_browser=True,
+            # cdp_url='ws://127.0.0.1:9321/devtools/browser/dc75fc3b-352a-4d26-910b-adf5c245e0ce'
+        # )   
+    def get(self, url:str):
+        if not self.page:
+            self.page = load_chrome_from_ini(
+                self.chrome_options 
+            )
+        self.page.get(url)
     
     async def run(self, url:str):
         page = load_chrome_from_ini(
             self.chrome_options
         )
         craw_ai_browser_config = BrowserConfig(
-            headless=self.chrome_options.headless,
-            # verbose=False,
-            # extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
-            # debugging_port=int(port),
-            use_managed_browser=True,
+            **self.browser_config,
             cdp_url=page.browser._driver._websocket_url
-            # cdp_url='ws://127.0.0.1:9321/devtools/browser/dc75fc3b-352a-4d26-910b-adf5c245e0ce'
         )
         try:
             async with AsyncWebCrawler(config=craw_ai_browser_config) as crawler:
@@ -73,70 +90,98 @@ class Crawler():
         url = self.get_asin_url(asin, asin_area)
         page.get(url)
         if mthml_type:
-            return page.save(str(ASIN_HTML_DIR), name=f'{asin}')
+            return page.save()
         else:
             return page.html
     def get_asin_and_save_page(self, asin:str, asin_area:str='JP', mthml_type:bool=True, save_path:str=None, overwrite:bool=False):
         if not overwrite and check_exists(save_path):
             logger.info(f"{save_path} exists")
             return save_path
-        # data = self.get_asin_page_data(asin, asin_area, mthml_type)
-        data = self.get_asin_url(asin, asin_area)
+        data = self.get_asin_page_data(asin, asin_area, mthml_type)
         save_path = save_path or str(ASIN_HTML_DIR / f'{asin}.html')
         return save_to_file(data, save_path)
+    
+    async def cralw4ai_run(self, uri:str) -> CrawlResult:
+        browser_config = BrowserConfig(
+            **self.browser_config,
+            cdp_url = self.page.browser._driver._websocket_url
+        )
+        schema = {
+            "baseSelector": "table.table tbody tr",  # 每行数据对应一个tr
+            "fields": [
+                {
+                    "name": "traffic_keyword",
+                    "selector": "td:first-child a",  # 关键词文本
+                    "type": "text"
+                },
+                {
+                    "name": "keyword_link",
+                    "selector": "td:first-child a",  # 关键词超链接
+                    "type": "attribute",
+                    "attribute": "href"
+                },
+                {
+                    "name": "monthly_searches",
+                    "selector": "td:nth-child(2) span",  # 搜索量数值
+                    "type": "text",
+                    "transform": lambda x: x.replace(",", "") if x else None  # 移除逗号转数字
+                },
+                {
+                    "name": "search_trend_link",
+                    "selector": "td:nth-child(2) a",  # 搜索量趋势链接(带图表)
+                    "type": "attribute",
+                    "attribute": "href"
+                },
+                {
+                    "name": "weight",
+                    "selector": "td:nth-child(3) i.leaf",  # 统计叶子图标数量
+                    "type": "count"  # 通过计数获取权重值
+                },
+                {
+                    "name": "amazon_search_link",
+                    "selector": "td:last-child a",  # Amazon搜索链接
+                    "type": "attribute",
+                    "attribute": "href"
+                }
+            ]
+        }
         
+        dummy_html = self.page.html
+        raw_url = f"raw://{dummy_html}"
+        async with AsyncWebCrawler(config=browser_config) as crawler:
+            result:CrawlResult = await crawler.arun(
+                url=raw_url,
+                config=CrawlerRunConfig(
+                    cache_mode=CacheMode.BYPASS,
+                    extraction_strategy=JsonCssExtractionStrategy(schema,verbose=True)
+                )
+            )
+
+            if not result.success:
+                logger.error(f"Crawl failed: {result.error_message}")
+                return
+            data = json.loads(result.extracted_content)
+            logger.info(f"Extracted {len(data)} coin rows")
+            logger.debug(f"First item: {result.extracted_content}")
+            return data
 async def task():
     asin = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     c = Crawler(ChromeOptions())
+    file_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml'
+    # file_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html'
+    # tab.get(file_path)
+    c.get(r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml')
+    res = await c.cralw4ai_run(file_path)
+    # logger.info(f"{res.model_dump()}")
+    # logger.info(f"{json.loads(res.extracted_content)}")
+    # save_to_file(res.model_dump(), OUTPUT_DIR/'page\debug\B0CQ1SHD8V.json')
+    return
     page = c.run_browser()
-    logger.info(f"{str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html')}")
     tab = page.latest_tab
-    tab.get(r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml')
-    # c.get_asin_and_save_page(asin[0], 'JP', save_path=str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html'))
-    # logger.info(f"{CFG.s3_secret_key}")
-    c.get_asin_and_save_page(
-        asin[0], 
-        'JP',
-        save_path='s3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
-        overwrite=True
-    )
-    # page.save(str(ASIN_HTML_DIR), name=f'{asin[0]}.html')
-    # save_to_file(page.html, str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html'))
-    # await c.run('https://fr.florame.com/en/essential-oils')
-    return
-    port = page.browser._chromium_options._address.split(':')[-1]
-    logger.info(f"{page.browser._driver.get(f'http://{page.browser._chromium_options._address}/json').json()}")
-    logger.info(f"{page.browser._driver._websocket_url}")
-    item_id = 1
-    # url = 'https://greg.app/acalypha-marissima-overview/'
-    url = 'https://fr.florame.com/en/essential-oils'
-    # url = 'https://repository.arizona.edu/bitstream/10150/550946/1/dp_04_01-04.pdf'
-    # url = 'https://baidu.com'
-    browser_config = BrowserConfig(
-        headless=False,
-        # verbose=False,
-        # extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
-        # debugging_port=int(port),
-        use_managed_browser=True,
-        cdp_url=page.browser._driver._websocket_url
-        # cdp_url='ws://127.0.0.1:9321/devtools/browser/dc75fc3b-352a-4d26-910b-adf5c245e0ce'
-    )
-    # async with AsyncWebCrawler(config=browser_config) as crawler:
-    #     crawler_config = CrawlerRunConfig(
-    #         cache_mode=CacheMode.BYPASS                
-    #     )
-    #     result = await crawler.arun(url=url, config=crawler_config)
-    #     print(result.markdown)
-
-    crawler = AsyncWebCrawler(config=browser_config)
-    await crawler.start()
-    crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
-    result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
-    logger.info(f"{item_id} crawler.arun result.success: {result.model_dump_json(indent=2)} ")
-    print(result.markdown)
-    input('press enter to continue')
-    await crawler.close()
-    # page.quit()
+    data = tab.save()
+    logger.info(f"{type(data)} , {data[:50]}")
+    save_to_file(data, 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',Metadata={'mykey':'myvalue','mykey2':'myvalue2'})
+ 
 
 def main():
     asyncio.run(task())

+ 76 - 0
tests/mytest/t_crawler.py

@@ -0,0 +1,76 @@
+
+async def task():
+    asin = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
+    c = Crawler(ChromeOptions())
+    page = c.run_browser()
+    logger.info(f"{str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html')}")
+    tab = page.latest_tab
+    file_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml'
+    # file_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html'
+    # tab.get(file_path)
+    # page.get(r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml')
+    # save_to_file(tab.save(), str(ASIN_HTML_DIR / 'B0CQ1SHD8V.png'))
+    data = tab.save()
+    # logger.info(f"{type(data)} , {data[:50]}")
+    save_to_file(data, 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',Metadata={'mykey':'myvalue','mykey2':'myvalue2'})
+    return
+    # 附带源信息上传 https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html#the-extraargs-parameter
+    with open(file_path, 'rb') as data:
+        s3.upload_fileobj(
+            data, "public", 'amazone/copywriting_production/output/B0CQ1SHD8V.mhtml',
+            ExtraArgs={
+                'Metadata': {'mykey': 'myvalue'},
+                'ContentType': 'text/html'
+                })
+    # c.get_asin_and_save_page(asin[0], 'JP', save_path=str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html'))
+    # logger.info(f"{CFG.s3_secret_key}")
+    # c.get_asin_and_save_page(
+    #     asin[0], 
+    #     'JP',
+    #     save_path='s3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
+    #     overwrite=True
+    # )
+    # page.save(str(ASIN_HTML_DIR), name=f'{asin[0]}.html')
+    # save_to_file(page.html, str(ASIN_HTML_DIR / 'B0CQ1SHD8V.html'))
+    # await c.run('https://fr.florame.com/en/essential-oils')
+    return
+    port = page.browser._chromium_options._address.split(':')[-1]
+    logger.info(f"{page.browser._driver.get(f'http://{page.browser._chromium_options._address}/json').json()}")
+    logger.info(f"{page.browser._driver._websocket_url}")
+    item_id = 1
+    # url = 'https://greg.app/acalypha-marissima-overview/'
+    url = 'https://fr.florame.com/en/essential-oils'
+    # url = 'https://repository.arizona.edu/bitstream/10150/550946/1/dp_04_01-04.pdf'
+    # url = 'https://baidu.com'
+    browser_config = BrowserConfig(
+        headless=False,
+        # verbose=False,
+        # extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
+        # debugging_port=int(port),
+        use_managed_browser=True,
+        cdp_url=page.browser._driver._websocket_url
+        # cdp_url='ws://127.0.0.1:9321/devtools/browser/dc75fc3b-352a-4d26-910b-adf5c245e0ce'
+    )
+    # async with AsyncWebCrawler(config=browser_config) as crawler:
+    #     crawler_config = CrawlerRunConfig(
+    #         cache_mode=CacheMode.BYPASS                
+    #     )
+    #     result = await crawler.arun(url=url, config=crawler_config)
+    #     print(result.markdown)
+
+    crawler = AsyncWebCrawler(config=browser_config)
+    await crawler.start()
+    crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
+    result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
+    logger.info(f"{item_id} crawler.arun result.success: {result.model_dump_json(indent=2)} ")
+    print(result.markdown)
+    input('press enter to continue')
+    await crawler.close()
+    # page.quit()
+
+def main():
+    asyncio.run(task())
+    # test()
+
+if __name__ == "__main__":
+    main()

+ 53 - 20
utils/file.py

@@ -1,39 +1,60 @@
 import json
 from pathlib import Path
+import smart_open
 from smart_open import open
 from botocore.exceptions import NoCredentialsError
 import boto3
 from botocore.config import Config
 from utils.config import CFG
+import mimetypes
 
 s3 = boto3.client(
     's3',
     aws_access_key_id=CFG.s3_access_key,
     aws_secret_access_key=CFG.s3_secret_key,
     endpoint_url=CFG.s3_endpoint,
-    config=Config(signature_version='s3v4'),
-    # aws_account_id='ACCOUNT_ID'
+    config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
 )
-response = s3.list_buckets()
-
-# Output the bucket names
-print('Existing buckets:')
-for bucket in response['Buckets']:
-    print(f'  {bucket["Name"]}')
-def save_to_file(content, filename:Path):
-    if not isinstance(content, str):
-    # 如果可以用 json 格式化,则格式化
-        try:
-            content = json.dumps(content, indent=4, ensure_ascii=False)
-        except:
-            # 如果不是 str ,则格式化
-            if not isinstance(content, str):
-                content = str(content)
-    
-    with open(filename, "w", encoding="utf-8", transport_params={'client': s3}) as file:
+resource = boto3.resource('s3')
+
+def upload_to_s3(content, filename:str, **extra_args):
+    bucket_name = filename.split('/')[2]
+    object_name = '/'.join(filename.split('/')[3:])
+    content_type, _ = mimetypes.guess_type(object_name)
+    content_type = content_type or 'application/octet-stream'
+    upload_args = {
+        'ContentType': content_type,
+    }
+    upload_args.update(extra_args)
+    if isinstance(content, str):
+        content = content.encode('utf-8')
+    print(bucket_name, object_name)
+    s3.put_object(
+        Bucket=bucket_name,
+        Key=object_name,
+        Body=content,
+        **upload_args
+    )
+    return filename
+def save_to_file(content, filename:Path, **extra_args):
+    '''
+    save_to_file(
+        data, 
+        's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
+        Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
+        )
+    '''
+    if str(filename).startswith('s3://'):
+        return upload_to_s3(content, str(filename), **extra_args)
+    with open(filename, "w", encoding="utf-8") as file:
         file.write(content)
     return filename
 
+def read_file(file_uri:str):
+    with open(file_uri, 'r', transport_params={'client': s3}) as f:
+        # 文件存在,继续操作
+        return f.read()
+
 def check_exists(file_uri:str):
     try:
         with open(file_uri, 'r', transport_params={'client': s3}) as f:
@@ -41,4 +62,16 @@ def check_exists(file_uri:str):
             return file_uri
     except (FileNotFoundError,OSError):
         # 文件不存在,执行相应的操作
-        return False
+        return False
+
+def main():
+    response = s3.list_buckets()
+
+    # Output the bucket names
+    print('Existing buckets:')
+    for bucket in response['Buckets']:
+        print(f'  {bucket["Name"]}')
+
+
+if __name__ == "__main__":
+    main()