Parcourir la source

完善解析 worker

mrh il y a 8 mois
Parent
commit
e1ac77b527

+ 2 - 1
config/celery.py

@@ -1,7 +1,7 @@
 from celery import Celery
+from config.settings import CFG
 from config import celery_config
 from celery import signals
-from config.settings import CFG
 import os
 from utils.logu import logger
 
@@ -23,6 +23,7 @@ def on_worker_init(sender=None, **kwargs):
 
 app = Celery(
     'copywriting_production',
+    backend=CFG.redis_url,
     include=[
         'src.tasks.crawl_asin_save_task',
         'src.tasks.crawl_asin_exract_task',

+ 4 - 4
config/celery_config.py

@@ -1,12 +1,12 @@
-from config.settings import CFG,DB_URL
+from config.settings import CFG,DB_URL,OUTPUT_DIR
 
 # celery_result_backend = 'db+' + DB_URL
 # Flower持久化配置
-# FLOWER_PERSISTENT = True
-# FLOWER_DB = "flower_monitor.db"
+FLOWER_PERSISTENT = True
+FLOWER_DB = str(OUTPUT_DIR / "celery" / "flower_monitor.db")
 broker_url = CFG.redis_url
 
-# result_backend = 'db+' + DB_URL
+result_backend = CFG.redis_url
 task_serializer = 'json'
 result_serializer = 'json'
 accept_content = ['json']

+ 0 - 1
config/dp_conf/9321_worker.yaml

@@ -1,5 +1,4 @@
 chrome_config_ini: G:\code\amazone\copywriting_production\config\dp_conf\9321.ini
 s3_access_key: bh9LbfsPHRJgQ44wXIlv
 s3_endpoint: http://vs1.lan:9002
-s3_secret_key: 
 storage: s3

+ 3 - 1
config/settings.py

@@ -14,6 +14,8 @@ LOG_DIR = Path(__file__).resolve().parent.parent/'output/logs'
 
 CONFIG_DIR = Path(__file__).resolve().parent.parent/'config'
 BROWSER_CONFIG_DIR = CONFIG_DIR / 'dp_conf'
+TEMP_PAGE_DIR = OUTPUT_DIR / 'page' / 'temp'
+TEMP_PAGE_DIR.mkdir(parents=True, exist_ok=True)
 
 
 class Config(BaseModel):
@@ -21,7 +23,7 @@ class Config(BaseModel):
     s3_access_key: Optional[str] = os.environ.get("S3_ACCESS_KEY", 'bh9LbfsPHRJgQ44wXIlv')
     s3_secret_key: Optional[str] = os.environ.get("S3_SECRET_KEY", 'N744RZ60T1b4zlcWG2MROCzjEE2mPTdNQCc7Pk3M')
     s3_endpoint: Optional[str] = os.environ.get("S3_ENDPOINT", 'http://vs1.lan:9002')
-    s3_prefix: Optional[str] = 'amazone/copywriting_production'
+    s3_prefix: Optional[str] = 's3://public/amazone/copywriting_production'
     chrome_config_ini: Optional[str] = r'G:\code\amazone\copywriting_production\config\dp_conf\9321.ini'
     redis_url: Optional[str] = os.environ.get("REDIS_URL", 'redis://localhost:6379/0')
     version: Optional[str] = "0.0.1-alpha"

+ 23 - 12
src/browser/crawl_asin.py

@@ -21,7 +21,7 @@ from crawl4ai.content_filter_strategy import BM25ContentFilter
 from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
 import base64
 from utils.logu import get_logger
-from config.settings import OUTPUT_DIR
+from config.settings import OUTPUT_DIR,TEMP_PAGE_DIR
 from utils.drission_page import load_chrome_from_ini,ChromeOptions
 from utils.file import save_to_file,check_exists,s3,read_file
 from config.settings import CFG
@@ -29,8 +29,6 @@ from config.settings import CFG
 logger = get_logger('browser')
 ASIN_HTML_DIR = OUTPUT_DIR / 'page' / 'asin'
 ASIN_HTML_DIR.mkdir(parents=True, exist_ok=True)
-TEMP_PAGE_DIR = OUTPUT_DIR / 'page' / 'temp'
-TEMP_PAGE_DIR.mkdir(parents=True, exist_ok=True)
 
 class Crawler():
     def __init__(self, chrome_options:ChromeOptions):
@@ -49,7 +47,7 @@ class Crawler():
         self.browser_config.update({
             "cdp_url": self.page.browser._driver._websocket_url 
         })
-    
+        logger.info(f"get {url}, browser_config: {self.browser_config}")
     async def run(self, url:str):
         page = load_chrome_from_ini(
             self.chrome_options
@@ -94,16 +92,20 @@ class Crawler():
             return page.html
     def get_asin_and_save_page(self, asin:str, asin_area:str='JP', mthml_type:bool=True, save_path:str=None, overwrite:bool=False):
         if not overwrite and check_exists(save_path):
-            logger.info(f"{save_path} exists")
+            logger.info(f"exists {save_path} ")
             return save_path
         data = self.get_asin_page_data(asin, asin_area, mthml_type)
-        save_path = save_path or str(ASIN_HTML_DIR / f'{asin}.html')
+        save_path = save_path or str(ASIN_HTML_DIR / f'{asin}{".mhtml" if mthml_type else ".html"}')
         return save_to_file(data, save_path)
     
     async def excra_strategy_raw_html(self, raw_html:str, schema:dict, strategy:ExtractionStrategy=JsonXPathExtractionStrategy):
         browser_config = BrowserConfig(
-            **self.browser_config,
+            headless=self.chrome_options.headless,
+            use_managed_browser=True,
+            cdp_url=self.page.browser._driver._websocket_url
         )
+        logger.info(f"{self.browser_config}")
+        logger.info(f"len {len(raw_html)} {type(raw_html)} {raw_html[:150]}")
         async with AsyncWebCrawler(config=browser_config) as crawler:
             result:CrawlResult = await crawler.arun(
                 url=f"raw://{raw_html}",
@@ -162,6 +164,7 @@ class Crawler():
         data = json.loads(result.extracted_content)
         logger.info(f"Extracted {len(data)} coin rows")
         logger.debug(f"First item: {result.extracted_content}")
+        # [{"traffic_keyword":"","keyword_link":"..."}, {}]
         return data
     
     async def excra_product_info(self, html:str, input_schema:dict={}, strategy:ExtractionStrategy=JsonXPathExtractionStrategy) -> CrawlResult:
@@ -205,7 +208,15 @@ class Crawler():
                 }
             ]
         }
-        return await self.excra_strategy_raw_html(html, schema, JsonCssExtractionStrategy)
+        result:CrawlResult = await self.excra_strategy_raw_html(html, schema, JsonCssExtractionStrategy)
+        if not result.success:
+            logger.error(f"Crawl failed: {result.error_message}")
+            return
+        data = json.loads(result.extracted_content)
+        logger.info(f"Extracted {len(data)} coin rows")
+        logger.debug(f"result.extracted_content: {result.extracted_content}")
+        return data
+
 
     def download_img(self,url:str,save_dir:str=TEMP_PAGE_DIR, page:str=None,as_img_base64:bool=True, upload_s3_dir:str=''):
         # ('success', '{abs_current_path}\\notice.svg')
@@ -237,10 +248,10 @@ async def task():
     # file_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html'
     # tab.get(file_path)
     c.get(r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml')
-    res = c.download_img(
-        'https://www.asinseed.com/assets/svg/flat-icons/notice.svg?v=20181122',
-        upload_s3_dir='s3://public/amazone/copywriting_production/output/B0CQ1SHD8V/')
-    logger.info(f"{res}")
+    # res = c.download_img(
+    #     'https://www.asinseed.com/assets/svg/flat-icons/notice.svg?v=20181122',
+    #     upload_s3_dir='s3://public/amazone/copywriting_production/output/B0CQ1SHD8V/')
+    # logger.info(f"{res}")
     # logger.info(f"{res.extracted_content}")
     
     # res = await c.cralw4ai_run(file_path)

+ 16 - 1
src/cli/manage_tasks.py → src/manager/cli_tasks.py

@@ -32,8 +32,23 @@ def worker():
         '--loglevel=info',
         '--pool=solo'  # 适用于Windows环境的pool
     ]
-    # python -m src.cli.manage_tasks --help
     app.worker_main(argv)
 
+@cli.command()
+@click.option('--queue', default='default', help='要清空的队列名称(默认:celery)')
+@click.option('--force', is_flag=True, help='无需确认,直接清空队列')
+def purge_queue(queue, force):
+    """清空指定的Celery队列中的所有消息"""
+    if not force:
+        click.confirm(f'确定要清空队列 {queue} 中的所有消息吗?此操作不可恢复!', abort=True)
+    
+    with app.connection_for_write() as conn:
+        try:
+            # 获取指定队列的消息数量并清空
+            count = conn.default_channel.queue_purge(queue)
+            click.echo(f"成功清空队列 '{queue}',删除了 {count} 条待处理消息。")
+        except Exception as e:
+            click.echo(f"清空队列时发生错误:{str(e)}", err=True)
+
 if __name__ == '__main__':
     cli()

+ 64 - 0
src/manager/core/db.py

@@ -0,0 +1,64 @@
+from datetime import datetime
+from typing import Optional
+from sqlmodel import SQLModel, create_engine, Session, select, Field
+from config.settings import DB_URL
+from utils.sql_engine import create_db_and_tables,drop_table,engine
+
+class AsinSeed(SQLModel, table=True):
+    id: Optional[int] = Field(default=None, primary_key=True)
+    asin: str
+    asin_area: str = 'JP'
+    extra_result_path: Optional[str] = None
+    mhtml_path: Optional[str] = None
+    error: Optional[str] = None
+    created_at: Optional[datetime] = Field(default_factory=datetime.now)
+
+class DbManager:
+    def __init__(self, engine: str=None):
+        self.engine = engine or create_engine(DB_URL)
+        create_db_and_tables()
+
+    def save_asin_seed(self, asin_model: AsinSeed):
+        with Session(self.engine) as session:
+            session.add(asin_model)
+            session.commit()
+            session.refresh(asin_model)
+            return asin_model
+
+    def get_asin_seed(self, asin: str):
+        with Session(self.engine) as session:
+            statement = select(AsinSeed).where(AsinSeed.asin == asin)
+            results = session.exec(statement)
+            return results.first()
+    
+    def add_or_ignore_asin_seed(self, asin_model: AsinSeed):
+        exist = self.get_asin_seed(asin_model.asin)
+        if exist:
+            return exist
+        else:
+            return self.save_asin_seed(asin_model)
+    
+    def update_asin_seed(self, asin_model: AsinSeed):
+        with Session(self.engine) as session:
+            statement = select(AsinSeed).where(AsinSeed.asin == asin_model.asin)
+            results = session.exec(statement)
+            exist = results.first()
+            if exist:
+                for key, value in asin_model.model_dump().items():
+                    setattr(exist, key, value)
+                session.add(exist)
+                session.commit()
+                session.refresh(exist)
+                return exist
+            else:
+                return None
+            
+def main():
+    asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
+    db_manager = DbManager()
+    for asin in asinseed_list:
+        db_manager.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area='JP'))
+
+
+if __name__ == "__main__":
+    main()

