import os import csv from urllib.parse import quote from ai_trans import translate_sentences import chardet def create_search_link(value): """为搜索词创建亚马逊搜索链接""" return f'=HYPERLINK("https://www.amazon.co.jp/s?k={quote(value)}", "{value}")' def detect_encoding(file_path): """检测文件编码""" # 使用chardet进行更可靠的编码检测 with open(file_path, 'rb') as f: raw_data = f.read(10000) # 读取前10000字节用于检测 result = chardet.detect(raw_data) encoding = result['encoding'] # 处理一些常见的不准确检测结果 if encoding == 'SHIFT_JIS': return 'shift_jis' elif encoding == 'EUC-JP': return 'euc-jp' elif encoding == 'ISO-8859-1': # 可能是UTF-8被误判为ISO-8859-1 try: raw_data.decode('utf-8') return 'utf-8' except: return 'cp932' return encoding or 'utf-8' def read_csv(file_path): """读取CSV文件并返回数据列表""" encoding = detect_encoding(file_path) print(f"Detected encoding: {encoding}") try: with open(file_path, mode='r', encoding=encoding, errors='replace') as file: # 使用csv.Sniffer检测分隔符 sample = file.read(1024) file.seek(0) try: dialect = csv.Sniffer().sniff(sample) except: # 如果无法自动检测,使用默认设置 dialect = csv.excel reader = csv.reader(file, dialect) data = [row for row in reader] # 验证第一行是否包含有效数据 if len(data) > 0 and len(data[0]) > 0: return data, encoding else: raise ValueError("Empty or invalid CSV file") except Exception as e: print(f"Error reading CSV file: {e}") raise return [], None def insert_empty_column(data, column_index): """在指定列之后插入一个空列""" for row in data: row.insert(column_index + 1, '') return data def translate_column(data, column_index, start_row=0, target_language='zh'): """ 翻译指定列的文本,并将结果保存到下一列 :param data: CSV数据列表 :param column_index: 需要翻译的列索引 :param start_row: 开始翻译的行索引,默认为第0行 :param target_language: 目标语言,默认为中文 :return: 翻译后的数据列表 """ sentences_to_translate = [row[column_index] for i, row in enumerate(data[start_row:], start=start_row) if len(row) > column_index] if sentences_to_translate: try: print(f"Translating {len(sentences_to_translate)} sentences to {target_language}.") translated_output = translate_sentences(sentences_to_translate, target_language) translations = translated_output.get("translations", []) for i, row in enumerate(data[start_row:], start=start_row): if len(row) > column_index and i - start_row < len(translations): row[column_index + 1] = translations[i - start_row] except Exception as e: print(f"Error translating rows: {e}") raise return data def add_search_links(data, column_index, start_row=0): """为搜索词列添加超链接""" for i, row in enumerate(data[start_row:], start=start_row): if len(row) > column_index: row[column_index] = create_search_link(row[column_index]) return data def save_csv(data, file_path, encoding='utf-8-sig'): """将数据保存为CSV文件""" try: # 确保使用能够处理所有字符的编码 if encoding.lower() in ['gb2312', 'gbk']: encoding = 'gb18030' # 更全面的中文编码 with open(file_path, mode='w', newline='', encoding=encoding, errors='replace') as file: writer = csv.writer(file) writer.writerows(data) except Exception as e: print(f"Error saving CSV file: {e}") raise def process_csv(input_file, output_file, column_index, start_row=0, target_language='zh'): """处理CSV文件的主要函数""" try: data, detected_encoding = read_csv(input_file) if not data: raise ValueError("No data found in CSV file") # 插入空列 data = insert_empty_column(data, column_index) # 翻译第二列的文本并保存到下一列 data = translate_column(data, column_index, start_row, target_language) # 为搜索词添加超链接 data = add_search_links(data, column_index, start_row) # 保存为新文件,强制使用utf-8-sig编码以确保兼容性 save_csv(data, output_file, encoding='utf-8-sig') print(f"Successfully processed and saved to {output_file}") except Exception as e: print(f"Error processing CSV file: {e}") exit(1) # 遇到错误立即退出 if __name__ == "__main__": input_file = "测试.csv" output_file = "测试_processed.csv" column_index = 1 # 插入空列的列索引(第2列) start_row = 2 # 从第2行开始翻译(通常第0行是标题) try: process_csv(input_file, output_file, column_index, start_row) except Exception as e: print(f"Fatal error: {e}") exit(1)