esp_mqtt.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # Hardware Platform: FireBeetle-ESP32
  2. # Result: input MQTTlibrary and remote controls LED by mqtt communication.
  3. import os
  4. from machine import Pin
  5. from machine import Timer
  6. import ustruct as struct
  7. from umqtt.simple import MQTTClient
  8. import re
  9. SERVER = 'mqtts.heclouds.com' # 服务器地址
  10. PRO_ID = '522417' # 产品ID
  11. DEV_NAME = 'esp8266' # 设备名称
  12. # accesskey = "dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs=" # 设备秘钥
  13. password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
  14. class MqttOneNet():
  15. def __init__(self, device_name=DEV_NAME) -> None:
  16. self.broker_addr = 'mqtts.heclouds.com'
  17. self.broker_port = 1883
  18. self.product_id = "522417"
  19. # 由 token 算法得出,有效期为 2023年5月31日01:41:21
  20. self.password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
  21. self.device_name = device_name
  22. # 暂未用到
  23. self.accesskey = ""
  24. self.device_id = ""
  25. self.client = None
  26. self.client_init()
  27. self.data_point_id = 0
  28. # self.upload_start(2000)
  29. def upload_start(self, ms):
  30. self.tim = Timer(-1)
  31. # 每 2000ms 定时回调一次 self.upload_data()
  32. self.tim.init(period=ms, mode=Timer.PERIODIC,
  33. callback=self.publish_timer_callback)
  34. print("start upload_data")
  35. def upload_stop(self):
  36. self.tim.deinit()
  37. def client_init(self):
  38. self.client = MQTTClient(client_id=self.device_name, server=self.broker_addr, port=self.broker_port, user=self.product_id,
  39. password=self.password, keepalive=360) # create a mqtt client
  40. self.client.connect() # connect mqtt
  41. print("mqtt connect")
  42. self.client.set_callback(self.client_msg_callback)
  43. # 每 50ms 定时回调一次检查消息是否到来
  44. self.tim = Timer(-2)
  45. self.tim.init(period=50, mode=Timer.PERIODIC,
  46. callback=self.check_mag_callback)
  47. # 订阅所有消息
  48. topic = f"$sys/{self.product_id}/{self.device_name}/#"
  49. self.client.subscribe(topic, qos=1)
  50. print("subscribed to %s topic" % (topic))
  51. print("client init finish")
  52. def check_mag_callback(self, timer):
  53. self.client.check_msg()
  54. # timer 是定时器回调函数需要的参数,实际上是定时器本身(self),如果没有这个参数,定时器会一直打印一个错误
  55. # http://docs.micropython.org/en/latest/library/machine.Timer.html#machine-timer
  56. def publish_timer_callback(self, timer):
  57. topic_publish = '$sys/%s/%s/dp/post/json' % (
  58. self.product_id, self.device_name)
  59. payload = self.get_upload_data()
  60. print("------------- publish ------------- \n")
  61. print("topic:%s\n" % topic_publish)
  62. print("payload:%s\n" % payload)
  63. self.publish(topic_publish, payload, qos=1)
  64. def response_cmd(self, topic_list, msg):
  65. if topic_list[4] == 'request':
  66. print("receive cmd:", msg)
  67. cmdid = topic_list[5]
  68. response_topic = f"$sys/{self.product_id}/{self.device_name}/cmd/response/{cmdid}"
  69. payload = "12gfok"
  70. print("------------- response ------------- \n")
  71. print("topic:%s\n" % response_topic)
  72. print("payload:%s\n" % payload)
  73. self.publish(response_topic, payload)
  74. def client_msg_callback(self, topic, msg):
  75. print("\n------------- receive ------------- \n")
  76. print("topic:%s\n" % topic)
  77. print("payload:%s\n" % msg)
  78. topic_list = str(topic, 'utf8').split('/')
  79. # 收到服务器命令:$sys/{pid}/{device-name}/cmd/request/{cmdid}
  80. if topic_list[3] == 'cmd':
  81. self.response_cmd(topic_list, msg)
  82. def publish(self, topic, msg, retain=False, qos=0):
  83. # 必须转换成bytes类型发送
  84. msg = bytes("{}".format(msg), 'utf8')
  85. self.client.publish(topic=topic, msg=msg, retain=retain, qos=qos)
  86. # 获取板子数据,用于上传到服务器
  87. def get_upload_data(self):
  88. self.data_point_id += 1
  89. message = {
  90. "id": int(self.data_point_id),
  91. "dp": {
  92. "sensor": [{ # 距离传感器采集的数据
  93. "v": {
  94. "temperature": self.get_temperature(),
  95. "humidity": self.get_humidity()
  96. }
  97. }],
  98. "system": [{
  99. "v": self.get_system()
  100. }]
  101. }
  102. }
  103. return message
  104. def get_temperature(self):
  105. return struct.unpack('@H', os.urandom(2))[0]
  106. def get_humidity(self):
  107. return struct.unpack('@H', os.urandom(2))[0]
  108. def get_system(self):
  109. info = {}
  110. info["power"]=os.urandom(1)[0] % 42
  111. gps_location = struct.unpack('@BBBB', os.urandom(4))
  112. info["GPS"] = "%d°%d'N',%d°%d'E" % (gps_location[0], gps_location[1], gps_location[2], gps_location[3])
  113. info["altitude"] = struct.unpack('@H', os.urandom(2))[0]
  114. return info
  115. # device = MqttOneNet()
  116. if __name__ == '__main__':
  117. try:
  118. mq = MqttOneNet()
  119. except Exception as e:
  120. print(e)