mrh 3 år sedan
incheckning
9029ffb7f3
7 ändrade filer med 533 tillägg och 0 borttagningar
  1. 122 0
      esp_http.py
  2. 190 0
      esp_mqtt.py
  3. 0 0
      led.py
  4. 36 0
      python38/OneNet_communicate-RaspberryPi.py
  5. 99 0
      python38/OneNet_communicate.py
  6. 69 0
      python38/OneNet_connect.py
  7. 17 0
      python38/readme.md

+ 122 - 0
esp_http.py

@@ -0,0 +1,122 @@
+import usocket
+
+class Response:
+
+  def __init__(self, f):
+    self.raw = f
+    self.encoding = "utf-8"
+    self._cached = None
+
+  def close(self):
+    if self.raw:
+      self.raw.close()
+      self.raw = None
+    self._cached = None
+
+  @property
+  def content(self):
+    if self._cached is None:
+      self._cached = self.raw.read()
+      self.raw.close()
+      self.raw = None
+    return self._cached
+
+  @property
+  def text(self):
+    return str(self.content, self.encoding)
+
+  def json(self):
+    import ujson
+    return ujson.loads(self.content)
+
+
+def request(method, url, data=None, json=None, headers={}, stream=None,params=None):
+  try:
+    proto, dummy, host, path = url.split("/", 3)
+  except ValueError:
+    proto, dummy, host = url.split("/", 2)
+    path = ""
+  if proto == "http:":
+    port = 80
+  elif proto == "https:":
+    import ussl
+    port = 443
+  else:
+    raise ValueError("Unsupported protocol: " + proto)
+
+  if ":" in host:
+    host, port = host.split(":", 1)
+    port = int(port)
+    
+  if params:
+    path = path + "?"
+    for k in params:
+      path = path + '&'+k+'='+params[k]
+
+  ai = usocket.getaddrinfo(host, port)
+  addr = ai[0][4]
+  s = usocket.socket()
+  s.connect(addr)
+  if proto == "https:":
+    s = ussl.wrap_socket(s)
+  s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
+  if not "Host" in headers:
+    s.write(b"Host: %s\r\n" % host)
+  # Iterate over keys to avoid tuple alloc
+  for k in headers:
+    s.write(k)
+    s.write(b": ")
+    s.write(headers[k])
+    s.write(b"\r\n")
+  if json is not None:
+    assert data is None
+    import ujson
+    data = ujson.dumps(json)
+  if data:
+    s.write(b"Content-Length: %d\r\n" % len(data))
+  s.write(b"\r\n")
+  if data:
+    s.write(data)
+
+  l = s.readline()
+  protover, status, msg = l.split(None, 2)
+  status = int(status)
+  #print(protover, status, msg)
+  while True:
+    l = s.readline()
+    if not l or l == b"\r\n":
+      break
+        #print(l)
+    if l.startswith(b"Transfer-Encoding:"):
+      if b"chunked" in l:
+        raise ValueError("Unsupported " + l)
+    elif l.startswith(b"Location:") and not 200 <= status <= 299:
+      raise NotImplementedError("Redirects not yet supported")
+
+  resp = Response(s)
+  resp.status_code = status
+  resp.reason = msg.rstrip()
+  return resp
+
+
+def head(url, **kw):
+  return request("HEAD", url, **kw)
+
+def get(url, **kw):
+  return request("GET", url, **kw)
+
+def post(url, **kw):
+  return request("POST", url, **kw)
+
+def put(url, **kw):
+  return request("PUT", url, **kw)
+
+def patch(url, **kw):
+  return request("PATCH", url, **kw)
+
+def delete(url, **kw):
+  return request("DELETE", url, **kw)
+
+
+
+

+ 190 - 0
esp_mqtt.py

