| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import csv
- import chardet
- from ai_trans import translate_sentences
- from brand_add_url_link import create_hyperlink
- def detect_encoding(file_path):
- with open(file_path, 'rb') as f:
- raw_data = f.read()
- result = chardet.detect(raw_data)
- return result['encoding']
- def read_csv(file_path):
- encodings_to_try = ['utf-8-sig', 'gb18030', 'shift_jis', 'euc-jp']
- detected_encoding = detect_encoding(file_path)
- print(f"Detected encoding: {detected_encoding}")
-
- if detected_encoding:
- encodings_to_try.insert(0, detected_encoding)
-
- for encoding in encodings_to_try:
- try:
- with open(file_path, 'r', encoding=encoding) as f:
- reader = csv.reader(f)
- return list(reader)
- except UnicodeDecodeError:
- continue
- except Exception as e:
- print(f"Error with encoding {encoding}: {e}")
- continue
-
- raise Exception("Failed to read file with all attempted encodings")
- def process_row(row, search_term_index):
- # Add translation column after search term
- search_term = row[search_term_index]
- try:
- print(f"Translating: {search_term}")
- translations = translate_sentences([search_term])
- print(f"Translation result: {translations}")
-
- if not translations or len(translations) == 0:
- translated = "翻译失败(无结果)"
- elif isinstance(translations, int): # Handle case where function returns error code
- translated = f"翻译失败(错误码:{translations})"
- else:
- translated = translations[0]
-
- except Exception as e:
- print(f"Translation error for '{search_term}': {str(e)}")
- translated = f"翻译失败(异常:{str(e)})"
-
- row.insert(search_term_index + 1, translated)
-
- # Add Amazon search link
- amazon_url = f"https://www.amazon.co.jp/s?k={search_term}"
- row[search_term_index] = create_hyperlink(search_term, amazon_url)
-
- return row
- def save_csv(data, file_path):
- with open(file_path, 'w', encoding='utf-8-sig', newline='') as f:
- writer = csv.writer(f)
- writer.writerows(data)
- def main(input_file, output_file):
- try:
- # Read CSV with proper encoding
- data = read_csv(input_file)
-
- # Process each row (skip header row)
- search_term_index = 1 # Search term is in second column
- for i, row in enumerate(data[1:], start=1):
- try:
- print(f"\nProcessing row {i}")
- data[i] = process_row(row, search_term_index)
- print(f"Processed row {i} successfully")
- except Exception as e:
- print(f"Error processing row {i}: {str(e)}")
- # Insert empty translation column to maintain structure
- row.insert(search_term_index + 1, "翻译失败(处理错误)")
- data[i] = row
- break
-
- # Save processed data
- save_csv(data, output_file)
- print(f"Successfully processed and saved to {output_file}")
-
- except Exception as e:
- print(f"Error processing file: {e}")
- raise
- if __name__ == "__main__":
- input_file = "测试.csv"
- output_file = "processed_测试.csv"
- main(input_file, output_file)
|