|
|
@@ -73,38 +73,40 @@ local mqtt2 = {}
|
|
|
local mqttc = {}
|
|
|
mqttc.__index = mqttc
|
|
|
|
|
|
-function mqtt2.new(clientId, keepAlive, username, password, cleanSession, host, port, topics, cb)
|
|
|
+function mqtt2.new(clientId, keepAlive, username, password, cleanSession, host, port, topics, cb, ckey)
|
|
|
local c = {
|
|
|
clientId = clientId,
|
|
|
keepAlive = keepAlive or 300,
|
|
|
username = username or "",
|
|
|
password = password or "",
|
|
|
- cleanSession = cleanSession,
|
|
|
+ cleanSession = cleanSession == nil and 1 or 0,
|
|
|
host = host,
|
|
|
port = port,
|
|
|
lping = 0,
|
|
|
stat = 0, -- 状态 0, 未连接, 1 已连接成功
|
|
|
nextid = 0, -- pkg的ID
|
|
|
- running = true,
|
|
|
- ckey = "",
|
|
|
+ running = false,
|
|
|
inpkgs = {},
|
|
|
outpkgs = {},
|
|
|
buff = "",
|
|
|
topics = topics or {},
|
|
|
- cb = cb
|
|
|
+ cb = cb,
|
|
|
+ ckey = ckey or ""
|
|
|
}
|
|
|
- c.ckey = "mqtt_" .. tostring(c)
|
|
|
+ if c.ckey == "" then c.ckey = "mqtt_" .. tostring(c) end
|
|
|
--mclog("mqtt", "MQTT Client Key", c.ckey)
|
|
|
setmetatable(c, mqttc)
|
|
|
return c
|
|
|
end
|
|
|
|
|
|
+-- 内部方法, 用于获取下一个pkg的id
|
|
|
function mqttc:genId()
|
|
|
self.nextid = self.nextid == 65535 and 1 or (self.nextid + 1)
|
|
|
--mclog("mqtt", "next packet id", self.nextid)
|
|
|
return self.nextid
|
|
|
end
|
|
|
|
|
|
+-- 内部方法,用于处理待处理的数据包
|
|
|
function mqttc:handle(netc)
|
|
|
local mc = self
|
|
|
-- 先处理服务器下发的数据包
|
|
|
@@ -173,10 +175,17 @@ function mqttc:handle(netc)
|
|
|
end
|
|
|
end
|
|
|
|
|
|
-function mqttc:start()
|
|
|
+-- 启动mqtt task, 要么在task里面执行, 要么新建一个task执行本方法
|
|
|
+function mqttc:run()
|
|
|
local mc = self
|
|
|
+ mc.running = true
|
|
|
while mc.running do
|
|
|
if socket.isReady() then
|
|
|
+ -- 先复位全部临时对象
|
|
|
+ mc.buff = ""
|
|
|
+ mc.inpkgs = {}
|
|
|
+ mc.outpkgs = {}
|
|
|
+ -- 建立socket对象
|
|
|
--mclog("mqtt", "try connect")
|
|
|
local netc = socket.tcp()
|
|
|
netc:host(mc.host)
|
|
|
@@ -223,11 +232,13 @@ function mqttc:start()
|
|
|
--mclog("mqtt", "handle/timeout/ping", (netc:closed()))
|
|
|
end
|
|
|
end
|
|
|
- mc.stat = 0
|
|
|
-- 清理socket上下文
|
|
|
mclog("mqtt", "clean up")
|
|
|
netc:clean()
|
|
|
netc:close()
|
|
|
+ -- 将所有状态复位
|
|
|
+ mc.stat = 0
|
|
|
+
|
|
|
mclog("mqtt", "wait 5s for next loop")
|
|
|
sys.wait(5*1000) -- TODO 使用级数递增进行延时
|
|
|
else
|
|
|
@@ -238,12 +249,14 @@ function mqttc:start()
|
|
|
mclog("mqtt", self.ckey, "exit")
|
|
|
end
|
|
|
|
|
|
+-- 订阅topic, table形式
|
|
|
function mqttc:sub(topics)
|
|
|
table.insert(self.outpkgs, packSUBSCRIBE(0, self:genId(), topics))
|
|
|
sys.publish(self.ckey)
|
|
|
return sys.waitUntil(self.ckey .. "SUBACK", 30000)
|
|
|
end
|
|
|
|
|
|
+-- 上报数据
|
|
|
function mqttc:pub(topic, qos, payload)
|
|
|
-- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
|
|
|
table.insert(self.outpkgs, packPUBLISH(0, qos, 0, qos and self:genId() or 0, topic, payload))
|