mqtt.lua 17 KB


  1. --- 模块功能:MQTT客户端
  2. -- @module mqtt
  3. -- @author openLuat
  4. -- @license MIT
  5. -- @copyright openLuat
  6. -- @release 2017.10.24
  7. local mqtt = {}
  8. -- MQTT 指令id
  9. local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
  10. local CLIENT_COMMAND_TIMEOUT = 60000
  11. local sys = require "sys"
  12. local pack = _G.pack
  13. local string = _G.string
  14. local encodeLen = mqttcore.encodeLen
  15. --local encodeUTF8 = mqttcore.encodeUTF8
  16. -- local function encodeLen(len)
  17. -- local s = ""
  18. -- local digit
  19. -- repeat
  20. -- digit = len % 128
  21. -- len = (len - digit) / 128
  22. -- if len > 0 then
  23. -- --digit = bit.bor(digit, 0x80)
  24. -- digit = digit | 0x80
  25. -- end
  26. -- s = s .. string.char(digit)
  27. -- until (len <= 0)
  28. -- return s
  29. -- end
  30. local function encodeUTF8(s)
  31. if not s or #s == 0 then
  32. return ""
  33. else
  34. return pack.pack(">P", s)
  35. end
  36. end
  37. local function packCONNECT(clientId, keepAlive, username, password, cleanSession, will, version)
  38. local content = pack.pack(">PbbHPAAAA",
  39. version == "3.1" and "MQIsdp" or "MQTT",
  40. version == "3.1" and 3 or 4,
  41. (#username == 0 and 0 or 1) * 128 + (#password == 0 and 0 or 1) * 64 + will.retain * 32 + will.qos * 8 + will.flag * 4 + cleanSession * 2,
  42. keepAlive,
  43. clientId,
  44. encodeUTF8(will.topic),
  45. encodeUTF8(will.payload),
  46. encodeUTF8(username),
  47. encodeUTF8(password))
  48. return pack.pack(">bAA",
  49. CONNECT * 16,
  50. encodeLen(string.len(content)),
  51. content)
  52. end
  53. local function packSUBSCRIBE(dup, packetId, topics)
  54. local header = SUBSCRIBE * 16 + dup * 8 + 2
  55. local data = pack.pack(">H", packetId)
  56. for topic, qos in pairs(topics) do
  57. data = data .. pack.pack(">Pb", topic, qos)
  58. end
  59. return pack.pack(">bAA", header, encodeLen(#data), data)
  60. end
  61. local function packUNSUBSCRIBE(dup, packetId, topics)
  62. local header = UNSUBSCRIBE * 16 + dup * 8 + 2
  63. local data = pack.pack(">H", packetId)
  64. for k, topic in pairs(topics) do
  65. data = data .. pack.pack(">P", topic)
  66. end
  67. return pack.pack(">bAA", header, encodeLen(#data), data)
  68. end
  69. local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
  70. local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
  71. local len = 2 + #topic + #payload
  72. if qos > 0 then
  73. return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
  74. else
  75. return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
  76. end
  77. end
  78. local function packACK(id, dup, packetId)
  79. return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
  80. end
  81. local function packZeroData(id, dup, qos, retain)
  82. dup = dup or 0
  83. qos = qos or 0
  84. retain = retain or 0
  85. return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
  86. end
  87. local function unpack(s)
  88. if #s < 2 then return end
  89. log.debug("mqtt.unpack", #s, string.toHex(string.sub(s, 1, 50)))
  90. -- read remaining length
  91. local len = 0
  92. local multiplier = 1
  93. local pos = 2
  94. repeat
  95. if pos > #s then return end
  96. local digit = string.byte(s, pos)
  97. len = len + ((digit % 128) * multiplier)
  98. multiplier = multiplier * 128
  99. pos = pos + 1
  100. until digit < 128
  101. if #s < len + pos - 1 then return end
  102. local header = string.byte(s, 1)
  103. --local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = bit.band(header, 0x06) / 2, retain = bit.band(header, 0x01)}
  104. local packet = {id = (header - (header % 16)) / 16, dup = ((header % 16) - ((header % 16) % 8)) / 8, qos = (header & 0x06) / 2, retain = (header & 0x01)}
  105. local nextpos
  106. if packet.id == CONNACK then
  107. nextpos, packet.ackFlag, packet.rc = pack.unpack(s, "bb", pos)
  108. elseif packet.id == PUBLISH then
  109. nextpos, packet.topic = pack.unpack(s, ">P", pos)
  110. if packet.qos > 0 then
  111. nextpos, packet.packetId = pack.unpack(s, ">H", nextpos)
  112. end
  113. packet.payload = string.sub(s, nextpos, pos + len - 1)
  114. elseif packet.id ~= PINGRESP then
  115. if len >= 2 then
  116. nextpos, packet.packetId = pack.unpack(s, ">H", pos)
  117. else
  118. packet.packetId = 0
  119. end
  120. end
  121. return packet, pos + len
  122. end
  123. local mqttc = {}
  124. mqttc.__index = mqttc
  125. --- 创建一个mqtt client实例
  126. -- @string clientId
  127. -- @number[opt=300] keepAlive 心跳间隔(单位为秒),默认300秒
  128. -- @string[opt=""] username 用户名,用户名为空配置为""或者nil
  129. -- @string[opt=""] password 密码,密码为空配置为""或者nil
  130. -- @number[opt=1] cleanSession 1/0
  131. -- @table[opt=nil] will 遗嘱参数,格式为{qos=, retain=, topic=, payload=}
  132. -- @string[opt="3.1.1"] version MQTT版本号
  133. -- @return table mqttc client实例
  134. -- @usage
  135. -- mqttc = mqtt.client("clientid-123")
  136. -- mqttc = mqtt.client("clientid-123",200)
  137. -- mqttc = mqtt.client("clientid-123",nil,"user","password")
  138. -- mqttc = mqtt.client("clientid-123",nil,"user","password",nil,nil,"3.1")
  139. function mqtt.client(clientId, keepAlive, username, password, cleanSession, will, version)
  140. local o = {}
  141. local packetId = 1
  142. if will then
  143. will.flag = 1
  144. else
  145. will = {flag = 0, qos = 0, retain = 0, topic = "", payload = ""}
  146. end
  147. o.clientId = clientId
  148. o.keepAlive = keepAlive or 300
  149. o.username = username or ""
  150. o.password = password or ""
  151. o.cleanSession = cleanSession or 1
  152. o.version = version or "3.1.1"
  153. o.will = will
  154. o.commandTimeout = CLIENT_COMMAND_TIMEOUT
  155. o.cache = {}-- 接收到的mqtt数据包缓冲
  156. o.inbuf = "" -- 未完成的数据缓冲
  157. o.connected = false
  158. o.getNextPacketId = function()
  159. packetId = packetId == 65535 and 1 or (packetId + 1)
  160. return packetId
  161. end
  162. o.lastOTime = 0
  163. setmetatable(o, mqttc)
  164. return o
  165. end
  166. -- 检测是否需要发送心跳包
  167. function mqttc:checkKeepAlive()
  168. if self.keepAlive == 0 then return true end
  169. if os.time() - self.lastOTime >= self.keepAlive then
  170. if not self:write(packZeroData(PINGREQ)) then
  171. log.info("mqtt.client:", "pingreq send fail")
  172. return false
  173. end
  174. end
  175. return true
  176. end
  177. -- 发送mqtt数据
  178. function mqttc:write(data)
  179. log.debug("mqtt.client:write", string.toHex(string.sub(data, 1, 50)))
  180. local r = self.io:send(data)
  181. if r then self.lastOTime = os.time() end
  182. return r
  183. end
  184. -- 接收mqtt数据包
  185. function mqttc:read(timeout, msg, msgNoResume)
  186. if not self:checkKeepAlive() then
  187. log.warn("mqtt.read checkKeepAlive fail")
  188. return false
  189. end
  190. local topic = "MQTTC_PKG_" .. tostring(self.io:id())
  191. local result, data = sys.waitUntil(topic, timeout)
  192. --log.info("mqtt.read", result, data)
  193. if result then -- 收到topic消息
  194. return true, data
  195. else
  196. if self.io:closed() == 1 then
  197. return false
  198. else
  199. return false, "timeout"
  200. end
  201. end
  202. end
  203. local function update_resp(_self, data)
  204. if #data > 0 then
  205. if #_self.inbuf > 0 then
  206. _self.inbuf = _self.inbuf .. data
  207. else
  208. _self.inbuf = data
  209. end
  210. end
  211. --log.debug("mqttc", "data recv to unpack", _self.inbuf:toHex())
  212. local packet, nextpos = unpack(_self.inbuf)
  213. if packet then
  214. log.info("mqttc", "msg unpack ok", packet.id)
  215. _self.inbuf = string.sub(_self.inbuf, nextpos)
  216. sys.publish("MQTTC_PKG_" .. tostring(_self.io:id()), packet)
  217. if #_self.inbuf > 0 then
  218. update_resp(_self, "")
  219. end
  220. else
  221. log.info("mqttc", "data not full")
  222. end
  223. return true
  224. end
  225. -- 等待接收指定的mqtt消息
  226. function mqttc:waitfor(id, timeout, msg, msgNoResume)
  227. for index, packet in ipairs(self.cache) do
  228. if packet.id == id then
  229. return true, table.remove(self.cache, index)
  230. end
  231. end
  232. while true do
  233. local insertCache = true
  234. local r, data, param = self:read(timeout, msg, msgNoResume)
  235. if r then
  236. if data.id == PUBLISH then
  237. if data.qos > 0 then
  238. if not self:write(packACK(data.qos == 1 and PUBACK or PUBREC, 0, data.packetId)) then
  239. log.info("mqtt.client:waitfor", "send publish ack failed", data.qos)
  240. return false
  241. end
  242. end
  243. elseif data.id == PUBREC or data.id == PUBREL then
  244. if not self:write(packACK(data.id == PUBREC and PUBREL or PUBCOMP, 0, data.packetId)) then
  245. log.info("mqtt.client:waitfor", "send ack fail", data.id == PUBREC and "PUBREC" or "PUBCOMP")
  246. return false
  247. end
  248. insertCache = false
  249. end
  250. if data.id == id then
  251. return true, data
  252. end
  253. if insertCache then table.insert(self.cache, data) end
  254. else
  255. return false, data, param
  256. end
  257. end
  258. end
  259. --- 连接mqtt服务器
  260. -- @string host 服务器地址
  261. -- @param port string或者number类型,服务器端口
  262. -- @string[opt="tcp"] transport "tcp"或者"tcp_ssl"
  263. -- @table[opt=nil] cert,table或者nil类型,ssl证书,当transport为"tcp_ssl"时,此参数才有意义。cert格式如下:
  264. -- {
  265. -- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
  266. -- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
  267. -- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
  268. -- clientPassword = "123456", --客户端证书文件密码[可选]
  269. -- }
  270. -- @number timeout, 链接服务器最长超时时间
  271. -- @return result true表示成功,false或者nil表示失败
  272. -- @usage mqttc = mqtt.client("clientid-123", nil, nil, false); mqttc:connect("mqttserver.com", 1883, "tcp", 5)
  273. function mqttc:connect(host, port, transport, cert, timeout)
  274. if self.connected then
  275. log.info("mqtt.client:connect", "has connected")
  276. return false
  277. end
  278. if self.io then
  279. self.io:clean()
  280. self.io:close()
  281. self.io = nil
  282. end
  283. if transport and transport ~= "tcp" and transport ~= "tcp_ssl" then
  284. log.info("mqtt.client:connect", "invalid transport", transport)
  285. return false
  286. end
  287. self.io = socket.tcp(transport == "tcp_ssl" or type(cert) == "table", cert)
  288. self.io:host(host)
  289. self.io:port(port)
  290. local connect_topic = "NETC_CONNECT_" .. tostring(self.io:id())
  291. self.io:on("connect", function(id, re)
  292. log.info("mqtt", "connect result", re, re == 1 and "OK" or "FAIL")
  293. sys.publish(connect_topic, re == 1)
  294. if re == 0 then
  295. self.io:clean()
  296. self.io:close()
  297. end
  298. end)
  299. self.io:on("recv", function(id, data)
  300. if not update_resp(self, data) then
  301. log.info("mqtt", "close connect for bad data")
  302. self.io:clean()
  303. self.io:close()
  304. end
  305. end)
  306. if not self.io:start() then
  307. self.io:clean()
  308. self.io:close()
  309. log.info("mqtt", "fail to start socket thread")
  310. return false
  311. end
  312. --log.info("mqtt", "wait for connect")
  313. local result, linked = sys.waitUntil(connect_topic, 15000)
  314. if not result then
  315. log.info("mqtt", "connect timeout")
  316. return false
  317. end
  318. if not linked or self.io:closed() == 1 then
  319. log.info("mqtt", "connect fail", result, linked, self.io:closed() == 1)
  320. return false
  321. end
  322. --log.info("mqtt", "send packCONNECT")
  323. if not self:write(packCONNECT(self.clientId, self.keepAlive, self.username, self.password, self.cleanSession, self.will, self.version)) then
  324. log.info("mqtt.client:connect", "send fail")
  325. return false
  326. end
  327. --log.info("mqtt", "waitfor CONNACK")
  328. local r, packet = self:waitfor(CONNACK, self.commandTimeout, nil, true)
  329. if not r or packet.rc ~= 0 then
  330. log.info("mqtt.client:connect", "connack error", r and packet.rc or -1)
  331. return false
  332. end
  333. self.connected = true
  334. --log.info("mqtt", "connected!~!")
  335. return true
  336. end
  337. --- 订阅主题
  338. -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
  339. -- @param[opt=0] qos,number或者nil,topic为一个主题时,qos为number类型(0/1/2,默认0);topic为多个主题时,qos为nil
  340. -- @return bool true表示成功,false或者nil表示失败
  341. -- @usage
  342. -- mqttc:subscribe("/abc", 0) -- subscribe topic "/abc" with qos = 0
  343. -- mqttc:subscribe({["/topic1"] = 0, ["/topic2"] = 1, ["/topic3"] = 2}) -- subscribe multi topic
  344. function mqttc:subscribe(topic, qos)
  345. if not self.connected then
  346. log.info("mqtt.client:subscribe", "not connected")
  347. return false
  348. end
  349. local topics
  350. if type(topic) == "string" then
  351. topics = {[topic] = qos and qos or 0}
  352. else
  353. topics = topic
  354. end
  355. if not self:write(packSUBSCRIBE(0, self.getNextPacketId(), topics)) then
  356. log.info("mqtt.client:subscribe", "send failed")
  357. return false
  358. end
  359. if not self:waitfor(SUBACK, self.commandTimeout, nil, true) then
  360. log.info("mqtt.client:subscribe", "wait ack failed")
  361. return false
  362. end
  363. return true
  364. end
  365. --- 取消订阅主题
  366. -- @param topic,string或者table类型,一个主题时为string类型,多个主题时为table类型,主题内容为UTF8编码
  367. -- @return bool true表示成功,false或者nil表示失败
  368. -- @usage
  369. -- mqttc:unsubscribe("/abc") -- unsubscribe topic "/abc"
  370. -- mqttc:unsubscribe({"/topic1", "/topic2", "/topic3"}) -- unsubscribe multi topic
  371. function mqttc:unsubscribe(topic)
  372. if not self.connected then
  373. log.info("mqtt.client:unsubscribe", "not connected")
  374. return false
  375. end
  376. local topics
  377. if type(topic) == "string" then
  378. topics = {topic}
  379. else
  380. topics = topic
  381. end
  382. if not self:write(packUNSUBSCRIBE(0, self.getNextPacketId(), topics)) then
  383. log.info("mqtt.client:unsubscribe", "send failed")
  384. return false
  385. end
  386. if not self:waitfor(UNSUBACK, self.commandTimeout, nil, true) then
  387. log.info("mqtt.client:unsubscribe", "wait ack failed")
  388. return false
  389. end
  390. return true
  391. end
  392. --- 发布一条消息
  393. -- @string topic UTF8编码的字符串
  394. -- @string payload 用户自己控制payload的编码,mqtt.lua不会对payload做任何编码转换
  395. -- @number[opt=0] qos 0/1/2, default 0
  396. -- @number[opt=0] retain 0或者1
  397. -- @return bool 发布成功返回true,失败返回false
  398. -- @usage
  399. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  400. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  401. -- mqttc:publish("/topic", "publish from luat mqtt client", 0)
  402. function mqttc:publish(topic, payload, qos, retain)
  403. if not self.connected then
  404. log.info("mqtt.client:publish", "not connected")
  405. return false
  406. end
  407. qos = qos or 0
  408. retain = retain or 0
  409. if not self:write(packPUBLISH(0, qos, retain, qos > 0 and self.getNextPacketId() or 0, topic, payload)) then
  410. log.info("mqtt.client:publish", "socket send failed")
  411. return false
  412. end
  413. if qos == 0 then return true end
  414. if not self:waitfor(qos == 1 and PUBACK or PUBCOMP, self.commandTimeout, nil, true) then
  415. log.warn("mqtt.client:publish", "wait ack timeout")
  416. return false
  417. end
  418. return true
  419. end
  420. --- 接收消息
  421. -- @number timeout 接收超时时间,单位毫秒
  422. -- @string[opt=nil] msg 可选参数,控制socket所在的线程退出recv阻塞状态
  423. -- @return result 数据接收结果,true表示成功,false表示失败
  424. -- @return data 如果result为true,表示服务器发过来的包;如果result为false,表示错误信息,超时失败时为"timeout"
  425. -- @return param msg控制退出时,返回msg的字符串
  426. -- @usage
  427. -- true, packet = mqttc:receive(2000)
  428. -- false, error_message = mqttc:receive(2000)
  429. -- false, msg, para = mqttc:receive(2000)
  430. function mqttc:receive(timeout, msg)
  431. if not self.connected then
  432. log.info("mqtt.client:receive", "not connected")
  433. return false
  434. end
  435. return self:waitfor(PUBLISH, timeout, msg)
  436. end
  437. --- 断开与服务器的连接
  438. -- @return nil
  439. -- @usage
  440. -- mqttc = mqtt.client("clientid-123", nil, nil, false)
  441. -- mqttc:connect("mqttserver.com", 1883, "tcp")
  442. -- process data
  443. -- mqttc:disconnect()
  444. function mqttc:disconnect()
  445. if self.io then
  446. if self.connected then self:write(packZeroData(DISCONNECT)) end
  447. self.io:close()
  448. self.io = nil
  449. end
  450. self.cache = {}
  451. self.inbuf = ""
  452. self.connected = false
  453. end
  454. return mqtt