|
|
@@ -1,19 +1,36 @@
|
|
|
import csv
|
|
|
import chardet
|
|
|
+import logging
|
|
|
+import sys
|
|
|
from pathlib import Path
|
|
|
from ai_trans import translate_sentences
|
|
|
from brand_add_url_link import create_hyperlink
|
|
|
|
|
|
+# Configure logging
|
|
|
+logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ format='%(asctime)s - %(levelname)s - %(message)s',
|
|
|
+ handlers=[
|
|
|
+ logging.FileHandler('process_data.log'),
|
|
|
+ logging.StreamHandler()
|
|
|
+ ]
|
|
|
+)
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
def detect_encoding(file_path):
|
|
|
- with open(file_path, 'rb') as f:
|
|
|
- raw_data = f.read()
|
|
|
- result = chardet.detect(raw_data)
|
|
|
- return result['encoding']
|
|
|
+ try:
|
|
|
+ with open(file_path, 'rb') as f:
|
|
|
+ raw_data = f.read()
|
|
|
+ result = chardet.detect(raw_data)
|
|
|
+ return result['encoding']
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error detecting encoding for {file_path}: {e}")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
def read_csv(file_path):
|
|
|
encodings_to_try = ['utf-8-sig', 'gb18030', 'shift_jis', 'euc-jp']
|
|
|
detected_encoding = detect_encoding(file_path)
|
|
|
- print(f"Detected encoding: {detected_encoding}")
|
|
|
+ logger.info(f"Detected encoding: {detected_encoding}")
|
|
|
|
|
|
if detected_encoding:
|
|
|
encodings_to_try.insert(0, detected_encoding)
|
|
|
@@ -26,45 +43,61 @@ def read_csv(file_path):
|
|
|
except UnicodeDecodeError:
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
- print(f"Error with encoding {encoding}: {e}")
|
|
|
+ logger.error(f"Error with encoding {encoding}: {e}")
|
|
|
continue
|
|
|
|
|
|
- raise Exception("Failed to read file with all attempted encodings")
|
|
|
+ logger.error("Failed to read file with all attempted encodings")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
def insert_empty_column(data, column_index):
|
|
|
"""在指定列之后插入一个空列"""
|
|
|
- for row in data:
|
|
|
- row.insert(column_index + 1, '') # 插入在目标列后面
|
|
|
- return data
|
|
|
+ try:
|
|
|
+ for row in data:
|
|
|
+ row.insert(column_index + 1, '')
|
|
|
+ return data
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error inserting empty column at index {column_index}: {e}")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
def process_row(row, search_term_index):
|
|
|
- # Add translation column after search term
|
|
|
- search_term = row[search_term_index]
|
|
|
- print(f"Translating: {search_term}")
|
|
|
- translations = translate_sentences([search_term])
|
|
|
- print(f"Translation result: {translations}")
|
|
|
-
|
|
|
- if not translations or len(translations) == 0:
|
|
|
- translated = "翻译失败(无结果)"
|
|
|
- elif isinstance(translations, int): # Handle case where function returns error code
|
|
|
- translated = f"翻译失败(错误码:{translations})"
|
|
|
- else:
|
|
|
- translated = translations[0]
|
|
|
+ try:
|
|
|
+ # Add translation column after search term
|
|
|
+ search_term = row[search_term_index]
|
|
|
+ logger.info(f"Translating: {search_term}")
|
|
|
+ translations = translate_sentences([search_term])
|
|
|
+ logger.info(f"Translation result: {translations}")
|
|
|
|
|
|
-
|
|
|
- # Update the row with translation in the new column
|
|
|
- row[search_term_index + 1] = translated
|
|
|
-
|
|
|
- # Add Amazon search link
|
|
|
- amazon_url = f"https://www.amazon.co.jp/s?k={search_term}"
|
|
|
- row[search_term_index] = create_hyperlink(search_term, amazon_url)
|
|
|
-
|
|
|
- return row
|
|
|
+ if not translations or len(translations) == 0:
|
|
|
+ translated = "翻译失败(无结果)"
|
|
|
+ logger.error(f"Translation failed for '{search_term}': No result")
|
|
|
+ sys.exit(1)
|
|
|
+ elif isinstance(translations, int): # Handle case where function returns error code
|
|
|
+ translated = f"翻译失败(错误码:{translations})"
|
|
|
+ logger.error(f"Translation error for '{search_term}': {translations}")
|
|
|
+ sys.exit(1)
|
|
|
+ else:
|
|
|
+ translated = translations[0]
|
|
|
+
|
|
|
+ # Update the row with translation in the new column
|
|
|
+ row[search_term_index + 1] = translated
|
|
|
+
|
|
|
+ # Add Amazon search link
|
|
|
+ amazon_url = f"https://www.amazon.co.jp/s?k={search_term}"
|
|
|
+ row[search_term_index] = create_hyperlink(search_term, amazon_url)
|
|
|
+
|
|
|
+ return row
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error processing row: {e}")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
def save_csv(data, file_path):
|
|
|
- with open(file_path, 'w', encoding='utf-8-sig', newline='') as f:
|
|
|
- writer = csv.writer(f)
|
|
|
- writer.writerows(data)
|
|
|
+ try:
|
|
|
+ with open(file_path, 'w', encoding='utf-8-sig', newline='') as f:
|
|
|
+ writer = csv.writer(f)
|
|
|
+ writer.writerows(data)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error saving CSV to {file_path}: {e}")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
def main(input_file, output_file):
|
|
|
try:
|
|
|
@@ -81,23 +114,20 @@ def main(input_file, output_file):
|
|
|
# Process each row (skip header row)
|
|
|
for i, row in enumerate(data[1:], start=1):
|
|
|
try:
|
|
|
- print(f"\nProcessing row {i}")
|
|
|
+ logger.info(f"\nProcessing row {i}")
|
|
|
data[i] = process_row(row, search_term_index)
|
|
|
- print(f"Processed row {i} successfully")
|
|
|
+ logger.info(f"Processed row {i} successfully")
|
|
|
except Exception as e:
|
|
|
- print(f"Error processing row {i}: {str(e)}")
|
|
|
- # Insert empty translation column to maintain structure
|
|
|
- row.insert(search_term_index + 1, "翻译失败(处理错误)")
|
|
|
- data[i] = row
|
|
|
- continue
|
|
|
+ logger.error(f"Error processing row {i}: {str(e)}")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
# Save processed data
|
|
|
save_csv(data, output_file)
|
|
|
- print(f"Successfully processed and saved to {output_file}")
|
|
|
+ logger.info(f"Successfully processed and saved to {output_file}")
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"Error processing file: {e}")
|
|
|
- raise
|
|
|
+ logger.error(f"Error processing file: {e}")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
output_dir = Path('temp')
|