mrh 2 mesi fa
parent
commit
977a4d43de

+ 1 - 0
pyproject.toml

@@ -27,6 +27,7 @@ dependencies = [
     "pandas>=2.2.3",
     "prefect>=3.4.11",
     "psycopg2>=2.9.10",
+    "pymysql>=1.1.2",
     "python-dotenv>=1.1.0",
     "pyyaml>=6.0.2",
     "redis>=5.2.1",

+ 4 - 1
src/flow_task/crawl_asin.py

@@ -115,7 +115,10 @@ class CrawlAsinFlow(BaseCrawlFlow):
         local_dir.mkdir(parents=True, exist_ok=True)
         extension = ".mhtml" if mthml_type else ".html"
         local_path = local_dir / f"{asin}{extension}"
-        self.crawler.save_mhtml(local_path)
+        if asin in self.crawler.page.html:
+            self.crawler.save_mhtml(local_path)
+        else:
+            raise Exception(f"ASIN {asin} 不在页面中")
         self.run_log.info(f"成功保存到本地temp目录: {local_path}")
         return str(local_path)
 

+ 60 - 65
src/flow_task/extra_excel_product_flow.py

@@ -9,8 +9,9 @@ from prefect.states import Failed, Running, Completed
 from prefect.cache_policies import INPUTS
 from prefect.futures import wait
 from utils.logu import get_logger
-from utils.file import extract_excel_text_from_url
+from utils.file import extract_excel_text_from_url, extract_excel_text_from_file
 from utils.url_utils import extract_urls_from_text, extract_filename_from_url
+from config.settings import OPENAI_API_KEY, OPENAI_API_BASE
 from llama_index.llms.litellm import LiteLLM
 from llama_index.core.program import LLMTextCompletionProgram
 from llama_index.core.output_parsers import PydanticOutputParser
@@ -89,49 +90,49 @@ def parse_url_to_markdown_task(url: str):
     """
     logger.info(f"开始解析URL表格文件: {url}")
     
-    try:
-        # 检查文件类型,如果是Excel文件则使用pandas方法
-        if url.lower().endswith(('.xlsx', '.xls')):
-            logger.info(f"检测到Excel文件,使用pandas方法读取: {url}")
-            
-            # 使用pandas方法读取Excel文件
-            all_cells_text_dict = extract_excel_text_from_url(url)
-            
-            if not all_cells_text_dict:
-                logger.warning(f"Excel文件读取失败或为空: {url}")
-                return ""
-            
-            # 将Excel内容转换为Markdown格式
-            markdown_content = ""
-            for sheet_name, sheet_content in all_cells_text_dict.items():
-                markdown_content += f"## 工作表: {sheet_name}\n\n"
-                markdown_content += "```\n"
-                markdown_content += sheet_content
-                markdown_content += "\n```\n\n"
-            
-            logger.info(f"成功解析Excel文件,共读取 {len(all_cells_text_dict)} 个工作表: {url}")
-            return markdown_content
+    # 检查文件类型,如果是Excel文件则使用pandas方法
+    if url.lower().endswith(('.xlsx', '.xls')):
+        logger.info(f"检测到Excel文件,使用pandas方法读取: {url}")
         
+        # 判断是本地文件还是HTTP URL
+        if url.startswith(('http://', 'https://')):
+            # HTTP URL,使用extract_excel_text_from_url函数
+            all_cells_text_dict = extract_excel_text_from_url(url)
         else:
-            # 非Excel文件使用原来的markitdown方法
-            logger.info(f"检测到非Excel文件,使用markitdown方法读取: {url}")
-            
-            # 创建MarkItDown实例
-            md = MarkItDown(enable_plugins=False)
-            
-            # 转换文档
-            result = md.convert(url)
-            
-            # 获取Markdown格式内容
-            markdown_content = result.text_content
-            
-            logger.info(f"成功解析URL表格文件: {url}")
-            return markdown_content
+            # 本地文件路径,使用extract_excel_text_from_file函数
+            all_cells_text_dict = extract_excel_text_from_file(url)
         
-    except Exception as e:
-        logger.error(f"解析URL表格文件时发生错误: {e}")
-        raise Exception(f"解析URL表格文件失败: {e}")
-
+        if not all_cells_text_dict:
+            logger.warning(f"Excel文件读取失败或为空: {url}")
+            return ""
+        
+        # 将Excel内容转换为Markdown格式
+        markdown_content = ""
+        for sheet_name, sheet_content in all_cells_text_dict.items():
+            markdown_content += f"## 工作表: {sheet_name}\n\n"
+            markdown_content += "```\n"
+            markdown_content += sheet_content
+            markdown_content += "\n```\n\n"
+        
+        logger.info(f"成功解析Excel文件,共读取 {len(all_cells_text_dict)} 个工作表: {url}")
+        return markdown_content
+    
+    else:
+        # 非Excel文件使用原来的markitdown方法
+        logger.info(f"检测到非Excel文件,使用markitdown方法读取: {url}")
+        
+        # 创建MarkItDown实例
+        md = MarkItDown(enable_plugins=False)
+        
+        # 转换文档
+        result = md.convert(url)
+        
+        # 获取Markdown格式内容
+        markdown_content = result.text_content
+        
+        logger.info(f"成功解析URL表格文件: {url}")
+        return markdown_content
+    
 
 @task(name="Excel处理",
     persist_result=True,
@@ -161,30 +162,24 @@ def get_or_create_product_import_by_url(file_url: str):
     
     logger.info(f"数据库中不存在文件 {file_name} 的记录,开始解析Excel并保存到数据库")
     
-    try:
-        # 解析Excel文件为Markdown格式
-        markdown_content = parse_url_to_markdown_task(file_url)
-        
-        if not markdown_content:
-            logger.warning(f"Excel文件解析失败或为空: {file_url}")
-            raise Exception(f"Excel文件解析失败或为空: {file_url}")
-        
-        # 使用LLM从Markdown内容中提取产品信息
-        product_import = extract_product_from_text(
-            text=markdown_content,
-            uri=file_url,
-            filename=file_name
-        )
-        
-        # 保存到数据库
-        saved_record = product_import_manager.save_product_import(product_import)
-        
-        logger.info(f"成功解析Excel并保存到数据库: {file_name}")
-        return saved_record
-        
-    except Exception as e:
-        logger.error(f"处理文件 {file_name} 时发生错误: {e}")
-        raise Exception(f"处理文件失败: {e}")
+    markdown_content = parse_url_to_markdown_task(file_url)
+    
+    if not markdown_content:
+        logger.warning(f"Excel文件解析失败或为空: {file_url}")
+        raise Exception(f"Excel文件解析失败或为空: {file_url}")
+    
+    # 使用LLM从Markdown内容中提取产品信息
+    product_import = extract_product_from_text(
+        text=markdown_content,
+        uri=file_url,
+        filename=file_name
+    )
+    
+    # 保存到数据库
+    saved_record = product_import_manager.save_product_import(product_import)
+    
+    logger.info(f"成功解析Excel并保存到数据库: {file_name}")
+    return saved_record
 
 
 class ProductImportInput(BaseModel):

+ 19 - 0
src/flow_task/readme.md

@@ -0,0 +1,19 @@
+```powershell
+prefect server start --host pc.lan 
+& g:/code/amazone/copywriting_production/.venv/Scripts/python.exe g:/code/amazone/copywriting_production/src/flow_task/depoly.py
+
+# 解析Excel表格
+& g:/code/amazone/copywriting_production/.venv/Scripts/python.exe g:/code/amazone/copywriting_production/tests/flow_run/t_flow_run_extra_product.py 
+
+# 爬取数据库 asin ,自动登录还没解决
+& g:/code/amazone/copywriting_production/.venv/Scripts/python.exe g:/code/amazone/copywriting_production/tests/flow_run/t_flow_run_crawl_asin.py 
+
+# 解析asin mhtml 文件
+& g:/code/amazone/copywriting_production/.venv/Scripts/python.exe g:/code/amazone/copywriting_production/tests/flow_run/t_flow_run_asin_mhtml_parser.py
+
+# 生成excel
+& g:/code/amazone/copywriting_production/.venv/Scripts/python.exe g:/code/amazone/copywriting_production/tests/flow_run/t_flow_run_excel_generator.py
+
+# 从表格生成 markdown
+& g:/code/amazone/copywriting_production/.venv/Scripts/python.exe g:/code/amazone/copywriting_production/tests/mytest/t_pandas_excel_reader.py
+```

+ 1 - 1
tests/flow_run/t_flow_run_crawl_asin.py

@@ -135,8 +135,8 @@ def t_crawl_multiple_competitors():
             print("本月没有找到任何产品数据")
             return
         
-        print(f"本月共找到 {len(monthly_products)} 个产品数据")
         pprint(monthly_products)
+        print(f"本月共找到 {len(monthly_products)} 个产品数据")
         y = input("是否继续?")
         print(f"input: {y}")
         if y != 'y':

+ 3 - 6
tests/flow_run/t_flow_run_extra_product.py

@@ -12,12 +12,9 @@ logger = get_logger('flow_run_test')
 
 # 测试URL列表
 test_urls = [
-    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/1P镊子压刀.xlsx",
-    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/3P一体不锈钢迷你园艺铲.xlsx",
-    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/磁吸固定夹.xlsx",
-    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/锯齿固定夹.xlsx",
-    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/魔术贴金属扣.xlsx",
-    "http://s3.vs1.lan/public/amazone/copywriting_production/product/202508/黑白轧带.xlsx"
+r"G:\xwechat_files\wxid_1fmirgx3vudo21_b3e8\msg\file\2025-10\手指粉扑.xlsx",
+r"G:\xwechat_files\wxid_1fmirgx3vudo21_b3e8\msg\file\2025-10\压缩毛巾.xlsx",
+r"G:\xwechat_files\wxid_1fmirgx3vudo21_b3e8\msg\file\2025-10\折叠剪刀橙色.xlsx"
 ]
 
 

+ 9 - 0
tests/mytest/t_browser.py

@@ -0,0 +1,9 @@
+from utils.drission_page import create_browser
+
+def test_create_browser():
+    """
+    测试函数:创建浏览器实例
+    """
+    browser = create_browser()
+
+test_create_browser()

+ 67 - 2
tests/mytest/t_pandas_excel_reader.py

@@ -1,5 +1,5 @@
 import sys
-from utils.file import extract_excel_text_from_url, read_excel_from_url, get_all_cells_text
+from utils.file import extract_excel_text_from_url, read_excel_from_url, get_all_cells_text, read_excel, extract_excel_text_from_file
 
 def test_pandas_excel_reader():
     """
@@ -46,5 +46,70 @@ def test_pandas_excel_reader():
     else:
         print("Excel 文件读取失败")
 
+def test_local_excel_files():
+    """
+    测试函数:读取本地 Excel 文件并提取文本
+    """
+    # 指定的Excel文件列表
+    excel_files = [
+r"G:\code\amazone\copywriting_production\output\generated_excels\extra-data-手指粉扑.xlsx",
+r"G:\code\amazone\copywriting_production\output\generated_excels\extra-data-压缩毛巾.xlsx", 
+r"G:\code\amazone\copywriting_production\output\generated_excels\extra-data-便携折叠剪刀.xlsx"    
+]
+    
+    for file_path in excel_files:
+        print(f"\n=== 正在读取文件: {file_path} ===")
+        
+        # 检查文件是否存在
+        import os
+        if not os.path.exists(file_path):
+            print(f"文件不存在: {file_path}")
+            continue
+        
+        # 方法1:使用组合函数直接提取文本
+        print("\n--- 使用 extract_excel_text_from_file 函数 ---")
+        all_cells_text_dict = extract_excel_text_from_file(file_path)
+        
+        if all_cells_text_dict:
+            # 将所有工作表内容合并为一个markdown文件
+            combined_markdown = ""
+            
+            # 打印所有工作表的内容
+            for sheet_name, sheet_content in all_cells_text_dict.items():
+                print(f"\n--- 工作表: {sheet_name} ---")
+                print(f"内容预览 (前200字符): {sheet_content[:200]}...")
+                
+                # 将Excel内容转换为Markdown格式,与extra_excel_product_flow.py保持一致
+                combined_markdown += f"## 工作表: {sheet_name}\n\n```\n{sheet_content}\n```\n\n"
+            
+            # 将合并的markdown内容保存到与原始文件同路径的markdown文件
+            file_dir = os.path.dirname(file_path)
+            file_name = os.path.basename(file_path).replace('.xlsx', '.md')
+            output_file = os.path.join(file_dir, file_name)
+            
+            with open(output_file, 'w', encoding='utf-8') as f:
+                f.write(combined_markdown)
+            print(f"\n已保存到: {output_file}")
+            print(f"共读取了 {len(all_cells_text_dict)} 个工作表")
+        else:
+            print("Excel 文件读取失败")
+        
+        # 方法2:分别调用两个函数
+        print("\n--- 分别调用 read_excel 和 get_all_cells_text ---")
+        excel_data = read_excel(file_path)
+        
+        if excel_data:
+            print("Excel 文件读取成功!")
+            
+            # 提取所有单元格内容(包括空值)
+            all_cells_text = get_all_cells_text(excel_data)
+            print(f"提取到 {len(all_cells_text)} 个工作表的文本内容")
+            
+        else:
+            print("Excel 文件读取失败")
+        
+        print("=" * 80)
+
 if __name__ == "__main__":
-    test_pandas_excel_reader()
+    # test_pandas_excel_reader()
+    test_local_excel_files()

+ 56 - 102
utils/drission_page.py

@@ -1,119 +1,73 @@
 import os
+import random
 import time
 from typing import Optional
 from DrissionPage import Chromium, ChromiumOptions, ChromiumPage
 from pathlib import Path
-from config.settings import OUTPUT_DIR, WORK_DIR, BROWSER_CONFIG_DIR
-from utils.logu import logger
-from pydantic import BaseModel
+from .logu import logger
+from DrissionPage._elements.chromium_element import ChromiumElement
+def create_browser(address='127.0.0.1:16800', user_data_dir='', browser_path=''):
 
-BROWSER_PATH=r"C:\Program Files\Google\Chrome\Application\chrome.exe"
-
-def genarate_chrome_ini(address="localhost:9321"):
-    port = address.split(':')[1]
-    chrome_options = ChromiumOptions().set_browser_path(BROWSER_PATH)
+    chrome_options = ChromiumOptions(read_file=False)
+    # 务必不能小于10000,否则可能由于环境问题导致错误
     chrome_options.set_address(address)
-    chrome_options.set_user_data_path(str(OUTPUT_DIR / f'user_data_dir_{port}'))
-    # chrome_options.no_imgs(True).mute(True)
-    # chrome_options.incognito(True)
-    path = chrome_options.save(BROWSER_CONFIG_DIR / f'{port}.ini')
-    return path
-
-class ChromeOptions(BaseModel):
-    ini_path: Optional[str] = BROWSER_CONFIG_DIR / '9321.ini'
-    browser_path: Optional[str] = None
-    user_data_dir: Optional[str] = None
-    address: Optional[str] = None
-    headless: Optional[bool] = False
-    proxy: Optional[str] = None
-    no_imgs: Optional[bool] = False
-    auto_port: Optional[bool] = False
-    save: Optional[bool] = False
-
-def load_chrome_from_ini(options:ChromeOptions):
-    chrome_options = ChromiumOptions(ini_path=options.ini_path)
-    if options.browser_path:
-        chrome_options.set_browser_path(options.browser_path)
-    if options.proxy:
-        chrome_options.set_proxy(options.proxy)
-    if options.user_data_dir:
-        chrome_options.set_user_data_path(options.user_data_dir)
-    # 如果存在代理环境变量
-    elif 'HTTP_PROXY' in os.environ:
-        chrome_options.set_proxy(os.environ['HTTP_PROXY'])
-    if options.auto_port:
-        chrome_options.auto_port(options.auto_port)
-    if options.no_imgs:
-        chrome_options.no_imgs(options.no_imgs)
-    if options.address:
-        chrome_options.headless(options.headless)
-    if options.address:
-        chrome_options.set_address(options.address)
-    if options.save:
-        chrome_options.save(options.ini_path)
-    logger.info(f"proxy {options.proxy}")
-    page = ChromiumPage(chrome_options)
-    return page
-
-def fake_ua():
-
-    # 创建一个 UserAgent 对象
-    ua = UserAgent()
+    if user_data_dir:
+        chrome_options.set_user_data_path(user_data_dir)
+    if browser_path:
+        chrome_options.set_browser_path(browser_path)
+    driver = ChromiumPage(addr_or_opts=chrome_options)
+    return driver
 
-    # 生成支持的浏览器的 User-Agent 字符串
-    chrome_ua = ua.chrome  # Chrome 浏览器
-    firefox_ua = ua.firefox  # Firefox 浏览器
-    safari_ua = ua.safari  # Safari 浏览器
-    edge_ua = ua.edge  # Chromium Edge 浏览器
 
-    # 打印生成的 User-Agent 字符串
-    print("Chrome User-Agent:", chrome_ua)
-    print("Firefox User-Agent:", firefox_ua)
-    print("Safari User-Agent:", safari_ua)
-    print("Edge User-Agent:", edge_ua)
-    return chrome_ua
 
-def load_random_ua_chrome(headless=False):
-    chrome_options = ChromiumOptions()
-    chrome_options.auto_port(True)
-    chrome_options.no_imgs(False)
-    chrome_options.set_user_agent(fake_ua())
-    chrome_options.arguments.append("--lang=en")
-    chrome_options.headless(headless)
-    page = ChromiumPage(chrome_options)
-    # page.set.auto_handle_alert(True)
-    return page
+def click_random_pos(ele:ChromiumElement, delay_random=(0,5), safe_zone=0.2, wait_timeout=35):
 
-def test_random_ua_chrome():
-    page = load_random_ua_chrome()
-    tab = page.latest_tab
-    keyword = "Acalypha rivularis essential oil"
-    url = f"https://www.google.com/search?q={keyword}"
-    # url = f"https://www.google.com/"
-    # url = "https://bot.sannysoft.com/"
-    tab.get(url)
-    print(tab.url)
-    if page.browser._chromium_options.is_headless:
-        tab.get_screenshot('./1.png')
-    # page.quit()
+    """在元素中心区域随机点击
+    
+    Args:
+        ele: 要点击的元素
+        delay_random: 点击前的随机延迟时间范围(秒)
+        safe_zone: 安全区域比例,0.2表示在中心80%区域内随机点击
+        wait_timeout: 等待元素出现并具有可点击矩形区域的超时时间(秒)
+    """
+    # 等待元素出现并具有可点击的矩形区域
+    logger.debug(f"等待元素出现并具有可点击矩形区域,超时时间:{wait_timeout}秒")
+    ele.wait.has_rect(timeout=wait_timeout)
+    
+    # 获取元素大小和位置信息
+    width, height = ele.rect.size
+    logger.debug(f"元素大小:{width}x{height}")
+    center_x, center_y = width/2, height/2
+    
+    max_offset_x = width * safe_zone
+    max_offset_y = height * safe_zone
+    
+    # 在中心点附近生成随机偏移量
+    offset_x = center_x + random.uniform(-max_offset_x, max_offset_x)
+    offset_y = center_y + random.uniform(-max_offset_y, max_offset_y)
+    logger.debug(f"中心点:{center_x},{center_y}")
+    logger.debug(f"偏移量:{offset_x},{offset_y}")
+    # 执行带偏移量的点击
+    time.sleep(random.uniform(*delay_random))
+    ele.click.at(offset_x, offset_y)
 
-def test_normal_chrome():
-    # genarate_chrome_ini()
-    page = load_chrome_from_ini(proxy='http://localhost:1881')
-    tab = page.latest_tab
-    keyword = "Acalypha rivularis essential oil"
-    url = f"https://www.google.com/search?q={keyword}"
-    url = "https://bot.sannysoft.com/"
-    # recaptcha 验证码检测
-    # url = "https://patrickhlauke.github.io/recaptcha/"
-    tab.get(url)
-    tab.scroll.to_bottom()
-    # tab.get_screenshot('./1.png')
-    # page.quit()
+def find_and_click_random(driver:ChromiumElement|ChromiumPage, locator, *args, **kwargs):
+    """查找元素并随机点击
+    
+    Args:
+        driver: 浏览器驱动或元素
+        locator: 元素定位器
+        *args: 传递给 ele() 方法的位置参数
+        **kwargs: 传递给 click_random_pos() 方法的关键字参数
+    """
+    ele = driver.ele(locator, *args)
+    return click_random_pos(ele, **kwargs)
 
 def main():
-    test_random_ua_chrome()
-    # test_normal_chrome()
+    page = create_browser()
+    page._driver._websocket_url
+    page.get("chrome://version")
+    print(page._driver._websocket_url)
     
 if __name__ == "__main__":
     main()

+ 40 - 0
utils/file.py

@@ -373,6 +373,46 @@ def get_all_cells_text(excel_data):
     
     return result
 
+def read_excel(file_path):
+    """
+    使用 pandas 从本地文件路径读取 Excel 文件
+    
+    Args:
+        file_path (str): Excel 文件的本地路径
+    
+    Returns:
+        dict: 包含所有工作表数据的字典
+    """
+    try:
+        # 使用 pandas 读取 Excel 文件
+        # 读取所有工作表,设置header=None以保留第一行作为数据而不是标题
+        excel_data = pd.read_excel(file_path, sheet_name=None, header=None)
+        
+        return excel_data
+        
+    except Exception as e:
+        print(f"读取 Excel 文件时发生错误: {e}")
+        return None
+
+def extract_excel_text_from_file(file_path):
+    """
+    从本地文件路径读取 Excel 文件并提取所有单元格的文本内容
+    
+    Args:
+        file_path (str): Excel 文件的本地路径
+    
+    Returns:
+        dict: 按字典顺序排列的所有工作表的文本内容字典
+    """
+    # 读取 Excel 文件
+    excel_data = read_excel(file_path)
+    
+    if excel_data:
+        # 提取所有单元格内容(包括空值)
+        return get_all_cells_text(excel_data)
+    else:
+        return {}
+
 def extract_excel_text_from_url(url):
     """
     从 URL 读取 Excel 文件并提取所有单元格的文本内容

+ 11 - 0
uv.lock

@@ -514,6 +514,7 @@ dependencies = [
     { name = "pandas" },
     { name = "prefect" },
     { name = "psycopg2" },
+    { name = "pymysql" },
     { name = "python-dotenv" },
     { name = "pyyaml" },
     { name = "redis" },
@@ -541,6 +542,7 @@ requires-dist = [
     { name = "pandas", specifier = ">=2.2.3" },
     { name = "prefect", specifier = ">=3.4.11" },
     { name = "psycopg2", specifier = ">=2.9.10" },
+    { name = "pymysql", specifier = ">=1.1.2" },
     { name = "python-dotenv", specifier = ">=1.1.0" },
     { name = "pyyaml", specifier = ">=6.0.2" },
     { name = "redis", specifier = ">=5.2.1" },
@@ -2904,6 +2906,15 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/b9/26/a5ef980305f5be4edd1c2523ae3127ad0e490b60585714c56428b8a24395/pymongo-4.13.1-cp313-cp313t-win_amd64.whl", hash = "sha256:6492565cd7bb10cb6104401af446926141249095953b57c108c4bdcf3452fa3d", size = 1010935, upload-time = "2025-06-11T19:24:04.677Z" },
 ]
 
+[[package]]
+name = "pymysql"
+version = "1.1.2"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/f5/ae/1fe3fcd9f959efa0ebe200b8de88b5a5ce3e767e38c7ac32fb179f16a388/pymysql-1.1.2.tar.gz", hash = "sha256:4961d3e165614ae65014e361811a724e2044ad3ea3739de9903ae7c21f539f03", size = 48258, upload-time = "2025-08-24T12:55:55.146Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/7c/4c/ad33b92b9864cbde84f259d5df035a6447f91891f5be77788e2a3892bce3/pymysql-1.1.2-py3-none-any.whl", hash = "sha256:e6b1d89711dd51f8f74b1631fe08f039e7d76cf67a42a323d3178f0f25762ed9", size = 45300, upload-time = "2025-08-24T12:55:53.394Z" },
+]
+
 [[package]]
 name = "pyopenssl"
 version = "25.1.0"