excel_processor.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. import pandas as pd
  2. from typing import List, Optional, Union, Tuple
  3. import logging
  4. from dotenv import load_dotenv
  5. from mylib.pdfzh_translator import OpenAITranslator
  6. load_dotenv()
  7. from mylib.logging_config import setup_logging
  8. # Setup custom logging
  9. setup_logging()
  10. logger = logging.getLogger('excel_tool')
  11. class ExcelProcessor:
  12. def __init__(self, file_path, header_row=1):
  13. """Initialize Excel processor"""
  14. self.file_path = file_path
  15. self.header_row = header_row
  16. self.df = self.read_file(self.file_path, self.header_row)
  17. self.translator = OpenAITranslator()
  18. def _column_letter_to_index(self, col: str) -> int:
  19. """将Excel列字母转换为索引(A=0, B=1,...)"""
  20. index = 0
  21. for char in col.upper():
  22. if not 'A' <= char <= 'Z':
  23. raise ValueError(f"无效的列字母: {col}")
  24. index = index * 26 + (ord(char) - ord('A') + 1)
  25. return index - 1
  26. def _column_index_to_letter(self, idx: int) -> str:
  27. """将Excel列索引转换为字母(0=A, 1=B,...)"""
  28. if idx < 0:
  29. raise ValueError("列索引不能为负数")
  30. letters = []
  31. while idx >= 0:
  32. letters.append(chr(ord('A') + (idx % 26)))
  33. idx = (idx // 26) - 1
  34. return ''.join(reversed(letters))
  35. def read_column_data(self, column: Union[str, int], start_row: Optional[int] = None, end_row: Optional[int] = None) -> List[str]:
  36. """
  37. 读取指定列的数据,不含表头
  38. Args:
  39. column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
  40. start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
  41. end_row (Optional[int]): 结束行号(从0开始),默认为最后一行
  42. Returns:
  43. List[str]: 读取到的数据列表
  44. """
  45. try:
  46. # 如果输入是列字母,转换为列索引
  47. if isinstance(column, str) and column.isalpha():
  48. column = self._column_letter_to_index(column)
  49. # 设置默认值
  50. if start_row is None:
  51. start_row = 0
  52. if end_row is None:
  53. end_row = len(self.df) - 1
  54. # 读取数据
  55. data = self.df.iloc[start_row:end_row + 1, column].tolist()
  56. logger.info(f"成功读取列 {column} ,行 {start_row} : {end_row} 的数据")
  57. return data
  58. except Exception as e:
  59. logger.error(f"读取列数据失败: {str(e)}")
  60. raise
  61. def write_column_data(self, column: Union[str, int], data: List[str], start_row: Optional[int] = None) -> None:
  62. """
  63. 写入数据到指定列
  64. Args:
  65. column (Union[str, int]): 列字母(如'A')或列索引(从0开始)
  66. data (List[str]): 要写入的数据列表
  67. start_row (Optional[int]): 起始行号(从0开始),默认为表头下方第一行
  68. """
  69. try:
  70. # 如果输入是列字母,转换为列索引
  71. if isinstance(column, str) and column.isalpha():
  72. column = self._column_letter_to_index(column)
  73. # 设置默认值
  74. if start_row is None:
  75. start_row = 0
  76. # 检查数据长度
  77. end_row = start_row + len(data)
  78. if end_row > len(self.df):
  79. raise ValueError("数据长度超出表格范围")
  80. # 写入数据
  81. self.df.iloc[start_row:end_row, column] = data
  82. logger.info(f"成功写入 {len(data)} 条数据到列 {column} 从 {start_row} 行开始")
  83. except Exception as e:
  84. logger.error(f"写入列数据失败: {str(e)}")
  85. raise
  86. def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, int]]:
  87. """
  88. 从表头搜索关键词,返回匹配的列名和列索引
  89. Args:
  90. keywords (Union[str, List[str]]): 要搜索的关键词或关键词列表
  91. Returns:
  92. List[Tuple[str, int]]: 匹配的列名和列索引列表,格式为[(列名, 列索引), ...]
  93. """
  94. try:
  95. # 如果输入是单个字符串,转换为列表
  96. if isinstance(keywords, str):
  97. keywords = [keywords]
  98. # 获取所有列名
  99. columns = self.df.columns.tolist()
  100. # 查找匹配的列
  101. matches = []
  102. for idx, col in enumerate(columns):
  103. if any(keyword.lower() in str(col).lower() for keyword in keywords):
  104. matches.append((col, idx))
  105. logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列 {matches}")
  106. return matches
  107. except Exception as e:
  108. logger.error(f"搜索表头失败: {str(e)}")
  109. raise
  110. def read_file(self, file_path: str, header_row: Optional[int] = 0) -> pd.DataFrame:
  111. """
  112. 读取文件并返回DataFrame
  113. 支持Excel和CSV文件
  114. Args:
  115. file_path (str): 文件路径
  116. header_row (int, optional): 表头所在行号,从0开始计数. Defaults to 0.
  117. """
  118. try:
  119. if file_path.endswith('.csv'):
  120. df = pd.read_csv(file_path, header=header_row)
  121. logger.info(f"成功读取CSV文件: {file_path}, 表头行: {header_row}")
  122. else:
  123. # 对于Excel文件,指定engine参数
  124. df = pd.read_excel(file_path, engine='openpyxl', header=header_row)
  125. # 打印表头行
  126. logger.info(f"成功读取Excel文件: {file_path}, 表头行: {header_row}")
  127. logger.info(f"表头行: {df.columns.tolist()}")
  128. return df
  129. except Exception as e:
  130. logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}")
  131. raise
  132. def save_file(self, output_path: str) -> None:
  133. """
  134. 将DataFrame保存为文件
  135. 支持Excel和CSV格式
  136. """
  137. df = self.df
  138. try:
  139. if output_path.endswith('.csv'):
  140. df.to_csv(output_path, index=False)
  141. logger.info(f"成功保存CSV文件: {output_path}")
  142. else:
  143. # 对于Excel文件,指定engine参数
  144. df.to_excel(output_path, index=False, engine='openpyxl')
  145. logger.info(f"成功保存Excel文件: {output_path}")
  146. except Exception as e:
  147. logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}")
  148. raise
  149. def _pad_values_to_match_index(self, values: list, length: int) -> list:
  150. """
  151. 如果values长度不足,用空字符串填充到指定长度
  152. Args:
  153. values (list): 原始值列表
  154. length (int): 目标长度
  155. Returns:
  156. list: 填充后的列表
  157. """
  158. if len(values) < length:
  159. return values + [''] * (length - len(values))
  160. return values
  161. def insert_column_with_header(self, column: Union[str, int], header: str) -> int:
  162. """
  163. 在指定列之后插入一个空列,并在新列的第一行写入表头
  164. 如果列名已存在,则返回现有列的索引
  165. Args:
  166. column (Union[str, int]): 要插入空列的列字母或列索引(从0开始)
  167. header (str): 新列的表头
  168. Returns:
  169. int: 新列或现有列的索引
  170. """
  171. try:
  172. # 如果输入是列字母,转换为列索引
  173. if isinstance(column, str) and column.isalpha():
  174. column_index = self._column_letter_to_index(column)
  175. else:
  176. column_index = int(column)
  177. # 检查列名是否已存在
  178. if header in self.df.columns:
  179. logger.info(f"列 {header} 已存在,返回现有列索引")
  180. return list(self.df.columns).index(header)
  181. # 插入空列
  182. self.df.insert(column_index + 1, header, '')
  183. logger.info(f"成功在列 {column} 后插入空列,并在第一行写入表头 {header}")
  184. return column_index + 1
  185. except Exception as e:
  186. logger.error(f"插入列失败: {str(e)}")
  187. raise
  188. def set_cell_value(self, column: str, row: int, value: str) -> None:
  189. """
  190. 设置指定单元格的值
  191. Args:
  192. column (str): 列字母
  193. row (int): 行号(从0开始)
  194. value (str): 要设置的值
  195. """
  196. try:
  197. # 将列字母转换为列索引
  198. column_index = self._column_letter_to_index(column)
  199. # 设置单元格的值
  200. self.df.at[row, column_index] = value
  201. logger.info(f"成功设置单元格 {column}{row} 的值为 {value}")
  202. except Exception as e:
  203. logger.error(f"设置单元格值失败: {str(e)}")
  204. raise
  205. def translate_column(self, column: Union[str, int]) -> int:
  206. """
  207. 翻译指定列的内容,并在该列右侧插入翻译结果列
  208. Args:
  209. column (Union[str, int]): 要翻译的列字母或列索引(从0开始)
  210. Returns:
  211. int: 新插入的翻译结果列的索引
  212. """
  213. try:
  214. # 在指定列右侧插入新列
  215. new_col_index = self.insert_column_with_header(column, '翻译结果')
  216. # 读取原列数据
  217. data = self.read_column_data(column)
  218. logger.info(f"读取到的数据: {data}")
  219. # 批量翻译
  220. translated_texts = self.translator._batch_translate(data)
  221. # 写入翻译结果
  222. self.write_column_data(new_col_index, translated_texts)
  223. # 返回新列索引
  224. return new_col_index
  225. except Exception as e:
  226. logger.error(f"翻译列失败: {str(e)}")
  227. raise
  228. def translate_columns_by_keywords(self, keywords: List[str]) -> None:
  229. """
  230. 根据关键词搜索表头,批量翻译匹配的列
  231. Args:
  232. keywords (List[str]): 要搜索的关键词列表
  233. """
  234. try:
  235. # 搜索匹配的列
  236. matches = self.search_headers(keywords)
  237. ref_column_indices = [match[1] for match in matches]
  238. logger.info(f"找到的匹配列索引: {ref_column_indices}")
  239. adjusted_col_idx = 0
  240. # 从左到右依次翻译
  241. for title, col_idx in enumerate(ref_column_indices):
  242. col_idx += adjusted_col_idx
  243. logger.info(f"正在翻译第 {col_idx} 列")
  244. # 翻译当前列
  245. new_col_idx = self.translate_column(adjusted_col_idx)
  246. # 写入后的数据
  247. new_data = self.read_column_data(new_col_idx)
  248. logger.info(f"写入 {new_col_idx} 列后的数据: {new_data}")
  249. # 由于前面插入新列会影响后续列的索引,需要调整
  250. adjusted_col_idx += 1
  251. except Exception as e:
  252. logger.error(f"批量翻译列失败: {str(e)}")
  253. raise
  254. if __name__ == '__main__':
  255. # 测试代码
  256. processor = ExcelProcessor('/home/mrh/code/excel_tool/temp/测试.csv.utf8.csv')
  257. processor.translate_columns_by_keywords(['类别', '搜索词'])
  258. # matches = processor.search_headers(['类别', '搜索词'])
  259. # ref_column_indices = [match[1] for match in matches]
  260. # logger.info(f"找到的匹配列索引: {ref_column_indices}")
  261. # process_col = ref_column_indices[0]
  262. # new_col_index = processor.translate_column(process_col)
  263. # new_data = processor.read_column_data(new_col_index)
  264. # print(f"写入后的数据: {new_data}")
  265. # 获得某一列的所有数据
  266. # processor.set_cell_value('C', 0, '测试')
  267. # 测试搜索表头
  268. # matches = processor.search_headers(['类别', '搜索词'])
  269. # logger.info(f"找到的匹配列号: {ref_column_list}")
  270. # # 测试读取列数据
  271. # data = processor.read_column_data('B')
  272. # print(f"读取到的数据: {data}")
  273. # # 测试写入列数据
  274. # new_data = ['新数据1', '新数据2', '新数据3']
  275. # processor.write_column_data('C', new_data)
  276. processor.save_file('/home/mrh/code/excel_tool/temp/测试_process.csv')