socket_demo.lua 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. local libnet = require "libnet"
  2. local JT808Prot = require "JT808Prot"
  3. local gpsMng = require "gpsMng"
  4. local d1Online = false
  5. local d1Name = "D1_TASK"
  6. local tx_buff = zbuff.create(1024)
  7. local rx_buff = zbuff.create(1024)
  8. local ip, port="112.125.89.8", 40042
  9. local function netCB(msg)
  10. log.info("未处理消息", msg[1], msg[2], msg[3], msg[4])
  11. end
  12. --数据发送的消息队列
  13. local msgQuene = {}
  14. function insertMsg(data,user)
  15. table.insert(msgQuene,{data=data,user=user})
  16. return true
  17. end
  18. --终端通用应答
  19. function T_commonRsp(seq,id,result,cbFnc)
  20. local data = JT808Prot.encode(JT808Prot.T_COMMON_RSP,seq,id,result and 0 or 1)
  21. if not insertMsg(data,{cb=function(r) log.info("socketOutMsg.commonRsp","send result",r) cbFnc(r) end}) then
  22. if cbFnc then cbFnc(false) end
  23. end
  24. end
  25. --位置上报task
  26. local coLocRpt
  27. function createLocRptTask()
  28. return sys.taskInit(function()
  29. log.info("libgnss.isFix()", libgnss.isFix())
  30. while true do
  31. if libgnss.isFix() then
  32. local tLocation = libgnss.getRmc(2)
  33. local lat = tLocation.lat*1000000
  34. local lng = tLocation.lng*1000000
  35. local spd = tLocation.speed
  36. local speed = (spd*1852 - (spd*1852 %1000))/1000
  37. local Course = tLocation.course
  38. local gga = libgnss.getGga(2)
  39. local altitude=gga.altitude
  40. -- log.info("发送位置信息.locRpt", lat, lng)
  41. local data, seq = JT808Prot.encode(JT808Prot.T_LOC_RPT,
  42. 0,
  43. 0,
  44. tonumber(lat),
  45. tonumber(lng),
  46. altitude, --海拔
  47. speed, --速度
  48. Course, --方向
  49. gpsMng.getTime(),
  50. "")
  51. log.info(" 打包后的字符串和流水号LocRpt", data, seq)
  52. if insertMsg(data, { cb = function(r)
  53. log.info("socketOutMsg.locRpt位置上报", "send result", r)
  54. sys.publish("SEND_LOC_RPT_CNF", r)
  55. end }) then
  56. sys.waitUntil("SEND_LOC_RPT_CNF")
  57. end
  58. else
  59. log.info("定位失败,位置上报失败")
  60. end
  61. sys.wait(fskv.get("wakeLocRptFreq") * 1000)
  62. end
  63. end)
  64. end
  65. --心跳上报task
  66. local coHeartRpt
  67. function createHeartRptTask()
  68. return sys.taskInit(function()
  69. while true do
  70. local data, seq = JT808Prot.encode(JT808Prot.T_HEART_RPT)
  71. log.info(" 打包后的字符串和流水号Heart", data, seq)
  72. if insertMsg(data, { cb = function(r)
  73. log.info("发送心跳信息.heartRpt", "send result", r)
  74. sys.publish("SEND_HEART_RPT_CNF", r)
  75. end }) then
  76. sys.waitUntil("SEND_HEART_RPT_CNF")
  77. end
  78. sys.wait(fskv.get("heartFreq") * 1000)
  79. end
  80. end)
  81. end
  82. --平台通用应答
  83. local function s_commonRsp(packet)
  84. log.info("s_commonRsp",packet.tmnlSeq,packet.tmnlId,packet.rspResult)
  85. coroutine.resume(socketTask.coMonitor,'feed monitor')
  86. end
  87. --设置终端参数
  88. local function s_setPara(packet)
  89. log.info("s_setPara",packet.result,packet.msgSeq,packet.msgId)
  90. T_commonRsp(packet.msgSeq,packet.msgId,packet.result)
  91. end
  92. --终端控制
  93. local function s_control(packet)
  94. log.info("s_control",packet.controlCmd,packet.result,packet.msgSeq,packet.msgId)
  95. T_commonRsp(packet.msgSeq,packet.msgId,packet.result,function(r)
  96. if packet.controlCmd==JT808Prot.CONTROL_RESET then
  97. sys.timerStart(sys.restart,2000,"server control reset")
  98. end
  99. end)
  100. end
  101. local cmds =
  102. {
  103. [JT808Prot.S_COMMON_RSP] = s_commonRsp, --平台通用应答
  104. [JT808Prot.S_SET_PARA] = s_setPara, --设置终端参数
  105. [JT808Prot.S_CONTROL] = s_control, --终端控制
  106. }
  107. local tCache = {}
  108. --tcp接收数据处理函数
  109. function inproc(data)
  110. table.insert(tCache, data)
  111. local cacheData = table.concat(tCache) --连接字符串
  112. tCache = {}
  113. while cacheData:len() > 0 do
  114. local unProcData, packet = JT808Prot.decode(cacheData)
  115. cacheData = unProcData or ""
  116. if packet then
  117. if packet.msgId and cmds[packet.msgId] and packet.result then
  118. cmds[packet.msgId](packet)
  119. else
  120. log.warn("inproc", "invalid packet")
  121. end
  122. else
  123. if cacheData:len() > 0 then
  124. table.insert(tCache, 1, cacheData)
  125. sys.timerStart(function() tCache = {} end, 10000)
  126. end
  127. break
  128. end
  129. end
  130. end
  131. local function socketTask(ip, port)
  132. local netc
  133. local result, param, is_err
  134. netc = socket.create(nil, d1Name)
  135. -- socket.debug(netc, true)
  136. socket.config(netc, nil, nil, nil)
  137. -- socket.config(netc, nil, true)
  138. while true do
  139. -- log.info(rtos.meminfo("sys"))
  140. --result = libnet.waitLink(d1Name, 0, netc)
  141. result = libnet.connect(d1Name, 15000, netc, ip, port)
  142. -- result = libnet.connect(d1Name, 5000, netc, "112.125.89.8",34607)
  143. d1Online = result
  144. if result then
  145. log.info("服务器连上了")
  146. createLocRptTask()
  147. createHeartRptTask()
  148. end
  149. while result do
  150. succ, param, _, _ = socket.rx(netc, rx_buff)
  151. if not succ then
  152. log.info("服务器断开了", succ, param, ip, port)
  153. break
  154. end
  155. if rx_buff:used() > 0 then
  156. log.info("收到服务器数据,长度", rx_buff:used())
  157. inproc(rx_buff:toStr(0, rx_buff:used())) --读出接收数据并处理
  158. rx_buff:del()
  159. end
  160. while #msgQuene>0 do
  161. local outMsg = table.remove(msgQuene,1)
  162. tx_buff:write(outMsg.data)
  163. if tx_buff:used() > 0 then
  164. result, param = libnet.tx(d1Name, 15000, netc, tx_buff)
  165. log.info("发送结果", result)
  166. end
  167. if not result then
  168. log.info("发送失败了", result )
  169. break
  170. end
  171. if outMsg.user and outMsg.user.cb then outMsg.user.cb(result,outMsg.data,outMsg.user.para) end
  172. if not result then return end
  173. tx_buff:del()
  174. end
  175. if tx_buff:len() > 1024 then
  176. tx_buff:resize(1024)
  177. end
  178. if rx_buff:len() > 1024 then
  179. rx_buff:resize(1024)
  180. end
  181. -- 阻塞等待新的消息到来,比如服务器下发,串口接收到数据
  182. result, param = libnet.wait(d1Name, 1000, netc)
  183. if not result then
  184. log.info("服务器断开了", result, param)
  185. break
  186. end
  187. end
  188. d1Online = false
  189. libnet.close(d1Name, 5000, netc)
  190. tx_buff:clear(0)
  191. rx_buff:clear(0)
  192. sys.wait(1000)
  193. end
  194. end
  195. sysplus.taskInitEx(socketTask, d1Name, netCB, ip, port)