prefect_s3_block.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import asyncio
  2. import base64
  3. import os
  4. import pickle
  5. from uuid import UUID
  6. import uuid
  7. prefect_api = f"http://10.0.0.1:8612/api"
  8. # 在导入 prefect 之前设置环境变量
  9. os.environ["PREFECT_API_URL"] = prefect_api
  10. from prefect import flow,task
  11. from prefect.filesystems import LocalFileSystem, RemoteFileSystem
  12. from prefect.results import PersistedResult
  13. import os
  14. import sys
  15. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  16. from conf.config import logger,OUTPUT
  17. from prefect.states import State
  18. import database.config
  19. @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
  20. def task_flow():
  21. print(task_flow.result_serializer)
  22. print(task_flow.result_storage_key)
  23. return "task_flow"
  24. @flow(persist_result=True)
  25. def miniofilesystem(msg:str="none"):
  26. logger.info(f"{miniofilesystem}")
  27. print(f"Hello, {msg}! minio quickstart flow is running! ")
  28. # task_flow()
  29. return f"Hello, {msg}! "
  30. def read_loacal_result():
  31. res = PersistedResult(type= 'reference', artifact_type='result', serializer_type= 'pickle', storage_block_id=UUID('767df9e7-2669-4270-904d-ebdd43aaf30d'), storage_key='c0849a099d6e467a8c6a20093ba78226')
  32. logger.info(res)
  33. logger.info(res.get())
  34. base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow="
  35. base64_data = "gAWVJAAAAAAAAACMIEhlbGxvLCBsb2FjYWwgZmlsZXN5c3RlbSB0ZXN0ISAglC4=\n"
  36. decoded_data = base64.b64decode(base64_data)
  37. result = pickle.loads(decoded_data)
  38. print(result)
  39. def load_block_data():
  40. minio_block = RemoteFileSystem(
  41. basepath="s3://swl",
  42. settings={
  43. "key": os.environ["s3_access_key"],
  44. "secret": os.environ["s3_secret_key"],
  45. "client_kwargs": {"endpoint_url": os.environ["s3_endpoint"]} ,
  46. },)
  47. block_document, block_document_name = asyncio.run(minio_block._get_block_document("minio"))
  48. logger.info(f"{block_document} {type(block_document)}")
  49. logger.info(f"{block_document.data}")
  50. logger.info(f"{block_document_name} {type(block_document_name)}")
  51. # block_document, block_document_name = asyncio.run(minio_block._get_block_document("anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6"))
  52. def minio_file_system_test():
  53. logger.info(f"s3_endpoint " + os.environ["s3_endpoint"])
  54. # https://docs.prefect.io/latest/concepts/filesystems/#s3
  55. minio_block = RemoteFileSystem(
  56. basepath="s3://swl",
  57. settings={
  58. "key": os.environ["s3_access_key"],
  59. "secret": os.environ["s3_secret_key"],
  60. "client_kwargs": {"endpoint_url": 'http://' + os.environ["s3_endpoint"]} ,
  61. },)
  62. minio_block.write_path("foo", b"hello")
  63. res = minio_block.read_path("foo")
  64. logger.info(f"{res}")
  65. # base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow="
  66. # data = minio_block.parse_obj(base64_data)
  67. # logger.info(f"{data}")
  68. # minio_block.save("minio")
  69. return
  70. new_flow = miniofilesystem.with_options(result_storage=minio_block)
  71. logger.info(f"new_flow {new_flow}")
  72. res = new_flow(msg="loacal filesystem test",return_state=True)
  73. logger.info(f"res {res}")
  74. logger.info(f"name {res.name}")
  75. logger.info(f"state_details {res.state_details}")
  76. # data
  77. logger.info(f"data {res.data} {type(res.data)}")
  78. # res.data.get()
  79. logger.info(f"data {res}")
  80. def main():
  81. # load_block_data()
  82. minio_file_system_test()
  83. if __name__ == "__main__":
  84. main()