| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- # -*- coding: utf-8 -*-
- # pip install paho-mqtt
- import paho.mqtt.client as mqtt
- import time
- from urllib.parse import quote
- from mylib import cal_token, payload_data
- import random
- SERVER = 'mqtts.heclouds.com'
- PRO_ID = "522417" # 产品ID
- DEV_NAME = 'local_python'
- # 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
- class MqttOneNet():
- def __init__(self, device_name) -> None:
- self.broker_addr = 'mqtts.heclouds.com'
- self.broker_port = 1883
- self.product_id = "522417"
- # 由 token 算法得出,有效期为 2023年5月31日01:41:21
- self.password = "version=2018-10-31&res=products%2F522417&et=1685551369&method=sha1&sign=9l9gkFQx7A%2B7B3fu8uU%2F089azps%3D"
- self.device_name = 'local_python'
- # 暂未用到
- self.accesskey = ""
- self.device_id = ""
- self.client_init()
- self.publish_dat()
-
- def publish_dat(self):
- topic_publish = '$sys/%s/%s/dp/post/json' % (
- self.product_id, self.device_name)
- count = 0
- while True:
- count += 1
- dat = self.read_dat()
- payload = self.payload_format(count, dat)
- self.client.publish(topic=topic_publish, payload=payload, qos=1)
- print("topic_publish: ", topic_publish)
- print("payload: ", payload)
- print("-------------------------------------------------------------------------------")
- time.sleep(3)
-
- def read_dat(self):
- dat = {
- "temperatrue": [{
- "v": random.randint(0, 100),
- }],
- "power": [{
- "v": random.randint(33, 55)/10,
- "t": time.time()
- }],
- "status": [{
- "v": {
- "color": "blue",
- "led1": "on",
- "led2": "off",
- }
- },
- {
- "v": {
- "color": "red"
- },
- "t": time.time()
- }
- ]
- }
- return dat
- '''
- publish 时,OneNet 对 payload 数据的格式要求:
- dictionaries 必须为字典
- 字典的 key 可以是 object
- 字典 value 必须是数组,数组元素必须是字典,哪怕数组只有一个元素。
- 参考文件: readme.md -> publish 消息
- 参考官网文档: https://open.iot.10086.cn/doc/v5/develop/detail/251
- '''
- @staticmethod
- def payload_format(ds_id, dictionaries: dict):
- message = {
- "id": int(ds_id),
- "dp": dictionaries
- }
- message = bytes("{}".format(message), 'ascii')
- print(message)
- return message
-
- def client_init(self):
- self.client = mqtt.Client(self.device_name, protocol=mqtt.MQTTv311)
- self.client.on_connect = self.on_connect
- self.client.on_publish = self.on_publish
- self.client.on_message = self.on_message
- self.client.connect(self.broker_addr, self.broker_port, keepalive=300)
- self.client.username_pw_set(self.product_id, self.password)
- self.client.loop_start()
- @staticmethod
- def on_connect(client, userdata, flags, rc):
- print("connet result:" + mqtt.connack_string(rc))
- @staticmethod
- def on_message(client, userdata, msg):
- print("on_message:", str(msg.payload, 'utf-8'))
-
- #当消息已经被发送给中间人,on_publish()回调将会被触发
- @staticmethod
- def on_publish(client, userdata, mid):
- print("on_publish: ", str(mid))
- if __name__ == '__main__':
- MqttOneNet(DEV_NAME)
|