ThingsCloud.lua 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. -- 合宙模组 LUATOS 接入 ThingsCloud 物联网平台
  2. -- 接入协议参考:ThingsCloud MQTT 接入文档 https://docs.thingscloud.xyz/guide/connect-device/mqtt.html
  3. local ThingsCloud = {}
  4. local projectKey = "" -- project_key
  5. local accessToken = "" -- access_token
  6. local typeKey = "" -- type_key
  7. local host = ""
  8. local port = 1883
  9. local apiEndpoint = "" -- api endpoint
  10. local mqttc = nil
  11. local connected = false
  12. local deviceInfo = {}
  13. local certFetchRetryMax = 5
  14. local certFetchRetryCnt = 0
  15. local SUBSCRIBE_PREFIX = {
  16. ATTRIBUTES_GET_REPONSE = "attributes/get/response/",
  17. ATTRIBUTES_PUSH = "attributes/push",
  18. COMMAND_SEND = "command/send/",
  19. COMMAND_REPLY_RESPONSE = "command/reply/response/",
  20. DATA_SET = "data/",
  21. GW_ATTRIBUTES_PUSH = "gateway/attributes/push",
  22. GW_COMMAND_SEND = "gateway/command/send"
  23. }
  24. local EVENT_TYPES = {
  25. fetch_cert = true,
  26. connect = true,
  27. attributes_report_response = true,
  28. attributes_get_response = true,
  29. attributes_push = true,
  30. command_send = true,
  31. command_reply_response = true,
  32. data_set = true,
  33. gw_attributes_push = true,
  34. gw_command_send = true
  35. }
  36. local CALLBACK = {}
  37. local QUEUE = {
  38. PUBLISH = {}
  39. }
  40. local logger = {}
  41. function logger.info(...)
  42. log.info("ThingsCloud", ...)
  43. end
  44. function ThingsCloud.on(eType, cb)
  45. if not eType or not EVENT_TYPES[eType] or type(cb) ~= "function" then
  46. return
  47. end
  48. CALLBACK[eType] = cb
  49. logger.info("on", eType)
  50. end
  51. local function cb(eType, ...)
  52. if not eType or not EVENT_TYPES[eType] or not CALLBACK[eType] then
  53. return
  54. end
  55. CALLBACK[eType](...)
  56. logger.info("event", eType, ...)
  57. end
  58. local function mqttConnect()
  59. local retryCount = 0
  60. logger.info("ThingsCloud connecting...")
  61. mqttc = mqtt.create(nil, host, port, false, {rxSize = 4096})
  62. mqttc:auth(mobile.imei(), accessToken, projectKey)
  63. mqttc:keepalive(300)
  64. mqttc:autoreconn(true, 10000)
  65. mqttc:connect()
  66. mqttc:on(function(mqtt_client, event, data, payload)
  67. if event == "conack" then
  68. connected = true
  69. logger.info("ThingsCloud connected")
  70. cb("connect", true)
  71. sys.publish("mqtt_conack")
  72. ThingsCloud.subscribe("attributes/push")
  73. ThingsCloud.subscribe("attributes/get/response/+")
  74. ThingsCloud.subscribe("command/send/+")
  75. ThingsCloud.subscribe("command/reply/response/+")
  76. elseif event == "recv" then
  77. logger.info("receive from cloud", data or nil, payload or "nil")
  78. if (data:sub(1, SUBSCRIBE_PREFIX.ATTRIBUTES_GET_REPONSE:len()) == SUBSCRIBE_PREFIX.ATTRIBUTES_GET_REPONSE) then
  79. local response = json.decode(payload)
  80. local responseId = tonumber(data:sub(SUBSCRIBE_PREFIX.ATTRIBUTES_GET_REPONSE:len() + 1))
  81. cb("attributes_get_response", response, responseId)
  82. elseif (data == SUBSCRIBE_PREFIX.ATTRIBUTES_PUSH) then
  83. local response = json.decode(payload)
  84. cb("attributes_push", response)
  85. elseif (data:sub(1, SUBSCRIBE_PREFIX.COMMAND_SEND:len()) == SUBSCRIBE_PREFIX.COMMAND_SEND) then
  86. local response = json.decode(payload)
  87. if response.method and response.params then
  88. cb("command_send", response)
  89. end
  90. elseif (data:sub(1, SUBSCRIBE_PREFIX.COMMAND_REPLY_RESPONSE:len()) ==
  91. SUBSCRIBE_PREFIX.COMMAND_REPLY_RESPONSE) then
  92. local response = json.decode(payload)
  93. local replyId = tonumber(data:sub(SUBSCRIBE_PREFIX.COMMAND_REPLY_RESPONSE:len() + 1))
  94. cb("command_reply_response", response, replyId)
  95. elseif (data:sub(1, SUBSCRIBE_PREFIX.DATA_SET:len()) == SUBSCRIBE_PREFIX.DATA_SET) then
  96. local tmp = split(data, "/")
  97. if #tmp == 3 and tmp[3] == "set" then
  98. local identifier = tmp[2]
  99. cb("data_set", payload)
  100. end
  101. elseif (data == SUBSCRIBE_PREFIX.GW_ATTRIBUTES_PUSH) then
  102. local response = json.decode(payload)
  103. cb("gw_attributes_push", response)
  104. elseif (data == SUBSCRIBE_PREFIX.GW_COMMAND_SEND) then
  105. local response = json.decode(payload)
  106. cb("gw_command_send", response)
  107. end
  108. elseif event == "sent" then
  109. log.info("mqtt", "sent", data)
  110. end
  111. end)
  112. end
  113. function ThingsCloud.disconnect()
  114. if not connected then
  115. return
  116. end
  117. mqttc:close()
  118. mqttc = nil
  119. end
  120. function ThingsCloud.connect(param)
  121. if not param.host or not param.projectKey then
  122. logger.info("host or projectKey not found")
  123. return
  124. end
  125. host = param.host
  126. projectKey = param.projectKey
  127. if param.accessToken then
  128. accessToken = param.accessToken
  129. sys.waitUntil("IP_READY", 30000)
  130. sys.taskInit(function()
  131. sys.taskInit(procConnect)
  132. end)
  133. else
  134. if not param.apiEndpoint then
  135. logger.info("apiEndpoint not found")
  136. return
  137. end
  138. apiEndpoint = param.apiEndpoint
  139. if param.typeKey ~= "" or param.typeKey ~= nil then
  140. typeKey = param.typeKey
  141. end
  142. sys.waitUntil("IP_READY", 30000)
  143. sys.taskInit(function()
  144. sys.taskInit(fetchDeviceCert)
  145. end)
  146. end
  147. end
  148. -- 一型一密,使用IMEI作为DeviceKey,领取设备证书AccessToken
  149. function fetchDeviceCert()
  150. local headers = {}
  151. headers["Project-Key"] = projectKey
  152. headers["Content-Type"] = "application/json"
  153. local url = apiEndpoint .. "/device/v1/certificate"
  154. local deviceKey = mobile.imei()
  155. local code, headers, body = http.request("POST", url, headers, json.encode({
  156. device_key = deviceKey,
  157. type_key = typeKey
  158. }), {
  159. timeout = 5000
  160. }).wait()
  161. log.info("http fetch cert:", deviceKey, code, headers, body)
  162. if code == 200 then
  163. local data = json.decode(body)
  164. if data.result == 1 then
  165. sys.taskInit(function()
  166. cb("fetch_cert", true)
  167. end)
  168. deviceInfo = data.device
  169. accessToken = deviceInfo.access_token
  170. procConnect()
  171. return
  172. end
  173. end
  174. if certFetchRetryCnt < certFetchRetryMax then
  175. -- 重试
  176. certFetchRetryCnt = certFetchRetryCnt + 1
  177. sys.wait(1000 * 10)
  178. fetchDeviceCert()
  179. else
  180. cb("fetch_cert", false)
  181. end
  182. end
  183. function procConnect()
  184. mqttConnect()
  185. sys.waitUntil("mqtt_conack")
  186. sys.taskInit(function()
  187. while true do
  188. if #QUEUE.PUBLISH > 0 then
  189. local item = table.remove(QUEUE.PUBLISH, 1)
  190. logger.info("publish", item.topic, item.data)
  191. if mqttc:publish(item.topic, item.data) then
  192. --
  193. end
  194. end
  195. sys.wait(100)
  196. end
  197. end)
  198. end
  199. function ThingsCloud.isConnected()
  200. return connected
  201. end
  202. local function insertPublishQueue(topic, data)
  203. if not connected then
  204. return
  205. end
  206. table.insert(QUEUE.PUBLISH, {
  207. topic = topic,
  208. data = data
  209. })
  210. end
  211. function ThingsCloud.subscribe(topic)
  212. if not connected then
  213. return
  214. end
  215. logger.info("subscribe", topic)
  216. mqttc:subscribe(topic)
  217. end
  218. function ThingsCloud.publish(topic, data)
  219. insertPublishQueue(topic, data)
  220. end
  221. function ThingsCloud.reportAttributes(tableData)
  222. insertPublishQueue("attributes", json.encode(tableData))
  223. sys.publish("QUEUE_PUBLISH", "ATTRIBUTES")
  224. end
  225. function ThingsCloud.getAttributes(attrsList, options)
  226. options = options or {}
  227. options.getId = options.getId or 1000
  228. local data = {
  229. keys = attrsList
  230. }
  231. if #attrsList == 0 then
  232. data = {}
  233. end
  234. insertPublishQueue("attributes/get/" .. tostring(options.getId), json.encode(data))
  235. end
  236. function ThingsCloud.reportEvent(event, options)
  237. options = options or {}
  238. options.eventId = options.eventId or 1000
  239. insertPublishQueue("event/report/" .. tostring(options.eventId), json.encode(event))
  240. end
  241. function ThingsCloud.replyCommand(commandReply, options)
  242. options = options or {}
  243. options.replyId = options.replyId or 1000
  244. insertPublishQueue("command/reply/" .. tostring(options.replyId), json.encode(commandReply))
  245. end
  246. function ThingsCloud.publishCustomTopic(topic, payload, options)
  247. insertPublishQueue(topic, payload)
  248. end
  249. function getAccessToken()
  250. return accessToken
  251. end
  252. function isGateway()
  253. if deviceInfo.conn_type == "3" then
  254. return true
  255. end
  256. return false
  257. end
  258. function split(str, sep)
  259. local sep, fields = sep or ":", {}
  260. local pattern = string.format("([^%s]+)", sep)
  261. str:gsub(pattern, function(c)
  262. fields[#fields + 1] = c
  263. end)
  264. return fields
  265. end
  266. return ThingsCloud