websocket_sender.lua 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. --[[
  2. @module websocket_sender
  3. @summary WebSocket client数据发送应用功能模块
  4. @version 1.0
  5. @date 2025.08.25
  6. @author 陈媛媛
  7. @usage
  8. 本文件为WebSocket client 数据发送应用功能模块,核心业务逻辑为:
  9. 1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func)订阅"SEND_DATA_REQ"消息,将其他应用模块需要发送的数据存储到队列send_queue中;
  10. 2、websocket sender task接收"CONNECT_OK"、"SEND_REQ"、"SEND_OK"三种类型的"WEBSOCKET_EVENT"消息,遍历队列send_queue,逐条发送数据到server;
  11. 3、websocket sender task接收"DISCONNECTED"类型的"WEBSOCKET_EVENT"消息,丢弃掉队列send_queue中未发送的数据;
  12. 4、任何一条数据无论发送成功还是失败,只要这条数据有回调函数,都会通过回调函数通知数据发送方;
  13. 本文件的对外接口有1个:
  14. 1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func):订阅"SEND_DATA_REQ"消息;
  15. 其他应用模块如果需要发送数据,直接sys.publish这个消息即可,将需要发送的数据以及回调函数和回调参数一起publish出去;
  16. 本demo项目中uart_app.lua和timer_app.lua中publish了这个消息;
  17. ]]
  18. local websocket_sender = {}
  19. --[[
  20. 数据发送队列,数据结构为:
  21. {
  22. [1] = {data="data1", cb={func=callback_function1, para=callback_para1}},
  23. [2] = {data="data2", cb={func=callback_function2, para=callback_para2}},
  24. [3] = {data="data3", cb={func=callback_function3, para=callback_para3}},
  25. }
  26. data的内容为要发送的数据,string类型,必须存在;
  27. cb.func的内容为数据发送结果的用户回调函数,可以不存在;
  28. cb.para的内容为数据发送结果的用户回调函数的回调参数,可以不存在;
  29. ]]
  30. local send_queue = {}
  31. -- WebSocket client的任务名前缀
  32. websocket_sender.TASK_NAME_PREFIX = "websocket_"
  33. -- websocket_client_sender的任务名
  34. websocket_sender.TASK_NAME = websocket_sender.TASK_NAME_PREFIX.."sender"
  35. -- "SEND_DATA_REQ"消息的处理函数
  36. local function send_data_req_proc_func(tag, data, cb)
  37. -- 确保data是字符串类型
  38. local data_str = tostring(data)
  39. -- 检查是否是"echo"命令
  40. if data_str == '"echo"' then
  41. log.info("WebSocket发送处理", "收到echo命令,发送数据")
  42. -- 创建JSON格式的echo响应
  43. local response = json.encode({
  44. action = "echo",
  45. msg = os.date("%a %Y-%m-%d %H:%M:%S") -- %a表示星期几缩写
  46. })
  47. -- 将echo响应插入到发送队列send_queue中
  48. table.insert(send_queue, {data=response, cb=cb})
  49. log.info("准备发送数据到服务器,长度", #response)
  50. log.info("原始数据:", response)
  51. else
  52. -- 根据tag类型输出日志
  53. if tag == "timer" then
  54. -- 对于timer数据,修改日志为"发送心跳"
  55. log.info("发送心跳", "长度", #data_str)
  56. log.info("原始数据:", data_str)
  57. table.insert(send_queue, {data=data_str, cb=cb})
  58. else
  59. -- 其他数据(如uart)
  60. log.info("准备发送数据到服务器,长度", #data_str)
  61. log.info("原始数据:", data_str)
  62. log.info("UART发送到服务器的数据包类型", type(data_str))
  63. log.info("转发普通数据")
  64. table.insert(send_queue, {data=data_str, cb=cb})
  65. end
  66. end
  67. -- 发送消息通知 websocket sender task,有新数据等待发送
  68. sysplus.sendMsg(websocket_sender.TASK_NAME, "WEBSOCKET_EVENT", "SEND_REQ")
  69. end
  70. -- 按照顺序发送send_queue中的数据
  71. -- 如果调用send接口成功,则返回当前正在发送的数据项
  72. -- 如果调用send接口失败,通知回调函数发送失败后,继续发送下一条数据
  73. local function send_item(ws_client)
  74. local item
  75. -- 如果发送队列中有数据等待发送
  76. while #send_queue > 0 do
  77. -- 取出来第一条数据赋值给item
  78. -- 同时从队列send_queue中删除这一条数据
  79. item = table.remove(send_queue, 1)
  80. -- 检查WebSocket连接状态
  81. if not ws_client or not ws_client:ready() then
  82. log.warn("WebSocket发送处理", "WebSocket连接未就绪,无法发送")
  83. -- 如果当前发送的数据有用户回调函数,则执行用户回调函数
  84. if item.cb and item.cb.func then
  85. item.cb.func(false, item.cb.para)
  86. end
  87. -- 触发重连
  88. sysplus.sendMsg(websocket_sender.TASK_NAME, "WEBSOCKET_EVENT", "DISCONNECTED")
  89. return nil
  90. end
  91. -- send数据
  92. -- result表示调用send接口的同步结果,返回值有以下几种:
  93. -- 如果失败,返回false
  94. -- 如果成功,返回true
  95. result = ws_client:send(item.data)
  96. -- send接口调用成功
  97. if result then
  98. -- 根据数据内容修改日志输出
  99. if item.data:match("^%d+$") then -- 如果数据是纯数字(来自timer)
  100. log.info("wbs_sender", "发送心跳成功", "长度", #item.data)
  101. else
  102. log.info("wbs_sender", "发送成功", "长度", #item.data)
  103. end
  104. -- 由于sent事件可能不会触发,我们直接认为发送成功
  105. if item.cb and item.cb.func then
  106. item.cb.func(true, item.cb.para)
  107. end
  108. -- 发送成功,通知网络环境检测看门狗功能模块进行喂狗
  109. -- 使用来自定时器的数据作为心跳
  110. if item.data:match("^%d+$") then -- 如果数据是纯数字(来自timer)
  111. sys.publish("FEED_NETWORK_WATCHDOG")
  112. end
  113. return item
  114. -- send接口调用失败
  115. else
  116. log.warn("WebSocket发送处理", "数据发送失败")
  117. -- 如果当前发送的数据有用户回调函数,则执行用户回调函数
  118. if item.cb and item.cb.func then
  119. item.cb.func(false, item.cb.para)
  120. end
  121. -- 触发重连
  122. sysplus.sendMsg(websocket_sender.TASK_NAME, "WEBSOCKET_EVENT", "DISCONNECTED")
  123. return nil
  124. end
  125. end
  126. return nil
  127. end
  128. -- websocket client sender的任务处理函数
  129. local function websocket_client_sender_task_func()
  130. local ws_client
  131. local send_item_obj
  132. local result, msg
  133. while true do
  134. -- 等待"WEBSOCKET_EVENT"消息
  135. msg = sysplus.waitMsg(websocket_sender.TASK_NAME, "WEBSOCKET_EVENT")
  136. log.info("WebSocket发送任务等待消息", msg[2], msg[3])
  137. -- WebSocket连接成功
  138. -- msg[3]表示WebSocket client对象
  139. if msg[2] == "CONNECT_OK" then
  140. ws_client = msg[3]
  141. log.info("WebSocket发送任务", "WebSocket连接成功")
  142. -- 发送send_queue中的所有数据
  143. while #send_queue > 0 do
  144. send_item_obj = send_item(ws_client)
  145. if not send_item_obj then
  146. break
  147. end
  148. end
  149. -- WebSocket send数据请求
  150. elseif msg[2] == "SEND_REQ" then
  151. log.info("WebSocket发送任务", "收到发送请求")
  152. -- 如果WebSocket client对象存在
  153. if ws_client then
  154. send_item_obj = send_item(ws_client)
  155. end
  156. -- WebSocket send数据成功
  157. elseif msg[2] == "SEND_OK" then
  158. log.info("WebSocket发送任务", "数据发送成功")
  159. -- 继续发送send_queue中的数据
  160. send_item_obj = send_item(ws_client)
  161. -- WebSocket断开连接
  162. elseif msg[2] == "DISCONNECTED" then
  163. log.info("WebSocket发送任务", "WebSocket连接断开")
  164. -- 清空WebSocket client对象
  165. ws_client = nil
  166. -- 如果发送队列中有数据等待发送
  167. while #send_queue > 0 do
  168. -- 取出来第一条数据赋值给send_item_obj
  169. -- 同时从队列send_queue中删除这一条数据
  170. send_item_obj = table.remove(send_queue, 1)
  171. -- 如果当前发送的数据有用户回调函数,则执行用户回调函数
  172. if send_item_obj.cb and send_item_obj.cb.func then
  173. send_item_obj.cb.func(false, send_item_obj.cb.para)
  174. end
  175. end
  176. -- 当前没有正在等待发送结果的发送项
  177. send_item_obj = nil
  178. end
  179. end
  180. end
  181. -- 订阅"SEND_DATA_REQ"消息;
  182. sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func)
  183. --创建并且启动一个task
  184. sysplus.taskInitEx(websocket_client_sender_task_func, websocket_sender.TASK_NAME)
  185. return websocket_sender