airtalk_dev_ctrl.lua 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. local g_state = SP_T_NO_READY --device状态
  2. local g_mqttc = nil --mqtt客户端
  3. local g_local_id --本机ID
  4. local g_remote_id --对端ID
  5. --local g_dl_topic --管理台下发的topic
  6. local g_s_topic --对讲用的topic
  7. local g_s_mode --对讲的模式
  8. local g_dev_list --对讲列表
  9. local function task_cb(msg)
  10. log.info("未处理消息", msg[1], msg[2], msg[3], msg[4])
  11. end
  12. local function airtalk_event_cb(event, param)
  13. log.info("airtalk event", event, param)
  14. end
  15. local function auth()
  16. if g_state == SP_T_NO_READY then
  17. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0001", json.encode({["key"] = PRODUCT_KEY, ["device_type"] = 1}))
  18. end
  19. end
  20. local function heart()
  21. if g_state == SP_T_CONNECTED then
  22. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0005", json.encode({["from"] = g_local_id, ["to"] = g_remote_id}))
  23. end
  24. end
  25. local function speech_on(mode, ssrc, sample)
  26. g_state = SP_T_CONNECTED
  27. g_s_mode = mode
  28. g_mqttc:subscribe(g_s_topic)
  29. airtalk.set_topic(g_s_topic)
  30. airtalk.set_ssrc(ssrc)
  31. airtalk.speech(true, g_s_mode, sample)
  32. sys.sendMsg(AIRTALK_TASK_NAME, MSG_SPEECH_ON_IND, true)
  33. sys.timerLoopStart(heart, 150000)
  34. end
  35. local function speech_off()
  36. if g_state == SP_T_CONNECTED then
  37. g_mqttc:unsubscribe(g_s_topic)
  38. airtalk.speech(false)
  39. g_s_topic = nil
  40. end
  41. g_state = SP_T_IDLE
  42. sys.timerStopAll(heart)
  43. end
  44. local function analyze_v1(cmd, topic, obj)
  45. if cmd == "8005" or cmd == "8004" then -- 对讲心跳保持和结束对讲的应答不做处理
  46. return
  47. end
  48. if cmd == "8003" then -- 请求对讲应答
  49. if g_state ~= SP_T_CONNECTING then --没有发起对讲请求
  50. log.error("state", g_state, "need", SP_T_CONNECTING)
  51. return
  52. else
  53. if obj and obj["result"] == SUCC and g_s_topic == obj["topic"]then --完全正确,开始对讲
  54. speech_on(airtalk.MODE_PERSON, obj["ssrc"], obj["audio_code"] == "amr-nb" and 8000 or 16000)
  55. else
  56. sys.sendMsg(AIRTALK_TASK_NAME, MSG_SPEECH_ON_IND, false) --有异常,无法对讲
  57. end
  58. end
  59. g_s_topic = nil
  60. g_state = SP_T_IDLE
  61. return
  62. end
  63. local new_obj = nil
  64. if cmd == "0102" then -- 对端打过来
  65. if obj and obj["topic"] and obj["ssrc"] and obj["audio_code"] then
  66. if g_state ~= SP_T_IDLE then
  67. log.error("state", g_state, "need", SP_T_CONNECTING)
  68. new_obj = {["result"] = "failed", ["topic"] = obj["topic"], ["info"] = "device is busy"}
  69. else
  70. local from = string.match(obj["topic"], "audio/.*/(.*)/.*")
  71. if from then
  72. log.info("remote id ", from)
  73. g_s_topic = obj["topic"]
  74. g_remote_id = from
  75. new_obj = {["result"] = SUCC, ["topic"] = obj["topic"], ["info"] = ""}
  76. speech_on(airtalk.MODE_PERSON, obj["ssrc"], obj["audio_code"] == "amr-nb" and 8000 or 16000)
  77. else
  78. new_obj = {["result"] = "failed", ["topic"] = obj["topic"], ["info"] = "topic error"}
  79. end
  80. end
  81. else
  82. new_obj = {["result"] = "failed", ["topic"] = obj["topic"], ["info"] = "json info error"}
  83. end
  84. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/8102", json.encode(new_obj))
  85. return
  86. end
  87. if cmd == "0103" then --对端挂断
  88. if g_state == SP_T_IDLE then
  89. new_obj = {["result"] = "failed", ["info"] = "no speech"}
  90. else
  91. new_obj = {["result"] = SUCC, ["info"] = ""}
  92. speech_off()
  93. sys.sendMsg(AIRTALK_TASK_NAME, MSG_SPEECH_OFF_IND, true)
  94. end
  95. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/8103", json.encode(new_obj))
  96. return
  97. end
  98. if cmd == "0101" then --更新设备列表
  99. if obj then
  100. g_dev_list = obj["dev_list"]
  101. -- for i=1,#g_dev_list do
  102. -- log.info(g_dev_list[i]["id"],g_dev_list[i]["name"])
  103. -- end
  104. new_obj = {["result"] = SUCC, ["info"] = ""}
  105. else
  106. new_obj = {["result"] = "failed", ["info"] = "json info error"}
  107. end
  108. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/8101", json.encode(new_obj))
  109. return
  110. end
  111. if cmd == "8001" then
  112. if obj and obj["result"] == SUCC then
  113. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0002","") -- 更新列表
  114. else
  115. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, false, "鉴权失败" .. obj["info"])
  116. end
  117. return
  118. end
  119. if cmd == "8002" then
  120. if obj and obj["result"] == SUCC then --收到设备列表更新应答,才能认为相关网络服务准备好了
  121. g_dev_list = obj["dev_list"]
  122. -- for i=1,#g_dev_list do
  123. -- log.info(g_dev_list[i]["id"],g_dev_list[i]["name"])
  124. -- end
  125. g_state = SP_T_IDLE
  126. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, true) --完整登录流程结束
  127. else
  128. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, false, "更新设备列表失败")
  129. end
  130. return
  131. end
  132. end
  133. local function mqtt_cb(mqttc, event, topic, payload)
  134. log.info(event, topic)
  135. local msg,data,obj
  136. if event == "conack" then
  137. sys.sendMsg(AIRTALK_TASK_NAME, MSG_CONNECT_ON_IND) --mqtt连上了,开始自定义的鉴权流程
  138. g_mqttc:subscribe("ctrl/downlink/" .. g_local_id .. "/#")--单主题订阅
  139. elseif event == "suback" then
  140. if g_state == SP_T_NO_READY then
  141. if topic then
  142. auth()
  143. else
  144. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, false, "订阅失败" .. "ctrl/downlink/" .. g_local_id .. "/#")
  145. end
  146. elseif g_state == SP_T_CONNECTED then
  147. if not topic then
  148. speech_off()
  149. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0004", json.encode({["to"] = g_remote_id}))
  150. sys.sendMsg(AIRTALK_TASK_NAME, MSG_SPEECH_OFF_IND, true)
  151. end
  152. end
  153. elseif event == "recv" then
  154. local result = string.match(topic, g_dl_topic)
  155. if result then
  156. local obj,res,err = json.decode(payload)
  157. analyze_v1(result, topic, obj)
  158. end
  159. result = nil
  160. data = nil
  161. obj = nil
  162. elseif event == "sent" then
  163. -- log.info("mqtt", "sent", "pkgid", data)
  164. elseif event == "disconnect" then
  165. speech_off()
  166. sys.sendMsg(AIRTALK_TASK_NAME, MSG_CONNECT_OFF_IND)
  167. sys.timerStopAll(auth)
  168. g_state = SP_T_NO_READY
  169. elseif event == "error" then
  170. end
  171. end
  172. function airtalk_mqtt_task()
  173. local msg,data,obj,online
  174. --g_local_id也可以自己设置
  175. g_local_id = mobile.imei()
  176. g_dl_topic = "ctrl/downlink/" .. g_local_id .. "/(%w%w%w%w)"
  177. sys.timerLoopStart(next_auth, 900000)
  178. g_mqttc = mqtt.create(nil, "mqtt.airtalk.luatos.com", 1883, false, {rxSize = 32768})
  179. airtalk.config(airtalk.PROTOCOL_MQTT, g_mqttc, 200) -- 缓冲至少200ms播放
  180. airtalk.on(airtalk_event_cb)
  181. airtalk.start()
  182. g_mqttc:auth(g_local_id,g_local_id,mobile.muid()) -- g_local_id必填,其余选填
  183. g_mqttc:keepalive(240) -- 默认值240s
  184. g_mqttc:autoreconn(true, 15000) -- 自动重连机制
  185. g_mqttc:debug(false)
  186. g_mqttc:on(mqtt_cb)
  187. log.info("设备信息", g_local_id, mobile.muid())
  188. -- mqttc自动处理重连, 除非自行关闭
  189. g_mqttc:connect()
  190. online = false
  191. while true do
  192. msg = sys.waitMsg(AIRTALK_TASK_NAME, MSG_CONNECT_ON_IND) --等服务器连上
  193. log.info("connected")
  194. while not online do
  195. msg = sys.waitMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, 30000) --登录流程不应该超过30秒
  196. if type(msg) == 'table' then
  197. online = msg[2]
  198. if online then
  199. sys.timerLoopStart(auth, 3600000) --鉴权通过则60分钟后尝试重新鉴权
  200. else
  201. log.info(msg[3])
  202. sys.timerLoopStart(auth, 300000) --5分钟后重新鉴权
  203. end
  204. else
  205. auth() --30秒鉴权无效后重新鉴权
  206. end
  207. end
  208. log.info("对讲管理平台已连接")
  209. while online do
  210. msg = sys.waitMsg(AIRTALK_TASK_NAME)
  211. if type(msg) == 'table' and type(msg[1]) == "number" then
  212. if msg[1] == MSG_PERSON_SPEECH_REQ then
  213. if g_state ~= SP_T_IDLE then
  214. else
  215. g_state = SP_T_CONNECTING
  216. g_remote_id = msg[2]
  217. g_s_topic = "audio/" .. g_local_id .. "/" .. g_remote_id .. "/" .. mcu.tick()
  218. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0003", json.encode({["topic"] = g_s_topic})) -- 更新列表
  219. end
  220. elseif msg[1] == MSG_SPEECH_STOP_REQ then
  221. if g_state ~= SP_T_CONNECTING and g_state ~= SP_T_CONNECTED then
  222. else
  223. speech_off()
  224. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0004", json.encode({["to"] = g_remote_id}))
  225. end
  226. elseif msg[1] == MSG_SPEECH_ON_IND then
  227. if msg[2] then
  228. end
  229. elseif msg[1] == MSG_CONNECT_OFF_IND then
  230. log.info("connect", msg[2])
  231. online = msg[2]
  232. end
  233. obj = nil
  234. else
  235. log.info(type(msg), type(msg[1]))
  236. end
  237. msg = nil
  238. end
  239. online = false
  240. end
  241. end
  242. function airtalk_mqtt_init()
  243. sys.taskInitEx(airtalk_mqtt_task, AIRTALK_TASK_NAME, task_cb)
  244. end