Luat系列教程:6、mqtt代码详解

LUAT系列全部教程可以点击下面链接查看(建议保存书签):

https://www.chenxublog.com/tag/luat系列教程

写在前面:
由于本人并未学习过具体原理,所以本文可能会有多处常识性错误,如有发现请留言指出,谢谢!


阅读本文需要具有的技能:
看过该系列前几篇文章或明白前几篇文章内容的
熟悉lua语法,尤其是数组部分
可以明白字符串、字节码之间的区别
可以自己实践操作
对mqtt通讯有基本的了解
用过这东西

官方demo代码

官方代码可以在github(https://github.com/openLuat/Luat_2G_RDA_8955/)的Luat_2G_RDA_8955/script_LuaTask/demo/mqtt目录或luatools的LuaTools 1.x.x\script\script_LuaTask\demo\mqtt目录找到

如果你能看懂官方例程,那么可以直接去使用,不需要再看本文了

先定义一个假装能用来测试的mqtt需求

客户端订阅主题:
/d/test/#

设备订阅主题:
/s/test/设备的imei值

  • 设备联网后向/d/test/设备的imei值的topic发送payload为当前设备ICCID的字符串
  • 服务器收到后向/s/test/设备的imei值的topic发送payload为ok两个字节的字符串
  • 设备收到topic为/s/test/设备的imei值,payload为ok的数据后,再次向/d/test/设备的imei值的topic发送payload为done四个字节的字符串

需求十分简单:

如果你知道mqtt是什么,肯定能明白

建立文件

首先先新建两个文件,用于测试这个工程

main.lua

PROJECT = "MQTT-TEST"
VERSION = "1.0.0"

require "log"
LOG_LEVEL = log.LOGLEVEL_TRACE

require "sys"

--每1分钟查询一次GSM信号强度,每1分钟查询一次基站信息
require "net"
net.startQueryAll(60000, 60000)

--加载硬件看门狗功能模块
require "wdt"
wdt.setup(pio.P0_30, pio.P0_31)

--加载网络指示灯功能模块
require "netLed"
netLed.setup(true,pio.P1_1)

require"mqttTest"

--启动系统框架
sys.init(0, 0)
sys.run()

mqttTest.lua

module(...,package.seeall)

require"misc"
require"mqtt"
--下面代码一会儿写

建立mqtt线程

一般来说,mqtt连接都是异步运行的,何时应该发送数据,何时应该接收数据,这些逻辑应该让mqtt收发的进程自己进行控制

所以我们在mqttTest.lua中添加一个新的线程(看不懂的回去看前几篇文章),文件改成如下(注意改一下测试服务器信息):

mqttTest.lua

module(...,package.seeall)

require"misc"
require"mqtt"

--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600

--启动mqtt客户端任务
sys.taskInit(
function()
    while true do
    --该区域的代码会永久循环运行(除非出现语法错误)
    end
end)

进行mqtt连接

一般来说,我们会在模块成功获取基站分配的ip后,才会进行网络的连接操作,所以我们需要使用socket.isReady()函数来判断是否连接网络,然后再进行网络操作

在成功获取ip后,我们才能新建一个mqtt对象,对其进行联网操作,mqtt客户端线程代码改为如下:

--启动mqtt客户端任务
sys.taskInit(
function()
    while true do
        --是否获取到了分配的ip(是否连上网)
        if socket.isReady() then
            local imei = misc.getImei()
            --新建一个mqtt对象
            local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
            --尝试连接指定服务器
            if mqttClient:connect(mqttip,mqttport,"tcp") then
                --连接成功
                log.info("mqttTest.mqttClient","connect success")
            else
                log.info("mqttTest.mqttClient","connect fail")
                --连接失败
            end
        else
            --没连上网,原地等待一秒,一秒后会循环回去重试
            sys.wait(1000)
        end
    end
end)

对连接失败的处理

上述代码只是一个简单的连接服务器的代码,并且连上之后没有进行任何的其他操作

为了增加代码的稳健性,我们可以利用sys.waitUntil()函数,设置五分钟内没有获取到ip就开启飞行模式几秒,再关闭,让模块重新去获取GPRS连接:

--启动mqtt客户端任务
sys.taskInit(
function()
    while true do
        --是否获取到了分配的ip(是否连上网)
        if socket.isReady() then
            local imei = misc.getImei()
            --新建一个mqtt对象
            local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
            --尝试连接指定服务器
            if mqttClient:connect(mqttip,mqttport,"tcp") then
                --连接成功
                log.info("mqttTest.mqttClient","connect success")
            else
                log.info("mqttTest.mqttClient","connect fail")
                --连接失败
            end
        else
            --没连上网
            --等待网络环境准备就绪,超时时间是5分钟
            sys.waitUntil("IP_READY_IND",300000)
            --等完了还没连上?
            if not socket.isReady() then
                --进入飞行模式,20秒之后,退出飞行模式
                net.switchFly(true)
                sys.wait(20000)
                net.switchFly(false)
            end
        end
    end
end)

同样,我们也可以给mqttClient:connect(mqttip,mqttport,"tcp")的连接加上错误次数的判断,连接错误超过五次,强制断开socket连接,等待五秒后重试:

--启动mqtt客户端任务
sys.taskInit(
function()
    while true do
        local retryConnectCnt = 0   --失败次数统计
        --是否获取到了分配的ip(是否连上网)
        if socket.isReady() then
            local imei = misc.getImei()
            --新建一个mqtt对象
            local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
            --尝试连接指定服务器
            if mqttClient:connect(mqttip,mqttport,"tcp") then
                --连接成功
                log.info("mqttTest.mqttClient","connect success")
                retryConnectCnt = 0 --失败次数清零
            else
                log.info("mqttTest.mqttClient","connect fail")--连接失败
                retryConnectCnt = retryConnectCnt+1 --失败次数加一
            end
            --断开MQTT连接
            mqttClient:disconnect()
            if retryConnectCnt>=5 then
                link.shut()
                retryConnectCnt=0
            end
            sys.wait(5000)
        else
            --没连上网
            --等待网络环境准备就绪,超时时间是5分钟
            sys.waitUntil("IP_READY_IND",300000)
            --等完了还没连上?
            if not socket.isReady() then
                --进入飞行模式,20秒之后,退出飞行模式
                net.switchFly(true)
                sys.wait(20000)
                net.switchFly(false)
            end
        end
    end
end)

添加发送/接收处理函数,订阅主题

到了这一步,整个的mqtt线程只剩下循环处理接收和发送的数据这一部分和订阅topic部分与demo不同了,我们直接把这两部分加到mqtt线程的代码中吧:

--启动mqtt客户端任务
sys.taskInit(
function()
    while true do
        local retryConnectCnt = 0   --失败次数统计
        --是否获取到了分配的ip(是否连上网)
        if socket.isReady() then
            local imei = misc.getImei()
            --新建一个mqtt对象
            local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
            --尝试连接指定服务器
            if mqttClient:connect(mqttip,mqttport,"tcp") then
                --连接成功
                log.info("mqttTest.mqttClient","connect success")
                retryConnectCnt = 0 --失败次数清零
                --订阅主题
                if mqttClient:subscribe({["/s/test/"..imei]=0}) then
                    --循环处理接收和发送的数据
                    while true do
                        if not mqttInMsgProc(mqttClient) then
                            log.error("mqttTest.mqttInMsgProc error")
                            break
                        end
                        if not mqttOutMsgProc(mqttClient) then
                            log.error("mqttTest.mqttOutMsgProc error")
                            break
                        end
                    end
                end
            else
                log.info("mqttTest.mqttClient","connect fail")--连接失败
                retryConnectCnt = retryConnectCnt+1 --失败次数加一
            end
            --断开MQTT连接
            mqttClient:disconnect()
            if retryConnectCnt>=5 then
                link.shut()
                retryConnectCnt=0
            end
            sys.wait(5000)
        else
            --没连上网
            --等待网络环境准备就绪,超时时间是5分钟
            sys.waitUntil("IP_READY_IND",300000)
            --等完了还没连上?
            if not socket.isReady() then
                --进入飞行模式,20秒之后,退出飞行模式
                net.switchFly(true)
                sys.wait(20000)
                net.switchFly(false)
            end
        end
    end
end)

可以看到,在接收和发送函数不返回false的情况下,接收和发送循环会一直进行下去;只有当两个函数之一返回false时,才会触发break导致退出该接收和发送循环

mqttInMsgProc(mqttClient)函数

这段的代码相对来说比较简单,我们可以直接使用mqttClient:receive(毫秒数)来接收我们的tcp消息。
我们在合适的地方,新建一个mqttInMsgProc(mqttClient)函数:

function mqttInMsgProc(mqttClient)
    local result,data
    while true do
        result,data = mqttClient:receive(2000)
        --接收到数据
        if result then
            log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
            --TODO:根据需求自行处理data.payload
        else
            break
        end
    end
    return result or data=="timeout"
end

这段代码就是循环获取mqtt消息,如果没获取到,mqttClient:receive(2000)就会返回false,"timeout";如果获取到了,就会返回true,获取到的数据字符串;如果返回了false,不为"timeout",则表示数据处理出错,说明mqtt连接有了什么问题

细心的读者可能看出来了,如果接收函数一直在2秒内有接收到数据,那么这段函数会永远无限循环下去,没办法到达mqttOutMsgProc(mqttClient)函数进行发送数据的操作,所以我们先去讲mqttOutMsgProc(mqttClient)函数的实现过程,再回来改进mqttInMsgProc(mqttClient)函数

mqttOutMsgProc(mqttClient)函数

由于发送函数在mqtt线程中是一个循环的小部分,所以我们要建立一个消息发送的队列:有要发送的发数据时,将数据放到这个队列中;等运行到mqttOutMsgProc(mqttClient)函数时,将队列中的数据一个一个发出去

首先我们要建一个放这种队列的数组,在合适位置声明一下这个数组:

--数据发送的消息队列
local msgQuene = {}

接着我们构造一个可以往数组里插入数据的函数,table.insert()可以向数组添加数据,所以我们新建一个insertMsg函数:

local function insertMsg(topic,payload,qos,user)
    table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end

还记得上面说过的消息接收函数函数会永远无限循环下去的问题吗?我们在合适的地方新建一个判断发送消息队列是否为空的函数:

function waitForSend()
    return #msgQuene > 0
end

在数组有数据时,这个函数会返回true,我们可以将mqttInMsgProc(mqttClient)接收到数据后的代码添加一行判断发送队列是否有数据的代码,当检测到发送队列有数据时,就立即退出接收函数,转而去进行发送动作,接收函数最终改为了这样:

function mqttInMsgProc(mqttClient)
    local result,data
    while true do
        result,data = mqttClient:receive(2000)
        --接收到数据
        if result then
            log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
            --TODO:根据需求自行处理data.payload
            --如果msgQuene中有等待发送的数据,则立即退出本循环
            if waitForSend() then return true end
        else
            break
        end
    end
    return result or data=="timeout"
end

最后我们终于可以开始写消息发送函数了,整体的函数就是检查队列是否为空,不为空的话就发一条消息并将其从队列中删除,然后重复这一操作,函数代码如下:

function mqttOutMsgProc(mqttClient)
    while #msgQuene>0 do    --数组大于零?
        local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
        local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
        if outMsg.user and outMsg.user.cb then  --如果存在回调函数
            outMsg.user.cb(result,outMsg.user.para)--执行回调函数
        end
        if not result then return end
    end
    return true
end

outMsg.user即为消息队列数组中的,消息数组中的,包含了回调函数的,数组(反正挺绕的)

具体就像下面这样用:

insertMsg("/d/test/123","done",{cb=testcb})

local function testcb()
    log.info("test.testcb","test message sent")
end

这样,该条消息发送后就会执行指定的回调函数

完成基本的mqtt线程

经过上述的更改,最终,mqttTest.lua已经实现了连接服务器并自动处理错误的功能,并且预留了消息接收以及向发送队列添加数据的接口,文件的所有代码如下:

mqttTest.lua

require"misc"
require"mqtt"

--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600

--数据发送的消息队列
local msgQuene = {}

local function insertMsg(topic,payload,qos,user)
    table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end

function waitForSend()
    return #msgQuene > 0
end

function mqttOutMsgProc(mqttClient)
    while #msgQuene>0 do    --数组大于零?
        local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
        local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
        if outMsg.user and outMsg.user.cb then  --如果存在回调函数
            outMsg.user.cb(result,outMsg.user.para)--执行回调函数
        end
        if not result then return end
    end
    return true
end

function mqttInMsgProc(mqttClient)
    local result,data
    while true do
        result,data = mqttClient:receive(2000)
        --接收到数据
        if result then
            log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
            --TODO:根据需求自行处理data.payload
            --如果msgQuene中有等待发送的数据,则立即退出本循环
            if waitForSend() then return true end
        else
            break
        end
    end
    return result or data=="timeout"
end


--启动mqtt客户端任务
sys.taskInit(
function()
    while true do
        local retryConnectCnt = 0   --失败次数统计
        --是否获取到了分配的ip(是否连上网)
        if socket.isReady() then
            local imei = misc.getImei()
            --新建一个mqtt对象
            local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
            --尝试连接指定服务器
            if mqttClient:connect(mqttip,mqttport,"tcp") then
                --连接成功
                log.info("mqttTest.mqttClient","connect success")
                retryConnectCnt = 0 --失败次数清零
                --订阅主题
                if mqttClient:subscribe({["/s/test/"..imei]=0}) then
                    --循环处理接收和发送的数据
                    while true do
                        if not mqttInMsgProc(mqttClient) then
                            log.error("mqttTest.mqttInMsgProc error")
                            break
                        end
                        if not mqttOutMsgProc(mqttClient) then
                            log.error("mqttTest.mqttOutMsgProc error")
                            break
                        end
                    end
                end
            else
                log.info("mqttTest.mqttClient","connect fail")--连接失败
                retryConnectCnt = retryConnectCnt+1 --失败次数加一
            end
            --断开MQTT连接
            mqttClient:disconnect()
            if retryConnectCnt>=5 then
                link.shut()
                retryConnectCnt=0
            end
            sys.wait(5000)
        else
            --没连上网
            --等待网络环境准备就绪,超时时间是5分钟
            sys.waitUntil("IP_READY_IND",300000)
            --等完了还没连上?
            if not socket.isReady() then
                --进入飞行模式,20秒之后,退出飞行模式
                net.switchFly(true)
                sys.wait(20000)
                net.switchFly(false)
            end
        end
    end
end)

实现协议需求

设备联网后向/d/test/设备的imei值的topic发送payload为当前设备ICCID的字符串

我们只需要在连接成功处添加代码即可,在if mqttClient:subscribe({["/s/test/"..imei]=0}) then所在代码下一行添加:

insertMsg("/d/test/"..misc.getImei(),sim.getIccid())

设备收到topic为/s/test/设备的imei值,payload为ok的数据后,再次向/d/test/设备的imei值的topic发送payload为done四个字节的字符串

我们只需要在接收处理消息处添加代码即可,在--TODO:根据需求自行处理data.payload所在代码下一行添加:

if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then
    insertMsg("/d/test/"..misc.getImei(),"done")
end

完整代码

经过上面的删删改改,功能以及基本实现了,整个文件的代码如下:

mqttTest.lua

require"misc"
require"mqtt"

--测试用的服务器信息,按需求自己改
local mqttip,mqttport,mqttuser,mqttpassword,mqttheartbeat = "x.x.x.x","xxxx","user","password",600

--数据发送的消息队列
local msgQuene = {}

local function insertMsg(topic,payload,qos,user)
    table.insert(msgQuene,{t=topic,p=payload,q=qos,user=user})
end

function waitForSend()
    return #msgQuene > 0
end

function mqttOutMsgProc(mqttClient)
    while #msgQuene>0 do    --数组大于零?
        local outMsg = table.remove(msgQuene,1)--取出并删除一个元素
        local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)--推送对应的mqtt消息
        if outMsg.user and outMsg.user.cb then  --如果存在回调函数
            outMsg.user.cb(result,outMsg.user.para)--执行回调函数
        end
        if not result then return end
    end
    return true
