airtalk_dev_ctrl.lua 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. local g_state = SP_T_NO_READY --device状态
  2. local g_mqttc = nil --mqtt客户端
  3. local g_local_id --本机ID
  4. local g_remote_id --对端ID
  5. local g_s_type --对讲的模式,字符串形式的
  6. local g_s_topic --对讲用的topic
  7. local g_s_mode --对讲的模式
  8. local g_dev_list --对讲列表
  9. local function auth()
  10. if g_state == SP_T_NO_READY then
  11. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0001", json.encode({["key"] = PRODUCT_KEY, ["device_type"] = 1}))
  12. end
  13. end
  14. local function heart()
  15. if g_state == SP_T_CONNECTED then
  16. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0005", json.encode({["from"] = g_local_id, ["to"] = g_remote_id}))
  17. end
  18. end
  19. local function wait_speech_to()
  20. log.info("主动请求对讲超时无应答")
  21. speech_off(true, false)
  22. end
  23. --对讲开始,topic,ssrc,采样率(8K或者16K)这3个参数都有了之后就能进行对讲了,可以通过其他协议传入
  24. local function speech_on(ssrc, sample)
  25. g_state = SP_T_CONNECTED
  26. g_mqttc:subscribe(g_s_topic)
  27. airtalk.set_topic(g_s_topic)
  28. airtalk.set_ssrc(ssrc)
  29. log.info("对讲模式", g_s_mode)
  30. airtalk.speech(true, g_s_mode, sample)
  31. sys.sendMsg(AIRTALK_TASK_NAME, MSG_SPEECH_ON_IND, true)
  32. sys.timerLoopStart(heart, 150000)
  33. sys.timerStopAll(wait_speech_to)
  34. log.info("对讲接通,可以说话了")
  35. end
  36. --对讲结束
  37. local function speech_off(need_upload, need_ind)
  38. if g_state == SP_T_CONNECTED then
  39. g_mqttc:unsubscribe(g_s_topic)
  40. airtalk.speech(false)
  41. g_s_topic = nil
  42. end
  43. g_state = SP_T_IDLE
  44. sys.timerStopAll(auth)
  45. sys.timerStopAll(heart)
  46. sys.timerStopAll(wait_speech_to)
  47. log.info("对讲断开了")
  48. if need_upload then
  49. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0004", json.encode({["to"] = g_remote_id}))
  50. end
  51. if need_ind then
  52. sys.sendMsg(AIRTALK_TASK_NAME, MSG_SPEECH_OFF_IND, true)
  53. end
  54. end
  55. local function analyze_v1(cmd, topic, obj)
  56. if cmd == "8005" or cmd == "8004" then -- 对讲心跳保持和结束对讲的应答不做处理
  57. return
  58. end
  59. if cmd == "8003" then -- 请求对讲应答
  60. if g_state ~= SP_T_CONNECTING then --没有发起对讲请求
  61. log.error("state", g_state, "need", SP_T_CONNECTING)
  62. return
  63. else
  64. if obj and obj["result"] == SUCC and g_s_topic == obj["topic"]then --完全正确,开始对讲
  65. speech_on(obj["ssrc"], obj["audio_code"] == "amr-nb" and 8000 or 16000)
  66. return
  67. else
  68. log.info(obj["result"], obj["topic"], g_s_topic)
  69. sys.sendMsg(AIRTALK_TASK_NAME, MSG_SPEECH_ON_IND, false) --有异常,无法对讲
  70. end
  71. end
  72. g_s_topic = nil
  73. g_state = SP_T_IDLE
  74. return
  75. end
  76. local new_obj = nil
  77. if cmd == "0102" then -- 对端打过来
  78. if obj and obj["topic"] and obj["ssrc"] and obj["audio_code"] and obj["type"] then
  79. if g_state ~= SP_T_IDLE then -- 空闲状态下才可以进入对讲状态
  80. log.error("state", g_state, "need", SP_T_IDLE)
  81. new_obj = {["result"] = "failed", ["topic"] = obj["topic"], ["info"] = "device is busy"}
  82. else
  83. if obj["type"] == "one-on-one" then -- 1对1对讲
  84. local from = string.match(obj["topic"], "audio/.*/(.*)/.*")
  85. if from then
  86. log.info("remote id ", from)
  87. g_s_topic = obj["topic"]
  88. g_remote_id = from
  89. new_obj = {["result"] = SUCC, ["topic"] = obj["topic"], ["info"] = ""}
  90. g_s_type = "one-on-one"
  91. g_s_mode = airtalk.MODE_PERSON
  92. speech_on(obj["ssrc"], obj["audio_code"] == "amr-nb" and 8000 or 16000)
  93. else
  94. new_obj = {["result"] = "failed", ["topic"] = obj["topic"], ["info"] = "topic error"}
  95. end
  96. elseif obj["type"] == "broadcast" then -- 1对多对讲
  97. g_s_topic = obj["topic"]
  98. new_obj = {["result"] = SUCC, ["topic"] = obj["topic"], ["info"] = ""}
  99. g_s_mode = airtalk.MODE_GROUP_LISTENER
  100. g_s_type = "broadcast"
  101. speech_on(obj["ssrc"], obj["audio_code"] == "amr-nb" and 8000 or 16000)
  102. end
  103. end
  104. else
  105. new_obj = {["result"] = "failed", ["topic"] = obj["topic"], ["info"] = "json info error"}
  106. end
  107. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/8102", json.encode(new_obj))
  108. return
  109. end
  110. if cmd == "0103" then --对端挂断
  111. if g_state == SP_T_IDLE then
  112. new_obj = {["result"] = "failed", ["info"] = "no speech"}
  113. else
  114. if obj and obj["type"] == g_s_type then
  115. new_obj = {["result"] = SUCC, ["info"] = ""}
  116. speech_off(false, true)
  117. else
  118. new_obj = {["result"] = "failed", ["info"] = "type mismatch"}
  119. end
  120. end
  121. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/8103", json.encode(new_obj))
  122. return
  123. end
  124. if cmd == "0101" then --更新设备列表
  125. if obj then
  126. g_dev_list = obj["dev_list"]
  127. -- for i=1,#g_dev_list do
  128. -- log.info(g_dev_list[i]["id"],g_dev_list[i]["name"])
  129. -- end
  130. new_obj = {["result"] = SUCC, ["info"] = ""}
  131. else
  132. new_obj = {["result"] = "failed", ["info"] = "json info error"}
  133. end
  134. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/8101", json.encode(new_obj))
  135. return
  136. end
  137. if cmd == "8001" then
  138. if obj and obj["result"] == SUCC then
  139. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0002","") -- 更新列表
  140. else
  141. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, false, "鉴权失败" .. obj["info"])
  142. end
  143. return
  144. end
  145. if cmd == "8002" then
  146. if obj and obj["result"] == SUCC then --收到设备列表更新应答,才能认为相关网络服务准备好了
  147. g_dev_list = obj["dev_list"]
  148. -- for i=1,#g_dev_list do
  149. -- log.info(g_dev_list[i]["id"],g_dev_list[i]["name"])
  150. -- end
  151. g_state = SP_T_IDLE
  152. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, true) --完整登录流程结束
  153. else
  154. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, false, "更新设备列表失败")
  155. end
  156. return
  157. end
  158. end
  159. local function mqtt_cb(mqttc, event, topic, payload)
  160. log.info(event, topic)
  161. local msg,data,obj
  162. if event == "conack" then
  163. sys.sendMsg(AIRTALK_TASK_NAME, MSG_CONNECT_ON_IND) --mqtt连上了,开始自定义的鉴权流程
  164. g_mqttc:subscribe("ctrl/downlink/" .. g_local_id .. "/#")--单主题订阅
  165. elseif event == "suback" then
  166. if g_state == SP_T_NO_READY then
  167. if topic then
  168. auth()
  169. else
  170. sys.sendMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, false, "订阅失败" .. "ctrl/downlink/" .. g_local_id .. "/#")
  171. end
  172. elseif g_state == SP_T_CONNECTED then
  173. if not topic then
  174. speech_off(false, true)
  175. end
  176. end
  177. elseif event == "recv" then
  178. local result = string.match(topic, g_dl_topic)
  179. if result then
  180. local obj,res,err = json.decode(payload)
  181. analyze_v1(result, topic, obj)
  182. end
  183. result = nil
  184. data = nil
  185. obj = nil
  186. elseif event == "sent" then
  187. -- log.info("mqtt", "sent", "pkgid", data)
  188. elseif event == "disconnect" then
  189. speech_off(false, true)
  190. g_state = SP_T_NO_READY
  191. elseif event == "error" then
  192. end
  193. end
  194. local function task_cb(msg)
  195. if msg[1] == MSG_SPEECH_CONNECT_TO then
  196. speech_off(true,false)
  197. else
  198. log.info("未处理消息", msg[1], msg[2], msg[3], msg[4])
  199. end
  200. end
  201. local function airtalk_event_cb(event, param)
  202. log.info("airtalk event", event, param)
  203. if event == airtalk.EVENT_ERROR then
  204. if param == airtalk.ERROR_NO_DATA then
  205. log.error("长时间没有收到音频数据")
  206. speech_off(true, true)
  207. end
  208. end
  209. end
  210. local function airtalk_mqtt_task()
  211. local msg,data,obj,online,num,res
  212. --g_local_id也可以自己设置
  213. g_local_id = mobile.imei()
  214. g_dl_topic = "ctrl/downlink/" .. g_local_id .. "/(%w%w%w%w)"
  215. sys.timerLoopStart(next_auth, 900000)
  216. g_mqttc = mqtt.create(nil, "mqtt.airtalk.luatos.com", 1883, false, {rxSize = 32768})
  217. airtalk.config(airtalk.PROTOCOL_MQTT, g_mqttc, 200) -- 缓冲至少200ms播放
  218. airtalk.on(airtalk_event_cb)
  219. airtalk.start()
  220. g_mqttc:auth(g_local_id,g_local_id,mobile.muid()) -- g_local_id必填,其余选填
  221. g_mqttc:keepalive(240) -- 默认值240s
  222. g_mqttc:autoreconn(true, 15000) -- 自动重连机制
  223. g_mqttc:debug(false)
  224. g_mqttc:on(mqtt_cb)
  225. log.info("设备信息", g_local_id, mobile.muid())
  226. -- mqttc自动处理重连, 除非自行关闭
  227. g_mqttc:connect()
  228. online = false
  229. while true do
  230. msg = sys.waitMsg(AIRTALK_TASK_NAME, MSG_CONNECT_ON_IND) --等服务器连上
  231. log.info("connected")
  232. while not online do
  233. msg = sys.waitMsg(AIRTALK_TASK_NAME, MSG_AUTH_IND, 30000) --登录流程不应该超过30秒
  234. if type(msg) == 'table' then
  235. online = msg[2]
  236. if online then
  237. sys.timerLoopStart(auth, 3600000) --鉴权通过则60分钟后尝试重新鉴权
  238. else
  239. log.info(msg[3])
  240. sys.timerLoopStart(auth, 300000) --5分钟后重新鉴权
  241. end
  242. else
  243. auth() --30秒鉴权无效后重新鉴权
  244. end
  245. end
  246. log.info("对讲管理平台已连接")
  247. while online do
  248. msg = sys.waitMsg(AIRTALK_TASK_NAME)
  249. if type(msg) == 'table' and type(msg[1]) == "number" then
  250. if msg[1] == MSG_PERSON_SPEECH_TEST_START then
  251. if g_state ~= SP_T_IDLE then
  252. log.info("正在对讲无法开始")
  253. else
  254. log.info("测试一下主动1对1对讲功能,找一个有效的IMEI")
  255. for i=1,#g_dev_list do
  256. res = string.match(g_dev_list[i]["id"], "(%w%w%w%w%w%w%w%w%w%w%w%w%w%w%w)")
  257. if res and res ~= g_local_id then
  258. break
  259. end
  260. end
  261. if res then
  262. log.info("向", res, "主动发起对讲")
  263. g_state = SP_T_CONNECTING
  264. g_remote_id = res
  265. g_s_mode = airtalk.MODE_PERSON
  266. g_s_type = "one-on-one"
  267. g_s_topic = "audio/" .. g_local_id .. "/" .. g_remote_id .. "/" .. (string.sub(tostring(mcu.ticks()), -4, -1))
  268. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0003", json.encode({["topic"] = g_s_topic, ["type"] = g_s_type}))
  269. sys.timerStart(wait_speech_to, 15000)
  270. else
  271. log.info("找不到有效的设备ID")
  272. end
  273. end
  274. elseif msg[1] == MSG_GROUP_SPEECH_TEST_START then
  275. if g_state ~= SP_T_IDLE then
  276. log.info("正在对讲无法开始")
  277. else
  278. log.info("测试一下1对多对讲功能")
  279. g_remote_id = "all"
  280. g_state = SP_T_CONNECTING
  281. g_s_mode = airtalk.MODE_GROUP_SPEAKER
  282. g_s_type = "broadcast"
  283. g_s_topic = "audio/" .. g_local_id .. "/all/" .. (string.sub(tostring(mcu.ticks()), -4, -1))
  284. g_mqttc:publish("ctrl/uplink/" .. g_local_id .."/0003", json.encode({["topic"] = g_s_topic, ["type"] = g_s_type}))
  285. sys.timerStart(wait_speech_to, 15000)
  286. end
  287. elseif msg[1] == MSG_SPEECH_STOP_TEST_END then
  288. if g_state ~= SP_T_CONNECTING and g_state ~= SP_T_CONNECTED then
  289. log.info("没有对讲", g_state)
  290. else
  291. log.info("主动断开对讲")
  292. speech_off(true, false)
  293. end
  294. elseif msg[1] == MSG_SPEECH_ON_IND then
  295. if msg[2] then
  296. log.info("对讲接通")
  297. else
  298. log.info("对讲断开")
  299. end
  300. elseif msg[1] == MSG_CONNECT_OFF_IND then
  301. log.info("connect", msg[2])
  302. online = msg[2]
  303. end
  304. obj = nil
  305. else
  306. log.info(type(msg), type(msg[1]))
  307. end
  308. msg = nil
  309. end
  310. online = false
  311. end
  312. end
  313. function airtalk_mqtt_init()
  314. sys.taskInitEx(airtalk_mqtt_task, AIRTALK_TASK_NAME, task_cb)
  315. end