esp_mqtt.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # Hardware Platform: FireBeetle-ESP32
  2. # Result: input MQTTlibrary and remote controls LED by mqtt communication.
  3. import os
  4. from machine import Pin
  5. from machine import Timer
  6. import ustruct as struct
  7. from umqtt.simple import MQTTClient
  8. led = Pin(2, Pin.OUT, value=0)
  9. SERVER = 'mqtts.heclouds.com' # 服务器地址
  10. PRO_ID = '522417' # 产品ID
  11. DEV_NAME = 'esp8266' # 设备名称
  12. # accesskey = "dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs=" # 设备秘钥
  13. password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
  14. class MqttOneNet():
  15. def __init__(self, device_name=DEV_NAME) -> None:
  16. self.broker_addr = 'mqtts.heclouds.com'
  17. self.broker_port = 1883
  18. self.product_id = "522417"
  19. # 由 token 算法得出,有效期为 2023年5月31日01:41:21
  20. self.password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
  21. self.device_name = device_name
  22. # 暂未用到
  23. self.accesskey = ""
  24. self.device_id = ""
  25. self.client = self.client_init()
  26. self.data_point_id = 0
  27. self.upload_init()
  28. def upload_init(self):
  29. self.data_point_id = 0
  30. self.tim = Timer(-1)
  31. # 每 2000ms 定时回调一次 self.upload_data()
  32. self.tim.init(period=2000, mode=Timer.PERIODIC,
  33. callback=self.publish_timer_callback)
  34. print("init finish, start upload_data")
  35. def client_init(self):
  36. client = MQTTClient(client_id=self.device_name, server=self.broker_addr, port=self.broker_port, user=self.product_id,
  37. password=self.password, keepalive=360) # create a mqtt client
  38. client.connect() # connect mqtt
  39. print("mqtt connect")
  40. client.set_callback(self.client_msg_callback)
  41. # 订阅所有消息
  42. topic = f"$sys/{self.product_id}/{self.device_name}/#"
  43. client.subscribe(topic)
  44. print("subscribed to %s topic" % (topic))
  45. return client
  46. # timer 是定时器回调函数需要的参数,实际上是定时器本身(self),如果没有这个参数,定时器会一直打印一个错误
  47. # http://docs.micropython.org/en/latest/library/machine.Timer.html#machine-timer
  48. def publish_timer_callback(self, timer):
  49. topic_publish = '$sys/%s/%s/dp/post/json' % (
  50. self.product_id, self.device_name)
  51. payload = self.get_upload_data()
  52. print("------------- publish ------------- \n")
  53. print("topic:%s\n" % topic_publish)
  54. print("payload:%s\n" % payload)
  55. self.client.publish(topic=topic_publish, msg=payload, qos=1)
  56. def client_msg_callback(self, topic, msg):
  57. print("\n------------- receive ------------- \n")
  58. print("topic:%s\n" % topic)
  59. print("payload:%s\n" % msg)
  60. # 获取板子数据,用于上传到服务器
  61. def get_upload_data(self):
  62. self.data_point_id += 1
  63. message = {
  64. "id": int(self.data_point_id),
  65. "dp": {
  66. "sensor": [{ # 距离传感器采集的数据
  67. "v": {
  68. "temperature": self.get_temperature(),
  69. "humidity": self.get_humidity()
  70. }
  71. }],
  72. "system": [{
  73. "v": self.get_system()
  74. }]
  75. }
  76. }
  77. message = bytes("{}".format(message), 'utf8')
  78. return message
  79. def get_temperature(self):
  80. return struct.unpack('@H', os.urandom(2))[0]
  81. def get_humidity(self):
  82. return struct.unpack('@H', os.urandom(2))[0]
  83. def get_system(self):
  84. info = {}
  85. info["power"]=os.urandom(1)[0] % 42
  86. gps_location = struct.unpack('@BBBB', os.urandom(4))
  87. info["GPS"] = "%d°%d'N',%d°%d'E" % (gps_location[0], gps_location[1], gps_location[2], gps_location[3])
  88. info["altitude"] = struct.unpack('@H', os.urandom(2))[0]
  89. return info
  90. # device = MqttOneNet()
  91. if __name__ == '__main__':
  92. try:
  93. MqttOneNet()
  94. except Exception as e:
  95. print(e)