|
|
@@ -25,32 +25,14 @@ def index_to_column_letter(index: int) -> str:
|
|
|
|
|
|
def read_csv_with_header(
|
|
|
file_path: str,
|
|
|
- encoding: str = 'cp936',
|
|
|
- header_row: int = 1,
|
|
|
- skip_rows: int = 0
|
|
|
+ encoding: str = 'cp936'
|
|
|
) -> List[List[str]]:
|
|
|
- """读取CSV文件并返回数据和表头
|
|
|
-
|
|
|
- Args:
|
|
|
- file_path: 文件路径
|
|
|
- encoding: 文件编码
|
|
|
- header_row: 表头所在行号(1-based)
|
|
|
- skip_rows: 要跳过的行数(包括表头前的行)
|
|
|
- """
|
|
|
+ """读取CSV文件并返回数据和表头"""
|
|
|
try:
|
|
|
with open(file_path, 'r', encoding=encoding) as f:
|
|
|
reader = csv.reader(f)
|
|
|
-
|
|
|
- # 跳过指定行数
|
|
|
- for _ in range(skip_rows):
|
|
|
- next(reader)
|
|
|
-
|
|
|
- # 读取并跳过表头前的行
|
|
|
- for _ in range(header_row - 1):
|
|
|
- next(reader)
|
|
|
-
|
|
|
- header = next(reader)
|
|
|
- data = [row for row in reader]
|
|
|
+ header = next(reader) # 读取表头(第1行)
|
|
|
+ data = [row for row in reader] # 读取剩余数据
|
|
|
|
|
|
logger.info(f"成功读取文件:{file_path}")
|
|
|
logger.debug(f"表头:{header}")
|
|
|
@@ -63,61 +45,26 @@ def search_keywords(
|
|
|
data: List[List[str]],
|
|
|
header: List[str],
|
|
|
keywords: Union[str, List[str]],
|
|
|
- row_index: int = 0,
|
|
|
- search_header: bool = False,
|
|
|
- header_search_start: int = 1,
|
|
|
- data_search_start: int = 2
|
|
|
+ row_index: int = 0
|
|
|
) -> List[str]:
|
|
|
- """搜索指定行中包含关键词的单元格并返回列名列表
|
|
|
-
|
|
|
- Args:
|
|
|
- data: 数据行列表
|
|
|
- header: 表头行
|
|
|
- keywords: 要搜索的关键词
|
|
|
- row_index: 要搜索的行索引(0-based)
|
|
|
- search_header: 是否搜索表头行
|
|
|
- header_search_start: 表头搜索起始行(1-based)
|
|
|
- data_search_start: 数据搜索起始行(1-based)
|
|
|
- """
|
|
|
+ """搜索指定行中包含关键词的单元格并返回列名列表"""
|
|
|
if isinstance(keywords, str):
|
|
|
keywords = [keywords]
|
|
|
|
|
|
found_columns = set()
|
|
|
|
|
|
- # 如果要搜索表头行
|
|
|
- if search_header:
|
|
|
- # 检查header_search_start是否有效
|
|
|
- if header_search_start < 1:
|
|
|
- logger.warning(f"header_search_start {header_search_start} 无效,使用默认值1")
|
|
|
- header_search_start = 1
|
|
|
-
|
|
|
- # 搜索表头行
|
|
|
- for col_index, cell in enumerate(header):
|
|
|
- if any(keyword in cell for keyword in keywords):
|
|
|
- col_letter = index_to_column_letter(col_index)
|
|
|
- found_columns.add(col_letter)
|
|
|
- logger.debug(f"在表头 {col_letter} 列找到关键词: {cell}")
|
|
|
- else:
|
|
|
- # 检查data_search_start是否有效
|
|
|
- if data_search_start < 1:
|
|
|
- logger.warning(f"data_search_start {data_search_start} 无效,使用默认值2")
|
|
|
- data_search_start = 2
|
|
|
-
|
|
|
- # 计算实际行索引
|
|
|
- actual_row_index = row_index + data_search_start - 1
|
|
|
-
|
|
|
- # 检查行索引是否在数据范围内
|
|
|
- if actual_row_index >= len(data):
|
|
|
- logger.warning(f"行索引 {actual_row_index} 超出数据范围")
|
|
|
- return []
|
|
|
-
|
|
|
- # 搜索数据行
|
|
|
- row = data[actual_row_index]
|
|
|
- for col_index, cell in enumerate(row):
|
|
|
- if any(keyword in cell for keyword in keywords):
|
|
|
- col_letter = index_to_column_letter(col_index)
|
|
|
- found_columns.add(col_letter)
|
|
|
- logger.debug(f"在 {col_letter}{actual_row_index + 2} 找到关键词: {cell}")
|
|
|
+ # 检查行索引是否在数据范围内
|
|
|
+ if row_index >= len(data):
|
|
|
+ logger.warning(f"行索引 {row_index} 超出数据范围")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 搜索数据行(从第2行开始)
|
|
|
+ row = data[row_index]
|
|
|
+ for col_index, cell in enumerate(row):
|
|
|
+ if any(keyword in cell for keyword in keywords):
|
|
|
+ col_letter = index_to_column_letter(col_index)
|
|
|
+ found_columns.add(col_letter)
|
|
|
+ logger.debug(f"在 {col_letter}{row_index + 2} 找到关键词: {cell}")
|
|
|
|
|
|
found_columns = sorted(found_columns, key=lambda x: column_letter_to_index(x))
|
|
|
logger.info(f"找到包含关键词的列: {', '.join(found_columns)}")
|
|
|
@@ -133,20 +80,13 @@ def translate_columns_data(
|
|
|
target_lang: str = 'zh-CN'
|
|
|
) -> List[List[str]]:
|
|
|
"""翻译多个指定列的数据"""
|
|
|
- # 记录用户传入的参数
|
|
|
- logger.info(f"翻译参数:源语言={source_lang}, 目标语言={target_lang}")
|
|
|
- logger.info(f"翻译范围:从第 {start_row} 行到第 {end_row if end_row else '最后'} 行")
|
|
|
-
|
|
|
translator = OpenAITranslator(lang_out=target_lang, lang_in=source_lang)
|
|
|
-
|
|
|
end_row = end_row if end_row is not None else len(data)
|
|
|
rows_to_translate = data[start_row - 1:end_row] # 转换为0-based索引
|
|
|
|
|
|
- logger.info(f"开始翻译 {start_row} 到 {end_row} 行的数据")
|
|
|
-
|
|
|
# 按顺序处理每一列
|
|
|
for i, col_index in enumerate(column_indices):
|
|
|
- # 计算当前列的实际索引(考虑之前插入的列)
|
|
|
+ # 计算当前列的实际索引
|
|
|
current_col_index = col_index + i
|
|
|
|
|
|
# 插入新列
|
|
|
@@ -159,21 +99,13 @@ def translate_columns_data(
|
|
|
# 提取要翻译的文本
|
|
|
texts_to_translate = [row[current_col_index] for row in rows_to_translate]
|
|
|
|
|
|
- # 在翻译前log出提取的内容
|
|
|
- logger.info(f"列 {current_col_index} 提取的内容示例:")
|
|
|
- for idx, text in enumerate(texts_to_translate[:3], start=start_row):
|
|
|
- logger.info(f"第 {idx + 1} 行: {text}")
|
|
|
-
|
|
|
# 批量翻译
|
|
|
translated_texts = translator._batch_translate(texts_to_translate)
|
|
|
|
|
|
# 将翻译结果插入新列
|
|
|
for j, row in enumerate(rows_to_translate):
|
|
|
row[current_col_index + 1] = translated_texts[j]
|
|
|
-
|
|
|
- logger.info(f"列 {current_col_index} 翻译完成")
|
|
|
|
|
|
- logger.info("所有列翻译完成")
|
|
|
return data, header
|
|
|
|
|
|
def save_csv(
|
|
|
@@ -201,51 +133,17 @@ def process_csv(
|
|
|
end_row: Optional[int] = None,
|
|
|
source_lang: str = 'auto',
|
|
|
target_lang: str = 'zh-CN',
|
|
|
- encoding: str = 'cp936',
|
|
|
- header_row: int = 1,
|
|
|
- skip_rows: int = 0,
|
|
|
- header_search_start: int = 1, # 默认从第1行开始搜索表头
|
|
|
- data_search_start: int = 2 # 默认从第2行开始搜索数据
|
|
|
+ encoding: str = 'cp936'
|
|
|
):
|
|
|
- """处理CSV文件的主函数
|
|
|
-
|
|
|
- Args:
|
|
|
- input_file: 输入文件路径
|
|
|
- output_file: 输出文件路径
|
|
|
- columns: 要处理的列
|
|
|
- start_row: 开始行(1-based)
|
|
|
- end_row: 结束行
|
|
|
- source_lang: 源语言
|
|
|
- target_lang: 目标语言
|
|
|
- encoding: 文件编码
|
|
|
- header_row: 表头所在行号(1-based)
|
|
|
- skip_rows: 要跳过的行数(包括表头前的行)
|
|
|
- header_search_start: 表头搜索起始行(1-based)
|
|
|
- data_search_start: 数据搜索起始行(1-based)
|
|
|
- """
|
|
|
+ """处理CSV文件的主函数"""
|
|
|
try:
|
|
|
- # 记录用户传入的参数
|
|
|
- logger.info(f"处理文件:{input_file}")
|
|
|
- logger.info(f"输出文件:{output_file}")
|
|
|
- logger.info(f"处理列:{columns}")
|
|
|
- logger.info(f"编码:{encoding}")
|
|
|
- logger.info(f"表头行号:{header_row}")
|
|
|
- logger.info(f"跳过行数:{skip_rows}")
|
|
|
- logger.info(f"表头搜索起始行:{header_search_start}")
|
|
|
- logger.info(f"数据搜索起始行:{data_search_start}")
|
|
|
-
|
|
|
# 转换列字母为索引
|
|
|
if isinstance(columns, str):
|
|
|
columns = [columns]
|
|
|
column_indices = [column_letter_to_index(col) for col in columns]
|
|
|
|
|
|
# 读取文件
|
|
|
- header, data = read_csv_with_header(
|
|
|
- input_file,
|
|
|
- encoding=encoding,
|
|
|
- header_row=header_row,
|
|
|
- skip_rows=skip_rows
|
|
|
- )
|
|
|
+ header, data = read_csv_with_header(input_file, encoding=encoding)
|
|
|
|
|
|
# 翻译指定列
|
|
|
data, header = translate_columns_data(
|
|
|
@@ -271,25 +169,18 @@ if __name__ == "__main__":
|
|
|
|
|
|
# 示例用法
|
|
|
file_path = "/home/mrh/code/excel_tool/temp/测试.csv"
|
|
|
+ output_path = "/home/mrh/code/excel_tool/temp/测试_processed.csv"
|
|
|
|
|
|
- # 单独测试search_keywords
|
|
|
- header, data = read_csv_with_header(file_path, header_row=1, skip_rows=1)
|
|
|
- # 搜索表头
|
|
|
- found_columns = search_keywords(data, header, ["搜索词", "类别"], search_header=True)
|
|
|
- print(f"在表头找到的列: {found_columns}")
|
|
|
-
|
|
|
+ # 读取文件并搜索关键词
|
|
|
+ header, data = read_csv_with_header(file_path)
|
|
|
+ found_columns = search_keywords(data, header, ["搜索词", "类别"])
|
|
|
|
|
|
- # 完整流程测试
|
|
|
- output_path = "/home/mrh/code/excel_tool/temp/测试_processed.csv"
|
|
|
+ # 处理文件
|
|
|
process_csv(
|
|
|
input_file=file_path,
|
|
|
output_file=output_path,
|
|
|
- columns=found_columns, # 使用搜索到的列
|
|
|
- start_row=1,
|
|
|
+ columns=found_columns,
|
|
|
+ start_row=2,
|
|
|
source_lang='auto',
|
|
|
- target_lang='zh-CN',
|
|
|
- header_row=1,
|
|
|
- skip_rows=1,
|
|
|
- header_search_start=1,
|
|
|
- data_search_start=2
|
|
|
+ target_lang='zh-CN'
|
|
|
)
|