mqtt.lua 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. --- 模块功能:MQTT客户端
  2. -- @module mqtt
  3. -- @author openLuat
  4. -- @license MIT
  5. -- @copyright openLuat
  6. -- @release 2017.10.24
  7. local mqtt = {}
  8. -- MQTT 指令id
  9. local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
  10. local CLIENT_COMMAND_TIMEOUT = 60000
  11. sys = require("sys")
  12. local pack = _G.pack
  13. local string = _G.string
  14. local encodeLen = mqttcore.encodeLen
  15. --local encodeUTF8 = mqttcore.encodeUTF8
  16. -- local function encodeLen(len)
  17. -- local s = ""
  18. -- local digit
  19. -- repeat
  20. -- digit = len % 128
  21. -- len = (len - digit) / 128
  22. -- if len > 0 then
  23. -- --digit = bit.bor(digit, 0x80)
  24. -- digit = digit | 0x80
  25. -- end
  26. -- s = s .. string.char(digit)
  27. -- until (len <= 0)
  28. -- return s
  29. -- end
  30. local encodeUTF8 = mqttcore.encodeUTF8
  31. -- local function encodeUTF8(s)
  32. -- if not s or #s == 0 then
  33. -- return ""
  34. -- else
  35. -- return pack.pack(">P", s)
  36. -- end
  37. -- end
  38. local packCONNECT = mqttcore.packCONNECT
  39. -- local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
  40. -- local content = pack.pack(">PbbHPAAAA",
  41. -- version == "3.1" and "MQIsdp" or "MQTT",
  42. -- version == "3.1" and 3 or 4,
  43. -- (#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
  44. -- keepAlive,
  45. -- clientId,
  46. -- encodeUTF8(will.topic),
  47. -- encodeUTF8(will.payload),
  48. -- encodeUTF8(username),
  49. -- encodeUTF8(password))
  50. -- local mydata = pack.pack(">bAA",
  51. -- CONNECT * 16,
  52. -- encodeLen(string.len(content)),
  53. -- content)
  54. -- local tdata = mqttcore.packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
  55. -- log.info("mqtt", "true", mydata:toHex())
  56. -- log.info("mqtt", "false", tdata:toHex())
  57. -- return mydata
  58. -- end
  59. local packSUBSCRIBE = mqttcore.packSUBSCRIBE
  60. -- local function packSUBSCRIBE(dup, packetId, topics)
  61. -- local header = SUBSCRIBE * 16 + dup * 8 + 2
  62. -- local data = pack.pack(">H", packetId)
  63. -- for topic, qos in pairs(topics) do
  64. -- data = data .. pack.pack(">Pb", topic, qos)
  65. -- end
  66. -- local mydata = pack.pack(">bAA", header, encodeLen(#data), data)
  67. -- log.info("mqtt", "true", mydata:toHex())
  68. -- local tdata = mqttcore.packSUBSCRIBE(dup, packetId, topics)
  69. -- log.info("mqtt", "false", tdata:toHex())
  70. -- return mydata
  71. -- end
  72. local packUNSUBSCRIBE = mqttcore.packUNSUBSCRIBE
  73. -- local function packUNSUBSCRIBE(dup, packetId, topics)
  74. -- local header = UNSUBSCRIBE * 16 + dup * 8 + 2
  75. -- local data = pack.pack(">H", packetId)
  76. -- for k, topic in pairs(topics) do
  77. -- data = data .. pack.pack(">P", topic)
  78. -- end
  79. -- return pack.pack(">bAA", header, encodeLen(#data), data)
  80. -- end
  81. local packPUBLISH = mqttcore.packPUBLISH
  82. -- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
  83. -- local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
  84. -- local len = 2 + #topic + #payload
  85. -- local mydata = nil
  86. -- if qos > 0 then
  87. -- mydata = pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
  88. -- else
  89. -- mydata = pack.pack(">bAPA", header, encodeLen(len), topic, payload)
  90. -- end
  91. -- local tdata = mqttcore.packPUBLISH(dup, qos, retain, packetId, topic, payload)
  92. -- log.info("mqtt", "true", mydata:toHex())
  93. -- log.info("mqtt", "false", tdata:toHex())
  94. -- return mydata
  95. -- end
  96. local packACK = mqttcore.packACK
  97. -- local function packACK(id, dup, packetId)
  98. -- return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
  99. -- end
  100. local packZeroData = mqttcore.packZeroData
  101. -- local function packZeroData(id, dup, qos, retain)
  102. -- dup = dup or 0
  103. -- qos = qos or 0
  104. -- retain = retain or 0
  105. -- return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
  106. -- end
  107. local function unpack(s)
  108. if #s < 2 then return end
  109. log.debug("mqtt.unpack", #s, string.toHex(string.sub(s, 1, 50)))
  110. -- read remaining length
  111. local len = 0
  112. local multiplier = 1
  113. local pos = 2
  114. repeat
  115. if pos > #s then return end
  116. local digit = string.byte(s, pos)
  117. len = len + ((digit % 128) * multiplier)
  118. multiplier = multiplier * 128
  119. pos = pos + 1
  120. until digit < 128
  121. if #s < len + pos - 1 then return end
  122. local header = string.byte(s, 1)
  123. --local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
  124. local packet = {id = (header - (header % 16)) >> 4, dup = ((header % 16) - ((header % 16) % 8)) >> 3, qos = (header & 0x06) >> 1, retain = (header & 0x01)}
  125. local nextpos
  126. if packet.id == CONNACK then
  127. nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
  128. elseif packet.id == PUBLISH then
  129. nextpos, packet.topic = pack.unpack(s, ">P", pos)
  130. if packet.qos > 0 then
  131. nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
  132. end
  133. packet.payload = string.sub(s, nextpos, pos + len - 1)
  134. elseif packet.id ~= PINGRESP then
  135. if len >= 2 then
  136. nextpos, packet.packetId = pack.unpack(s, ">H", pos)
  137. else
  138. packet.packetId = 0
  139. end
  140. end
  141. return packet, pos + len
  142. end
  143. local mqttc = {}
  144. mqttc.__index = mqttc
  145. --- 创建一个mqtt client实例
  146. -- @string clientId
  147. -- @number[opt=300] keepAlive 心跳间隔(单位为秒),默认300秒
  148. -- @string[opt=""] username 用户名,用户名为空配置为""或者nil
  149. -- @string[opt=""] password 密码,密码为空配置为""或者nil
  150. -- @number[opt=1] cleanSession 1/0
  151. -- @table[opt=nil] will 遗嘱参数,格式为{qos=, retain=, topic=, payload=}
  152. -- @string[opt="3.1.1"] version MQTT版本号
  153. -- @return table mqttc client实例
  154. -- @usage
  155. -- mqttc = mqtt.client("clientid-123")
  156. -- mqttc = mqtt.client("clientid-123",200)
  157. -- mqttc = mqtt.client("clientid-123",nil,"user","password")
  158. -- mqttc = mqtt.client("clientid-123",nil,"user","password",nil,nil,"3.1")
  159. function mqtt.client(clientId, keepAlive, username, password, cleanSession, will, version)
  160. local o = {}
  161. local packetId = 1
  162. if will then
  163. will.flag = 1
  164. else
  165. will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
  166. end
  167. o.clientId = clientId
  168. o.keepAlive = keepAlive or 300
  169. o.username = username or ""
  170. o.password = password or ""
  171. o.cleanSession = cleanSession or 1
  172. o.version = version or "3.1.1"
  173. o.will = will
  174. o.commandTimeout = CLIENT_COMMAND_TIMEOUT
  175. o.cache = {}-- 接收到的mqtt数据包缓冲
  176. o.inbuf = "" -- 未完成的数据缓冲
  177. o.connected = false
  178. o.getNextPacketId = function()
  179. packetId = packetId == 65535 and 1 or (packetId + 1)
  180. return packetId
  181. end
  182. o.lastOTime = 0
  183. o.pkgs = {}
  184. setmetatable(o, mqttc)
  185. return o
  186. end
  187. -- 检测是否需要发送心跳包
  188. function mqttc:checkKeepAlive()
  189. if self.keepAlive == 0 then return true end
  190. if os.time() - self.lastOTime >= self.keepAlive then
  191. if not self:write(packZeroData(PINGREQ)) then
  192. log.info("mqtt.client:", "pingreq send fail")
  193. return false
  194. end
  195. end
  196. return true
  197. end
  198. -- 发送mqtt数据
  199. function mqttc:write(data)
  200. log.debug("mqtt.client:write", string.toHex(string.sub(data, 1, 50)))
  201. local r = self.io:send(data)
  202. if r then self.lastOTime = os.time() end
  203. return r
  204. end
  205. -- 接收mqtt数据包
  206. function mqttc:read(timeout, msg, msgNoResume)
  207. if not self:checkKeepAlive() then
  208. log.warn("mqtt.read checkKeepAlive fail")
  209. return false
  210. end
  211. local topic = "MQTTC_PKG_" .. tostring(self.io:id())
  212. local result, data, param = sys.waitUntil({topic, msg}, timeout)
  213. --log.debug("mqtt.read", result, data, param)
  214. if result then -- 收到topic消息
  215. local pkg = table.remove(self.pkgs, 1)
  216. if pkg ~= nil then
  217. --log.debug("mqtt", "get packet", pkg.id, pkg.packetId)
  218. return true, pkg
  219. end
  220. --log.debug("mqtt", "get sys.msg", msg, data)
  221. return false, msg, data
  222. else
  223. if self.io:closed() == 1 then
  224. return false
  225. else
  226. return false, "timeout"
  227. end
  228. end
  229. end
  230. local function update_resp(_self, data)
  231. if #data > 0 then
  232. if #_self.inbuf > 0 then
  233. _self.inbuf = _self.inbuf .. data
  234. else
  235. _self.inbuf = data
  236. end
  237. end
  238. --log.debug("mqttc", "data recv to unpack", _self.inbuf:toHex())
  239. local packet, nextpos = unpack(_self.inbuf)
  240. if packet then
  241. log.info("mqttc", "msg unpack ok", packet.id)
  242. _self.inbuf = string.sub(_self.inbuf, nextpos)
  243. table.insert(_self.pkgs, packet)
  244. sys.publish("MQTTC_PKG_" .. tostring(_self.io:id()))
  245. if #_self.inbuf > 0 then
  246. update_resp(_self, "")
  247. end
  248. else
  249. log.info("mqttc", "data not full")
  250. end
  251. return true
  252. end
  253. -- 等待接收指定的mqtt消息
  254. function mqttc:waitfor(id, timeout, msg, msgNoResume)
  255. for index, packet in ipairs(self.cache) do
  256. if packet.id == id then
  257. return true, table.remove(self.cache, index)
  258. end
  259. end
  260. while true do
  261. local insertCache = true
  262. local r, data, param = self:read(timeout, msg, msgNoResume)
  263. if r then
  264. if data.id == PUBLISH then
  265. if data.qos > 0 then
  266. if not self:write(packACK(data.qos == 1 and PUBACK or PUBREC, 0, data.packetId)) then
  267. log.info("mqtt.client:waitfor", "send publish ack failed", data.qos)
  268. return false
  269. end
  270. end
  271. elseif data.id == PUBREC or data.id == PUBREL then
  272. if not self:write(packACK(data.id == PUBREC and PUBREL or PUBCOMP, 0, data.packetId)) then
  273. log.info("mqtt.client:waitfor", "send ack fail", data.id == PUBREC and "PUBREC" or "PUBCOMP")
  274. return false
  275. end
  276. insertCache = false
  277. end
  278. if data.id == id then
  279. return true, data
  280. end
  281. if insertCache then table.insert(self.cache, data) end
  282. else
  283. return false, data, param
  284. end
  285. end
  286. end
  287. --- 连接mqtt服务器
  288. -- @string host 服务器地址
  289. -- @param port string或者number类型,服务器端口
  290. -- @string[opt="tcp"] transport "tcp"或者"tcp_ssl"
  291. -- @table[opt=nil] cert,table或者nil类型,ssl证书,当transport为"tcp_ssl"时,此参数才有意义。cert格式如下:
  292. -- {
  293. -- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
  294. -- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
  295. -- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
  296. -- clientPassword = "123456", --客户端证书文件密码[可选]
  297. -- }
  298. -- @number timeout, 链接服务器最长超时时间
  299. -- @return result true表示成功,false或者nil表示失败
  300. -- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
  301. function mqttc:connect(host, port, transport, cert, timeout)
  302. if self.connected then
  303. log.info("mqtt.client:connect", "has connected")
  304. return false
  305. end
  306. if self.io then
  307. self.io:close()
  308. self.io = nil
  309. end
  310. if transport and transport ~= "tcp" and transport ~= "tcp_ssl" then
  311. log.info("mqtt.client:connect", "invalid transport", transport)
  312. return false
  313. end
  314. self.io = socket.tcp(transport == "tcp_ssl" or type(cert) == "table", cert)
  315. if not self.io:connect(host, port, timeout) then
  316. log.info("mqtt.client:connect", "connect host fail")
  317. return false
  318. end
  319. if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will, self.version)) then
  320. log.info("mqtt.client:connect", "send fail")
  321. return false
  322. end
  323. local r, packet = self:waitfor(CONNACK, self.commandTimeout, nil, true)
  324. -- if not r or packet.rc ~= 0 then
  325. -- log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
  326. -- return false,packet.rc
  327. -- end
  328. if (not r) or (not packet) or packet.rc ~= 0 then
  329. log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
  330. return false, packet and packet.rc or -1
  331. end
  332. self.connected = true
  333. return true
  334. end
  335. --- 订阅主题
  336. -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
  337. -- @param[opt=0] qos,number或者nil,topic为一个主题时,qos为number类型(0/1/2,默认0);topic为多个主题时,qos为nil
  338. -- @return bool true表示成功,false或者nil表示失败
  339. -- @usage
  340. -- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
  341. -- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
  342. function mqttc:subscribe(topic, qos)
  343. if not self.connected then
  344. log.info("mqtt.client:subscribe", "not connected")
  345. return false
  346. end
  347. local topics
  348. if type(topic) == "string" then
  349. topics = {[topic] = qos and qos or 0}
  350. else
  351. topics = topic
  352. end
  353. if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
  354. log.info("mqtt.client:subscribe", "send failed")
  355. return false
  356. end
  357. local r, packet = self:waitfor(SUBACK, self.commandTimeout, nil, true)
  358. if not r then
  359. log.info("mqtt.client:subscribe", "wait ack failed")
  360. return false
  361. end
  362. if not (packet.grantedQos and packet.grantedQos~="" and not packet.grantedQos:match(string.char(0x80))) then
  363. log.info("mqtt.client:subscribe", "suback grant qos error", packet.grantedQos)
  364. return false
  365. end
  366. return true
  367. end
  368. --- 取消订阅主题
  369. -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
  370. -- @return bool true表示成功,false或者nil表示失败
  371. -- @usage
  372. -- mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
  373. -- mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
  374. function mqttc:unsubscribe(topic)
  375. if not self.connected then
  376. log.info("mqtt.client:unsubscribe", "not connected")
  377. return false
  378. end
  379. local topics
  380. if type(topic) == "string" then
  381. topics = {topic}
  382. else
  383. topics = topic
  384. end
  385. if not self:write(packUNSUBSCRIBE(0, self.getNextPacketId(), topics)) then
  386. log.info("mqtt.client:unsubscribe", "send failed")
  387. return false
  388. end
  389. if not self:waitfor(UNSUBACK, self.commandTimeout, nil, true) then
  390. log.info("mqtt.client:unsubscribe", "wait ack failed")
  391. return false
  392. end
  393. return true
  394. end
  395. --- 发布一条消息
  396. -- @string topic UTF8编码的字符串
  397. -- @string payload 用户自己控制payload的编码,mqtt.lua不会对payload做任何编码转换
  398. -- @number[opt=0] qos 0/1/2, default 0
  399. -- @number[opt=0] retain 0或者1
  400. -- @return bool 发布成功返回true,失败返回false
  401. -- @usage
  402. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  403. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  404. -- mqttc:publish("/topic", "publish from luat mqtt client", 0)
  405. function mqttc:publish(topic, payload, qos, retain)
  406. if not self.connected then
  407. log.info("mqtt.client:publish", "not connected")
  408. return false
  409. end
  410. qos = qos or 0
  411. retain = retain or 0
  412. if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
  413. log.info("mqtt.client:publish", "socket send failed")
  414. return false
  415. end
  416. if qos == 0 then return true end
  417. if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout, nil, true) then
  418. log.warn("mqtt.client:publish", "wait ack timeout")
  419. return false
  420. end
  421. return true
  422. end
  423. --- 接收消息
  424. -- @number timeout 接收超时时间,单位毫秒
  425. -- @string[opt=nil] msg 可选参数,控制socket所在的线程退出recv阻塞状态
  426. -- @return result 数据接收结果,true表示成功,false表示失败
  427. -- @return data
  428. -- 如果result为true,表示服务器发过来的mqtt包
  429. --
  430. -- 如果result为false,超时失败,data为"timeout"
  431. -- 如果result为false,msg控制退出,data为msg的字符串
  432. -- 如果result为false,socket连接被动断开控制退出,data为"CLOSED"
  433. -- 如果result为false,PDP断开连接控制退出,data为"IP_ERROR_IND"
  434. --
  435. -- 如果result为false,mqtt不处于连接状态,data为nil
  436. -- 如果result为false,收到了PUBLISH报文,发送PUBACK或者PUBREC报文失败,data为nil
  437. -- 如果result为false,收到了PUBREC报文,发送PUBREL报文失败,data为nil
  438. -- 如果result为false,收到了PUBREL报文,发送PUBCOMP报文失败,data为nil
  439. -- 如果result为false,发送PINGREQ报文失败,data为nil
  440. -- @return param 如果是msg控制退出,param的值是msg的参数;其余情况无意义,为nil
  441. -- @usage
  442. -- true, packet = mqttc:receive(2000)
  443. -- false, error_message = mqttc:receive(2000)
  444. -- false, msg, para = mqttc:receive(2000,"APP_SEND_DATA")
  445. function mqttc:receive(timeout, msg)
  446. if not self.connected then
  447. log.info("mqtt.client:receive", "not connected")
  448. return false
  449. end
  450. return self:waitfor(PUBLISH, timeout, msg)
  451. end
  452. --- 断开与服务器的连接
  453. -- @return nil
  454. -- @usage
  455. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  456. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  457. -- process data
  458. -- mqttc:disconnect()
  459. function mqttc:disconnect()
  460. if self.io then
  461. if self.connected then self:write(packZeroData(DISCONNECT)) end
  462. self.io:close()
  463. self.io = nil
  464. end
  465. self.cache = {}
  466. self.inbuf = ""
  467. self.connected = false
  468. end
  469. return mqtt