Просмотр исходного кода

fix: mqtt.lua的pub_msg没反应 https://gitee.com/openLuat/LuatOS/issues/I1TRE7
add: sys的订阅功能支持多topic

Wendal Chen 5 лет назад
Родитель
Сommit
080c85adbc
4 измененных файлов с 56 добавлено и 16 удалено
  1. 12 4
      bsp/air302/lib/mqtt.lua
  2. 16 4
      bsp/air302/lib/sys.lua
  3. 12 4
      bsp/air640w/lib/mqtt.lua
  4. 16 4
      bsp/air640w/lib/sys.lua

+ 12 - 4
bsp/air302/lib/mqtt.lua

@@ -208,6 +208,7 @@ function mqtt.client(clientId, keepAlive, username, password, cleanSession, will
         return packetId
     end
     o.lastOTime = 0
+    o.pkgs = {}
     
     setmetatable(o, mqttc)
     
@@ -242,10 +243,16 @@ function mqttc:read(timeout, msg, msgNoResume)
     end
 
     local topic = "MQTTC_PKG_" .. tostring(self.io:id())
-    local result, data = sys.waitUntil(topic, timeout)
-    --log.info("mqtt.read", result, data)
+    local result, data, param = sys.waitUntil({topic, msg}, timeout)
+    --log.debug("mqtt.read", result, data, param)
     if result then -- 收到topic消息
-        return true, data
+        local pkg = table.remove(self.pkgs, 1)
+        if pkg ~= nil then
+            --log.debug("mqtt", "get packet", pkg.id, pkg.packetId)
+            return true, pkg
+        end
+        --log.debug("mqtt", "get sys.msg", msg, data)
+        return false, msg, data
     else
         if self.io:closed() == 1 then
             return false
@@ -268,7 +275,8 @@ local function update_resp(_self, data)
     if packet then
         log.info("mqttc", "msg unpack ok", packet.id)
         _self.inbuf = string.sub(_self.inbuf, nextpos)
-        sys.publish("MQTTC_PKG_" .. tostring(_self.io:id()), packet)
+        table.insert(_self.pkgs, packet)
+        sys.publish("MQTTC_PKG_" .. tostring(_self.io:id()))
         if #_self.inbuf > 0 then
             update_resp(_self, "")
         end

+ 16 - 4
bsp/air302/lib/sys.lua

@@ -259,8 +259,14 @@ local messageQueue = {}
 -- @param callback 消息回调处理
 -- @usage subscribe("NET_STATUS_IND", callback)
 function sys.subscribe(id, callback)
-    if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
-        log.warn("warning: sys.subscribe invalid parameter", id, callback)
+    --if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
+    --    log.warn("warning: sys.subscribe invalid parameter", id, callback)
+    --    return
+    --end
+    --log.debug("sys", "subscribe", id, callback)
+    if type(id) == "table" then
+        -- 支持多topic订阅
+        for _, v in pairs(id) do sys.subscribe(v, callback) end
         return
     end
     if not subscribers[id] then subscribers[id] = {} end
@@ -271,8 +277,14 @@ end
 -- @param callback 消息回调处理
 -- @usage unsubscribe("NET_STATUS_IND", callback)
 function sys.unsubscribe(id, callback)
-    if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
-        log.warn("warning: sys.unsubscribe invalid parameter", id, callback)
+    --if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
+    --    log.warn("warning: sys.unsubscribe invalid parameter", id, callback)
+    --    return
+    --end
+    --log.debug("sys", "unsubscribe", id, callback)
+    if type(id) == "table" then
+        -- 支持多topic订阅
+        for _, v in pairs(id) do sys.unsubscribe(v, callback) end
         return
     end
     if subscribers[id] then subscribers[id][callback] = nil end

+ 12 - 4
bsp/air640w/lib/mqtt.lua

@@ -208,6 +208,7 @@ function mqtt.client(clientId, keepAlive, username, password, cleanSession, will
         return packetId
     end
     o.lastOTime = 0
+    o.pkgs = {}
     
     setmetatable(o, mqttc)
     
@@ -242,10 +243,16 @@ function mqttc:read(timeout, msg, msgNoResume)
     end
 
     local topic = "MQTTC_PKG_" .. tostring(self.io:id())
-    local result, data = sys.waitUntil(topic, timeout)
-    --log.info("mqtt.read", result, data)
+    local result, data, param = sys.waitUntil({topic, msg}, timeout)
+    --log.debug("mqtt.read", result, data, param)
     if result then -- 收到topic消息
-        return true, data
+        local pkg = table.remove(self.pkgs, 1)
+        if pkg ~= nil then
+            --log.debug("mqtt", "get packet", pkg.id, pkg.packetId)
+            return true, pkg
+        end
+        --log.debug("mqtt", "get sys.msg", msg, data)
+        return false, msg, data
     else
         if self.io:closed() == 1 then
             return false
@@ -268,7 +275,8 @@ local function update_resp(_self, data)
     if packet then
         log.info("mqttc", "msg unpack ok", packet.id)
         _self.inbuf = string.sub(_self.inbuf, nextpos)
-        sys.publish("MQTTC_PKG_" .. tostring(_self.io:id()), packet)
+        table.insert(_self.pkgs, packet)
+        sys.publish("MQTTC_PKG_" .. tostring(_self.io:id()))
         if #_self.inbuf > 0 then
             update_resp(_self, "")
         end

+ 16 - 4
bsp/air640w/lib/sys.lua

@@ -259,8 +259,14 @@ local messageQueue = {}
 -- @param callback 消息回调处理
 -- @usage subscribe("NET_STATUS_IND", callback)
 function sys.subscribe(id, callback)
-    if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
-        log.warn("warning: sys.subscribe invalid parameter", id, callback)
+    --if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
+    --    log.warn("warning: sys.subscribe invalid parameter", id, callback)
+    --    return
+    --end
+    --log.debug("sys", "subscribe", id, callback)
+    if type(id) == "table" then
+        -- 支持多topic订阅
+        for _, v in pairs(id) do sys.subscribe(v, callback) end
         return
     end
     if not subscribers[id] then subscribers[id] = {} end
@@ -271,8 +277,14 @@ end
 -- @param callback 消息回调处理
 -- @usage unsubscribe("NET_STATUS_IND", callback)
 function sys.unsubscribe(id, callback)
-    if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
-        log.warn("warning: sys.unsubscribe invalid parameter", id, callback)
+    --if not id or type(id) == "boolean" or (type(callback) ~= "function" and type(callback) ~= "thread") then
+    --    log.warn("warning: sys.unsubscribe invalid parameter", id, callback)
+    --    return
+    --end
+    --log.debug("sys", "unsubscribe", id, callback)
+    if type(id) == "table" then
+        -- 支持多topic订阅
+        for _, v in pairs(id) do sys.unsubscribe(v, callback) end
         return
     end
     if subscribers[id] then subscribers[id][callback] = nil end