@@ -0,0 +1,190 @@
+import usocket as socket
+import ustruct as struct
+#from ubinascii import hexlify
+
+class MQTTException(Exception):
+    pass
+
+class MQTTClient:
+
+  def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}):
+    if port == 0:
+      port = 8883 if ssl else 1883
+    self.client_id = client_id
+    self.sock = None
+    self.addr = socket.getaddrinfo(server, port)[0][-1]
+    self.ssl = ssl
+    self.ssl_params = ssl_params
+    self.pid = 0
+    self.cb = None
+    self.user = user
+    self.pswd = password
+    self.keepalive = keepalive
+    self.lw_topic = None
+    self.lw_msg = None
+    self.lw_qos = 0
+    self.lw_retain = False
+
+  def _send_str(self, s):
+    self.sock.write(struct.pack("!H", len(s)))
+    self.sock.write(s)
+
+  def _recv_len(self):
+    n = 0
+    sh = 0
+    while 1:
+      b = self.sock.read(1)[0]
+      n |= (b & 0x7f) << sh
+      if not b & 0x80:
+        return n
+      sh += 7
+
+  def set_callback(self, f):
+    self.cb = f
+
+  def set_last_will(self, topic, msg, retain=False, qos=0):
+    assert 0 <= qos <= 2
+    assert topic
+    self.lw_topic = topic
+    self.lw_msg = msg
+    self.lw_qos = qos
+    self.lw_retain = retain
+
+  def connect(self, clean_session=True):
+    self.sock = socket.socket()
+    self.sock.connect(self.addr)
+    if self.ssl:
+      import ussl
+      self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
+    msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
+    msg[1] = 10 + 2 + len(self.client_id)
+    msg[9] = clean_session << 1
+    if self.user is not None:
+      msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
+      msg[9] |= 0xC0
+    if self.keepalive:
+      assert self.keepalive < 65536
+      msg[10] |= self.keepalive >> 8
+      msg[11] |= self.keepalive & 0x00FF
+    if self.lw_topic:
+      msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
+      msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
+      msg[9] |= self.lw_retain << 5
+    self.sock.write(msg)
+    #print(hex(len(msg)), hexlify(msg, ":"))
+    self._send_str(self.client_id)
+    if self.lw_topic:
+      self._send_str(self.lw_topic)
+      self._send_str(self.lw_msg)
+    if self.user is not None:
+      self._send_str(self.user)
+      self._send_str(self.pswd)
+    resp = self.sock.read(4)
+    assert resp[0] == 0x20 and resp[1] == 0x02
+    if resp[3] != 0:
+      raise MQTTException(resp[3])
+    return resp[2] & 1
+
+  def disconnect(self):
+    self.sock.write(b"\xe0\0")
+    self.sock.close()
+
+  def ping(self):
+    self.sock.write(b"\xc0\0")
+
+  def publish(self, topic, msg, retain=False, qos=0):
+    pkt = bytearray(b"\x30\0\0\0")
+    pkt[0] |= qos << 1 | retain
+    sz = 2 + len(topic) + len(msg)
+    if qos > 0:
+      sz += 2
+    assert sz < 2097152
+    i = 1
+    while sz > 0x7f:
+      pkt[i] = (sz & 0x7f) | 0x80
+      sz >>= 7
+      i += 1
+    pkt[i] = sz
+    #print(hex(len(pkt)), hexlify(pkt, ":"))
+    self.sock.write(pkt, i + 1)
+    self._send_str(topic)
+    if qos > 0:
+      self.pid += 1
+      pid = self.pid
+      struct.pack_into("!H", pkt, 0, pid)
+      self.sock.write(pkt, 2)
+    self.sock.write(msg)
+    if qos == 1:
+      while 1:
+        op = self.wait_msg()
+        if op == 0x40:
+          sz = self.sock.read(1)
+          assert sz == b"\x02"
+          rcv_pid = self.sock.read(2)
+          rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
+          if pid == rcv_pid:
+            return
+    elif qos == 2:
+      assert 0
+
+  def subscribe(self, topic, qos=0):
+    assert self.cb is not None, "Subscribe callback is not set"
+    pkt = bytearray(b"\x82\0\0\0")
+    self.pid += 1
+    struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
+    #print(hex(len(pkt)), hexlify(pkt, ":"))
+    self.sock.write(pkt)
+    self._send_str(topic)
+    self.sock.write(qos.to_bytes(1, "little"))
+    while 1:
+      op = self.wait_msg()
+      if op == 0x90:
+        resp = self.sock.read(4)
+        #print(resp)
+        assert resp[1] == pkt[2] and resp[2] == pkt[3]
+        if resp[3] == 0x80:
+          raise MQTTException(resp[3])
+        return
+
+  # Wait for a single incoming MQTT message and process it.
+  # Subscribed messages are delivered to a callback previously
+  # set by .set_callback() method. Other (internal) MQTT
+  # messages processed internally.
+  def wait_msg(self):
+    res = self.sock.read(1)
+    self.sock.setblocking(True)
+    if res is None:
+      return None
+    if res == b"":
+      raise OSError(-1)
+    if res == b"\xd0":  # PINGRESP
+      sz = self.sock.read(1)[0]
+      assert sz == 0
+      return None
+    op = res[0]
+    if op & 0xf0 != 0x30:
+      return op
+    sz = self._recv_len()
+    topic_len = self.sock.read(2)
+    topic_len = (topic_len[0] << 8) | topic_len[1]
+    topic = self.sock.read(topic_len)
+    sz -= topic_len + 2
+    if op & 6:
+      pid = self.sock.read(2)
+      pid = pid[0] << 8 | pid[1]
+      sz -= 2
+    msg = self.sock.read(sz)
+    self.cb(topic, msg)
+    if op & 6 == 2:
+      pkt = bytearray(b"\x40\x02\0\0")
+      struct.pack_into("!H", pkt, 2, pid)
+      self.sock.write(pkt)
+    elif op & 6 == 4:
+      assert 0
+
+  # Checks whether a pending message from server is available.
+  # If not, returns immediately with None. Otherwise, does
+  # the same processing as wait_msg.
+  def check_msg(self):
+    self.sock.setblocking(False)
+    return self.wait_msg()

