|
|
@@ -1,47 +1,74 @@
|
|
|
-from typing import List, Dict
|
|
|
-import pandas as pd
|
|
|
-from urllib.parse import quote
|
|
|
-from mylib.logging_config import setup_logging
|
|
|
-import logging
|
|
|
-
|
|
|
-# Setup logging
|
|
|
-setup_logging()
|
|
|
-logger = logging.getLogger('excel_tool')
|
|
|
-
|
|
|
-class ExcelProcessor:
|
|
|
- def __init__(self, file_path: str):
|
|
|
- """Initialize Excel processor with file path"""
|
|
|
- self.file_path = file_path
|
|
|
- self.df = pd.read_excel(file_path)
|
|
|
- logger.info(f"Loaded Excel file: {file_path}")
|
|
|
-
|
|
|
- def add_translation_column(self, column_name: str, translations: Dict[str, str]):
|
|
|
- """Add translated column next to specified column"""
|
|
|
- try:
|
|
|
- new_col = f"{column_name}_中文"
|
|
|
- self.df[new_col] = self.df[column_name].map(translations)
|
|
|
- logger.info(f"Added translation column for {column_name}")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error adding translation column: {str(e)}")
|
|
|
- raise
|
|
|
-
|
|
|
- def add_hyperlink_column(self, column_name: str, base_url: str):
|
|
|
- """Add hyperlink column for specified column"""
|
|
|
- try:
|
|
|
- new_col = f"{column_name}_链接"
|
|
|
- self.df[new_col] = self.df[column_name].apply(
|
|
|
- lambda x: f'=HYPERLINK("{base_url}{quote(x)}", "{x}")'
|
|
|
- )
|
|
|
- logger.info(f"Added hyperlink column for {column_name}")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error adding hyperlink column: {str(e)}")
|
|
|
- raise
|
|
|
-
|
|
|
- def save(self, output_path: str):
|
|
|
- """Save processed Excel file"""
|
|
|
+import csv
|
|
|
+import sys
|
|
|
+import os
|
|
|
+
|
|
|
+# 添加项目根目录到Python路径
|
|
|
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
+
|
|
|
+from mylib.read_encoding_cvs import read_csv, detect_encoding
|
|
|
+
|
|
|
+def _column_letter_to_index(col):
|
|
|
+ """将Excel列字母转换为索引(A=0, B=1,...)"""
|
|
|
+ index = 0
|
|
|
+ for char in col.upper():
|
|
|
+ if not 'A' <= char <= 'Z':
|
|
|
+ raise ValueError(f"无效的列字母: {col}")
|
|
|
+ index = index * 26 + (ord(char) - ord('A') + 1)
|
|
|
+ return index - 1
|
|
|
+
|
|
|
+def _read_csv_file(csv_file):
|
|
|
+ """读取CSV文件并返回数据"""
|
|
|
+ encoding = detect_encoding(csv_file)
|
|
|
+ return read_csv(csv_file, specified_encoding=encoding)
|
|
|
+
|
|
|
+def _get_column_index(headers, column_name):
|
|
|
+ """获取指定列的索引"""
|
|
|
+ try:
|
|
|
+ # 先尝试作为列字母处理
|
|
|
+ ref_index = _column_letter_to_index(column_name)
|
|
|
+ if ref_index >= len(headers):
|
|
|
+ raise ValueError(f"列索引 {ref_index} 超出范围")
|
|
|
+ except ValueError:
|
|
|
+ # 如果不是列字母,尝试作为列名处理
|
|
|
try:
|
|
|
- self.df.to_excel(output_path, index=False)
|
|
|
- logger.info(f"Saved processed file to {output_path}")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error saving file: {str(e)}")
|
|
|
- raise
|
|
|
+ ref_index = headers.index(column_name)
|
|
|
+ except ValueError:
|
|
|
+ raise ValueError(f"列 '{column_name}' 不存在")
|
|
|
+ return ref_index
|
|
|
+
|
|
|
+def _insert_new_column(rows, headers, ref_index, new_column_name, position):
|
|
|
+ """在指定位置插入新列"""
|
|
|
+ insert_index = ref_index + 1 if position == 'right' else ref_index
|
|
|
+ headers.insert(insert_index, new_column_name)
|
|
|
+ for row in rows[1:]:
|
|
|
+ row.insert(insert_index, '')
|
|
|
+ return rows
|
|
|
+
|
|
|
+def _write_csv_file(output_file, rows):
|
|
|
+ """将数据写入CSV文件"""
|
|
|
+ with open(output_file, 'w', encoding='utf-8', newline='') as file:
|
|
|
+ writer = csv.writer(file)
|
|
|
+ writer.writerows(rows)
|
|
|
+
|
|
|
+def insert_column(csv_file, column_name, new_column_name, position='right'):
|
|
|
+ """
|
|
|
+ 在指定列旁边插入空列
|
|
|
+
|
|
|
+ Args:
|
|
|
+ csv_file (str): CSV文件路径
|
|
|
+ column_name (str): 参考列名或Excel列字母(如'A', 'B')
|
|
|
+ new_column_name (str): 新列名
|
|
|
+ position (str): 插入位置,'left'或'right',默认为'right'
|
|
|
+ """
|
|
|
+ rows = _read_csv_file(csv_file)
|
|
|
+ headers = rows[0]
|
|
|
+
|
|
|
+ ref_index = _get_column_index(headers, column_name)
|
|
|
+ rows = _insert_new_column(rows, headers, ref_index, new_column_name, position)
|
|
|
+
|
|
|
+ output_file = '/home/mrh/code/excel_tool/temp/测试_process.csv'
|
|
|
+ _write_csv_file(output_file, rows)
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # 测试代码
|
|
|
+ insert_column('/home/mrh/code/excel_tool/temp/测试.csv', 'B', 'New Column')
|