mqtt_onenet.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # -*- coding: utf-8 -*-
  2. # pip install paho-mqtt
  3. import paho.mqtt.client as mqtt
  4. import time
  5. from urllib.parse import quote
  6. from mylib import cal_token, payload_data
  7. import random
  8. SERVER = 'mqtts.heclouds.com'
  9. PRO_ID = "522417" # 产品ID
  10. DEV_NAME = 'local_python'
  11. # 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
  12. class MqttOneNet():
  13. def __init__(self, device_name) -> None:
  14. self.broker_addr = 'mqtts.heclouds.com'
  15. self.broker_port = 1883
  16. self.product_id = "522417"
  17. # 由 token 算法得出,有效期为 2023年5月31日01:41:21
  18. self.password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
  19. self.device_name = 'local_python'
  20. # 暂未用到
  21. self.accesskey = ""
  22. self.device_id = ""
  23. self.client_init()
  24. self.publish_dat()
  25. def publish_dat(self):
  26. topic_publish = '$sys/%s/%s/dp/post/json' % (
  27. self.product_id, self.device_name)
  28. count = 0
  29. while True:
  30. count += 1
  31. dat = self.read_dat()
  32. payload = self.payload_format(count, dat)
  33. self.client.publish(topic=topic_publish, payload=payload, qos=1)
  34. print("topic_publish: ", topic_publish)
  35. print("payload: ", payload)
  36. print("-------------------------------------------------------------------------------")
  37. time.sleep(3)
  38. def read_dat(self):
  39. dat = {
  40. "temperatrue": [{
  41. "v": random.randint(0, 100),
  42. }],
  43. "power": [{
  44. "v": random.randint(33, 55)/10,
  45. "t": time.time()
  46. }],
  47. "status": [{
  48. "v": {
  49. "color": "blue",
  50. "led1": "on",
  51. "led2": "off",
  52. }
  53. },
  54. {
  55. "v": {
  56. "color": "red"
  57. },
  58. "t": time.time()
  59. }
  60. ]
  61. }
  62. return dat
  63. '''
  64. publish 时,OneNet 对 payload 数据的格式要求:
  65. dictionaries 必须为字典
  66. 字典的 key 可以是 object
  67. 字典 value 必须是数组,数组元素必须是字典,哪怕数组只有一个元素。
  68. 参考文件: readme.md -> publish 消息
  69. 参考官网文档: https://open.iot.10086.cn/doc/v5/develop/detail/251
  70. '''
  71. @staticmethod
  72. def payload_format(ds_id, dictionaries: dict):
  73. message = {
  74. "id": int(ds_id),
  75. "dp": dictionaries
  76. }
  77. message = bytes("{}".format(message), 'ascii')
  78. print(message)
  79. return message
  80. def client_init(self):
  81. self.client = mqtt.Client(self.device_name, protocol=mqtt.MQTTv311)
  82. self.client.on_connect = self.on_connect
  83. self.client.on_publish = self.on_publish
  84. self.client.on_message = self.on_message
  85. self.client.connect(self.broker_addr, self.broker_port, keepalive=300)
  86. self.client.username_pw_set(self.product_id, self.password)
  87. self.client.loop_start()
  88. @staticmethod
  89. def on_connect(client, userdata, flags, rc):
  90. print("connet result:" + mqtt.connack_string(rc))
  91. @staticmethod
  92. def on_message(client, userdata, msg):
  93. print("on_message:", str(msg.payload, 'utf-8'))
  94. #当消息已经被发送给中间人,on_publish()回调将会被触发
  95. @staticmethod
  96. def on_publish(client, userdata, mid):
  97. print("on_publish: ", str(mid))
  98. if __name__ == '__main__':
  99. MqttOneNet(DEV_NAME)