base.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import asyncio
  2. import base64
  3. from crawl4ai import *
  4. from pathlib import Path
  5. import json
  6. import pickle
  7. OUTPUT_DIR = Path("output").absolute()
  8. from crawl4ai.async_configs import BrowserConfig
  9. # Using proxy URL
  10. browser_config = BrowserConfig(
  11. # proxy="http://localhost:1881",
  12. headless=False,
  13. # user_agent_mode="random",
  14. # use_managed_browser=True,
  15. verbose=True,
  16. # use_persistent_context=True,
  17. user_data_dir=OUTPUT_DIR / "user_data_dir"
  18. )
  19. # 如果目录不存在则创建
  20. def ensure_output_dir(output_dir: Path):
  21. if not output_dir.exists():
  22. output_dir.mkdir(parents=True, exist_ok=True)
  23. # 保存到文件
  24. def save_to_file(content, filename:Path):
  25. ensure_output_dir(filename.parent)
  26. if not isinstance(content, str):
  27. # 如果可以用 json 格式化,则格式化
  28. try:
  29. content = json.dumps(content, indent=4, ensure_ascii=False)
  30. except:
  31. # 如果不是 str ,则格式化
  32. if not isinstance(content, str):
  33. content = str(content)
  34. with open(filename, "w", encoding="utf-8") as file:
  35. file.write(content)
  36. def save_to_pickle(obj, filename):
  37. with open(filename, "wb") as file:
  38. pickle.dump(obj, file)
  39. return filename
  40. def save_base64_to_file(base64_str, file_path):
  41. with open(file_path, 'wb') as file:
  42. file.write(base64.b64decode(base64_str))
  43. return file_path
  44. def load_from_pickle(filename):
  45. with open(filename, "rb") as file:
  46. return pickle.load(file)
  47. def save_all_result(result:CrawlResult, output_dir):
  48. # save result obj to pickle
  49. save_to_pickle(result, output_dir / "result.pickle")
  50. result_attributes = dir(result)
  51. for attr in result_attributes:
  52. if hasattr(result, attr) and not attr.startswith("__"):
  53. format_str = f"{attr}: {result.__getattribute__(attr)}"
  54. save_to_file(format_str, output_dir / f"{attr}.txt")
  55. # 如果字符串有空格,替换成加号
  56. def replace_space(search_key:str):
  57. if search_key.find(" ") != -1:
  58. return search_key.replace(" ", "+")
  59. return search_key