浏览代码

add: 添加mqtt2.lua,异步通知,低内存消耗,自动重连. 已在air640w,仍需在air302进行深入测试

Wendal Chen 5 年之前
父节点
当前提交
06687c973e
共有 2 个文件被更改,包括 358 次插入0 次删除
  1. 97 0
      bsp/air640w/demo/73.mqtt2/main.lua
  2. 261 0
      bsp/air640w/lib/mqtt2.lua

+ 97 - 0
bsp/air640w/demo/73.mqtt2/main.lua

@@ -0,0 +1,97 @@
+--[[
+demo说明:
+1. 演示wifi联网操作
+2. 演示长连接操作
+3. 演示简易的网络状态灯
+]]
+_G.sys = require("sys")
+_G.mqtt2 = require("mqtt2")
+_G.mine = require("my_demo")
+
+log.info("main", "simple mqtt2 demo")
+
+-- //////////////////////////////////////////////////////////////////////////////////////
+-- wifi 相关的代码
+if wlan ~= nil then
+    log.info("mac", wlan.get_mac())
+    local ssid = mine.wifi_ssid
+    local password = mine.wifi_passwd
+    -- 方式1 直接连接, 简单快捷
+    wlan.connect(ssid, password) -- 直接连
+    -- 方式2 先扫描,再连接. 例如根据rssi(信号强度)的不同, 择优选择ssid
+    -- sys.taskInit(function()
+    --     wlan.scan()
+    --     sys.waitUntil("WLAN_SCAN_DONE", 30000)
+    --     local re = wlan.scan_get_info()
+    --     log.info("wlan", "scan done", #re)
+    --     for i in ipairs(re) do
+    --         log.info("wlan", "info", re[i].ssid, re[i].rssi)
+    --     end
+    --     log.info("wlan", "try connect to wifi")
+    --     wlan.connect(ssid, password)
+    --     sys.waitUntil("WLAN_READY", 15000)
+    --     log.info("wifi", "self ip", socket.ip())
+    -- end)
+    -- 方法3 airkiss配网, 可参考 app/playit/main.lua
+end
+
+-- airkiss.auto(27) -- 预留的功能,未完成 
+-- //////////////////////////////////////////////////////////////////////////////////////
+
+--- 从这里开始, 代码与具体网络无关
+
+-- 联网后自动同步时间
+-- sys.subscribe("NET_READY", function ()
+--     log.info("net", "!!! network ready event !!! send ntp")
+--     sys.taskInit(function()
+--         sys.wait(2000)
+--         socket.ntpSync()
+--     end)
+-- end)
+
+gpio.setup(21, 0)
+_G.use_netled = 1 -- 启用1, 关闭0
+sys.taskInit(function()
+    while 1 do
+        --log.info("wlan", "ready?", wlan.ready())
+        if socket.isReady() then
+            --log.info("netled", "net ready, slow")
+            gpio.set(21, 1 * use_netled)
+            sys.wait(1900)
+            gpio.set(21, 0)
+            sys.wait(100)
+        else
+            --log.info("netled", "net not ready, fast")
+            gpio.set(21, 1 * use_netled)
+            sys.wait(100)
+            gpio.set(21, 0)
+            sys.wait(100)
+        end
+        --log.info("mem", rtos.meminfo())
+    end
+end)
+
+sys.taskInit(function()
+    local host, port, selfid = "lbsmqtt.airm2m.com", 1884, wlan.getMac():lower()
+
+    local topic_req = string.format("/device/%s/req", selfid)
+    local topic_report = string.format("/device/%s/report", selfid)
+    local topic_resp = string.format("/device/%s/resp", selfid)
+
+    local sub_topics = {}
+    sub_topics[topic_req] = 1
+
+    local mqttc = mqtt2.new(selfid, 300, "wendal", "123456", 0, host, port, sub_topics, function(pkg)
+        log.info("mqtt", "Oh", json.encode(pkg))
+    end)
+
+    --log.info("mqtt", json.encode(mqttc))
+
+    while not socket.isReady() do sys.waitUntil("NET_READY", 1000) end
+    sys.wait(3000)
+    log.info("go", "GoGoGo")
+    mqttc:start()
+end)
+
+
+sys.run()

+ 261 - 0
bsp/air640w/lib/mqtt2.lua

@@ -0,0 +1,261 @@
+--[[
+异步MQTT客户端
+1. 自动重连
+2. 异步收发信息
+
+暂不支持的特性:
+1. qos 2的消息不被支持,以后也不会添加
+2. 不支持取消订阅(也许会添加,也许不会)
+
+用法请参考demo
+
+]]
+
+-- MQTT 指令id
+local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
+local CLIENT_COMMAND_TIMEOUT = 60000
+
+local packCONNECT = mqttcore.packCONNECT
+local packPUBLISH = mqttcore.packPUBLISH
+local packSUBSCRIBE = mqttcore.packSUBSCRIBE
+local packACK = mqttcore.packACK
+local packZeroData = mqttcore.packZeroData
+
+
+local mclog = log.debug
+
+local function unpack(s) -- TODO 做成纯C实现
+    if #s < 2 then return end
+    --mclog("mqtt", "unpack", #s, string.toHex(string.sub(s, 1, 50)))
+
+    -- read remaining length
+    local len = 0
+    local multiplier = 1
+    local pos = 2
+
+    repeat
+        if pos > #s then return end
+        local digit = string.byte(s, pos)
+        len = len + ((digit % 128) * multiplier)
+        multiplier = multiplier * 128
+        pos = pos + 1
+    until digit < 128
+
+    if #s < len + pos - 1 then return end
+
+    local header = string.byte(s, 1)
+
+    --local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
+    local packet = {id = (header - (header % 16)) >> 4, dup = ((header % 16) - ((header % 16) % 8)) >> 3, qos = (header & 0x06) >> 1, retain = (header & 0x01)}
+    local nextpos
+
+    if packet.id == CONNACK then
+        nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
+    elseif packet.id == PUBLISH then
+        nextpos, packet.topic = pack.unpack(s, ">P", pos)
+        if packet.qos > 0 then
+            nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
+        end
+        packet.payload = string.sub(s, nextpos, pos + len - 1)
+    elseif packet.id ~= PINGRESP then
+        if len >= 2 then
+            nextpos, packet.packetId = pack.unpack(s, ">H", pos)
+        else
+            packet.packetId = 0
+        end
+    end
+
+    return packet, pos + len
+end
+
+local mqtt2 = {}
+
+local mqttc = {}
+mqttc.__index = mqttc
+
+function mqtt2.new(clientId, keepAlive, username, password, cleanSession, host, port, topics, cb)
+    local c = {
+        clientId = clientId,
+        keepAlive = keepAlive or 300,
+        username = username or "",
+        password = password or "",
+        cleanSession = cleanSession,
+        host = host,
+        port = port,
+        lping = 0,
+        stat = 0, -- 状态 0, 未连接, 1 已连接成功
+        nextid = 0, -- pkg的ID
+        running = true,
+        ckey = "",
+        inpkgs = {},
+        outpkgs = {},
+        buff = "",
+        topics = topics or {},
+        cb = cb
+    }
+    c.ckey = "mqtt_" .. tostring(c)
+    --mclog("mqtt", "MQTT Client Key", c.ckey)
+    setmetatable(c, mqttc)
+    return c
+end
+
+function mqttc:genId()
+    self.nextid = self.nextid == 65535 and 1 or (self.nextid + 1)
+    --mclog("mqtt", "next packet id", self.nextid)
+    return self.nextid
+end
+
+function mqttc:handle(netc)
+    local mc = self
+    -- 先处理服务器下发的数据包
+    if #mc.inpkgs > 0 then
+        --mclog("mqtt", "inpkgs count", #mc.inpkgs)
+        while 1 do
+            local pkg = table.remove( mc.inpkgs, 1 )
+            if pkg == nil then
+                break
+            end
+            -- 处理服务器下发的包
+            --mclog("mqtt", "handle pkg", json.encode(pkg))
+            if pkg.id == CONNACK then
+                mc.stat = 1
+                mc.lping = os.time()
+                --mclog("mqtt", "GOT CONNACK")
+                for k, v in pairs(mc.topics) do
+                    --mclog("mqtt", "sub topics", json.encode(mc.topics))
+                    netc:send(packSUBSCRIBE(0, mc:genId(), mc.topics))
+                    break
+                end
+            elseif pkg.id == PUBACK then
+                --mclog("mqtt", "GOT PUBACK")
+                sys.publish(mc.ckey .. "PUBACK")
+            elseif pkg.id == SUBACK then
+                --mclog("mqtt", "GOT SUBACK")
+                sys.publish(mc.ckey .. "SUBACK")
+            elseif pkg.id == PINGRESP then
+                mc.lping = os.time()
+                --mclog("mqtt", "GOT PINGRESP", mc.lping)
+            elseif pkg.id == UNSUBACK then
+                --mclog("mqtt", "GOT UNSUBACK")
+            elseif pkg.id == DISCONNECT then
+                --mclog("mqtt", "GOT DISCONNECT")
+            elseif pkg.id == PUBLISH then
+                --mclog("mqtt", "GOT PUBLISH", pkg.topic, pkg.qos)
+                if pkg.packetId then
+                    -- 发送PUBACK
+                    --mclog("mqtt", "send back PUBACK")
+                    table.insert( mc.outpkgs, packACK(PUBACK, 0, pkg.packetId))
+                end
+                if mc.cb then
+                    --mclog("mqtt", "Callback for PUBLISH", mc.cb)
+                    mc.cb(pkg)
+                end
+            end
+        end
+    end
+    -- 处理需要上报的数据包
+    if #mc.outpkgs > 0 then
+        --mclog("mqtt", "outpkgs count", #mc.outpkgs)
+        while 1 do
+            local buff = table.remove( mc.outpkgs, 1)
+            if buff == nil then
+                break
+            end
+            --mclog("mqtt", "netc send", buff:toHex())
+            netc:send(buff)
+        end
+    end
+    -- 是否需要发心跳
+    if mc.lping > 0 and os.time() - mc.lping > mc.keepAlive * 0.75 then
+        --mclog("mqtt", "time for ping", mc.lping)
+        mc.lping = os.time()
+        netc:send(packZeroData(PINGREQ)) -- 发送心跳包
+    end
+end
+
+function mqttc:start()
+    local mc = self
+    while mc.running do
+        if socket.isReady() then
+            --mclog("mqtt", "try connect")
+            local netc = socket.tcp()
+            netc:host(mc.host)
+            netc:port(mc.port)
+            netc:on("connect", function(id, re)
+                --mclog("mqtt", "connect", id , re)
+                if re then
+                    -- 发送CONN包
+                    table.insert(mc.outpkgs, packCONNECT(mc.clientId, mc.keepAlive, mc.username, mc.password, mc.cleanSession, {topic="",payload="",qos=0,retain=0,flag=0}))
+                    sys.publish(mc.ckey)
+                end
+            end)
+            netc:on("recv", function(id, data)
+                --mclog("mqtt", "recv", id , data:sub(1, 10):toHex())
+                mc.buff = mc.buff .. data
+                while 1 do
+                    local packet, nextpos = unpack(mc.buff)
+                    mc.buff = mc.buff:sub(nextpos)
+                    if not packet then
+                        if packet > 4096 then
+                            log.warn("mqtt", "packet is too big!!!")
+                            netc:close()
+                        end
+                        break
+                    else
+                        --mclog("mqtt", "recv new pkg", json.encode(packet))
+                        table.insert( mc.inpkgs, packet)
+                        if #mc.buff < 2 then
+                            break
+                        end
+                    end
+                end
+                if #mc.inpkgs > 0 then
+                    sys.publish(mc.ckey)
+                end
+            end)
+            if netc:start() == 0 then
+                --mclog("mqtt", "start success")
+                local endTopic = "NETC_END_" .. netc:id()
+                while (netc:closed()) == 0 do
+                    mc:handle(netc)
+                    sys.waitUntil({endTopic, mc.ckey}, 30000)
+                    if not mc.running then netc:close() end
+                    --mclog("mqtt", "handle/timeout/ping", (netc:closed()))
+                end
+            end
+            mc.stat = 0
+            -- 清理socket上下文
+            mclog("mqtt", "clean up")
+            netc:clean()
+            netc:close()
+            mclog("mqtt", "wait 5s for next loop")
+            sys.wait(5*1000) -- TODO 使用级数递增进行延时
+        else
+            sys.wait(1000)
+        end
+    end
+    -- 线程退出, 只可能是用户主动shutdown
+    mclog("mqtt", self.ckey, "exit")
+end
+
+function mqttc:sub(topics)
+    table.insert(self.outpkgs, packSUBSCRIBE(0, self:genId(), topics))
+    sys.publish(self.ckey)
+    return sys.waitUntil(self.ckey .. "SUBACK", 30000)
+end
+
+function mqttc:pub(topic, qos, payload)
+    -- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
+    table.insert(self.outpkgs, packPUBLISH(0, qos, 0, qos and self:genId() or 0, topic, payload))
+    sys.publish(self.ckey)
+    if qos then
+        return sys.waitUntil(self.ckey .. "PUBACK", 30000)
+    end
+end
+
+function mqttc:shutdown()
+    self.running = 0
+    sys.publish(self.ckey)
+end
+
+return mqtt2