| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # -*- coding: utf-8 -*-
- # pip install paho-mqtt
- import paho.mqtt.client as mqtt
- import base64
- import hmac
- import time
- from urllib.parse import quote
- DEV_ID="950698937" #设备ID
- PRO_ID = "522417" #产品ID
- DEV_NAME='Air_Conditioning_IR'
- accesskey="dgEP7iOL2/1kVNc1ykDvIhXSVqhb0tLnKinjU2RYpDs="
- def token(id,access_key): #官方文档给出的核心秘钥计算算法
- version = '2018-10-31'
- res = 'products/%s' % id # 通过产品ID访问产品API
- # 用户自定义token过期时间
- et = str(int(time.time()) + 3600)
- # 签名方法,支持md5、sha1、sha256
- method = 'sha1'
- # 对access_key进行decode
- key = base64.b64decode(access_key)
- # 计算sign
- org = et + '\n' + method + '\n' + res + '\n' + version
- sign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)
- sign = base64.b64encode(sign_b.digest()).decode()
- # value 部分进行url编码,method/res/version值较为简单无需编码
- sign = quote(sign, safe='')
- res = quote(res, safe='')
- # token参数拼接
- token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)
- return token
- # 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
- def on_connect(client, userdata, flags, rc):
- print("连接结果:" + mqtt.connack_string(rc))
- # 从服务器接收发布消息时的回调。
- def on_message(client, userdata, msg):
- print(str(msg.payload,'utf-8'))
- #当消息已经被发送给中间人,on_publish()回调将会被触发
- def on_publish(client, userdata, mid):
- print(str(mid))
- import random
- import json
- # 从树莓派发布到服务器的数据内容
- def data(ds_id,value):
- message = {
- "id": int(ds_id),
- "dp": {
- "count": [{ # 距离传感器采集的数据
- "v": value
- }],
- "random": [{ # Python产生的随机数
- "v": random.randint(20,80)
- }],
- "talk": [{ # Python产生的随机数
- "v": "fuck you bitch!%d"%(value*random.randint(20,80))
- }]
- }
- }
- # print(message)
- message = json.dumps(message).encode('ascii')
- return message
- def main():
- passw=token(PRO_ID,accesskey)
- print(passw)
- username = PRO_ID
- client = mqtt.Client(DEV_NAME,protocol=mqtt.MQTTv311)
- #client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书
- client.on_connect = on_connect
- client.on_publish = on_publish
- client.on_message = on_message
- client.username_pw_set(PRO_ID, passw)
- client.connect('183.230.40.96', port=1883, keepalive=1200)
- # 按照OneENT要求的格式,配置数据发布和订阅的主题
- topic_dp = '$sys/%s/%s/dp/post/json/+' % (username, DEV_NAME) # 设备上报数据主题
- topic_cmd = '$sys/%s/%s/cmd/#' % (username, DEV_NAME) # 设备接受命令主题
- topic_cmds = '$sys/%s/%s/cmd/request/' % (username, DEV_NAME) # 设备接受命令主题
- topic_publish = '$sys/%s/%s/dp/post/json' %(username,DEV_NAME)
- client.loop_start()
- count = 0
- while True:
- count += 1
- # 树莓派循环发布数据到OneNET
- client.publish(topic=topic_publish, payload=data(count, random.randint(0,100)), qos=1)
- print("-------------------------------------------------------------------------------")
- time.sleep(3)
- if __name__ == '__main__':
- main()
|