mqtts_ca_main.lua 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. --[[
  2. @module mqtts_ca_main
  3. @summary mqtts ca client 主应用功能模块
  4. @version 1.0
  5. @date 2025.07.28
  6. @author 朱天华
  7. @usage
  8. 本文件为mqtts ca client 主应用功能模块,核心业务逻辑为:
  9. 1、创建一个mqtts ca client,连接server;
  10. 2、处理连接/订阅/取消订阅/异常逻辑,出现异常后执行重连动作;
  11. 3、调用mqtts_ca_receiver的外部接口mqtts_ca_receiver.proc,对接收到的publish数据进行处理;
  12. 4、调用sysplus.sendMsg接口,发送"CONNECT OK"、"PUBLISH OK"和"DISCONNECTED"三种类型的"MQTT_EVENT"消息到mqtts_ca_sender的task,控制publish数据发送逻辑;
  13. 5、收到MQTT心跳应答后,执行sys.publish("FEED_NETWORK_WATCHDOG") 对网络环境检测看门狗功能模块进行喂狗;
  14. 本文件没有对外接口,直接在main.lua中require "mqtts_ca_main"就可以加载运行;
  15. ]]
  16. -- 加载sntp时间同步应用功能模块(ca证书校验的mqtt ssl需要时间同步功能)
  17. require "sntp_app"
  18. -- 加载mqtts ca client数据接收功能模块
  19. local mqtts_ca_receiver = require "mqtts_ca_receiver"
  20. -- 加载mqtts ca client数据发送功能模块
  21. local mqtts_ca_sender = require "mqtts_ca_sender"
  22. -- mqtts ca服务器地址和端口
  23. -- 这里使用的地址和端口,仅能用作测试用途,不可商用,说不定哪一天就关闭了
  24. -- 用户开发项目时,替换为自己的商用服务器地址和端口
  25. local SERVER_ADDR = "airlbs.openluat.com"
  26. local SERVER_PORT = 8883
  27. -- mqtts_ca_main的任务名
  28. local TASK_NAME = mqtts_ca_sender.TASK_NAME_PREFIX.."main"
  29. -- mqtt主题的前缀:IMEI号
  30. local TOPIC_PREFIX = mobile.imei()
  31. -- mqtts ca client的事件回调函数
  32. local function mqtts_ca_client_event_cbfunc(mqtt_client, event, data, payload, metas)
  33. log.info("mqtts_ca_client_event_cbfunc", mqtt_client, event, data, payload, json.encode(metas))
  34. -- mqtt连接成功
  35. if event == "conack" then
  36. sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "CONNECT", true)
  37. -- 订阅单主题
  38. -- 第二个参数表示qos,取值范围为0,1,2,如果不设置,默认为0
  39. if not mqtt_client:subscribe(TOPIC_PREFIX .. "/down") then
  40. sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", false, -1)
  41. end
  42. -- 订阅多主题,如果有需要,打开注释
  43. -- 表中的每一个订阅主题的格式为[topic]=qos
  44. -- if not mqtt_client:subscribe(
  45. -- {
  46. -- [(TOPIC_PREFIX .. "/data"]=0,
  47. -- [(TOPIC_PREFIX .. "/cmd"]=1
  48. -- }
  49. -- ) then
  50. -- sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", false, -1)
  51. -- end
  52. -- 订阅结果
  53. -- data:订阅应答结果,true为成功,false为失败
  54. -- payload:number类型;成功时表示qos,取值范围为0,1,2;失败时表示失败码,一般是0x80
  55. elseif event == "suback" then
  56. -- 发送消息通知 mqtts ca main task
  57. sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", data, payload)
  58. -- 取消订阅成功
  59. elseif event == "unsuback" then
  60. -- 发送消息通知 mqtts ca main task
  61. sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "UNSUBSCRIBE", true)
  62. -- 接收到服务器下发的publish数据
  63. -- data:string类型,表示topic
  64. -- payload:string类型,表示payload
  65. -- metas:table类型,数据内容如下
  66. -- {
  67. -- qos: number类型,取值范围0,1,2
  68. -- retain:number类型,取值范围0,1
  69. -- dup:number类型,取值范围0,1
  70. -- message_id: number类型
  71. -- }
  72. elseif event == "recv" then
  73. -- 对接收到的publish数据处理
  74. mqtts_ca_receiver.proc(data, payload, metas)
  75. -- 发送成功publish数据
  76. -- data:number类型,表示message id
  77. elseif event == "sent" then
  78. -- 发送消息通知 mqtts ca sender task
  79. sysplus.sendMsg(mqtts_ca_sender.TASK_NAME, "MQTT_EVENT", "PUBLISH_OK", data)
  80. -- 服务器断开mqtt连接
  81. elseif event == "disconnect" then
  82. -- 发送消息通知 mqtts ca main task
  83. sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "DISCONNECTED", false)
  84. -- 收到服务器的心跳应答
  85. elseif event == "pong" then
  86. -- 接收到数据,通知网络环境检测看门狗功能模块进行喂狗
  87. sys.publish("FEED_NETWORK_WATCHDOG")
  88. -- 严重异常,本地会主动断开连接
  89. -- data:string类型,表示具体的异常,有以下几种:
  90. -- "connect":tcp连接失败
  91. -- "tx":数据发送失败
  92. -- "conack":mqtt connect后,服务器应答CONNACK鉴权失败,失败码为payload(number类型)
  93. -- "other":其他异常
  94. elseif event == "error" then
  95. if data == "connect" or data == "conack" then
  96. -- 发送消息通知 mqtts ca main task,连接失败
  97. sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "CONNECT", false)
  98. elseif data == "other" or data == "tx" then
  99. -- 发送消息通知 mqtts ca main task,出现异常
  100. sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "ERROR")
  101. end
  102. end
  103. end
  104. -- mqtts ca main task 的任务处理函数
  105. local function mqtts_ca_client_main_task_func()
  106. local mqtt_client
  107. local result, msg, para
  108. -- 用来验证server证书是否合法的ca证书文件为airlbs_parent_ca.crt
  109. -- 此ca证书的有效期截止到2030年5月6日
  110. -- 将这个ca证书文件的内容读取出来,赋值给server_ca_cert
  111. -- 注意:此处的ca证书文件仅用来验证airlbs.openluat.com:8883端口的server证书
  112. -- baidu网站的server证书有效期截止到2026年8月10日
  113. -- 在有效期之前,baidu会更换server证书,如果server证书更换后,此处验证使用的baidu_parent_ca.crt也可能需要更换
  114. -- 使用电脑上的网页浏览器访问https://www.baidu.com,可以实时看到baidu的server证书以及baidu_parent_ca.crt
  115. -- 如果你使用的是自己的server,要替换为自己server证书对应的ca证书文件
  116. local server_ca_cert = io.readFile("/luadb/airlbs_parent_ca.crt")
  117. while true do
  118. -- 如果当前时间点设置的默认网卡还没有连接成功,一直在这里循环等待
  119. while not socket.adapter(socket.dft()) do
  120. log.warn("mqtts_ca_client_main_task_func", "wait IP_READY", socket.dft())
  121. -- 在此处阻塞等待默认网卡连接成功的消息"IP_READY"
  122. -- 或者等待1秒超时退出阻塞等待状态;
  123. -- 注意:此处的1000毫秒超时不要修改的更长;
  124. -- 因为当使用exnetif.set_priority_order配置多个网卡连接外网的优先级时,会隐式的修改默认使用的网卡
  125. -- 当exnetif.set_priority_order的调用时序和此处的socket.adapter(socket.dft())判断时序有可能不匹配
  126. -- 此处的1秒,能够保证,即使时序不匹配,也能1秒钟退出阻塞状态,再去判断socket.adapter(socket.dft())
  127. sys.waitUntil("IP_READY", 1000)
  128. end
  129. -- 检测到了IP_READY消息
  130. log.info("mqtts_ca_client_main_task_func", "recv IP_READY", socket.dft())
  131. -- 清空此task绑定的消息队列中的未处理的消息
  132. sysplus.cleanMsg(TASK_NAME)
  133. -- 创建mqtt client对象,ssl连接,单向证书校验
  134. -- client仅单向校验server的证书,server不校验client的证书和密钥文件
  135. -- 如果做证书校验,需要特别注意以下几点:
  136. -- 1、证书校验前,设备端必须同步为正确的时间,因为校验过程中会检查ca证书以及server证书中的有效期是否合法;本demo中的sntp_app.lua会同步时间;
  137. -- 2、任何证书都有有效期,无论是ca证书还是server证书,必须在有效期截止之前,及时更换证书,延长有效期,否则证书校验会失败;
  138. -- 3、如果要更换ca证书,需要在设备端远程升级,必须保证ca证书失效之前升级成功,否则校验失败,就无法连接server;
  139. -- 综上所述,证书校验虽然安全,可以验证身份,但是后续维护成本比较高;除非有需要,否则可以不配置证书校验功能;
  140. -- 另外,如果使用https://netlab.luatos.com/创建的TCP SSL Server,使用的server证书有可能过了有效期;
  141. -- 如果过了有效期,使用本文件无法连接成功tcp ssl ca server,遇到这种问题,可以在main.lua中打开socket.sslLog(3),观察Luatools的日志,如果出现类似于下面的日志
  142. -- expires on : 2020-12-27 15:46:55
  143. -- 表示证书有效期截止到2020-12-27 15:46:55,明显就是证书已经过了有效期
  144. -- 遇到这种情况,可以反馈给合宙的技术人员;或者不再使用netlab server测试,使用你自己的tcp ssl server来测试,只要保证你的server证书合法就行
  145. mqtt_client = mqtt.create(nil, SERVER_ADDR, SERVER_PORT, {server_cert = server_ca_cert})
  146. -- 如果创建mqtt client对象失败
  147. if not mqtt_client then
  148. log.error("mqtts_ca_client_main_task_func", "mqtt.create error")
  149. goto EXCEPTION_PROC
  150. end
  151. -- 配置mqtt client对象的client id,username,password和clean session标志
  152. result = mqtt_client:auth(mobile.imei(), "", "", true)
  153. -- 如果配置失败
  154. if not result then
  155. log.error("mqtts_ca_client_main_task_func", "mqtt_client:auth error")
  156. goto EXCEPTION_PROC
  157. end
  158. -- 注册mqtt client对象的事件回调函数
  159. mqtt_client:on(mqtts_ca_client_event_cbfunc)
  160. -- 设置mqtt keepalive时间为120秒
  161. -- 如果没有设置,内核固件中默认为180秒
  162. -- 有需要的话,可以打开注释
  163. -- mqtt_client:keepalive(120)
  164. -- 设置遗嘱消息,有需要的话,可以打开注释
  165. -- mqtt_client:will(TOPIC_PREFIX .. "/status", "offline")
  166. -- 配置开启debug信息,有需要的话,可以打开注释
  167. -- mqtt_client:debug(true)
  168. -- socket.sslLog(3)
  169. -- 连接server
  170. result = mqtt_client:connect()
  171. -- 如果连接server失败
  172. if not result then
  173. log.error("mqtts_ca_client_main_task_func", "mqtt_client:connect error")
  174. goto EXCEPTION_PROC
  175. end
  176. -- 连接、断开连接、订阅、取消订阅、异常等各种事件的处理调度逻辑
  177. while true do
  178. -- 等待"MQTT_EVENT"消息
  179. msg = sysplus.waitMsg(TASK_NAME, "MQTT_EVENT")
  180. log.info("mqtts_ca_client_main_task_func waitMsg", msg[2], msg[3], msg[4])
  181. -- connect连接结果
  182. -- msg[3]表示连接结果,true为连接成功,false为连接失败
  183. if msg[2] == "CONNECT" then
  184. -- mqtt连接成功
  185. if msg[3] then
  186. log.info("mqtts_ca_client_main_task_func", "connect success")
  187. -- 通知数据发送应用模块,MQTT连接成功
  188. sysplus.sendMsg(mqtts_ca_sender.TASK_NAME, "MQTT_EVENT", "CONNECT_OK", mqtt_client)
  189. -- mqtt连接失败
  190. else
  191. log.info("mqtts_ca_client_main_task_func", "connect error")
  192. -- 退出循环,发起重连
  193. break
  194. end
  195. -- subscribe订阅结果
  196. -- msg[3]表示订阅结果,true为订阅成功,false为订阅失败
  197. elseif msg[2] == "SUBSCRIBE" then
  198. -- 订阅成功
  199. if msg[3] then
  200. log.info("mqtts_ca_client_main_task_func", "subscribe success", "qos: "..(msg[4] or "nil"))
  201. -- 订阅失败
  202. else
  203. log.error("mqtts_ca_client_main_task_func", "subscribe error", "code", msg[4])
  204. -- 主动断开mqtt client连接
  205. mqtt_client:disconnect()
  206. -- 发送disconnect之后,此处延时1秒,给数据发送预留一点儿时间,发送到服务器;
  207. -- 即使1秒的时间不足以发送给服务器也没关系;对服务器来说,mqtt客户端只是没有优雅的断开,不影响什么实质功能;
  208. sys.wait(1000)
  209. break
  210. end
  211. -- unsubscribe取消订阅成功
  212. elseif msg[2] == "UNSUBSCRIBE" then
  213. log.info("mqtts_ca_client_main_task_func", "unsubscribe success")
  214. -- 需要主动关闭mqtt连接
  215. -- 用户需要主动关闭mqtt连接时,可以调用sysplus.sendMsg(TASK_NAME, "MQTT_EVENT", "CLOSE")
  216. elseif msg[2] == "CLOSE" then
  217. -- 主动断开mqtt client连接
  218. mqtt_client:disconnect()
  219. -- 发送disconnect之后,此处延时1秒,给数据发送预留一点儿时间,发送到服务器;
  220. -- 即使1秒的时间不足以发送给服务器也没关系;对服务器来说,mqtt客户端只是没有优雅的断开,不影响什么实质功能;
  221. sys.wait(1000)
  222. break
  223. -- 被动关闭了mqtt连接
  224. -- 被网络或者服务器断开了连接
  225. elseif msg[2] == "DISCONNECTED" then
  226. break
  227. -- 出现了其他异常
  228. elseif msg[2] == "ERROR" then
  229. break
  230. end
  231. end
  232. -- 出现异常
  233. ::EXCEPTION_PROC::
  234. -- 清空此task绑定的消息队列中的未处理的消息
  235. sysplus.cleanMsg(TASK_NAME)
  236. -- 通知mqtts ca sender数据发送应用模块的task,MQTT连接已经断开
  237. sysplus.sendMsg(mqtts_ca_sender.TASK_NAME, "MQTT_EVENT", "DISCONNECTED")
  238. -- 如果存在mqtt client对象
  239. if mqtt_client then
  240. -- 关闭mqtt client,并且释放mqtt client对象
  241. mqtt_client:close()
  242. mqtt_client = nil
  243. end
  244. -- 5秒后跳转到循环体开始位置,自动发起重连
  245. sys.wait(5000)
  246. end
  247. end
  248. --创建并且启动一个task
  249. --运行这个task的处理函数mqtts_ca_client_main_task_func
  250. sysplus.taskInitEx(mqtts_ca_client_main_task_func, TASK_NAME)