end

function mqttInMsgProc(mqttClient)
    local result,data
    while true do
        result,data = mqttClient:receive(2000)
        --接收到数据
        if result then
            log.info("mqttTest.mqttInMsgProc",data.topic,string.toHex(data.payload))
            --TODO:根据需求自行处理data.payload
            if data.topic == "/s/test/"..misc.getImei() and data.payload == "ok" then
                insertMsg("/d/test/"..misc.getImei(),"done")
            end
            --如果msgQuene中有等待发送的数据,则立即退出本循环
            if waitForSend() then return true end
        else
            break
        end
    end
    return result or data=="timeout"
end


--启动mqtt客户端任务
sys.taskInit(
function()
    while true do
        local retryConnectCnt = 0   --失败次数统计
        --是否获取到了分配的ip(是否连上网)
        if socket.isReady() then
            local imei = misc.getImei()
            --新建一个mqtt对象
            local mqttClient = mqtt.client(imei,mqttheartbeat,mqttuser,mqttpassword)
            --尝试连接指定服务器
            if mqttClient:connect(mqttip,mqttport,"tcp") then
                --连接成功
                log.info("mqttTest.mqttClient","connect success")
                retryConnectCnt = 0 --失败次数清零
                --订阅主题
                if mqttClient:subscribe({["/s/test/"..imei]=0}) then
                    insertMsg("/d/test/"..misc.getImei(),sim.getIccid())
                    --循环处理接收和发送的数据
                    while true do
                        if not mqttInMsgProc(mqttClient) then
                            log.error("mqttTest.mqttInMsgProc error")
                            break
                        end
                        if not mqttOutMsgProc(mqttClient) then
                            log.error("mqttTest.mqttOutMsgProc error")
                            break
                        end
                    end
                end
            else
                log.info("mqttTest.mqttClient","connect fail")--连接失败
                retryConnectCnt = retryConnectCnt+1 --失败次数加一
            end
            --断开MQTT连接
            mqttClient:disconnect()
            if retryConnectCnt>=5 then
                link.shut()
                retryConnectCnt=0
            end
            sys.wait(5000)
        else
            --没连上网
            --等待网络环境准备就绪,超时时间是5分钟
            sys.waitUntil("IP_READY_IND",300000)
            --等完了还没连上?
            if not socket.isReady() then
                --进入飞行模式,20秒之后,退出飞行模式
                net.switchFly(true)
                sys.wait(20000)
                net.switchFly(false)
            end
        end
    end
end)

验证功能

我没条件测试这玩意儿。。。谁有空帮我测试下的?

3 Comments

  1. Sogou Explorer Sogou Explorer Windows 7 x64 Edition Windows 7 x64 Edition

    调试助手发送的:abc 111

    下面是设备接收到的
    [2018-08-14 23:38:02.458]: [I]-[mqttTest.mqttInMsgProc] /s/test/869300031720745 61626320313131 7

  2. Google Chrome 87.0.4280.141 Google Chrome 87.0.4280.141 Windows 10 x64 Edition Windows 10 x64 Edition

    MQTT教程6的订阅和推送的流程图,文字描述是否有错误?有点看不懂

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注