| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- air_jt808 = require("air_gt808")
- local moduleName = "建栋服务器"
- local taskName = "auxTask"
- local libfota = require "libfota"
- local auxServer = {}
- local simID = nil
- local connectOk = false
- local isWaitMsg, isResponseOK, isResponseNeedCheck, connectOKFlag, waitAck
- local lastTxMsgID, lastTxMsgSn, lastRxMsgID, lastRxData, localMsgID
- local responseQueue, dataQueue
- local logSwitch = true
- local libnet = require("libnet")
- local function logF(...)
- if logSwitch then
- log.info(moduleName, ...)
- end
- end
- -- 处理未识别的网络消息
- local function netCB(msg)
- logF("unprocessed message", msg[1], msg[2], msg[3], msg[4])
- end
- -- 客户端定位数据发送
- function auxServer.dataSend(data)
- local tTime = os.date("!*t", os.time())
- if tTime.year < 2024 then
- logF("时间未校准,放弃本次数据上传")
- return false, 0
- end
- if #dataQueue > 200 then
- local item = table.remove(dataQueue, 1)
- item:free()
- end
- if not data then
- local item = zbuff.create(200)
- item:copy(nil, common.monitorRecord())
- table.insert(dataQueue, item)
- else
- table.insert(dataQueue, data)
- end
- if isWaitMsg and connectOKFlag then
- sys_send(taskName, socket.EVENT, 0)
- end
- return true, 0
- end
- -- 客户端连接状态
- function auxServer.isConnected()
- return connectOKFlag
- end
- -- 客户端注册应答解析
- local function monitorRxReg(inData)
- logF("客户端注册应答解析", inData:toHex())
- local lastMsgSn, procResult, authCode = air_jt808.analyzeRegResopnse(inData)
- logF("rx reg response", lastTxMsgSn, lastMsgSn, procResult, authCode:toHex())
- if (lastMsgSn == lastTxMsgSn) and (procResult == 0) and (authCode ~= nil) then
- log.info("设置鉴权码", fskv.set("authCode2", authCode))
- -- logF("当前鉴权码",fskv.get("authCode2"),fskv.get("authCode2"):toHex())
- isResponseOK = true
- else
- isResponseOK = false
- end
- logF("isresponseok", isResponseOK)
- end
- -- 客户端通用应答解析
- local function monitorRxResponse(inData)
- local lastMsgSn, lastMsgID, procResult = air_jt808.analyzeMonitorResopnse(inData)
- if isResponseNeedCheck then
- logF("rx normal response", string.format("%04x", lastTxMsgID), string.format("%04x", lastMsgID), lastTxMsgSn,
- lastMsgSn, procResult)
- if (lastTxMsgID == lastMsgID) and (lastMsgSn == lastTxMsgSn) and ((procResult == 0) or (procResult == 4)) then
- logF("rx response ok")
- isResponseNeedCheck = false
- isResponseOK = true
- else
- isResponseOK = false
- end
- end
- end
- local function monitorTrackIngResponse(inData, msgSn)
- logF("track data", inData:toHex(), msgSn)
- if #inData ~= 6 then
- return
- end
- -- local interval, timeout = tonumber(inData:sub(1, 2), 2), tonumber(inData:sub(3), 2)
- local _, interval, timeout = pack.unpack(inData, ">hI")
- logF("interval", interval, timeout)
- if not interval or not timeout or interval <= 0 and timeout < 0 then
- return
- end
- local pTxBuf = ""
- localMsgID = (localMsgID + 1) % 65536
- pTxBuf = air_jt808.air_jt808Head(0x0001, localMsgID, simID, pTxBuf)
- table.insert(responseQueue, pTxBuf)
- common.setfastUpload(interval, timeout)
- end
- local function monitorDlTrans(inData, msgSn)
- local result, ret, data = true, nil, nil
- if #inData > 0 then
- -- 解析indata从第2个位置开始的json数据
- local flag = string.sub(inData, 1, 1)
- if string.byte(flag) == 0xF1 then
- local data, ret = json.decode(inData:sub(2))
- log.info("data", result)
- if ret == 1 then
- fskv.set("phoneList", data)
- for k, v in pairs(data) do
- if v and v.name and v.num then
- logF("联系人列表", v, string.fromHex(v.name), v.num)
- end
- end
- else
- result = false
- end
- else
- result = false
- end
- else
- result = false
- end
- localMsgID = (localMsgID + 1) % 65536
- local pTxBuf = air_jt808.makeTransResponseBody(result and "OK#" or "ERROR#")
- pTxBuf = air_jt808.air_jt808Head(0x0900, localMsgID, simID, pTxBuf)
- table.insert(responseQueue, pTxBuf)
- end
- -- 注册命令解析函数
- local monitorRxFun = {
- [0x8001] = monitorRxResponse,
- [0x8100] = monitorRxReg,
- [0x8202] = monitorTrackIngResponse,
- [0x8900] = monitorDlTrans
- }
- -- 命令消息分发
- local function msgDispatch(param)
- logF("dispatch", param)
- for index, msg in pairs(param) do
- logF("msg", msg.msgID, msg.body:toHex())
- if monitorRxFun[msg.msgID] then
- monitorRxFun[msg.msgID](msg.body, msg.msgSn)
- end
- end
- end
- -- 重启设备
- local function reBootDevice(ret)
- if ret == 0 then
- pm.reboot()
- end
- end
- local function auxServerTask(d1Name)
- log.info("auxServer in")
- local result, data, queue, rxBuff, param, param2, ip, port, succ, netc, authCode, pTxBuf
- local firstWaitNetReady = true
- local nowStatus = "WAIT_NET_READY"
- local retryTimes = 0
- local recvTimeout = 0
- local ipv6Valid = false
- localMsgID = 0
- dataQueue = {}
- rxBuff = zbuff.create(1024)
- netc = socket.create(nil, d1Name)
- socket.debug(netc, true)
- while true do
- ::CONTINUE::
- if nowStatus == "WAIT_NET_READY" then
- responseQueue = {}
- lastRxData = ""
- sysplus.cleanMsg(d1Name)
- connectOKFlag = false
- if not firstWaitNetReady then
- retryTimes = retryTimes + 1
- libnet.close(d1Name, 5000, netc)
- socket.release(netc)
- if retryTimes > 3 then -- 重试3次,进入FLY_MODE
- nowStatus = "FLY_MODE"
- goto CONTINUE
- end
- math.randomseed(os.time()) -- 设置随机数种子
- sys.wait(math.random(10, 20) * 1000)
- end
- while not netWork.isReady() do -- 等待网络就绪
- sys.wait(1000)
- end
- if firstWaitNetReady then -- 第一次或者进出飞行模式后等待网络就绪, 触发下远程升级
- socket.sntp()
- firstWaitNetReady = false
- libfota.request(reBootDevice)
- sys.timerLoopStart(libfota.request, 3600000, reBootDevice)
- end
- nowStatus = "REQ_SERVER"
- goto CONTINUE
- end
- if nowStatus == "REQ_SERVER" then
- sys.wait(3000)
- local _0, _1, _2, ipv6 = socket.localIP()
- if ipv6 and #ipv6 > 0 and ipv6:sub(1, 2) ~= "FE" then
- ipv6Valid = true
- else
- ipv6Valid = false
- end
- netc = socket.create(nil, d1Name)
- if _G.IPV6_UDP_VER and ipv6Valid then
- socket.config(netc, 12398, true)
- else
- socket.config(netc, nil, false)
- end
- local code, headers, body = http.request("GET", string.format(
- "https://gps.openluat.com/iot/getip?clientid=%s&p=%s", mobile.imei():sub(3, 14), _G.PRODUCT_VER)).wait()
- log.info("连接ip 请求结果", code, headers, body)
- if (code == 200 or code == 206) and body then
- local data, result = json.decode(body)
- if result == 1 and type(data) == "table" and data.msg == "ok" then
- if _G.IPV6_UDP_VER then
- if ipv6Valid and data.ipv6 and data.udp then
- log.info(
- "获取到IPV6地址,且服务器返回了IPV6地址,使用IPV6 UDP方式连接服务器")
- ip = data.ipv6
- port = data.udp
- else
- if not ipv6Valid then
- log.info("设备没有分配到IPV6地址,使用IPV4 TCP方式连接服务器")
- end
- if not data.ipv6 or not data.udp then
- log.info(
- "服务器没有返回IPV6地址或UDP端口,使用IPV4 TCP方式连接服务器")
- end
- if data.ipv4 and data.tcp then
- log.info("使用IPV4 TCP方式连接服务器")
- ip = data.ipv4
- port = data.tcp
- else
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- end
- else
- if data.ipv4 and data.tcp then
- log.info("使用IPV4 TCP方式连接服务器")
- ip = data.ipv4
- port = data.tcp
- else
- log.info("服务器没有返回IPV4地址,使用IPV4 TCP方式连接服务器")
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- end
- else
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- else
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- simID = mobile.imei():sub(3, 14)
- simID = api.StrToBin(simID)
- result = libnet.connect(d1Name, 30000, netc, ip, port)
- if not result then
- nowStatus = "WAIT_NET_READY"
- else
- nowStatus = "REG_DEVICE"
- end
- goto CONTINUE
- end
- if nowStatus == "REG_DEVICE" then
- authCode = fskv.get("authCode2")
- if authCode and #authCode > 0 then
- nowStatus = "AUTH_DEVICE"
- goto CONTINUE
- end
- localMsgID = (localMsgID + 1) % 65536
- lastTxMsgSn = localMsgID
- -- pTxBuf = air_jt808.makeRegMsg(0, 0, "Airm2m", string.fromHex(string.rep("0", 36) .. _G.PRODUCT_VER), api.StrToBin(mobile.imei():sub(1, 14)), 0, string.char(0x00))
- pTxBuf = air_jt808.makeRegMsg(0, 0, "Airm2m", "Air8000", api.StrToBin(mobile.imei():sub(1, 14)), 0,
- string.char(0x00))
- pTxBuf = air_jt808.air_jt808Head(0x0100, localMsgID, simID, pTxBuf)
- result, param = libnet.tx(d1Name, 15000, netc, pTxBuf)
- if not result then
- logF("注册发送异常")
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- logF("注册发送成功", pTxBuf:toHex())
- result, param = libnet.wait(d1Name, 30000, netc)
- if not result then
- logF("注册数据等待异常", result, param)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- if not ipv6Valid and IPV6_UDP_VER then
- if not param then
- logF("注册数据等待超时", result, param)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- end
- succ, param = socket.rx(netc, rxBuff)
- if not succ then
- logF("服务器断开了")
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- -- 如果服务器有下发数据, used()就必然大于0, 进行处理
- if rxBuff:used() > 0 then
- data = rxBuff:query()
- rxBuff:del()
- isResponseOK = false
- lastRxData, queue = air_jt808.rxAnalyze(lastRxData .. data)
- msgDispatch(queue)
- if not isResponseOK then
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- nowStatus = "AUTH_DEVICE"
- logF("注册成功")
- end
- goto CONTINUE
- end
- if nowStatus == "AUTH_DEVICE" then
- if mreport then
- mreport.send()
- end
- localMsgID = (localMsgID + 1) % 65536
- lastTxMsgSn = localMsgID
- pTxBuf = air_jt808.authPackage(lastTxMsgSn, simID, fskv.get("authCode2"))
- lastTxMsgID = 0x0102
- result, param = libnet.tx(d1Name, 15000, netc, pTxBuf)
- if not result then
- logF("鉴权发送失败")
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- logF("鉴权发送成功")
- result, param = libnet.wait(d1Name, 30000, netc)
- if not result then
- logF("鉴权数据等待异常", result, param)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- if not ipv6Valid and IPV6_UDP_VER then
- if not param then
- logF("鉴权 超时未收到响应报文")
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- end
- succ, param = socket.rx(netc, rxBuff)
- if not succ then
- logF("数据读取出现异常 4", succ, param, ip, port)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- if rxBuff:used() > 0 then
- data = rxBuff:query() -- 获取数据
- rxBuff:del()
- logF("鉴权data", data:toHex())
- isResponseOK = false
- isResponseNeedCheck = true
- lastRxData, queue = air_jt808.rxAnalyze(lastRxData .. data)
- msgDispatch(queue)
- if not isResponseOK then
- logF("鉴权失败")
- logF("lastRxData = ", lastRxData)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- logF("鉴权成功")
- nowStatus = "MSG_LISTEN"
- retryTimes = 0
- end
- goto CONTINUE
- end
- if nowStatus == "MSG_LISTEN" then
- connectOKFlag = true
- logF("开始监听")
- while true do
- waitAck = false
- while #responseQueue > 0 do
- -- 应答区有数据
- pTxBuf = responseQueue[1]
- result, param = libnet.tx(d1Name, 15000, netc, pTxBuf)
- if not result then
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- table.remove(responseQueue, 1)
- logF("response packet", api.BinToHex(pTxBuf, " "))
- end
- if #dataQueue > 0 then
- if mreport then
- mreport.send()
- end
- local item = dataQueue[1]
- localMsgID = (localMsgID + 1) % 65536
- lastTxMsgSn = localMsgID
- pTxBuf = air_jt808.positionPackage(lastTxMsgSn, simID, item:query())
- lastTxMsgID = 0x0200
- logF("data packet", #pTxBuf, api.BinToHex(pTxBuf, " "))
- result, param = libnet.tx(d1Name, 15000, netc, pTxBuf)
- if not result then
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- item:free()
- table.remove(dataQueue, 1)
- waitAck = true
- end
- isWaitMsg = true
- if waitAck then
- result, param = libnet.wait(d1Name, 60000, netc)
- else
- result, param = libnet.wait(d1Name, 300000, netc)
- end
- isWaitMsg = false
- logF("wait", result, param, param2)
- if not result then
- logF("服务器断开了 5", result, param)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- succ, param = socket.rx(netc, rxBuff)
- if not succ then
- logF("服务器断开了 6", succ, param, ip, port)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- -- 如果服务器有下发数据, used()就必然大于0, 进行处理
- if rxBuff:used() > 0 then
- logF("socket", "收到服务器数据,长度", rxBuff:used())
- data = rxBuff:query()
- rxBuff:del()
- logF("rxData", data:toHex())
- lastRxData, queue = air_jt808.rxAnalyze(lastRxData .. data)
- msgDispatch(queue)
- end
- -- 其他task给服务器发消息
- local _, msg = sys.waitUntil("OTHER_FILE_SENDMSG")
- log.info("其他task或者文件发过来的要发给日志服务器的消息", msg)
- local d1Name = taskName
- local pTxBuf = string.char(0xF0)..msg
- localMsgID = 0xF0 --透传
- pTxBuf = air_jt808.air_jt808Head(0x0900, localMsgID, simID, pTxBuf)
- logF("实际其他task发送给服务器的数据是", pTxBuf:toHex())
- -- local result, param = libnet.tx(d1Name, 15000, netCB, pTxBuf)
- local result, full,result = socket.tx(netc, pTxBuf)
- -- if not result then
- -- logF("没发出去") --我也不知道为啥显示是没发送出去,但是建栋那边收到了,先屏蔽这玩意
- -- nowStatus = "WAIT_NET_READY"
- -- -- goto CONTINUE
- -- else
- -- logF("发送成功")
- -- table.insert(responseQueue, pTxBuf)
- -- end
- -- 循环尾部, 继续下一轮循环
- end
- goto CONTINUE
- end
- if nowStatus == "FLY_MODE" then
- firstWaitNetReady = true
- netWork.setFlyMode(true)
- sys.waitUntil("EXIT_FLYMODE", 10 * 60 * 1000)
- netWork.setFlyMode(false)
- nowStatus = "WAIT_NET_READY"
- goto CONTINUE
- end
- end
- end
- sysplus.taskInitEx(auxServerTask, taskName, netCB, taskName)
- sys.timerLoopStart(function()
- local data = {
- type= "http_test",
- test_mode = "GET方法,无请求头、body以及额外的附加数据",
- test_end= "失败",
- fail_res = "download time out",
- imei= mobile.imei(),
- }
- local data = json.encode(data)
- sys.publish("OTHER_FILE_SENDMSG", data)
- end, 60 * 1000)
|