LUAT系列全部教程可以点击下面链接查看(建议保存书签):
https://www.chenxublog.com/tag/luat系列教程
写在前面:
由于本人并未学习过具体原理,所以本文可能会有多处常识性错误,如有发现请留言指出,谢谢!
阅读本文需要具有的技能:
看过该系列前几篇文章或明白前几篇文章内容的
熟悉lua语法,尤其是数组部分
可以明白字符串、字节码之间的区别
可以自己实践操作
对mqtt通讯有基本的了解
用过这东西
官方demo代码
官方代码可以在github(https://github.com/openLuat/Luat_2G_RDA_8955/)的Luat_2G_RDA_8955/script_LuaTask/demo/mqtt
目录或luatools的LuaTools 1.x.x\script\script_LuaTask\demo\mqtt
目录找到
如果你能看懂官方例程,那么可以直接去使用,不需要再看本文了
先定义一个假装能用来测试的mqtt需求
客户端订阅主题:
/d/test/#
设备订阅主题:
/s/test/设备的imei值
- 设备联网后向
/d/test/设备的imei值
的topic发送payload为当前设备ICCID的字符串 - 服务器收到后向
/s/test/设备的imei值
的topic发送payload为ok
两个字节的字符串 - 设备收到topic为
/s/test/设备的imei值
,payload为ok
的数据后,再次向/d/test/设备的imei值
的topic发送payload为done
四个字节的字符串
需求十分简单:
如果你知道mqtt是什么,肯定能明白
建立文件
首先先新建两个文件,用于测试这个工程
main.lua
PROJECT = "MQTT-TEST"
VERSION = "1.0.0"
require "log"
LOG_LEVEL = log.LOGLEVEL_TRACE
require "sys"
--每1分钟查询一次GSM信号强度,每1分钟查询一次基站信息
require "net"
net.startQueryAll(60000, 60000)
--加载硬件看门狗功能模块
require "wdt"
wdt.setup(pio.P0_30, pio.P0_31)
--加载网络指示灯功能模块
require "netLed"
netLed.setup(true,pio.P1_1)
require"mqttTest"
--启动系统框架
sys.init(0, 0)
sys.run()
mqttTest.lua
module(...,package.seeall)
require"misc"
require"mqtt"
--下面代码一会儿写
建立mqtt线程
一般来说,mqtt连接都是异步运行的,何时应该发送数据,何时应该接收数据,这些逻辑应该让mqtt收发的进程自己进行控制
所以我们在mqttTest.lua
中添加一个新的线程(看不懂的回去看前几篇文章),文件改成如下(注意改一下测试服务器信息
):
mqttTest.lua
module(...,package.seeall)
require"misc"
require"mqtt"
--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
--该区域的代码会永久循环运行(除非出现语法错误)
end
end)
进行mqtt连接
一般来说,我们会在模块成功获取基站分配的ip后,才会进行网络的连接操作,所以我们需要使用socket.isReady()
函数来判断是否连接网络,然后再进行网络操作
在成功获取ip后,我们才能新建一个mqtt对象,对其进行联网操作,mqtt客户端线程代码改为如下:
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
else
log.info("mqttTest.mqttClient","connect fail")
--连接失败
end
else
--没连上网,原地等待一秒,一秒后会循环回去重试
sys.wait(1000)
end
end
end)
对连接失败的处理
上述代码只是一个简单的连接服务器的代码,并且连上之后没有进行任何的其他操作
为了增加代码的稳健性,我们可以利用sys.waitUntil()
函数,设置五分钟内没有获取到ip就开启飞行模式几秒,再关闭,让模块重新去获取GPRS连接:
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
else
log.info("mqttTest.mqttClient","connect fail")
--连接失败
end
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)
同样,我们也可以给mqttClient:connect(mqttip,mqttport,"tcp")
的连接加上错误次数的判断,连接错误超过五次,强制断开socket连接,等待五秒后重试:
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)
添加发送/接收处理函数,订阅主题
到了这一步,整个的mqtt线程只剩下循环处理接收和发送的数据
这一部分和订阅topic部分与demo不同了,我们直接把这两部分加到mqtt线程的代码中吧:
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
--订阅主题
if mqttClient:subscribe({["/s/test/"..imei]=0}) then
--循环处理接收和发送的数据
while true do
if not mqttInMsgProc(mqttClient) then
log.error("mqttTest.mqttInMsgProc error")
break
end
if not mqttOutMsgProc(mqttClient) then
log.error("mqttTest.mqttOutMsgProc error")
break
end
end
end
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)
可以看到,在接收和发送函数不返回false的情况下,接收和发送循环会一直进行下去;只有当两个函数之一返回false时,才会触发break导致退出该接收和发送循环
mqttInMsgProc(mqttClient)函数
这段的代码相对来说比较简单,我们可以直接使用mqttClient:receive(毫秒数)
来接收我们的tcp消息。
我们在合适的地方,新建一个mqttInMsgProc(mqttClient)
函数:
function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
else
break
end
end
return result or data=="timeout"
end
这段代码就是循环获取mqtt消息,如果没获取到,mqttClient:receive(2000)
就会返回false,"timeout"
;如果获取到了,就会返回true,获取到的数据字符串
;如果返回了false,不为"timeout"
,则表示数据处理出错,说明mqtt连接有了什么问题
细心的读者可能看出来了,如果接收函数一直在2秒内有接收到数据,那么这段函数会永远无限循环下去,没办法到达mqttOutMsgProc(mqttClient)
函数进行发送数据的操作,所以我们先去讲mqttOutMsgProc(mqttClient)
函数的实现过程,再回来改进mqttInMsgProc(mqttClient)
函数
mqttOutMsgProc(mqttClient)函数
由于发送函数在mqtt线程中是一个循环的小部分,所以我们要建立一个消息发送的队列:有要发送的发数据时,将数据放到这个队列中;等运行到mqttOutMsgProc(mqttClient)
函数时,将队列中的数据一个一个发出去
首先我们要建一个放这种队列的数组,在合适位置声明一下这个数组:
--数据发送的消息队列
local msgQuene = {}
接着我们构造一个可以往数组里插入数据的函数,table.insert()
可以向数组添加数据,所以我们新建一个insertMsg
函数:
local function insertMsg(topic,payload,qos,user)
table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end
还记得上面说过的消息接收函数函数会永远无限循环下去
的问题吗?我们在合适的地方新建一个判断发送消息队列是否为空的函数:
function waitForSend()
return #msgQuene > 0
end
在数组有数据时,这个函数会返回true,我们可以将mqttInMsgProc(mqttClient)
接收到数据后的代码添加一行判断发送队列是否有数据的代码,当检测到发送队列有数据时,就立即退出接收函数,转而去进行发送动作,接收函数最终改为了这样:
function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
--如果msgQuene中有等待发送的数据,则立即退出本循环
if waitForSend() then return true end
else
break
end
end
return result or data=="timeout"
end
最后我们终于可以开始写消息发送函数了,整体的函数就是检查队列是否为空,不为空的话就发一条消息并将其从队列中删除,然后重复这一操作,函数代码如下:
function mqttOutMsgProc(mqttClient)
while #msgQuene>0 do --数组大于零?
local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
if outMsg.user and outMsg.user.cb then --如果存在回调函数
outMsg.user.cb(result,outMsg.user.para)--执行回调函数
end
if not result then return end
end
return true
end
outMsg.user
即为消息队列数组中的,消息数组中的,包含了回调函数的,数组(反正挺绕的)具体就像下面这样用:
insertMsg("/d/test/123","done",{cb=testcb})
local function testcb()
log.info("test.testcb","test message sent")
end
这样,该条消息发送后就会执行指定的回调函数
完成基本的mqtt线程
经过上述的更改,最终,mqttTest.lua
已经实现了连接服务器并自动处理错误的功能,并且预留了消息接收以及向发送队列添加数据的接口,文件的所有代码如下:
mqttTest.lua
require"misc"
require"mqtt"
--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600
--数据发送的消息队列
local msgQuene = {}
local function insertMsg(topic,payload,qos,user)
table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end
function waitForSend()
return #msgQuene > 0
end
function mqttOutMsgProc(mqttClient)
while #msgQuene>0 do --数组大于零?
local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
if outMsg.user and outMsg.user.cb then --如果存在回调函数
outMsg.user.cb(result,outMsg.user.para)--执行回调函数
end
if not result then return end
end
return true
end
function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
--如果msgQuene中有等待发送的数据,则立即退出本循环
if waitForSend() then return true end
else
break
end
end
return result or data=="timeout"
end
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
--订阅主题
if mqttClient:subscribe({["/s/test/"..imei]=0}) then
--循环处理接收和发送的数据
while true do
if not mqttInMsgProc(mqttClient) then
log.error("mqttTest.mqttInMsgProc error")
break
end
if not mqttOutMsgProc(mqttClient) then
log.error("mqttTest.mqttOutMsgProc error")
break
end
end
end
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)
实现协议需求
设备联网后向/d/test/设备的imei值
的topic发送payload为当前设备ICCID的字符串
我们只需要在连接成功处添加代码即可,在if mqttClient:subscribe({["/s/test/"..imei]=0}) then
所在代码下一行添加:
insertMsg("/d/test/"..misc.getImei(),sim.getIccid())
设备收到topic为/s/test/设备的imei值
,payload为ok
的数据后,再次向/d/test/设备的imei值
的topic发送payload为done
四个字节的字符串
我们只需要在接收处理消息处添加代码即可,在--TODO:根据需求自行处理data.payload
所在代码下一行添加:
if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then
insertMsg("/d/test/"..misc.getImei(),"done")
end
完整代码
经过上面的删删改改,功能以及基本实现了,整个文件的代码如下:
mqttTest.lua
require"misc"
require"mqtt"
--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600
--数据发送的消息队列
local msgQuene = {}
local function insertMsg(topic,payload,qos,user)
table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end
function waitForSend()
return #msgQuene > 0
end
function mqttOutMsgProc(mqttClient)
while #msgQuene>0 do --数组大于零?
local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
if outMsg.user and outMsg.user.cb then --如果存在回调函数
outMsg.user.cb(result,outMsg.user.para)--执行回调函数
end
if not result then return end
end
return true
end
function mqttInMsgProc(mqttClient)
local result,data
while true do
result,data = mqttClient:receive(2000)
--接收到数据
if result then
log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
--TODO:根据需求自行处理data.payload
if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then
insertMsg("/d/test/"..misc.getImei(),"done")
end
--如果msgQuene中有等待发送的数据,则立即退出本循环
if waitForSend() then return true end
else
break
end
end
return result or data=="timeout"
end
--启动mqtt客户端任务
sys.taskInit(
function()
while true do
local retryConnectCnt = 0 --失败次数统计
--是否获取到了分配的ip(是否连上网)
if socket.isReady() then
local imei = misc.getImei()
--新建一个mqtt对象
local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
--尝试连接指定服务器
if mqttClient:connect(mqttip,mqttport,"tcp") then
--连接成功
log.info("mqttTest.mqttClient","connect success")
retryConnectCnt = 0 --失败次数清零
--订阅主题
if mqttClient:subscribe({["/s/test/"..imei]=0}) then
insertMsg("/d/test/"..misc.getImei(),sim.getIccid())
--循环处理接收和发送的数据
while true do
if not mqttInMsgProc(mqttClient) then
log.error("mqttTest.mqttInMsgProc error")
break
end
if not mqttOutMsgProc(mqttClient) then
log.error("mqttTest.mqttOutMsgProc error")
break
end
end
end
else
log.info("mqttTest.mqttClient","connect fail")--连接失败
retryConnectCnt = retryConnectCnt+1 --失败次数加一
end
--断开MQTT连接
mqttClient:disconnect()
if retryConnectCnt>=5 then
link.shut()
retryConnectCnt=0
end
sys.wait(5000)
else
--没连上网
--等待网络环境准备就绪,超时时间是5分钟
sys.waitUntil("IP_READY_IND",300000)
--等完了还没连上?
if not socket.isReady() then
--进入飞行模式,20秒之后,退出飞行模式
net.switchFly(true)
sys.wait(20000)
net.switchFly(false)
end
end
end
end)
验证功能
我没条件测试这玩意儿。。。谁有空帮我测试下的?
转载保留版权:晨旭的博客 » 《Luat系列教程:6、mqtt代码详解》如果喜欢可以: 点击右侧上方的邮件订阅,订阅本站
可以正常接收调试助手发过来的数据,内容写的很漂亮,喜欢
调试助手发送的:abc 111
下面是设备接收到的
[2018-08-14 23:38:02.458]: [I]-[mqttTest.mqttInMsgProc] /s/test/869300031720745 61626320313131 7
MQTT教程6的订阅和推送的流程图,文字描述是否有错误?有点看不懂