excel_processor.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import pandas as pd
  2. from typing import List, Optional, Union, Tuple
  3. import logging
  4. from mylib.logging_config import setup_logging
  5. # Setup custom logging
  6. setup_logging()
  7. logger = logging.getLogger('excel_tool')
  8. class ExcelProcessor:
  9. def __init__(self, file_path, header_row=1):
  10. """Initialize Excel processor"""
  11. self.file_path = file_path
  12. self.header_row = header_row
  13. self.df = self.read_file(self.file_path, self.header_row)
  14. def _column_letter_to_index(self, col: str) -> int:
  15. """将Excel列字母转换为索引(A=0, B=1,...)"""
  16. index = 0
  17. for char in col.upper():
  18. if not 'A' <= char <= 'Z':
  19. raise ValueError(f"无效的列字母: {col}")
  20. index = index * 26 + (ord(char) - ord('A') + 1)
  21. return index - 1
  22. def _index_to_column_letter(self, index: int) -> str:
  23. """将列索引转换为Excel列字母(0=A, 1=B,...)"""
  24. if index < 0:
  25. raise ValueError("列索引不能为负数")
  26. letters = []
  27. while index >= 0:
  28. letters.append(chr(ord('A') + (index % 26)))
  29. index = index // 26 - 1
  30. return ''.join(reversed(letters))
  31. def search_headers(self, keywords: Union[str, List[str]]) -> List[Tuple[str, str]]:
  32. """
  33. 从表头搜索关键词,返回匹配的列名和列字母
  34. Args:
  35. keywords (Union[str, List[str]]): 要搜索的关键词或关键词列表
  36. Returns:
  37. List[Tuple[str, str]]: 匹配的列名和列字母列表,格式为[(列名, 列字母), ...]
  38. """
  39. try:
  40. # 如果输入是单个字符串,转换为列表
  41. if isinstance(keywords, str):
  42. keywords = [keywords]
  43. # 获取所有列名
  44. columns = self.df.columns.tolist()
  45. # 查找匹配的列
  46. matches = []
  47. for idx, col in enumerate(columns):
  48. if any(keyword.lower() in str(col).lower() for keyword in keywords):
  49. col_letter = self._index_to_column_letter(idx)
  50. matches.append((col, col_letter))
  51. logger.info(f"搜索关键词 {keywords} 找到 {len(matches)} 个匹配列")
  52. return matches
  53. except Exception as e:
  54. logger.error(f"搜索表头失败: {str(e)}")
  55. raise
  56. def read_file(self, file_path: str, header_row: Optional[int] = 0) -> pd.DataFrame:
  57. """
  58. 读取文件并返回DataFrame
  59. 支持Excel和CSV文件
  60. Args:
  61. file_path (str): 文件路径
  62. header_row (int, optional): 表头所在行号,从0开始计数. Defaults to 0.
  63. """
  64. try:
  65. if file_path.endswith('.csv'):
  66. df = pd.read_csv(file_path, header=header_row)
  67. logger.info(f"成功读取CSV文件: {file_path}, 表头行: {header_row}")
  68. else:
  69. # 对于Excel文件,指定engine参数
  70. df = pd.read_excel(file_path, engine='openpyxl', header=header_row)
  71. # 打印表头行
  72. logger.info(f"成功读取Excel文件: {file_path}, 表头行: {header_row}")
  73. logger.info(f"表头行: {df.columns.tolist()}")
  74. return df
  75. except Exception as e:
  76. logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}")
  77. raise
  78. def save_file(self, df: pd.DataFrame, output_path: str) -> None:
  79. """
  80. 将DataFrame保存为文件
  81. 支持Excel和CSV格式
  82. """
  83. try:
  84. if output_path.endswith('.csv'):
  85. df.to_csv(output_path, index=False)
  86. logger.info(f"成功保存CSV文件: {output_path}")
  87. else:
  88. # 对于Excel文件,指定engine参数
  89. df.to_excel(output_path, index=False, engine='openpyxl')
  90. logger.info(f"成功保存Excel文件: {output_path}")
  91. except Exception as e:
  92. logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}")
  93. raise
  94. def insert_column(self, ref_column: str, new_column_name: str, position: str = 'right') -> pd.DataFrame:
  95. """
  96. 在指定列旁边插入空列
  97. Args:
  98. ref_column (str): 参考列名或Excel列字母(如'A', 'B')
  99. new_column_name (str): 新列名
  100. position (str): 插入位置,'left'或'right',默认为'right'
  101. Returns:
  102. pd.DataFrame: 包含新列的DataFrame
  103. """
  104. df = self.df
  105. try:
  106. # 如果ref_column是字母,转换为列索引
  107. if ref_column.isalpha():
  108. col_index = self._column_letter_to_index(ref_column)
  109. ref_column = df.columns[col_index]
  110. # 获取参考列的位置
  111. ref_index = df.columns.get_loc(ref_column)
  112. # 计算插入位置
  113. insert_index = ref_index + 1 if position == 'right' else ref_index
  114. # 插入新列
  115. df.insert(insert_index, new_column_name, '')
  116. logger.info(f"成功在列 '{ref_column}' 的 '{position}' 插入新列 '{new_column_name}' ")
  117. return df
  118. except Exception as e:
  119. logger.error(f"插入列失败: {str(e)}")
  120. raise
  121. if __name__ == '__main__':
  122. # 测试代码
  123. processor = ExcelProcessor('/home/mrh/code/excel_tool/temp/测试_process.csv')
  124. # 测试CSV文件
  125. processor.insert_column('B', '翻译结果')
  126. # 测试搜索表头
  127. matches = processor.search_headers(['名称', '描述'])
  128. print(f"找到的匹配列: {matches}")
  129. # processor.save_file(df, '/home/mrh/code/excel_tool/temp/测试_process.csv')