|
|
@@ -0,0 +1,171 @@
|
|
|
+from pathlib import Path
|
|
|
+import shutil
|
|
|
+
|
|
|
+
|
|
|
+def list_files(directory) -> list[Path]:
|
|
|
+ """
|
|
|
+ 遍历指定目录中的所有文件,返回文件路径列表。
|
|
|
+ :param directory: 要遍历的目录路径
|
|
|
+ :return: 文件路径列表
|
|
|
+ """
|
|
|
+ directory = Path(directory).resolve()
|
|
|
+ file_paths = []
|
|
|
+ for file in directory.rglob("*"):
|
|
|
+ if file.is_file():
|
|
|
+ file_paths.append(file)
|
|
|
+ return file_paths
|
|
|
+
|
|
|
+
|
|
|
+def parse_file_path(file_path):
|
|
|
+ """
|
|
|
+ 解析文件路径,提取 {keyword}、{id} 和文件类型(docling 或 filtered)。
|
|
|
+ :param file_path: 文件路径
|
|
|
+ :return: (keyword, id, file_type, extension) 或 None
|
|
|
+ """
|
|
|
+ # 获取相对路径
|
|
|
+ file_path = Path(file_path).resolve()
|
|
|
+ rel_path = file_path.relative_to(Path("output/copy_exported_files").resolve())
|
|
|
+ parts = rel_path.parts
|
|
|
+ if len(parts) < 2:
|
|
|
+ raise ValueError(f"Invalid file path: {file_path}")
|
|
|
+
|
|
|
+ keyword = parts[0]
|
|
|
+ filename = parts[-1]
|
|
|
+ name = Path(filename).stem
|
|
|
+ ext = Path(filename).suffix
|
|
|
+
|
|
|
+ # 检查是否为PDF文件
|
|
|
+ if ext.lower() == ".pdf":
|
|
|
+ return keyword, name, "pdf", ext
|
|
|
+
|
|
|
+ try:
|
|
|
+ id_part, file_type = name.rsplit("_", 1)
|
|
|
+ except ValueError:
|
|
|
+ print(f"Skipping invalid filename format: {filename}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ return keyword, id_part, file_type, ext
|
|
|
+
|
|
|
+
|
|
|
+def restore_files(source_dir, target_dir, dry_run=False):
|
|
|
+ """
|
|
|
+ 将 source_dir 中的文件恢复到 target_dir,按照目标目录结构组织。
|
|
|
+ :param source_dir: 源目录路径
|
|
|
+ :param target_dir: 目标目录路径
|
|
|
+ :param dry_run: 是否为 dry run 模式(仅打印操作而不实际复制)
|
|
|
+ """
|
|
|
+ # 列出所有文件
|
|
|
+ file_paths = list_files(source_dir)
|
|
|
+
|
|
|
+ for file_path in file_paths:
|
|
|
+ try:
|
|
|
+ # 检查文件扩展名
|
|
|
+ if file_path.suffix.lower() not in [".md", ".docx", ".pdf"]:
|
|
|
+ print(f"Skipping non-target file type: {file_path}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 解析文件路径
|
|
|
+ parsed = parse_file_path(file_path)
|
|
|
+ if not parsed:
|
|
|
+ continue
|
|
|
+
|
|
|
+ keyword, id_part, file_type, ext = parsed
|
|
|
+
|
|
|
+ # 构造目标路径
|
|
|
+ if file_type == "pdf":
|
|
|
+ target_subdir = Path(target_dir) / keyword / "crawled_urls"
|
|
|
+ target_file = target_subdir / f"{id_part}{ext}"
|
|
|
+ else:
|
|
|
+ target_subdir = Path(target_dir) / keyword / "html_convert"
|
|
|
+ target_file = target_subdir / f"{id_part}_{file_type}{ext}"
|
|
|
+
|
|
|
+ target_subdir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 检查目标文件是否存在
|
|
|
+ target_file_path = Path(target_file)
|
|
|
+ if target_file_path.exists():
|
|
|
+ print(f"File already exists, skipping: {target_file}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # dry run 模式
|
|
|
+ if dry_run:
|
|
|
+ print(f"[Dry Run] Would copy: {file_path} -> {target_file.absolute()}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 复制文件
|
|
|
+ shutil.copy(file_path, target_file)
|
|
|
+ print(f"Copied: {file_path} -> {target_file}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error processing {file_path}: {e}")
|
|
|
+
|
|
|
+
|
|
|
+def reverse_restore_files(source_dir, target_dir, dry_run=False):
|
|
|
+ """
|
|
|
+ 将 source_dir (results) 中的文件复制回 target_dir (copy_exported_files)
|
|
|
+ :param source_dir: 源目录路径 (results)
|
|
|
+ :param target_dir: 目标目录路径 (copy_exported_files)
|
|
|
+ :param dry_run: 是否为 dry run 模式(默认开启)
|
|
|
+ """
|
|
|
+ # 列出所有文件
|
|
|
+ file_paths = list_files(source_dir)
|
|
|
+
|
|
|
+ for file_path in file_paths[20000:]:
|
|
|
+ try:
|
|
|
+ # 检查文件扩展名
|
|
|
+ if file_path.suffix.lower() not in [".md", ".docx", ".pdf"]:
|
|
|
+ # print(f"Skipping non-target file type: {file_path}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 获取相对路径
|
|
|
+ rel_path = file_path.relative_to(Path(source_dir).resolve())
|
|
|
+ parts = rel_path.parts
|
|
|
+
|
|
|
+ if len(parts) < 3:
|
|
|
+ print(f"Skipping invalid path: {file_path}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ keyword = parts[0]
|
|
|
+ folder_type = parts[1]
|
|
|
+ filename = parts[-1]
|
|
|
+
|
|
|
+ if folder_type == "html_convert":
|
|
|
+ # 处理 html_convert 文件
|
|
|
+ target_file = Path(target_dir) / keyword / filename
|
|
|
+ elif folder_type == "crawled_urls":
|
|
|
+ # 处理 PDF 文件
|
|
|
+ target_file = Path(target_dir) / keyword / filename
|
|
|
+ else:
|
|
|
+ print(f"Unknown folder type: {folder_type}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 检查目标文件是否存在
|
|
|
+ if target_file.exists():
|
|
|
+ print(f"File already exists, skipping: {target_file}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # dry run 模式
|
|
|
+ if dry_run:
|
|
|
+ print(f"[Dry Run] Would copy: {file_path} -> {target_file}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 创建目标目录
|
|
|
+ target_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 复制文件
|
|
|
+ shutil.copy(file_path, target_file)
|
|
|
+ print(f"Copied: {file_path} -> {target_file.absolute()}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Error processing {file_path}: {e}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ # source_directory = "output/copy_exported_files"
|
|
|
+ # target_directory = "output/results"
|
|
|
+ # restore_files(source_directory, target_directory)
|
|
|
+
|
|
|
+ # Reverse copy
|
|
|
+ reverse_source_directory = "output/results"
|
|
|
+ reverse_target_directory = "output/copy_exported_files"
|
|
|
+ reverse_restore_files(reverse_source_directory, reverse_target_directory)
|