esp_mqtt.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import usocket as socket
  2. import ustruct as struct
  3. #from ubinascii import hexlify
  4. class MQTTException(Exception):
  5. pass
  6. class MQTTClient:
  7. def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}):
  8. if port == 0:
  9. port = 8883 if ssl else 1883
  10. self.client_id = client_id
  11. self.sock = None
  12. self.addr = socket.getaddrinfo(server, port)[0][-1]
  13. self.ssl = ssl
  14. self.ssl_params = ssl_params
  15. self.pid = 0
  16. self.cb = None
  17. self.user = user
  18. self.pswd = password
  19. self.keepalive = keepalive
  20. self.lw_topic = None
  21. self.lw_msg = None
  22. self.lw_qos = 0
  23. self.lw_retain = False
  24. def _send_str(self, s):
  25. self.sock.write(struct.pack("!H", len(s)))
  26. self.sock.write(s)
  27. def _recv_len(self):
  28. n = 0
  29. sh = 0
  30. while 1:
  31. b = self.sock.read(1)[0]
  32. n |= (b & 0x7f) << sh
  33. if not b & 0x80:
  34. return n
  35. sh += 7
  36. def set_callback(self, f):
  37. self.cb = f
  38. def set_last_will(self, topic, msg, retain=False, qos=0):
  39. assert 0 <= qos <= 2
  40. assert topic
  41. self.lw_topic = topic
  42. self.lw_msg = msg
  43. self.lw_qos = qos
  44. self.lw_retain = retain
  45. def connect(self, clean_session=True):
  46. self.sock = socket.socket()
  47. self.sock.connect(self.addr)
  48. if self.ssl:
  49. import ussl
  50. self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
  51. msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
  52. msg[1] = 10 + 2 + len(self.client_id)
  53. msg[9] = clean_session << 1
  54. if self.user is not None:
  55. msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
  56. msg[9] |= 0xC0
  57. if self.keepalive:
  58. assert self.keepalive < 65536
  59. msg[10] |= self.keepalive >> 8
  60. msg[11] |= self.keepalive & 0x00FF
  61. if self.lw_topic:
  62. msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
  63. msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
  64. msg[9] |= self.lw_retain << 5
  65. self.sock.write(msg)
  66. #print(hex(len(msg)), hexlify(msg, ":"))
  67. self._send_str(self.client_id)
  68. if self.lw_topic:
  69. self._send_str(self.lw_topic)
  70. self._send_str(self.lw_msg)
  71. if self.user is not None:
  72. self._send_str(self.user)
  73. self._send_str(self.pswd)
  74. resp = self.sock.read(4)
  75. assert resp[0] == 0x20 and resp[1] == 0x02
  76. if resp[3] != 0:
  77. raise MQTTException(resp[3])
  78. return resp[2] & 1
  79. def disconnect(self):
  80. self.sock.write(b"\xe0\0")
  81. self.sock.close()
  82. def ping(self):
  83. self.sock.write(b"\xc0\0")
  84. def publish(self, topic, msg, retain=False, qos=0):
  85. pkt = bytearray(b"\x30\0\0\0")
  86. pkt[0] |= qos << 1 | retain
  87. sz = 2 + len(topic) + len(msg)
  88. if qos > 0:
  89. sz += 2
  90. assert sz < 2097152
  91. i = 1
  92. while sz > 0x7f:
  93. pkt[i] = (sz & 0x7f) | 0x80
  94. sz >>= 7
  95. i += 1
  96. pkt[i] = sz
  97. #print(hex(len(pkt)), hexlify(pkt, ":"))
  98. self.sock.write(pkt, i + 1)
  99. self._send_str(topic)
  100. if qos > 0:
  101. self.pid += 1
  102. pid = self.pid
  103. struct.pack_into("!H", pkt, 0, pid)
  104. self.sock.write(pkt, 2)
  105. self.sock.write(msg)
  106. if qos == 1:
  107. while 1:
  108. op = self.wait_msg()
  109. if op == 0x40:
  110. sz = self.sock.read(1)
  111. assert sz == b"\x02"
  112. rcv_pid = self.sock.read(2)
  113. rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
  114. if pid == rcv_pid:
  115. return
  116. elif qos == 2:
  117. assert 0
  118. def subscribe(self, topic, qos=0):
  119. assert self.cb is not None, "Subscribe callback is not set"
  120. pkt = bytearray(b"\x82\0\0\0")
  121. self.pid += 1
  122. struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
  123. #print(hex(len(pkt)), hexlify(pkt, ":"))
  124. self.sock.write(pkt)
  125. self._send_str(topic)
  126. self.sock.write(qos.to_bytes(1, "little"))
  127. while 1:
  128. op = self.wait_msg()
  129. if op == 0x90:
  130. resp = self.sock.read(4)
  131. #print(resp)
  132. assert resp[1] == pkt[2] and resp[2] == pkt[3]
  133. if resp[3] == 0x80:
  134. raise MQTTException(resp[3])
  135. return
  136. # Wait for a single incoming MQTT message and process it.
  137. # Subscribed messages are delivered to a callback previously
  138. # set by .set_callback() method. Other (internal) MQTT
  139. # messages processed internally.
  140. def wait_msg(self):
  141. res = self.sock.read(1)
  142. self.sock.setblocking(True)
  143. if res is None:
  144. return None
  145. if res == b"":
  146. raise OSError(-1)
  147. if res == b"\xd0": # PINGRESP
  148. sz = self.sock.read(1)[0]
  149. assert sz == 0
  150. return None
  151. op = res[0]
  152. if op & 0xf0 != 0x30:
  153. return op
  154. sz = self._recv_len()
  155. topic_len = self.sock.read(2)
  156. topic_len = (topic_len[0] << 8) | topic_len[1]
  157. topic = self.sock.read(topic_len)
  158. sz -= topic_len + 2
  159. if op & 6:
  160. pid = self.sock.read(2)
  161. pid = pid[0] << 8 | pid[1]
  162. sz -= 2
  163. msg = self.sock.read(sz)
  164. self.cb(topic, msg)
  165. if op & 6 == 2:
  166. pkt = bytearray(b"\x40\x02\0\0")
  167. struct.pack_into("!H", pkt, 2, pid)
  168. self.sock.write(pkt)
  169. elif op & 6 == 4:
  170. assert 0
  171. # Checks whether a pending message from server is available.
  172. # If not, returns immediately with None. Otherwise, does
  173. # the same processing as wait_msg.
  174. def check_msg(self):
  175. self.sock.setblocking(False)
  176. return self.wait_msg()