base.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import asyncio
  2. import base64
  3. from crawl4ai import *
  4. from pathlib import Path
  5. import json
  6. import pickle
  7. from crawl4ai.async_configs import BrowserConfig
  8. # Using proxy URL
  9. browser_config = BrowserConfig(
  10. # proxy="http://localhost:1881",
  11. headless=False,
  12. # user_agent_mode="random",
  13. # use_managed_browser=True,
  14. verbose=True,
  15. # use_persistent_context=True,
  16. # user_data_dir=OUTPUT_DIR / "user_data_dir",
  17. viewport_height=1080,
  18. viewport_width=1920,
  19. )
  20. # 如果目录不存在则创建
  21. def ensure_output_dir(output_dir: Path):
  22. if not output_dir.exists():
  23. output_dir.mkdir(parents=True, exist_ok=True)
  24. # 保存到文件
  25. def save_to_file(content, filename:Path):
  26. ensure_output_dir(filename.parent)
  27. if not isinstance(content, str):
  28. # 如果可以用 json 格式化,则格式化
  29. try:
  30. content = json.dumps(content, indent=4, ensure_ascii=False)
  31. except:
  32. # 如果不是 str ,则格式化
  33. if not isinstance(content, str):
  34. content = str(content)
  35. with open(filename, "w", encoding="utf-8") as file:
  36. file.write(content)
  37. return filename
  38. def save_to_pickle(obj, filename):
  39. with open(filename, "wb") as file:
  40. pickle.dump(obj, file)
  41. return filename
  42. def save_base64_to_file(base64_str, file_path):
  43. with open(file_path, 'wb') as file:
  44. file.write(base64.b64decode(base64_str))
  45. return file_path
  46. def load_from_pickle(filename):
  47. with open(filename, "rb") as file:
  48. return pickle.load(file)
  49. def save_all_result(result:CrawlResult, output_dir):
  50. # save result obj to pickle
  51. save_to_pickle(result, output_dir / "result.pickle")
  52. result_attributes = dir(result)
  53. for attr in result_attributes:
  54. if hasattr(result, attr) and not attr.startswith("__"):
  55. format_str = f"{attr}: {result.__getattribute__(attr)}"
  56. save_to_file(format_str, output_dir / f"{attr}.txt")
  57. # 如果字符串有空格,替换成加号
  58. def replace_space(search_key:str):
  59. if search_key.find(" ") != -1:
  60. return search_key.replace(" ", "+")
  61. return search_key