mrh 3 жил өмнө
parent
commit
c7f26da6ab

+ 67 - 180
esp_mqtt.py

@@ -1,190 +1,77 @@
-import usocket as socket
-import ustruct as struct
-#from ubinascii import hexlify
+# Hardware Platform: FireBeetle-ESP32
+# Result: input MQTTlibrary and remote controls LED by mqtt communication.
+import uos
+from umqtt.simple import MQTTClient
+from machine import Pin
+import network
+import time
 
-class MQTTException(Exception):
-    pass
+led = Pin(2, Pin.OUT, value=0)
 
-class MQTTClient:
+SERVER = 'mqtts.heclouds.com'  # 服务器地址
+PRO_ID = '522417'  # 产品ID
+DEV_NAME = 'esp8266'  # 设备名称
+accesskey = "dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs="  # 设备秘钥
 
-  def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}):
-    if port == 0:
-      port = 8883 if ssl else 1883
-    self.client_id = client_id
-    self.sock = None
-    self.addr = socket.getaddrinfo(server, port)[0][-1]
-    self.ssl = ssl
-    self.ssl_params = ssl_params
-    self.pid = 0
-    self.cb = None
-    self.user = user
-    self.pswd = password
-    self.keepalive = keepalive
-    self.lw_topic = None
-    self.lw_msg = None
-    self.lw_qos = 0
-    self.lw_retain = False
+TOPIC = b"esp-status"
+username = PRO_ID
+password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
 
-  def _send_str(self, s):
-    self.sock.write(struct.pack("!H", len(s)))
-    self.sock.write(s)
 
-  def _recv_len(self):
-    n = 0
-    sh = 0
-    while 1:
-      b = self.sock.read(1)[0]
-      n |= (b & 0x7f) << sh
-      if not b & 0x80:
-        return n
-      sh += 7
+def data(ds_id, value):
+    message = {
+        "id": int(ds_id),
+        "dp": {
+            "count": [{      # 距离传感器采集的数据
+                "v": value
+            }],
+            "random": [{        # Python产生的随机数
+                "v": uos.urandom(2)[0]
+            }],
+            "talk": [{        # Python产生的随机数
+                "v": "fuck you bitch!%d" % (uos.urandom(2)[0])
+            }]
+        }
+    }
+    # print("publish:", message)
+    # message = json.dumps(message).encode('ascii')
+    # print("publish json:", bytes("{}".format(message), 'ascii'))
+    message = bytes("{}".format(message), 'utf8')
+    return message
 
-  def set_callback(self, f):
-    self.cb = f
 
-  def set_last_will(self, topic, msg, retain=False, qos=0):
-    assert 0 <= qos <= 2
-    assert topic
-    self.lw_topic = topic
-    self.lw_msg = msg
-    self.lw_qos = qos
-    self.lw_retain = retain
+def my_mqtt():
+    client = None
+    # Catch exceptions,stop program if interrupted accidentally in the 'try'
+    try:
+        client = MQTTClient(client_id=DEV_NAME, server=SERVER, port=1883, user=username,
+                       password=password, keepalive=360)  # create a mqtt client
+        print("mqtt init")
+        client.connect()  # connect mqtt
+        print("mqtt connect")
+        # client.subscribe(TOPIC)  # client subscribes to a topic
+        # print("Connected to %s, subscribed to %s topic" % (SERVER, TOPIC))
 