+ 0 - 0
led.py


+ 36 - 0
python38/OneNet_communicate-RaspberryPi.py

@@ -0,0 +1,36 @@
+if __name__ == '__main__':
+    # 配置树莓派GPIO引脚
+    GPIO.setmode(GPIO.BOARD)    # BOARD编号方式,基于插座引脚编号
+    GPIO.setup(11, GPIO.OUT)    # 输出模式
+
+    # 配置MQTT连接信息
+    client_id = DEV_NAME
+    username = PRO_ID
+    password = token(PRO_ID, DEV_NAME, DEV_KEY)
+    print('username:' + username)
+    print('password:' + password)
+    client = mqtt.Client(client_id=client_id, clean_session=True, protocol=mqtt.MQTTv311)
+    client.on_connect = on_connect
+    client.on_message = on_message
+    client.on_publish = on_publish
+    client.on_subscribe = on_subscribe
+    client.on_disconnect = on_disconnect
+    client.username_pw_set(username=username, password=password)
+    # client.tls_set(ca_certs='MQTTS-certificate.pem')              # 加密方式需要使用鉴权证书
+    # client.tls_insecure_set(True) #关验证
+    client.connect(HOST, int(PORT), keepalive=1200)
+
+    # 按照OneENT要求的格式,配置数据发布和订阅的主题
+    topic_dp = '$sys/%s/%s/dp/post/json/+' % (username, DEV_NAME)   # 设备上报数据主题
+    topic_cmd = '$sys/%s/%s/cmd/#' % (username, DEV_NAME)           # 设备接受命令主题
+    topic_cmds = '$sys/%s/%s/cmd/request/' % (username, DEV_NAME)   # 设备接受命令主题
+    topic_publish = '$sys/%s/%s/dp/post/json' %(username,DEV_NAME)
+    client.loop_start()
+
+    count = 0
+    while True:
+        count += 1
+        # 树莓派循环发布数据到OneNET
+        client.publish(topic=topic_publish, payload=data(count, random.randint(0,100)), qos=1)
+        print("-------------------------------------------------------------------------------")
+        time.sleep(3)

+ 99 - 0
python38/OneNet_communicate.py

@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# pip install paho-mqtt
+import paho.mqtt.client as mqtt
+import base64
+import hmac
+import time
+from urllib.parse import quote
+
+DEV_ID="950698937" #设备ID
+PRO_ID = "522417" #产品ID
+DEV_NAME='Air_Conditioning_IR'
+accesskey="dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs="
+
+def token(id,access_key):  #官方文档给出的核心秘钥计算算法
+
+    version = '2018-10-31'
+
+    res = 'products/%s' % id  # 通过产品ID访问产品API
+
+    # 用户自定义token过期时间
+    et = str(int(time.time()) + 3600)
+
+    # 签名方法,支持md5、sha1、sha256
+    method = 'sha1'
+
+    # 对access_key进行decode
+    key = base64.b64decode(access_key)
+
+    # 计算sign
+    org = et + '\n' + method + '\n' + res + '\n' + version
+    sign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)
+    sign = base64.b64encode(sign_b.digest()).decode()
+
+    # value 部分进行url编码,method/res/version值较为简单无需编码
+    sign = quote(sign, safe='')
+    res = quote(res, safe='')
+
+    # token参数拼接
+    token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)
+    return token
+
+# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
+def on_connect(client, userdata, flags, rc):
+    print("连接结果:" + mqtt.connack_string(rc))
+# 从服务器接收发布消息时的回调。
+def on_message(client, userdata, msg):
+    print(str(msg.payload,'utf-8'))
+#当消息已经被发送给中间人,on_publish()回调将会被触发
+def on_publish(client, userdata, mid):
+    print(str(mid))
+
+import random
+import json
+# 从树莓派发布到服务器的数据内容
+def data(ds_id,value):
+    message = {
+        "id": int(ds_id),
+        "dp": {
+            "home": [{      # 距离传感器采集的数据
+                "v": value
+            }],
+            "random": [{        # Python产生的随机数
+                "v": random.randint(20,80)
+            }]
+        }
+    }
+    # print(message)
+    message = json.dumps(message).encode('ascii')
+    return message
+
+def main():
+    passw=token(PRO_ID,accesskey)
+    print(passw)
+    username = PRO_ID
+    client = mqtt.Client(DEV_NAME,protocol=mqtt.MQTTv311)
+    #client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书
+    client.on_connect = on_connect
+    client.on_publish = on_publish
+    client.on_message = on_message
+    client.username_pw_set(PRO_ID, passw)
+    client.connect('183.230.40.96', port=1883, keepalive=1200)
+
+    # 按照OneENT要求的格式,配置数据发布和订阅的主题
+    topic_dp = '$sys/%s/%s/dp/post/json/+' % (username, DEV_NAME)   # 设备上报数据主题
+    topic_cmd = '$sys/%s/%s/cmd/#' % (username, DEV_NAME)           # 设备接受命令主题
+    topic_cmds = '$sys/%s/%s/cmd/request/' % (username, DEV_NAME)   # 设备接受命令主题
+    topic_publish = '$sys/%s/%s/dp/post/json' %(username,DEV_NAME)
+    client.loop_start()
+
+    count = 0
+    while True:
+        count += 1
+        # 树莓派循环发布数据到OneNET
+        client.publish(topic=topic_publish, payload=data(count, random.randint(0,100)), qos=1)
+        print("-------------------------------------------------------------------------------")
+        time.sleep(3)
+
+if __name__ == '__main__':
+    main()