+ 101 - 0
src/manager/manager_task.py

@@ -0,0 +1,101 @@
+from pathlib import Path
+from config.settings import CFG
+from src.manager.core.db import DbManager,AsinSeed
+from utils.file import save_to_file, read_file
+from src.tasks.crawl_asin_save_task import get_asin_and_save_page
+from src.tasks.crawl_asin_exract_task import extra_result
+from celery.result import AsyncResult
+
+class ManagerTask:
+    s3_prefix = CFG.s3_prefix + '/output/page'
+    def __init__(self):
+        self.db = DbManager()
+
+    def submit_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300):
+        """提交任务并等待完成,保存结果路径到数据库"""
+        # 提交celery任务
+        task = get_asin_and_save_page.delay(asin, asin_area, overwrite)
+        
+        # 等待任务完成
+        result = AsyncResult(task.id)
+        result.get(timeout=timeout)
+        
+        # 处理任务结果
+        if result.successful():
+            task_result = result.result
+            self.save_task_asin_crawl_result(asin, asin_area, task_result)
+        return None
+    
+    def submit_extract_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
+        """提交页面解析任务并等待完成,保存结果到数据库"""
+        # 从数据库获取mhtml路径
+        asin_seed = self.db.get_asin_seed(asin)
+        if not asin_seed or not asin_seed.mhtml_path:
+            print(f"未找到{asin}的mhtml路径")
+            return None
+        
+        # 提交celery任务
+        task = extra_result.delay(asin_seed.mhtml_path)
+        
+        # 等待任务完成
+        result = AsyncResult(task.id)
+        result.get(timeout=timeout)
+        
+        # 处理任务结果
+        if result.successful():
+            task_result = result.result
+            if task_result['status'] == 'success':
+                # 保存提取结果到文件并上传S3
+                filename = f"{asin}_extract.json"
+                save_path = self.upload_file(
+                    file_path=task_result['data'],
+                    filename=filename
+                )
+                # 保存数据库记录
+                self.save_task_asin_page_extract_result(asin, asin_area, {
+                    'status': 'success',
+                    'path': save_path
+                })
+        return task_result
+    
+    def save_task_asin_crawl_result(self, asin: str, asin_area:str=None, task_result: dict={}):
+        if task_result['status'] == 'success':
+            # 更新数据库记录
+            asin_seed = self.db.get_asin_seed(asin)
+            if asin_seed:
+                asin_seed.mhtml_path = task_result['path']
+                self.db.update_asin_seed(asin_seed)
+            else:
+                self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
+            return asin_seed
+    def save_task_asin_page_extract_result(self, asin: str, asin_area:str=None, task_result: dict={}):
+        if task_result.get('status') == 'success':
+            asin_seed = self.db.get_asin_seed(asin)
+            if asin_seed:
+                asin_seed.extract_path = task_result['path']
+                self.db.update_asin_seed(asin_seed)
+            else:
+                new_seed = AsinSeed(
+                    asin=asin,
+                    asin_area=asin_area,
+                    extract_path=task_result['path']
+                )
+                self.db.add_or_ignore_asin_seed(new_seed)
+            return asin_seed
+    def upload_file(self, file_path: str, filename: str):
+        res = save_to_file(Path(file_path).read_text(), self.s3_prefix + '/' + filename)
+        return res
+    def upload_mhtml(self, file_path: str, s3_filename: str=None):
+        if not s3_filename:
+            s3_filename = Path(file_path).stem + '.mhtml'
+        res = self.upload_file(file_path, s3_filename)
+
+def main():
+    asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
+    manager = ManagerTask()    
+    # manager.submit_task_and_wait('B0B658JC22', overwrite=False)
+    manager.submit_extract_task_and_wait('B0B658JC22')
+    # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'}
+    # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result)
+if __name__ == "__main__":
+    main()

