|
@@ -1,74 +1,83 @@
|
|
|
-import csv
|
|
|
|
|
-import sys
|
|
|
|
|
-import os
|
|
|
|
|
|
|
+import pandas as pd
|
|
|
|
|
+from typing import List
|
|
|
|
|
+import logging
|
|
|
|
|
+from mylib.logging_config import setup_logging
|
|
|
|
|
|
|
|
-# 添加项目根目录到Python路径
|
|
|
|
|
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
|
|
+# Setup custom logging
|
|
|
|
|
+setup_logging()
|
|
|
|
|
+logger = logging.getLogger('excel_tool')
|
|
|
|
|
|
|
|
-from mylib.read_encoding_cvs import read_csv, detect_encoding
|
|
|
|
|
|
|
+class ExcelProcessor:
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ """Initialize Excel processor"""
|
|
|
|
|
+ pass
|
|
|
|
|
|
|
|
-def _column_letter_to_index(col):
|
|
|
|
|
- """将Excel列字母转换为索引(A=0, B=1,...)"""
|
|
|
|
|
- index = 0
|
|
|
|
|
- for char in col.upper():
|
|
|
|
|
- if not 'A' <= char <= 'Z':
|
|
|
|
|
- raise ValueError(f"无效的列字母: {col}")
|
|
|
|
|
- index = index * 26 + (ord(char) - ord('A') + 1)
|
|
|
|
|
- return index - 1
|
|
|
|
|
|
|
+ def _column_letter_to_index(self, col: str) -> int:
|
|
|
|
|
+ """将Excel列字母转换为索引(A=0, B=1,...)"""
|
|
|
|
|
+ index = 0
|
|
|
|
|
+ for char in col.upper():
|
|
|
|
|
+ if not 'A' <= char <= 'Z':
|
|
|
|
|
+ raise ValueError(f"无效的列字母: {col}")
|
|
|
|
|
+ index = index * 26 + (ord(char) - ord('A') + 1)
|
|
|
|
|
+ return index - 1
|
|
|
|
|
|
|
|
-def _read_csv_file(csv_file):
|
|
|
|
|
- """读取CSV文件并返回数据"""
|
|
|
|
|
- encoding = detect_encoding(csv_file)
|
|
|
|
|
- return read_csv(csv_file, specified_encoding=encoding)
|
|
|
|
|
-
|
|
|
|
|
-def _get_column_index(headers, column_name):
|
|
|
|
|
- """获取指定列的索引"""
|
|
|
|
|
- try:
|
|
|
|
|
- # 先尝试作为列字母处理
|
|
|
|
|
- ref_index = _column_letter_to_index(column_name)
|
|
|
|
|
- if ref_index >= len(headers):
|
|
|
|
|
- raise ValueError(f"列索引 {ref_index} 超出范围")
|
|
|
|
|
- except ValueError:
|
|
|
|
|
- # 如果不是列字母,尝试作为列名处理
|
|
|
|
|
|
|
+ def read_excel_file(self, file_path: str) -> pd.DataFrame:
|
|
|
|
|
+ """读取Excel文件并返回DataFrame"""
|
|
|
try:
|
|
try:
|
|
|
- ref_index = headers.index(column_name)
|
|
|
|
|
- except ValueError:
|
|
|
|
|
- raise ValueError(f"列 '{column_name}' 不存在")
|
|
|
|
|
- return ref_index
|
|
|
|
|
-
|
|
|
|
|
-def _insert_new_column(rows, headers, ref_index, new_column_name, position):
|
|
|
|
|
- """在指定位置插入新列"""
|
|
|
|
|
- insert_index = ref_index + 1 if position == 'right' else ref_index
|
|
|
|
|
- headers.insert(insert_index, new_column_name)
|
|
|
|
|
- for row in rows[1:]:
|
|
|
|
|
- row.insert(insert_index, '')
|
|
|
|
|
- return rows
|
|
|
|
|
|
|
+ df = pd.read_excel(file_path)
|
|
|
|
|
+ logger.info(f"成功读取文件: {file_path}")
|
|
|
|
|
+ return df
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}")
|
|
|
|
|
+ raise
|
|
|
|
|
|
|
|
-def _write_csv_file(output_file, rows):
|
|
|
|
|
- """将数据写入CSV文件"""
|
|
|
|
|
- with open(output_file, 'w', encoding='utf-8', newline='') as file:
|
|
|
|
|
- writer = csv.writer(file)
|
|
|
|
|
- writer.writerows(rows)
|
|
|
|
|
|
|
+ def save_excel_file(self, df: pd.DataFrame, output_path: str) -> None:
|
|
|
|
|
+ """将DataFrame保存为Excel文件"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ df.to_excel(output_path, index=False)
|
|
|
|
|
+ logger.info(f"成功保存文件: {output_path}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"保存文件失败: {output_path}, 错误: {str(e)}")
|
|
|
|
|
+ raise
|
|
|
|
|
|
|
|
-def insert_column(csv_file, column_name, new_column_name, position='right'):
|
|
|
|
|
- """
|
|
|
|
|
- 在指定列旁边插入空列
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- csv_file (str): CSV文件路径
|
|
|
|
|
- column_name (str): 参考列名或Excel列字母(如'A', 'B')
|
|
|
|
|
- new_column_name (str): 新列名
|
|
|
|
|
- position (str): 插入位置,'left'或'right',默认为'right'
|
|
|
|
|
- """
|
|
|
|
|
- rows = _read_csv_file(csv_file)
|
|
|
|
|
- headers = rows[0]
|
|
|
|
|
-
|
|
|
|
|
- ref_index = _get_column_index(headers, column_name)
|
|
|
|
|
- rows = _insert_new_column(rows, headers, ref_index, new_column_name, position)
|
|
|
|
|
-
|
|
|
|
|
- output_file = '/home/mrh/code/excel_tool/temp/测试_process.csv'
|
|
|
|
|
- _write_csv_file(output_file, rows)
|
|
|
|
|
|
|
+ def insert_column(self, df: pd.DataFrame, ref_column: str, new_column_name: str, position: str = 'right') -> pd.DataFrame:
|
|
|
|
|
+ """
|
|
|
|
|
+ 在指定列旁边插入空列
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ df (pd.DataFrame): 输入的DataFrame
|
|
|
|
|
+ ref_column (str): 参考列名或Excel列字母(如'A', 'B')
|
|
|
|
|
+ new_column_name (str): 新列名
|
|
|
|
|
+ position (str): 插入位置,'left'或'right',默认为'right'
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ pd.DataFrame: 包含新列的DataFrame
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 如果ref_column是字母,转换为列索引
|
|
|
|
|
+ if ref_column.isalpha():
|
|
|
|
|
+ col_index = self._column_letter_to_index(ref_column)
|
|
|
|
|
+ ref_column = df.columns[col_index]
|
|
|
|
|
+
|
|
|
|
|
+ # 获取参考列的位置
|
|
|
|
|
+ ref_index = df.columns.get_loc(ref_column)
|
|
|
|
|
+
|
|
|
|
|
+ # 计算插入位置
|
|
|
|
|
+ insert_index = ref_index + 1 if position == 'right' else ref_index
|
|
|
|
|
+
|
|
|
|
|
+ # 插入新列
|
|
|
|
|
+ df.insert(insert_index, new_column_name, '')
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"成功在列 {ref_column} 的{position}插入新列 {new_column_name}")
|
|
|
|
|
+ return df
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"插入列失败: {str(e)}")
|
|
|
|
|
+ raise
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
|
# 测试代码
|
|
# 测试代码
|
|
|
- insert_column('/home/mrh/code/excel_tool/temp/测试.csv', 'B', 'New Column')
|
|
|
|
|
|
|
+ processor = ExcelProcessor()
|
|
|
|
|
+ df = processor.read_excel_file('/home/mrh/code/excel_tool/temp/测试.xlsx')
|
|
|
|
|
+ df = processor.insert_column(df, 'B', 'New Column')
|
|
|
|
|
+ processor.save_excel_file(df, '/home/mrh/code/excel_tool/temp/测试_process.xlsx')
|