-  def connect(self, clean_session=True):
-    self.sock = socket.socket()
-    self.sock.connect(self.addr)
-    if self.ssl:
-      import ussl
-      self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
-    msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
-    msg[1] = 10 + 2 + len(self.client_id)
-    msg[9] = clean_session << 1
-    if self.user is not None:
-      msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
-      msg[9] |= 0xC0
-    if self.keepalive:
-      assert self.keepalive < 65536
-      msg[10] |= self.keepalive >> 8
-      msg[11] |= self.keepalive & 0x00FF
-    if self.lw_topic:
-      msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
-      msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
-      msg[9] |= self.lw_retain << 5
-    self.sock.write(msg)
-    #print(hex(len(msg)), hexlify(msg, ":"))
-    self._send_str(self.client_id)
-    if self.lw_topic:
-      self._send_str(self.lw_topic)
-      self._send_str(self.lw_msg)
-    if self.user is not None:
-      self._send_str(self.user)
-      self._send_str(self.pswd)
-    resp = self.sock.read(4)
-    assert resp[0] == 0x20 and resp[1] == 0x02
-    if resp[3] != 0:
-      raise MQTTException(resp[3])
-    return resp[2] & 1
+        topic_publish = '$sys/%s/%s/dp/post/json' % (username, DEV_NAME)
 
-  def disconnect(self):
-    self.sock.write(b"\xe0\0")
-    self.sock.close()
+        count = 0
+        while True:
+            count += 1
+            value = uos.urandom(2)[0]
+            type(value)
+            payload = data(count, value)
+            client.publish(topic=topic_publish, msg=payload, qos=1)
+            print("topic_publish: ", topic_publish)
+            print("topic_publish: ", topic_publish)
+            print("payload: ", payload)
+            print("-------------------------------------------------------------------------------")
+            # client.wait_msg()  # wait message
+            time.sleep(3)
 
-  def ping(self):
-    self.sock.write(b"\xc0\0")
-
-  def publish(self, topic, msg, retain=False, qos=0):
-    pkt = bytearray(b"\x30\0\0\0")
-    pkt[0] |= qos << 1 | retain
-    sz = 2 + len(topic) + len(msg)
-    if qos > 0:
-      sz += 2
-    assert sz < 2097152
-    i = 1
-    while sz > 0x7f:
-      pkt[i] = (sz & 0x7f) | 0x80
-      sz >>= 7
-      i += 1
-    pkt[i] = sz
-    #print(hex(len(pkt)), hexlify(pkt, ":"))
-    self.sock.write(pkt, i + 1)
-    self._send_str(topic)
-    if qos > 0:
-      self.pid += 1
-      pid = self.pid
-      struct.pack_into("!H", pkt, 0, pid)
-      self.sock.write(pkt, 2)
-    self.sock.write(msg)
-    if qos == 1:
-      while 1:
-        op = self.wait_msg()
-        if op == 0x40:
-          sz = self.sock.read(1)
-          assert sz == b"\x02"
-          rcv_pid = self.sock.read(2)
-          rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
-          if pid == rcv_pid:
-            return
-    elif qos == 2:
-      assert 0
-
-  def subscribe(self, topic, qos=0):
-    assert self.cb is not None, "Subscribe callback is not set"
-    pkt = bytearray(b"\x82\0\0\0")
-    self.pid += 1
-    struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
-    #print(hex(len(pkt)), hexlify(pkt, ":"))
-    self.sock.write(pkt)
-    self._send_str(topic)
-    self.sock.write(qos.to_bytes(1, "little"))
-    while 1:
-      op = self.wait_msg()
-      if op == 0x90:
-        resp = self.sock.read(4)
-        #print(resp)
-        assert resp[1] == pkt[2] and resp[2] == pkt[3]
-        if resp[3] == 0x80:
-          raise MQTTException(resp[3])
-        return
-
-  # Wait for a single incoming MQTT message and process it.
-  # Subscribed messages are delivered to a callback previously
-  # set by .set_callback() method. Other (internal) MQTT
-  # messages processed internally.
-  def wait_msg(self):
-    res = self.sock.read(1)
-    self.sock.setblocking(True)
-    if res is None:
-      return None
-    if res == b"":
-      raise OSError(-1)
-    if res == b"\xd0":  # PINGRESP
-      sz = self.sock.read(1)[0]
-      assert sz == 0
-      return None
-    op = res[0]
-    if op & 0xf0 != 0x30:
-      return op
-    sz = self._recv_len()
-    topic_len = self.sock.read(2)
-    topic_len = (topic_len[0] << 8) | topic_len[1]
-    topic = self.sock.read(topic_len)
-    sz -= topic_len + 2
-    if op & 6:
-      pid = self.sock.read(2)
-      pid = pid[0] << 8 | pid[1]
-      sz -= 2
-    msg = self.sock.read(sz)
-    self.cb(topic, msg)
-    if op & 6 == 2:
-      pkt = bytearray(b"\x40\x02\0\0")
-      struct.pack_into("!H", pkt, 2, pid)
-      self.sock.write(pkt)
-    elif op & 6 == 4:
-      assert 0
-
-  # Checks whether a pending message from server is available.
-  # If not, returns immediately with None. Otherwise, does
-  # the same processing as wait_msg.
-  def check_msg(self):
-    self.sock.setblocking(False)
-    return self.wait_msg()
+    finally:
+        if(client is not None):
+            client.disconnect()
+            
+            
+if __name__ == '__main__':
+    my_mqtt()

