base.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import base64
  2. import datetime
  3. import json
  4. import pickle
  5. import os
  6. import sys
  7. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  8. from conf.config import OUTPUT,PAGE_OUTPUT,logger
  9. from database.config import minio_block
  10. import prefect.runtime
  11. from dp.page import page
  12. import time
  13. from datetime import datetime
  14. from prefect import runtime
  15. from database.config import minio_client
  16. from database.s3 import S3Object,S3
  17. HOME_URL = 'https://www.douyin.com/user/self'
  18. tab=page.get_tab(url=HOME_URL)
  19. def get_tab(tab_id=None):
  20. if tab_id:
  21. return page.get_tab(tab_id)
  22. else:
  23. tab_id = runtime.flow_run.parameters.get('tab_id', None)
  24. if tab_id:
  25. return page.get_tab(tab_id)
  26. else:
  27. return page.tab
  28. def get_object_name_by_time():
  29. # 获取时分秒毫秒,并且符合路径的格式,如 023213_123.json
  30. now = datetime.now()
  31. # 格式化时间:小时、分钟、秒、毫秒
  32. formatted_time = now.strftime("%Y%m%d/%H%M%S_%f") # %f 提供了微秒,所以我们取前三个数字作为毫秒
  33. return formatted_time + '-{task_run.task_name}'
  34. def get_result(path:str):
  35. if path.startswith(minio_block.basepath):
  36. path = path[len(minio_block.basepath):]
  37. bytes = minio_block.read_path(path)
  38. json_data = json.loads(bytes)
  39. base64_data = json_data['data']
  40. decoded_data = base64.b64decode(base64_data)
  41. result = pickle.loads(decoded_data)
  42. return result
  43. def save_html_to_s3(file_name):
  44. page_dir = Path(r'I:\code\ai-yunying\live-online-people\output\page\\')
  45. file_path = page_dir/file_name
  46. f = open(file_path, 'w')
  47. f.write(tab.html)
  48. s3minio = S3(bucket='public',client=minio_client)
  49. base_path = '/md/ai-yunying/'
  50. obj_name = base_path+ s3minio.get_object_name_by_time() + '-'+ file_name
  51. res = s3minio.fput(r'I:\code\ai-yunying\live-online-people\output\page\\'+file_name, obj_name)
  52. logger.info(f"{res.bucket_name } {res.object_name} {res._http_headers}")
  53. import pathlib
  54. def save_page_info(file_name='', tab_id=None, local=True,s3=False):
  55. base_time_dir = pathlib.Path(datetime.now().strftime("%Y-%m-%d"))
  56. tab = get_tab(tab_id)
  57. png = tab.get_screenshot(as_bytes=True)
  58. if local:
  59. save_dir = OUTPUT/base_time_dir
  60. logger.info(f"{save_dir}")
  61. if not os.path.exists(save_dir):
  62. os.makedirs(save_dir)
  63. with open(save_dir/f'{file_name}.html', 'w') as f:
  64. f.write(tab.html)
  65. with open(save_dir/f'{file_name}.png', 'wb') as f:
  66. f.write(png)
  67. if s3:
  68. s3minio = S3(bucket='swl',client=minio_client)
  69. # base_path = pathlib.Path('/log') / base_time_dir
  70. # obj_name = base_path+ s3minio.get_object_name_by_time() + '-'+ file_name
  71. res = s3minio.put(tab.html, f'{file_name}.html')
  72. logger.info(f"{res.bucket_name } {res.object_name} {res._http_headers}")
  73. res = s3minio.put(png, f'{file_name}.png')
  74. return base_time_dir
  75. def main():
  76. save_page_info(file_name='点击陌生人对话框', tab_id='A017888A62FE53FAD9D85ED2662FEA34')
  77. if __name__ == "__main__":
  78. main()