+ 0 - 0
src/sql/create_user.sql → src/models/sql/create_user.sql


+ 0 - 0
src/sql/grant_permissions.sql → src/models/sql/grant_permissions.sql


+ 6 - 0
src/tasks/readme.md → src/readme.md

@@ -3,6 +3,12 @@
 ssh mrh@sv-v2
 cd ~/program/redis
 dc up -d
+$env:CONFIG_PATH="G:\code\amazone\copywriting_production\config\dp_conf\9321_worker.yaml";celery -A src.tasks.crawl_asin_save_task worker --loglevel=info --hostname=9321@%h
+
+celery -A config.celery flower 
+
+python -m src.manager.cli_tasks --help
+python -m src.manager.cli_tasks purge-queue --force
 ```
 
 # celery worker 每个任务的分离

+ 74 - 30
src/tasks/crawl_asin_exract_task.py

@@ -1,41 +1,85 @@
 # tasks/extract_tasks.py
 import asyncio
+import json
+import os
+from pathlib import Path
+import tempfile
 from config.celery import app
 from src.browser.crawl_asin import Crawler
-from config.settings import CFG
+from config.settings import CFG,TEMP_PAGE_DIR
 from utils.drission_page import ChromeOptions
-from utils.file import read_file
+from utils.file import read_file,save_to_file
+from utils.logu import get_logger
+logger = get_logger('browser')
 
-@app.task(bind=True, max_retries=3)
-def extra_result(self, mhtml_path: str):
-    """Celery task for data extraction"""
+async def async_run_extractions(crawler:Crawler, html_content):
+    """异步运行提取逻辑"""
+    product_info, result_table = await asyncio.gather(
+        crawler.excra_product_info(html_content),
+        crawler.extra_result_table(html_content)
+    )
+    res = {
+        "result_table": result_table,
+    }
+    if product_info:
+        res.update(product_info.pop())
+    return res
+
+async def async_process_mhtml(mhtml_path: str):
+    """异步处理MHTML文件"""
+    chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
+    crawler = Crawler(chrome_options=chrome_options)
+    
     try:
-        # 读取存储的页面内容
-        html_content = read_file(mhtml_path)
-        chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
-        crawler = Crawler(chrome_options=chrome_options)
-        
-        # 执行异步提取操作
-        async def _run_extractions():
-            return {
-                'product_info': await crawler.excra_product_info(html_content),
-                'result_table': await crawler.extra_result_table(html_content)
-            }
-        
-        results = asyncio.run(_run_extractions())
-        
-        # 执行图片下载(示例)
-        download_status,imgbase64 = crawler.download_img(
-            url=results['product_info']['image_url'],
-            as_img_base64=True
+        # 使用to_thread处理同步IO操作
+        data = await asyncio.to_thread(read_file, mhtml_path)
+        mhtml_path_name = Path(mhtml_path).name
+        temp_mhtml_path = await asyncio.to_thread(
+            save_to_file, data, str(TEMP_PAGE_DIR / mhtml_path_name)
         )
-        if download_status == 'success':
-            results['product_info']['imgbase64'] = imgbase64
         
-        return {
-            'status': 'success',
-            'data': results
-        }
+        # 异步加载页面
+        crawler.get(temp_mhtml_path)
+        html_content = crawler.page.html
+        
+        # 执行异步提取
+        results = await async_run_extractions(crawler, html_content)
+        logger.info(f"{json.dumps(results, indent=4)}")
+        # 处理图片下载
+        if results['product_info'].get('image_url'):
+            download_status, imgbase64 = await asyncio.to_thread(
+                crawler.download_img,
+                results['product_info']['image_url'],
+                True
+            )
+            if download_status == 'success':
+                results['product_info']['imgbase64'] = imgbase64
         
+        # 异步清理临时文件
+        await asyncio.to_thread(os.unlink, temp_mhtml_path)
+        # return json.dumps(results, indent=4,ensure_ascii=False)
+        return {'status': 'success', 'data': json.dumps(results, indent=4,ensure_ascii=False)}
+    
     except Exception as e:
-        self.retry(exc=e, countdown=60)
+        logger.exception(f"异步任务失败: {str(e)}")
+        raise
+
+@app.task(bind=True, max_retries=0)
+def extra_result(self, mhtml_path: str):
+    """异步Celery任务"""
+    try:
+        logger.info(f"异步任务启动: {mhtml_path}")
+        res = asyncio.run(async_process_mhtml(mhtml_path))
+        logger.info(f"异步任务完成: {res}")
+        return res
+    except Exception as e:
+        logger.error(f"任务重试中: {str(e)}")
+        self.retry(exc=e, countdown=60)
+
+def main():
+    mhtml_path = Path(r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.mhtml')
+    res = extra_result(mhtml_path)
+    save_to_file(res, mhtml_path.with_suffix('.json'))
+
+if __name__ == "__main__":
+    main()

+ 12 - 9
src/tasks/crawl_asin_save_task.py

@@ -3,31 +3,34 @@ from config.celery import app
 from src.browser.crawl_asin import Crawler
 from config.settings import CFG
 from utils.drission_page import ChromeOptions
-from utils.file import check_exists, save_to_file
+from utils.file import check_exists, save_to_file,s3_uri_to_http_url
+from utils.logu import get_logger
+
+logger = get_logger('worker')
 import asyncio
 
-@app.task(bind=True, max_retries=3)
+
+@app.task(bind=True, max_retries=3, rate_limit='10/m')
 def get_asin_and_save_page(self, asin: str, asin_area: str = 'JP', 
                           overwrite: bool = False):
     """Celery task for saving ASIN page"""
     try:
+        logger.info(f"任务开始: asin {asin} , asin_area {asin_area} overwrite {overwrite}  ")
         # 初始化浏览器配置
         chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
         crawler = Crawler(chrome_options=chrome_options)
-        save_path = f"{CFG.s3_prefix}/output/{asin}/{asin}.mhtml"
-        # 检查文件是否已存在
-        if not overwrite and check_exists(save_path):
-            return {'status': 'exists', 'path': save_path}
-            
+        save_path = f"{CFG.s3_prefix}/output/{asin}/{asin}.mhtml"  # 保持.mhtml后缀与内容类型一致            
         # 执行保存操作
         final_path = crawler.get_asin_and_save_page(
             asin=asin,
             asin_area=asin_area,
             save_path=save_path,
-            mthml_type=True
+            mthml_type=True,
+            overwrite=overwrite,
         )
-        
+        logger.info(f"任务成功: {final_path}")
         return {'status': 'success', 'path': final_path}
         
     except Exception as e:
+        logger.exception(f"任务失败:{e}")
         self.retry(exc=e, countdown=60)

+ 8 - 0
tests/mytest/t_boto3.py

@@ -0,0 +1,8 @@
+from utils.file import s3_uri_to_http_url
+
+def main():
+    s3_uri = 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'
+    print(s3_uri_to_http_url(s3_uri))
+
+if __name__ == "__main__":
+    main()

+ 58 - 2
utils/file.py

@@ -1,12 +1,16 @@
 import json
 from pathlib import Path
+import mimetypes
+import urllib.parse
 import smart_open
 from smart_open import open
 from botocore.exceptions import NoCredentialsError
 import boto3
+import logging
+from botocore.exceptions import ClientError
+
 from botocore.config import Config
 from config.settings import CFG
-import mimetypes
 
 s3 = boto3.client(
     's3',
@@ -17,10 +21,62 @@ s3 = boto3.client(
 )
 resource = boto3.resource('s3')
 
+def s3_uri_to_http_url(s3_uri):
+    """
+    将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。
+    适用于公共可读的存储桶。
+    """
+    if not s3_uri.startswith('s3://'):
+        raise ValueError("Invalid S3 URI. Must start with 's3://'")
+    
+    # 提取 bucket 和 key
+    path = s3_uri[5:]  # 去除 's3://'
+    parts = path.split('/', 1)
+    bucket = parts[0]
+    key = parts[1] if len(parts) > 1 else ''
+    
+    # 对 key 进行 URL 编码(保留路径斜杠)
+    encoded_key = urllib.parse.quote(key, safe='/')
+    
+    # 获取并清理 endpoint(确保无末尾斜杠)
+    endpoint = CFG.s3_endpoint.rstrip('/')
+    
+    # 拼接完整 URL
+    return f"{endpoint}/{bucket}/{encoded_key}"
+
+def create_presigned_url_expanded(client_method_name, method_parameters=None,
+                                  expiration=3600, http_method=None):
+    """Generate a presigned URL to invoke an S3.Client method
+
+    Not all the client methods provided in the AWS Python SDK are supported.
+
+    :param client_method_name: Name of the S3.Client method, e.g., 'list_buckets'
+    :param method_parameters: Dictionary of parameters to send to the method
+    :param expiration: Time in seconds for the presigned URL to remain valid
+    :param http_method: HTTP method to use (GET, etc.)
+    :return: Presigned URL as string. If error, returns None.
+    """
+
+    # Generate a presigned URL for the S3 client method
+    s3_client = boto3.client('s3')
+    try:
+        response = s3_client.generate_presigned_url(ClientMethod=client_method_name,
+                                                    Params=method_parameters,
+                                                    ExpiresIn=expiration,
+                                                    HttpMethod=http_method)
+    except ClientError as e:
+        logging.error(e)
+        return None
+
+    # The response contains the presigned URL
+    return response
 def upload_to_s3(content, filename:str, **extra_args):
     bucket_name = filename.split('/')[2]
     object_name = '/'.join(filename.split('/')[3:])
-    content_type, _ = mimetypes.guess_type(object_name)
+    if object_name.endswith('.mhtml'):
+        content_type = 'multipart/related'
+    else:
+        content_type, _ = mimetypes.guess_type(object_name)
     content_type = content_type or 'application/octet-stream'
     upload_args = {
         'ContentType': content_type,