|
|
@@ -12,7 +12,7 @@ local mqtt = {}
|
|
|
local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
|
|
|
local CLIENT_COMMAND_TIMEOUT = 60000
|
|
|
|
|
|
-local sys = require "sys"
|
|
|
+sys = require("sys")
|
|
|
local pack = _G.pack
|
|
|
local string = _G.string
|
|
|
local encodeLen = mqttcore.encodeLen
|
|
|
@@ -124,12 +124,12 @@ local packZeroData = mqttcore.packZeroData
|
|
|
local function unpack(s)
|
|
|
if #s < 2 then return end
|
|
|
log.debug("mqtt.unpack", #s, string.toHex(string.sub(s, 1, 50)))
|
|
|
-
|
|
|
+
|
|
|
-- read remaining length
|
|
|
local len = 0
|
|
|
local multiplier = 1
|
|
|
local pos = 2
|
|
|
-
|
|
|
+
|
|
|
repeat
|
|
|
if pos > #s then return end
|
|
|
local digit = string.byte(s, pos)
|
|
|
@@ -137,15 +137,15 @@ local function unpack(s)
|
|
|
multiplier = multiplier * 128
|
|
|
pos = pos + 1
|
|
|
until digit < 128
|
|
|
-
|
|
|
+
|
|
|
if #s < len + pos - 1 then return end
|
|
|
-
|
|
|
+
|
|
|
local header = string.byte(s, 1)
|
|
|
-
|
|
|
+
|
|
|
--local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
|
|
|
local packet = {id = (header - (header % 16)) >> 4, dup = ((header % 16) - ((header % 16) % 8)) >> 3, qos = (header & 0x06) >> 1, retain = (header & 0x01)}
|
|
|
local nextpos
|
|
|
-
|
|
|
+
|
|
|
if packet.id == CONNACK then
|
|
|
nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
|
|
|
elseif packet.id == PUBLISH then
|
|
|
@@ -161,7 +161,7 @@ local function unpack(s)
|
|
|
packet.packetId = 0
|
|
|
end
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
return packet, pos + len
|
|
|
end
|
|
|
|
|
|
@@ -185,13 +185,13 @@ mqttc.__index = mqttc
|
|
|
function mqtt.client(clientId, keepAlive, username, password, cleanSession, will, version)
|
|
|
local o = {}
|
|
|
local packetId = 1
|
|
|
-
|
|
|
+
|
|
|
if will then
|
|
|
will.flag = 1
|
|
|
else
|
|
|
will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
o.clientId = clientId
|
|
|
o.keepAlive = keepAlive or 300
|
|
|
o.username = username or ""
|
|
|
@@ -209,9 +209,9 @@ function mqtt.client(clientId, keepAlive, username, password, cleanSession, will
|
|
|
end
|
|
|
o.lastOTime = 0
|
|
|
o.pkgs = {}
|
|
|
-
|
|
|
+
|
|
|
setmetatable(o, mqttc)
|
|
|
-
|
|
|
+
|
|
|
return o
|
|
|
end
|
|
|
|
|
|
@@ -294,7 +294,7 @@ function mqttc:waitfor(id, timeout, msg, msgNoResume)
|
|
|
return true, table.remove(self.cache, index)
|
|
|
end
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
while true do
|
|
|
local insertCache = true
|
|
|
local r, data, param = self:read(timeout, msg, msgNoResume)
|
|
|
@@ -313,7 +313,7 @@ function mqttc:waitfor(id, timeout, msg, msgNoResume)
|
|
|
end
|
|
|
insertCache = false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if data.id == id then
|
|
|
return true, data
|
|
|
end
|
|
|
@@ -343,29 +343,29 @@ function mqttc:connect(host, port, transport, cert, timeout)
|
|
|
log.info("mqtt.client:connect", "has connected")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if self.io then
|
|
|
self.io:close()
|
|
|
self.io = nil
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if transport and transport ~= "tcp" and transport ~= "tcp_ssl" then
|
|
|
log.info("mqtt.client:connect", "invalid transport", transport)
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
self.io = socket.tcp(transport == "tcp_ssl" or type(cert) == "table", cert)
|
|
|
-
|
|
|
+
|
|
|
if not self.io:connect(host, port, timeout) then
|
|
|
log.info("mqtt.client:connect", "connect host fail")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will, self.version)) then
|
|
|
log.info("mqtt.client:connect", "send fail")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
local r, packet = self:waitfor(CONNACK, self.commandTimeout, nil, true)
|
|
|
-- if not r or packet.rc ~= 0 then
|
|
|
-- log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
|
|
|
@@ -375,9 +375,9 @@ function mqttc:connect(host, port, transport, cert, timeout)
|
|
|
log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
|
|
|
return false, packet and packet.rc or -1
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
self.connected = true
|
|
|
-
|
|
|
+
|
|
|
return true
|
|
|
end
|
|
|
|
|
|
@@ -393,30 +393,30 @@ function mqttc:subscribe(topic, qos)
|
|
|
log.info("mqtt.client:subscribe", "not connected")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
local topics
|
|
|
if type(topic) == "string" then
|
|
|
topics = {[topic] = qos and qos or 0}
|
|
|
else
|
|
|
topics = topic
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
|
|
|
log.info("mqtt.client:subscribe", "send failed")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
local r, packet = self:waitfor(SUBACK, self.commandTimeout, nil, true)
|
|
|
if not r then
|
|
|
log.info("mqtt.client:subscribe", "wait ack failed")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if not (packet.grantedQos and packet.grantedQos~="" and not packet.grantedQos:match(string.char(0x80))) then
|
|
|
log.info("mqtt.client:subscribe", "suback grant qos error", packet.grantedQos)
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
return true
|
|
|
end
|
|
|
|
|
|
@@ -431,24 +431,24 @@ function mqttc:unsubscribe(topic)
|
|
|
log.info("mqtt.client:unsubscribe", "not connected")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
local topics
|
|
|
if type(topic) == "string" then
|
|
|
topics = {topic}
|
|
|
else
|
|
|
topics = topic
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if not self:write(packUNSUBSCRIBE(0, self.getNextPacketId(), topics)) then
|
|
|
log.info("mqtt.client:unsubscribe", "send failed")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if not self:waitfor(UNSUBACK, self.commandTimeout, nil, true) then
|
|
|
log.info("mqtt.client:unsubscribe", "wait ack failed")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
return true
|
|
|
end
|
|
|
|
|
|
@@ -467,22 +467,22 @@ function mqttc:publish(topic, payload, qos, retain)
|
|
|
log.info("mqtt.client:publish", "not connected")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
qos = qos or 0
|
|
|
retain = retain or 0
|
|
|
-
|
|
|
+
|
|
|
if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
|
|
|
log.info("mqtt.client:publish", "socket send failed")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
if qos == 0 then return true end
|
|
|
-
|
|
|
+
|
|
|
if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout, nil, true) then
|
|
|
log.warn("mqtt.client:publish", "wait ack timeout")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
return true
|
|
|
end
|
|
|
|
|
|
@@ -490,7 +490,7 @@ end
|
|
|
-- @number timeout 接收超时时间,单位毫秒
|
|
|
-- @string[opt=nil] msg 可选参数,控制socket所在的线程退出recv阻塞状态
|
|
|
-- @return result 数据接收结果,true表示成功,false表示失败
|
|
|
--- @return data
|
|
|
+-- @return data
|
|
|
-- 如果result为true,表示服务器发过来的mqtt包
|
|
|
--
|
|
|
-- 如果result为false,超时失败,data为"timeout"
|
|
|
@@ -513,7 +513,7 @@ function mqttc:receive(timeout, msg)
|
|
|
log.info("mqtt.client:receive", "not connected")
|
|
|
return false
|
|
|
end
|
|
|
-
|
|
|
+
|
|
|
return self:waitfor(PUBLISH, timeout, msg)
|
|
|
end
|
|
|
|