+ 69 - 0
python38/OneNet_connect.py

@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+# pip install paho-mqtt
+import paho.mqtt.client as mqtt
+import base64
+import hmac
+import time
+from urllib.parse import quote
+
+DEV_ID="950698937" #设备ID
+PRO_ID = "522417" #产品ID
+DEV_NAME='Air_Conditioning_IR'
+accesskey="dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs="
+
+def token(id,access_key):  #官方文档给出的核心秘钥计算算法
+
+    version = '2018-10-31'
+
+    res = 'products/%s' % id  # 通过产品ID访问产品API
+
+    # 用户自定义token过期时间
+    et = str(int(time.time()) + 3600)
+
+    # 签名方法,支持md5、sha1、sha256
+    method = 'sha1'
+
+    # 对access_key进行decode
+    key = base64.b64decode(access_key)
+
+    # 计算sign
+    org = et + '\n' + method + '\n' + res + '\n' + version
+    sign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)
+    sign = base64.b64encode(sign_b.digest()).decode()
+
+    # value 部分进行url编码,method/res/version值较为简单无需编码
+    sign = quote(sign, safe='')
+    res = quote(res, safe='')
+
+    # token参数拼接
+    token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)
+    return token
+
+# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
+def on_connect(client, userdata, flags, rc):
+    print("连接结果:" + mqtt.connack_string(rc))
+# 从服务器接收发布消息时的回调。
+def on_message(client, userdata, msg):
+    print(str(msg.payload,'utf-8'))
+#当消息已经被发送给中间人,on_publish()回调将会被触发
+def on_publish(client, userdata, mid):
+    print(str(mid))
+
+def main():
+    passw=token(PRO_ID,accesskey)
+    print(passw)
+    client = mqtt.Client(DEV_NAME,protocol=mqtt.MQTTv311)
+    #client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书
+    client.on_connect = on_connect
+    client.on_publish = on_publish
+    client.on_message = on_message
+    client.connect('183.230.40.96', port=1883, keepalive=1200)
+    client.username_pw_set(PRO_ID, passw)
+    while True:
+        client.publish(topic='home', payload='fuck you', qos=0, retain=False)
+        time.sleep(2)
+        # client.on_publish("fuck you", "form python", 1)
+    # client.loop_forever()
+
+if __name__ == '__main__':
+    main()

+ 17 - 0
python38/readme.md

@@ -0,0 +1,17 @@
+# 连接OneNet
+
+## OneNet官方参考指南
+https://open.iot.10086.cn/doc/mqtt/book/device-develop/mqtt-c-demo.html
+
+
+## token算法
+https://open.iot.10086.cn/doc/mqtt/book/manual/auth/token.html
+
+## 网友参考
+https://blog.csdn.net/weixin_42410959/article/details/105897629?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165375763916782388012981%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=165375763916782388012981&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-105897629-null-null.142^v11^pc_search_result_control_group,157^v12^control&utm_term=python+mqtt+onenet&spm=1018.2226.3001.4187
+
+# mqtt通信
+由于上述只能连接,无法通信,参考其他文档:
+https://blog.csdn.net/ZHJ123CSDN/article/details/115137258?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165376451316782246419436%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=165376451316782246419436&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-10-115137258-null-null.142^v11^pc_search_result_control_group,157^v12^control&utm_term=OneNet+mqtt&spm=1018.2226.3001.4187
+
+在 OneNet_communicate-RaspberryPi.py 中有代码示例,但这是关于树莓派的,还需要移植一下,详见:OneNet_communicate.py