# -*- coding: UTF-8 -*-
import ujson
import utime as time
from aliyunIoT import Device
import ota
import kv
mynet = None
device = None
deviceSecret = None
deviceName=None
productKey=None
productSecret=None
heartBeat=None
device_dyn_resigter_succed = False
# 定义需要升级的模块和版本号
module_name = 'default'
app_version = '1.0.1'
# 定义升级包的下载和安装路径,其中url,hash_type和hash 会通过服务端推送被保存下来
info = {
'url': '',
'store_path': '/data/pyamp/app.zip',
'install_path': '/data/pyamp/',
'length': 0,
'hash_type': '',
'hash': ''
}
modle = 1
def get_modle():
global modle
return modle
onoff = 1
def get_onoff():
global onoff
return onoff
light = 200
def get_light():
global light
return light
time_begin = 18
def get_time_begin():
global time_begin
return time_begin
time_end = 6
def get_time_end():
global time_end
return time_end
mqtt_connect_flag=False
def get_mqtt_connect_flag():
global mqtt_connect_flag
return mqtt_connect_flag
# ota 消息推送的接受函数
def on_trigger(data):
global info
# 保存服务端推送的ota信息
info['url'] = data['url']
info['length'] = data['length']
info['module_name'] = data['module_name']
info['version'] = data['version']
info['hash'] = data['hash']
info['hash_type'] = data['hash_type']
# 开始ota 包下载
dl_data = {}
dl_data['url'] = info['url']
dl_data['store_path'] = info['store_path']
ota.download(dl_data)
# ota 升级包下载结果回调函数
def on_download(data):
global info
if data >= 0:
print('Ota download succeed')
# 开始ota包校验
param = {}
param['length'] = info['length']
param['store_path'] = info['store_path']
param['hash_type'] = info['hash_type']
param['hash'] = info['hash']
ota.verify(param)
# ota 升级包校验结果回调函数
def on_verify(data):
global info
print(data)
if data >= 0 :
print('Ota verify succeed')
print('Start Upgrade')
# 开始ota升级
param = {}
param['length'] = info['length']
param['store_path'] = info['store_path']
param['install_path'] = info['install_path']
ota.upgrade(param)
# ota 升级包结果回调函数
def on_upgrade(data):
if data >= 0 :
print('Ota succeed')
#ota升完级后 重启设备
reboot()
#当iot设备连接到物联网平台的时候触发'connect' 事件
def on_connect(data):
global mqtt_connect_flag
mqtt_connect_flag=True
print('***** connect lp succeed****')
global module_name,app_version,productKey,deviceName
data_handle = {}
data_handle['device_handle'] = device.getDeviceHandle()
# 初始化ota服务
ota.init(data_handle)
# ota 回调函数注册
ota.on(1,on_trigger)
ota.on(2,on_download)
ota.on(3,on_verify)
ota.on(4,on_upgrade)
report_info = {
"device_handle": data_handle['device_handle'],
"product_key": productKey ,
"device_name": deviceName ,
"module_name": module_name ,
"version": app_version
}
# 上报本机ota相关信息,上报版本信息返回以后程序返回,知道后台推送ota升级包,才会调用on_trigger函数
ota.report(report_info)
#当连接断开时,触发'disconnect'事件
def on_disconnect(data):
print('linkkit is disconnected')
#当iot云端下发属性设置时,触发'props'事件
def on_props(request):
global onoff,light,time_begin,time_end,modle
print('clound req data is {}'.format(request))
# # # #获取消息中的params数据
params=request['params']
# #去除字符串的'',得到字典数据
params=eval(params)
if "modle" in params:
modle = params["modle"]
if "onoff" in params:
onoff = params["onoff"]
update('onoff',onoff)
if "light" in params:
light = params["light"]
if "time_beg" in params:
time_begin = params["time_beg"]
if "time_end" in params:
time_end = params["time_end"]
#当iot云端调用设备service时,触发'service'事件
def on_service(id,request):
print('clound req id is {} , req is {}'.format(id,request))
#当设备跟iot平台通信过程中遇到错误时,触发'error'事件
def on_error(err):
print('err msg is {} '.format(err))
#动态注册回调函数
def on_dynreg_cb(data):
global deviceSecret,device_dyn_resigter_succed
deviceSecret = data
device_dyn_resigter_succed = True
# 连接物联网平台
def dyn_register_device(productKey,productSecret,deviceName):
global on_dynreg_cb,device,deviceSecret,device_dyn_resigter_succed
key = '_amp_customer_devicesecret'
deviceSecretdict = kv.get(key)
if isinstance(deviceSecretdict,str):
deviceSecret = deviceSecretdict
if deviceSecretdict is None or deviceSecret is None:
key_info = {
'productKey': productKey ,
'productSecret': productSecret ,
'deviceName': deviceName
}
print('key_info',key_info)
# 动态注册一个设备,获取设备的deviceSecret
#下面的if防止多次注册,当前若是注册过一次了,重启设备再次注册就会卡住,
if not device_dyn_resigter_succed:
device.register(key_info,on_dynreg_cb)
def aliyun_login(device_name,product_key,product_secret,device_Secret,area):
global device,deviceName,productKey,productSecret,heartBeat,region
deviceName=device_name
productKey=product_key
productSecret=product_secret
deviceSecret =device_Secret
region=area
#初始化物联网平台Device类,获取device实例
device = Device()
key_info = {
'region' : region ,
'productKey': productKey ,
'deviceName': deviceName ,
'deviceSecret': deviceSecret ,
'keepaliveSec': 60,
}
#打印设备信息
print(key_info)
#device.ON_CONNECT 是事件,on_connect是事件处理函数/回调函数
device.on(device.ON_CONNECT,on_connect)
device.on(device.ON_DISCONNECT,on_disconnect)
device.on(device.ON_PROPS,on_props)
device.on(device.ON_SERVICE,on_service)
device.on(device.ON_ERROR,on_error)
device.connect(key_info)
def update(key,value):
global device
post_data_str=ujson.dumps({key:value})
data={
"params":post_data_str
}
print("----------------",data)
device.postProps(data)
没有合适的资源?快使用搜索试试~ 我知道了~
7.1 HD1 - 智能路灯代码
共6个文件
py:5个
json:1个
需积分: 19 2 下载量 39 浏览量
2022-10-14
10:41:19
上传
评论
收藏 6KB ZIP 举报
温馨提示
7.1 HD1 - 智能路灯代码
资源详情
资源评论
资源推荐
收起资源包目录
7.1 RTU智能路灯.zip (6个子文件)
main.py 3KB
board.json 2KB
app_network.py 984B
app_server_aliyun.py 7KB
app_led.py 1KB
app_relay.py 744B
共 6 条
- 1
智云服
- 粉丝: 135
- 资源: 20
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0