| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- local libnet = require "libnet"
- local JT808Prot = require "JT808Prot"
- local gpsMng = require "gpsMng"
- local d1Online = false
- local d1Name = "D1_TASK"
- local tx_buff = zbuff.create(1024)
- local rx_buff = zbuff.create(1024)
- local ip, port="112.125.89.8", 40042
- local function netCB(msg)
- log.info("未处理消息", msg[1], msg[2], msg[3], msg[4])
- end
- --数据发送的消息队列
- local msgQuene = {}
- function insertMsg(data,user)
- table.insert(msgQuene,{data=data,user=user})
- return true
- end
- --终端通用应答
- function T_commonRsp(seq,id,result,cbFnc)
- local data = JT808Prot.encode(JT808Prot.T_COMMON_RSP,seq,id,result and 0 or 1)
- if not insertMsg(data,{cb=function(r) log.info("socketOutMsg.commonRsp","send result",r) cbFnc(r) end}) then
- if cbFnc then cbFnc(false) end
- end
- end
- --位置上报task
- local coLocRpt
- function createLocRptTask()
- return sys.taskInit(function()
- log.info("libgnss.isFix()", libgnss.isFix())
- while true do
- if libgnss.isFix() then
- local tLocation = libgnss.getRmc(2)
- local lat = tLocation.lat*1000000
- local lng = tLocation.lng*1000000
- local spd = tLocation.speed
- local speed = (spd*1852 - (spd*1852 %1000))/1000
- local Course = tLocation.course
- local gga = libgnss.getGga(2)
- local altitude=gga.altitude
- -- log.info("发送位置信息.locRpt", lat, lng)
- local data, seq = JT808Prot.encode(JT808Prot.T_LOC_RPT,
- 0,
- 0,
- tonumber(lat),
- tonumber(lng),
- altitude, --海拔
- speed, --速度
- Course, --方向
- gpsMng.getTime(),
- "")
- log.info(" 打包后的字符串和流水号LocRpt", data, seq)
- if insertMsg(data, { cb = function(r)
- log.info("socketOutMsg.locRpt位置上报", "send result", r)
- sys.publish("SEND_LOC_RPT_CNF", r)
- end }) then
- sys.waitUntil("SEND_LOC_RPT_CNF")
- end
- else
- log.info("定位失败,位置上报失败")
- end
- sys.wait(fskv.get("wakeLocRptFreq") * 1000)
- end
- end)
- end
- --心跳上报task
- local coHeartRpt
- function createHeartRptTask()
- return sys.taskInit(function()
- while true do
- local data, seq = JT808Prot.encode(JT808Prot.T_HEART_RPT)
- log.info(" 打包后的字符串和流水号Heart", data, seq)
- if insertMsg(data, { cb = function(r)
- log.info("发送心跳信息.heartRpt", "send result", r)
- sys.publish("SEND_HEART_RPT_CNF", r)
- end }) then
- sys.waitUntil("SEND_HEART_RPT_CNF")
- end
- sys.wait(fskv.get("heartFreq") * 1000)
- end
- end)
- end
- --平台通用应答
- local function s_commonRsp(packet)
- log.info("s_commonRsp",packet.tmnlSeq,packet.tmnlId,packet.rspResult)
- coroutine.resume(socketTask.coMonitor,'feed monitor')
- end
- --设置终端参数
- local function s_setPara(packet)
- log.info("s_setPara",packet.result,packet.msgSeq,packet.msgId)
- T_commonRsp(packet.msgSeq,packet.msgId,packet.result)
- end
- --终端控制
- local function s_control(packet)
- log.info("s_control",packet.controlCmd,packet.result,packet.msgSeq,packet.msgId)
- T_commonRsp(packet.msgSeq,packet.msgId,packet.result,function(r)
- if packet.controlCmd==JT808Prot.CONTROL_RESET then
- sys.timerStart(sys.restart,2000,"server control reset")
- end
- end)
- end
- local cmds =
- {
- [JT808Prot.S_COMMON_RSP] = s_commonRsp, --平台通用应答
- [JT808Prot.S_SET_PARA] = s_setPara, --设置终端参数
- [JT808Prot.S_CONTROL] = s_control, --终端控制
- }
- local tCache = {}
- --tcp接收数据处理函数
- function inproc(data)
- table.insert(tCache, data)
- local cacheData = table.concat(tCache) --连接字符串
- tCache = {}
- while cacheData:len() > 0 do
- local unProcData, packet = JT808Prot.decode(cacheData)
- cacheData = unProcData or ""
- if packet then
- if packet.msgId and cmds[packet.msgId] and packet.result then
- cmds[packet.msgId](packet)
- else
- log.warn("inproc", "invalid packet")
- end
- else
- if cacheData:len() > 0 then
- table.insert(tCache, 1, cacheData)
- sys.timerStart(function() tCache = {} end, 10000)
- end
- break
- end
- end
- end
- local function socketTask(ip, port)
- local netc
- local result, param, is_err
- netc = socket.create(nil, d1Name)
- -- socket.debug(netc, true)
- socket.config(netc, nil, nil, nil)
- -- socket.config(netc, nil, true)
- while true do
- -- log.info(rtos.meminfo("sys"))
- --result = libnet.waitLink(d1Name, 0, netc)
- result = libnet.connect(d1Name, 15000, netc, ip, port)
- -- result = libnet.connect(d1Name, 5000, netc, "112.125.89.8",34607)
- d1Online = result
- if result then
- log.info("服务器连上了")
- createLocRptTask()
- createHeartRptTask()
- end
- while result do
- succ, param, _, _ = socket.rx(netc, rx_buff)
- if not succ then
- log.info("服务器断开了", succ, param, ip, port)
- break
- end
- if rx_buff:used() > 0 then
- log.info("收到服务器数据,长度", rx_buff:used())
- inproc(rx_buff:toStr(0, rx_buff:used())) --读出接收数据并处理
- rx_buff:del()
- end
- while #msgQuene>0 do
- local outMsg = table.remove(msgQuene,1)
- tx_buff:write(outMsg.data)
- if tx_buff:used() > 0 then
- result, param = libnet.tx(d1Name, 15000, netc, tx_buff)
- log.info("发送结果", result)
- end
- if not result then
- log.info("发送失败了", result )
- break
- end
- if outMsg.user and outMsg.user.cb then outMsg.user.cb(result,outMsg.data,outMsg.user.para) end
- if not result then return end
- tx_buff:del()
- end
- if tx_buff:len() > 1024 then
- tx_buff:resize(1024)
- end
- if rx_buff:len() > 1024 then
- rx_buff:resize(1024)
- end
- -- 阻塞等待新的消息到来,比如服务器下发,串口接收到数据
- result, param = libnet.wait(d1Name, 1000, netc)
- if not result then
- log.info("服务器断开了", result, param)
- break
- end
- end
- d1Online = false
- libnet.close(d1Name, 5000, netc)
- tx_buff:clear(0)
- rx_buff:clear(0)
- sys.wait(1000)
- end
- end
- sysplus.taskInitEx(socketTask, d1Name, netCB, ip, port)
|