file.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. import json
  2. from pathlib import Path
  3. import mimetypes
  4. import urllib.parse
  5. import smart_open
  6. from smart_open import open
  7. from botocore.exceptions import NoCredentialsError,ClientError
  8. import boto3
  9. import logging
  10. from typing import List, Dict, Any, Optional
  11. import pandas as pd
  12. import requests
  13. import io
  14. from botocore.config import Config
  15. from config.settings import CFG
  16. s3 = boto3.client(
  17. 's3',
  18. aws_access_key_id=CFG.s3_access_key,
  19. aws_secret_access_key=CFG.s3_secret_key,
  20. endpoint_url=CFG.s3_endpoint,
  21. config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
  22. )
  23. # resource = boto3.resource('s3')
  24. class S3Client:
  25. """S3客户端类,封装常用的S3操作"""
  26. def __init__(self, client=None):
  27. """初始化S3客户端"""
  28. self.client = client or s3
  29. self.logger = logging.getLogger(__name__)
  30. def list_s3_files(self, s3_path: str) -> Dict[str, Any]:
  31. """列出S3指定路径下的所有文件"""
  32. self.logger.info(f"开始列出S3路径下的文件: {s3_path}")
  33. try:
  34. # 解析S3路径
  35. if s3_path.startswith('s3://'):
  36. # 从s3://bucket/path格式解析
  37. path_parts = s3_path[5:].split('/', 1)
  38. bucket_name = path_parts[0]
  39. prefix = path_parts[1] if len(path_parts) > 1 else ''
  40. else:
  41. # 从/path格式解析,使用默认bucket
  42. bucket_name = 'public'
  43. prefix = s3_path.lstrip('/')
  44. # 移除路径开头的'public/'(如果存在),避免重复
  45. if prefix.startswith('public/'):
  46. prefix = prefix[7:]
  47. # 确保prefix以/结尾,以便正确列出目录内容
  48. if prefix and not prefix.endswith('/'):
  49. prefix += '/'
  50. self.logger.info(f"使用bucket: {bucket_name}, prefix: {prefix}")
  51. # 使用boto3列出文件
  52. response = self.client.list_objects_v2(
  53. Bucket=bucket_name,
  54. Prefix=prefix
  55. )
  56. files = []
  57. if 'Contents' in response:
  58. for obj in response['Contents']:
  59. # 跳过目录本身(以/结尾的对象)
  60. if not obj['Key'].endswith('/'):
  61. files.append({
  62. 'key': obj['Key'],
  63. 'size': obj['Size'],
  64. 'last_modified': obj['LastModified'].isoformat(),
  65. 'etag': obj['ETag'].strip('"')
  66. })
  67. self.logger.info(f"找到 {len(files)} 个文件")
  68. return {
  69. 'path': s3_path,
  70. 'bucket': bucket_name,
  71. 'prefix': prefix,
  72. 'files': files,
  73. 'count': len(files)
  74. }
  75. except Exception as e:
  76. self.logger.error(f"列出S3文件失败: {e}")
  77. raise Exception(f"列出S3文件失败: {e}")
  78. def generate_http_urls(self, s3_files_result: Dict[str, Any], base_url: str = "http://s3.vs1.lan") -> List[str]:
  79. """根据S3文件列表结果生成HTTP URL列表"""
  80. http_urls = []
  81. for file_info in s3_files_result['files']:
  82. # 从完整路径中提取文件名
  83. file_name = file_info['key'].split('/')[-1]
  84. # 构造完整的 HTTP URL
  85. http_url = f"{base_url}/{s3_files_result['bucket']}/{s3_files_result['prefix']}{file_name}"
  86. http_urls.append(http_url)
  87. return http_urls
  88. def list_s3_files_with_urls(self, s3_path: str, base_url: str = "http://s3.vs1.lan") -> Dict[str, Any]:
  89. """列出S3指定路径下的所有文件并生成HTTP URL列表"""
  90. # 先获取文件列表
  91. files_result = self.list_s3_files(s3_path)
  92. # 生成HTTP URL列表
  93. http_urls = self.generate_http_urls(files_result, base_url)
  94. # 将HTTP URL添加到结果中
  95. result = files_result.copy()
  96. result['http_urls'] = http_urls
  97. return result
  98. # 创建全局S3客户端实例
  99. s3_client = S3Client()
  100. def s3_uri_to_http_url(s3_uri):
  101. """
  102. 将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。
  103. 适用于公共可读的存储桶。
  104. """
  105. if not s3_uri.startswith('s3://'):
  106. # raise ValueError("Invalid S3 URI. Must start with 's3://'")
  107. return s3_uri
  108. # 提取 bucket 和 key
  109. path = s3_uri[5:] # 去除 's3://'
  110. parts = path.split('/', 1)
  111. bucket = parts[0]
  112. key = parts[1] if len(parts) > 1 else ''
  113. # 对 key 进行 URL 编码(保留路径斜杠)
  114. encoded_key = urllib.parse.quote(key, safe='/')
  115. # 获取并清理 endpoint(确保无末尾斜杠)
  116. endpoint = CFG.s3_endpoint.rstrip('/')
  117. # 拼接完整 URL
  118. return f"{endpoint}/{bucket}/{encoded_key}"
  119. def create_presigned_url_expanded(client_method_name, method_parameters=None,
  120. expiration=3600, http_method=None):
  121. """Generate a presigned URL to invoke an S3.Client method
  122. Not all the client methods provided in the AWS Python SDK are supported.
  123. :param client_method_name: Name of the S3.Client method, e.g., 'list_buckets'
  124. :param method_parameters: Dictionary of parameters to send to the method
  125. :param expiration: Time in seconds for the presigned URL to remain valid
  126. :param http_method: HTTP method to use (GET, etc.)
  127. :return: Presigned URL as string. If error, returns None.
  128. """
  129. # Generate a presigned URL for the S3 client method
  130. s3_client = boto3.client('s3')
  131. try:
  132. response = s3_client.generate_presigned_url(ClientMethod=client_method_name,
  133. Params=method_parameters,
  134. ExpiresIn=expiration,
  135. HttpMethod=http_method)
  136. except ClientError as e:
  137. logging.error(e)
  138. return None
  139. # The response contains the presigned URL
  140. return response
  141. def get_s3_uri_info(s3_uri:str):
  142. bucket_name = s3_uri.split('/')[2]
  143. object_name = '/'.join(s3_uri.split('/')[3:])
  144. if object_name.endswith('.mhtml'):
  145. content_type = 'multipart/related'
  146. else:
  147. content_type, _ = mimetypes.guess_type(object_name)
  148. content_type = content_type or 'application/octet-stream'
  149. upload_args = {
  150. 'ContentType': content_type,
  151. }
  152. return bucket_name, object_name, upload_args
  153. def upload_to_s3(content, filename:str, **extra_args):
  154. bucket_name, object_name, upload_args = get_s3_uri_info(filename)
  155. upload_args.update(extra_args)
  156. if isinstance(content, dict): # 处理字典类型
  157. content = json.dumps(content, ensure_ascii=False)
  158. if isinstance(content, str):
  159. content = content.encode('utf-8')
  160. print(bucket_name, object_name)
  161. s3.put_object(
  162. Bucket=bucket_name,
  163. Key=object_name,
  164. Body=content,
  165. **upload_args
  166. )
  167. return filename
  168. def upload_file_to_s3(file_path:str,s3_uri:str, **extra_args):
  169. bucket_name, object_name, upload_args = get_s3_uri_info(s3_uri)
  170. upload_args.update(extra_args)
  171. s3.upload_file(
  172. file_path,
  173. bucket_name,
  174. object_name,
  175. ExtraArgs=upload_args
  176. )
  177. def save_to_file(content, filename:Path, **extra_args):
  178. '''
  179. save_to_file(
  180. data,
  181. 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
  182. Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
  183. )
  184. '''
  185. if str(filename).startswith('s3://'):
  186. return upload_to_s3(content, str(filename), **extra_args)
  187. if isinstance(content, str):
  188. with open(filename, "w", encoding="utf-8") as file:
  189. file.write(content)
  190. else:
  191. with open(filename, "wb") as file:
  192. file.write(content)
  193. return filename
  194. def read_file(file_uri:str, mode='r'):
  195. if str(file_uri).startswith('s3://'):
  196. with open(file_uri, mode or 'r', transport_params={'client': s3}) as f:
  197. # 文件存在,继续操作
  198. return f.read()
  199. else:
  200. with open(file_uri, mode) as f:
  201. return f.read()
  202. def check_exists(file_uri:str):
  203. if not file_uri.startswith('s3://'):
  204. return Path(file_uri).exists()
  205. bucket_name, object_name, upload_args = get_s3_uri_info(file_uri)
  206. try:
  207. s3.head_object(Bucket=bucket_name, Key=object_name)
  208. return file_uri
  209. except (FileNotFoundError,OSError,ClientError) as e:
  210. if e.response['Error']['Code'] == '404':
  211. return False
  212. raise e
  213. def delete_s3_file(s3_uri:str):
  214. """删除S3文件"""
  215. if not s3_uri or not s3_uri.startswith('s3://'):
  216. return False
  217. try:
  218. bucket_name, object_name, _ = get_s3_uri_info(s3_uri)
  219. s3.delete_object(Bucket=bucket_name, Key=object_name)
  220. return True
  221. except (FileNotFoundError,OSError,ClientError) as e:
  222. logging.error(f"删除S3文件失败: {s3_uri}, 错误: {e}")
  223. return False
  224. def read_excel_from_url(url):
  225. """
  226. 使用 pandas 从 URL 读取 Excel 文件
  227. Args:
  228. url (str): Excel 文件的 URL
  229. Returns:
  230. dict: 包含所有工作表数据的字典
  231. """
  232. try:
  233. # 下载文件内容
  234. response = requests.get(url)
  235. response.raise_for_status()
  236. # 使用 io.BytesIO 将内容转换为文件对象
  237. file_content = io.BytesIO(response.content)
  238. # 使用 pandas 读取 Excel 文件
  239. # 读取所有工作表,设置header=None以保留第一行作为数据而不是标题
  240. excel_data = pd.read_excel(file_content, sheet_name=None, header=None)
  241. return excel_data
  242. except Exception as e:
  243. print(f"读取 Excel 文件时发生错误: {e}")
  244. return None
  245. def get_all_cells_text(excel_data):
  246. """
  247. 从 Excel 数据中提取所有单元格的文本内容,包括空值,保持原始结构
  248. Args:
  249. excel_data (dict): 包含所有工作表数据的字典
  250. Returns:
  251. dict: 按字典顺序排列的所有工作表的文本内容字典
  252. """
  253. if not excel_data:
  254. return {}
  255. result = {}
  256. # 按照字典顺序处理所有工作表
  257. for sheet_name in sorted(excel_data.keys()):
  258. df = excel_data[sheet_name]
  259. sheet_text = []
  260. # 遍历所有行和列,包括空值
  261. for idx, row in df.iterrows():
  262. row_text = []
  263. for value in row:
  264. if pd.isna(value):
  265. row_text.append("")
  266. else:
  267. row_text.append(str(value))
  268. # 用制表符分隔每列,保持表格结构
  269. sheet_text.append("\t".join(row_text))
  270. # 将当前工作表的文本内容添加到结果字典中
  271. result[sheet_name] = "\n".join(sheet_text)
  272. return result
  273. def extract_excel_text_from_url(url):
  274. """
  275. 从 URL 读取 Excel 文件并提取所有单元格的文本内容
  276. Args:
  277. url (str): Excel 文件的 URL
  278. Returns:
  279. dict: 按字典顺序排列的所有工作表的文本内容字典
  280. """
  281. # 读取 Excel 文件
  282. excel_data = read_excel_from_url(url)
  283. if excel_data:
  284. # 提取所有单元格内容(包括空值)
  285. return get_all_cells_text(excel_data)
  286. else:
  287. return {}
  288. def main():
  289. response = s3.list_buckets()
  290. # Output the bucket names
  291. print('Existing buckets:')
  292. for bucket in response['Buckets']:
  293. print(f' {bucket["Name"]}')
  294. if __name__ == "__main__":
  295. main()