mqtt2.lua 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. --[[
  2. 异步MQTT客户端
  3. 1. 自动重连
  4. 2. 异步收发信息
  5. 暂不支持的特性:
  6. 1. qos 2的消息不被支持,以后也不会添加
  7. 2. 不支持取消订阅(也许会添加,也许不会)
  8. 用法请参考demo
  9. ]]
  10. -- MQTT 指令id
  11. local CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, PINGREQ, PINGRESP, DISCONNECT = 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
  12. local CLIENT_COMMAND_TIMEOUT = 60000
  13. local packCONNECT = mqttcore.packCONNECT
  14. local packPUBLISH = mqttcore.packPUBLISH
  15. local packSUBSCRIBE = mqttcore.packSUBSCRIBE
  16. local packACK = mqttcore.packACK
  17. local packZeroData = mqttcore.packZeroData
  18. local mclog = log.debug
  19. local unpack = mqttcore.unpack
  20. local mqtt2 = {}
  21. local mqttc = {}
  22. mqttc.__index = mqttc
  23. function mqtt2.new(clientId, keepAlive, username, password, cleanSession, host, port, topics, cb, ckey)
  24. local c = {
  25. clientId = clientId,
  26. keepAlive = keepAlive or 300,
  27. username = username or "",
  28. password = password or "",
  29. cleanSession = cleanSession == nil and 1 or 0,
  30. host = host,
  31. port = port,
  32. lping = 0,
  33. stat = 0, -- 状态 0, 未连接, 1 已连接成功
  34. nextid = 0, -- pkg的ID
  35. running = false,
  36. inpkgs = {},
  37. outpkgs = {},
  38. buff = "",
  39. topics = topics or {},
  40. cb = cb,
  41. ckey = ckey or ""
  42. }
  43. if c.ckey == "" then c.ckey = "mqtt_" .. tostring(c) end
  44. --mclog("mqtt", "MQTT Client Key", c.ckey)
  45. setmetatable(c, mqttc)
  46. return c
  47. end
  48. -- 内部方法, 用于获取下一个pkg的id
  49. function mqttc:genId()
  50. self.nextid = self.nextid == 65535 and 1 or (self.nextid + 1)
  51. --mclog("mqtt", "next packet id", self.nextid)
  52. return self.nextid
  53. end
  54. -- 内部方法,用于处理待处理的数据包
  55. function mqttc:handle(netc)
  56. local mc = self
  57. -- 先处理服务器下发的数据包
  58. if #mc.inpkgs > 0 then
  59. --mclog("mqtt", "inpkgs count", #mc.inpkgs)
  60. while 1 do
  61. local pkg = table.remove( mc.inpkgs, 1 )
  62. if pkg == nil then
  63. break
  64. end
  65. -- 处理服务器下发的包
  66. --mclog("mqtt", "handle pkg", json.encode(pkg))
  67. if pkg.id == CONNACK then
  68. mc.stat = 1
  69. mc.lping = os.time()
  70. --mclog("mqtt", "GOT CONNACK")
  71. for k, v in pairs(mc.topics) do
  72. --mclog("mqtt", "sub topics", json.encode(mc.topics))
  73. netc:send(packSUBSCRIBE(0, mc:genId(), mc.topics))
  74. break
  75. end
  76. elseif pkg.id == PUBACK then
  77. --mclog("mqtt", "GOT PUBACK")
  78. sys.publish(mc.ckey .. "PUBACK")
  79. elseif pkg.id == SUBACK then
  80. --mclog("mqtt", "GOT SUBACK")
  81. sys.publish(mc.ckey .. "SUBACK")
  82. elseif pkg.id == PINGRESP then
  83. mc.lping = os.time()
  84. --mclog("mqtt", "GOT PINGRESP", mc.lping)
  85. elseif pkg.id == UNSUBACK then
  86. --mclog("mqtt", "GOT UNSUBACK")
  87. elseif pkg.id == DISCONNECT then
  88. --mclog("mqtt", "GOT DISCONNECT")
  89. elseif pkg.id == PUBLISH then
  90. --mclog("mqtt", "GOT PUBLISH", pkg.topic, pkg.qos)
  91. if pkg.qos > 0 then
  92. -- 发送PUBACK
  93. --mclog("mqtt", "send back PUBACK")
  94. table.insert( mc.outpkgs, packACK(PUBACK, 0, pkg.packetId))
  95. end
  96. if mc.cb then
  97. --mclog("mqtt", "Callback for PUBLISH", mc.cb)
  98. mc.cb(pkg)
  99. end
  100. end
  101. end
  102. end
  103. -- 处理需要上报的数据包
  104. if #mc.outpkgs > 0 then
  105. --mclog("mqtt", "outpkgs count", #mc.outpkgs)
  106. while 1 do
  107. local buff = table.remove( mc.outpkgs, 1)
  108. if buff == nil then
  109. break
  110. end
  111. --mclog("mqtt", "netc send", buff:toHex())
  112. netc:send(buff)
  113. end
  114. end
  115. -- 是否需要发心跳
  116. if mc.lping > 0 and os.time() - mc.lping > mc.keepAlive * 0.75 then
  117. --mclog("mqtt", "time for ping", mc.lping)
  118. mc.lping = os.time()
  119. netc:send(packZeroData(PINGREQ)) -- 发送心跳包
  120. end
  121. end
  122. -- 启动mqtt task, 要么在task里面执行, 要么新建一个task执行本方法
  123. function mqttc:run()
  124. local mc = self
  125. mc.running = true
  126. while mc.running do
  127. if socket.isReady() then
  128. -- 先复位全部临时对象
  129. mc.buff = ""
  130. mc.inpkgs = {}
  131. mc.outpkgs = {}
  132. -- 建立socket对象
  133. --mclog("mqtt", "try connect")
  134. local netc = socket.tcp()
  135. netc:host(mc.host)
  136. netc:port(mc.port)
  137. netc:on("connect", function(id, re)
  138. --mclog("mqtt", "connect", id , re)
  139. if re then
  140. -- 发送CONN包
  141. table.insert(mc.outpkgs, packCONNECT(mc.clientId, mc.keepAlive, mc.username, mc.password, mc.cleanSession, {topic="",payload="",qos=0,retain=0,flag=0}))
  142. sys.publish(mc.ckey)
  143. end
  144. end)
  145. netc:on("recv", function(id, data)
  146. --mclog("mqtt", "recv", id , data:sub(1, 10):toHex())
  147. mc.buff = mc.buff .. data
  148. while 1 do
  149. local packet, nextpos = unpack(mc.buff)
  150. if not packet then
  151. if #mc.buff > 4096 then
  152. log.warn("mqtt", "packet is too big!!!")
  153. netc:close()
  154. end
  155. break
  156. else
  157. mc.buff = mc.buff:sub(nextpos)
  158. --mclog("mqtt", "recv new pkg", json.encode(packet))
  159. table.insert( mc.inpkgs, packet)
  160. if #mc.buff < 2 then
  161. break
  162. end
  163. end
  164. end
  165. if #mc.inpkgs > 0 then
  166. sys.publish(mc.ckey)
  167. end
  168. end)
  169. if netc:start() == 0 then
  170. --mclog("mqtt", "start success")
  171. local endTopic = "NETC_END_" .. netc:id()
  172. while (netc:closed()) == 0 do
  173. mc:handle(netc)
  174. sys.waitUntil({endTopic, mc.ckey}, 30000)
  175. if not mc.running then netc:close() end
  176. --mclog("mqtt", "handle/timeout/ping", (netc:closed()))
  177. end
  178. end
  179. -- 清理socket上下文
  180. mclog("mqtt", "clean up")
  181. netc:clean()
  182. netc:close()
  183. -- 将所有状态复位
  184. mc.stat = 0
  185. mclog("mqtt", "wait 5s for next loop")
  186. sys.wait(5*1000) -- TODO 使用级数递增进行延时
  187. else
  188. sys.wait(1000)
  189. end
  190. end
  191. -- 线程退出, 只可能是用户主动shutdown
  192. mclog("mqtt", self.ckey, "exit")
  193. end
  194. -- 订阅topic, table形式
  195. function mqttc:sub(topics)
  196. table.insert(self.outpkgs, packSUBSCRIBE(0, self:genId(), topics))
  197. sys.publish(self.ckey)
  198. return sys.waitUntil(self.ckey .. "SUBACK", 30000)
  199. end
  200. -- 上报数据
  201. function mqttc:pub(topic, qos, payload)
  202. -- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
  203. table.insert(self.outpkgs, packPUBLISH(0, qos, 0, qos > 0 and self:genId() or 0, topic, payload))
  204. sys.publish(self.ckey)
  205. if qos > 0 then
  206. return sys.waitUntil(self.ckey .. "PUBACK", 30000)
  207. end
  208. end
  209. function mqttc:shutdown()
  210. self.running = false
  211. sys.publish(self.ckey)
  212. end
  213. return mqtt2