| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- # Hardware Platform: FireBeetle-ESP32
- # Result: input MQTTlibrary and remote controls LED by mqtt communication.
- import os
- from machine import Pin
- from machine import Timer
- import ustruct as struct
- from umqtt.simple import MQTTClient
- import re
- SERVER = 'mqtts.heclouds.com' # 服务器地址
- PRO_ID = '522417' # 产品ID
- DEV_NAME = 'esp8266' # 设备名称
- # accesskey = "dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs=" # 设备秘钥
- password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
- class MqttOneNet():
- def __init__(self, device_name=DEV_NAME) -> None:
- self.broker_addr = 'mqtts.heclouds.com'
- self.broker_port = 1883
- self.product_id = "522417"
- # 由 token 算法得出,有效期为 2023年5月31日01:41:21
- self.password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
- self.device_name = device_name
- # 暂未用到
- self.accesskey = ""
- self.device_id = ""
- self.client = None
- self.client_init()
- self.data_point_id = 0
- # self.upload_start(2000)
- def upload_start(self, ms):
- self.tim = Timer(-1)
- # 每 2000ms 定时回调一次 self.upload_data()
- self.tim.init(period=ms, mode=Timer.PERIODIC,
- callback=self.publish_timer_callback)
- print("start upload_data")
- def upload_stop(self):
- self.tim.deinit()
-
- def client_init(self):
- self.client = MQTTClient(client_id=self.device_name, server=self.broker_addr, port=self.broker_port, user=self.product_id,
- password=self.password, keepalive=360) # create a mqtt client
- self.client.connect() # connect mqtt
- print("mqtt connect")
- self.client.set_callback(self.client_msg_callback)
- # 每 50ms 定时回调一次检查消息是否到来
- self.tim = Timer(-2)
- self.tim.init(period=50, mode=Timer.PERIODIC,
- callback=self.check_mag_callback)
- # 订阅所有消息
- topic = f"$sys/{self.product_id}/{self.device_name}/#"
- self.client.subscribe(topic, qos=1)
- print("subscribed to %s topic" % (topic))
-
- print("client init finish")
- def check_mag_callback(self, timer):
- self.client.check_msg()
-
- # timer 是定时器回调函数需要的参数,实际上是定时器本身(self),如果没有这个参数,定时器会一直打印一个错误
- # http://docs.micropython.org/en/latest/library/machine.Timer.html#machine-timer
- def publish_timer_callback(self, timer):
- topic_publish = '$sys/%s/%s/dp/post/json' % (
- self.product_id, self.device_name)
- payload = self.get_upload_data()
-
- print("------------- publish ------------- \n")
- print("topic:%s\n" % topic_publish)
- print("payload:%s\n" % payload)
- self.publish(topic_publish, payload, qos=1)
- def response_cmd(self, topic_list, msg):
- if topic_list[4] == 'request':
- print("receive cmd:", msg)
-
- cmdid = topic_list[5]
- response_topic = f"$sys/{self.product_id}/{self.device_name}/cmd/response/{cmdid}"
- payload = "12gfok"
- print("------------- response ------------- \n")
- print("topic:%s\n" % response_topic)
- print("payload:%s\n" % payload)
- self.publish(response_topic, payload)
- def client_msg_callback(self, topic, msg):
- print("\n------------- receive ------------- \n")
- print("topic:%s\n" % topic)
- print("payload:%s\n" % msg)
-
- topic_list = str(topic, 'utf8').split('/')
- # 收到服务器命令:$sys/{pid}/{device-name}/cmd/request/{cmdid}
- if topic_list[3] == 'cmd':
- self.response_cmd(topic_list, msg)
- def publish(self, topic, msg, retain=False, qos=0):
- # 必须转换成bytes类型发送
- msg = bytes("{}".format(msg), 'utf8')
- self.client.publish(topic=topic, msg=msg, retain=retain, qos=qos)
-
- # 获取板子数据,用于上传到服务器
- def get_upload_data(self):
- self.data_point_id += 1
- message = {
- "id": int(self.data_point_id),
- "dp": {
- "sensor": [{ # 距离传感器采集的数据
- "v": {
- "temperature": self.get_temperature(),
- "humidity": self.get_humidity()
- }
- }],
- "system": [{
- "v": self.get_system()
- }]
- }
- }
- return message
- def get_temperature(self):
- return struct.unpack('@H', os.urandom(2))[0]
-
- def get_humidity(self):
- return struct.unpack('@H', os.urandom(2))[0]
- def get_system(self):
- info = {}
- info["power"]=os.urandom(1)[0] % 42
- gps_location = struct.unpack('@BBBB', os.urandom(4))
- info["GPS"] = "%d°%d'N',%d°%d'E" % (gps_location[0], gps_location[1], gps_location[2], gps_location[3])
- info["altitude"] = struct.unpack('@H', os.urandom(2))[0]
- return info
-
-
- # device = MqttOneNet()
- if __name__ == '__main__':
- try:
- mq = MqttOneNet()
- except Exception as e:
- print(e)
|