+ 14 - 10
python38/OneNet_communicate.py

@@ -7,13 +7,15 @@ import time
 from urllib.parse import quote
 from OneNet_token import cal_token
 
-DEV_ID="950698937" #设备ID
+SERVER = 'mqtts.heclouds.com'
 PRO_ID = "522417" #产品ID
-DEV_NAME='Air_Conditioning_IR'
+DEV_NAME='local_python'
 accesskey="dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs="
-SERVER = 'mqtts.heclouds.com'
+# DEV_ID = "950698937"  # 设备ID
 
 
+username = PRO_ID
+password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
 # 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
 def on_connect(client, userdata, flags, rc):
     print("连接结果:" + mqtt.connack_string(rc))
@@ -41,13 +43,14 @@ def data(ds_id,value):
         }
     }
     # message = json.dumps(message).encode('ascii')
-    print("publish json:", bytes("{}".format(message), 'ascii'))
+    # print("publish json:", bytes("{}".format(message), 'ascii'))
     message = bytes("{}".format(message), 'ascii')
     return message
 
 def main():
-    passw = cal_token(PRO_ID, accesskey)
-    print(passw)
+    # password = cal_token(PRO_ID, accesskey)
+    # passw = password
+    print(password)
     username = PRO_ID
     client = mqtt.Client(DEV_NAME, protocol=mqtt.MQTTv311)
     #client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书
@@ -55,7 +58,7 @@ def main():
     client.on_publish = on_publish
     client.on_message = on_message
     client.connect(SERVER, port=1883, keepalive=300)
-    client.username_pw_set(PRO_ID, passw)
+    client.username_pw_set(PRO_ID, password)
 
     # 按照OneENT要求的格式,配置数据发布和订阅的主题
     topic_dp = '$sys/%s/%s/dp/post/json/+' % (username, DEV_NAME)   # 设备上报数据主题
@@ -63,12 +66,13 @@ def main():
     topic_cmds = '$sys/%s/%s/cmd/request/' % (username, DEV_NAME)   # 设备接受命令主题
     topic_publish = '$sys/%s/%s/dp/post/json' %(username,DEV_NAME)
     client.loop_start()
-
     count = 0
     while True:
         count += 1
-        # 树莓派循环发布数据到OneNET
-        client.publish(topic=topic_publish, payload=data(count, random.randint(0,100)), qos=1)
+        payload = data(count, random.randint(0, 100))
+        client.publish(topic=topic_publish, payload=payload, qos=1)
+        print("topic_publish: ", topic_publish)
+        print("payload: ", payload)
         print("-------------------------------------------------------------------------------")
         time.sleep(3)
 

+ 6 - 0
python38/readme.md

@@ -1,3 +1,9 @@
+# 开发板
+参考安信可官网:
+https://docs.ai-thinker.com/esp8266/boards/nodemcu
+
+https://docs.ai-thinker.com/_media/esp8266/boards/nodemcu8266_v1.2_e8_a7_84_e6_a0_bc_e4_b9_a6.pdf
+
 # 连接OneNet
 
 ## OneNet官方参考指南