airtalk_network.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. #include "csdk.h"
  2. #include "luat_airtalk.h"
  3. #include "airtalk_def.h"
  4. #include "airtalk_api.h"
  5. #include "luat_rtp.h"
  6. typedef struct
  7. {
  8. llist_head node;
  9. uint64_t remote_tamp;
  10. uint64_t local_tamp;
  11. uint32_t total_len;
  12. uint8_t amr_save_data[];
  13. }net_data_struct;
  14. typedef struct
  15. {
  16. uint32_t tamp_high;
  17. uint32_t tamp_low;
  18. union
  19. {
  20. struct
  21. {
  22. uint32_t unused:12;
  23. uint32_t encode_type:4;
  24. uint32_t amr_data_len:16;
  25. };
  26. uint32_t fin_param;
  27. };
  28. }airtalk_extern_head_data_t;
  29. static airtalk_network_ctrl_t prv_network;
  30. //播放完成
  31. static void airtalk_full_stop(void)
  32. {
  33. net_data_struct *net_cache;
  34. luat_airtalk_speech_stop_play();
  35. luat_stop_rtos_timer(prv_network.download_check_timer);
  36. while(!llist_empty(&prv_network.download_cache_head))
  37. {
  38. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  39. llist_del(&net_cache->node);
  40. luat_heap_free(net_cache);
  41. }
  42. }
  43. static void download_check_timer(void *param)
  44. {
  45. if (prv_network.new_data_flag)
  46. {
  47. prv_network.new_data_flag = 0;
  48. return;
  49. }
  50. luat_airtalk_callback(LUAT_AIRTALK_CB_ERROR, NULL, LUAT_AIRTALL_ERR_LONG_TIME_NO_DATA);
  51. // luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_FORCE_STOP, 0, 0, 0, 0);
  52. }
  53. static void airtalk_network_task(void *param)
  54. {
  55. uint64_t tamp;
  56. airtalk_extern_head_data_t extern_data;
  57. rtp_base_head_t *remote_rtp_head = luat_heap_malloc(sizeof(rtp_base_head_t));
  58. rtp_extern_head_t *remote_rtp_extern = luat_heap_malloc(sizeof(rtp_extern_head_t));
  59. rtp_base_head_t *local_rtp_head = luat_heap_malloc(sizeof(rtp_base_head_t));
  60. rtp_extern_head_t *local_rtp_extern = luat_heap_malloc(sizeof(rtp_extern_head_t) + sizeof(airtalk_extern_head_data_t));
  61. uint32_t local_time_diff, remote_time_diff, *remote_rtp_extern_data;
  62. luat_event_t event;
  63. net_data_struct *net_cache;
  64. record_data_struct *record_cache = luat_heap_calloc(UPLOAD_CACHE_MAX, sizeof(record_data_struct));
  65. int ret = -1;
  66. uint16_t local_sn = 0;
  67. uint8_t *p;
  68. uint8_t *out = luat_heap_malloc(RECORD_DATA_MAX + 28);
  69. uint8_t sync_lost = 1;
  70. memset(local_rtp_head, 0, sizeof(rtp_base_head_t));
  71. memset(local_rtp_extern, 0, sizeof(rtp_extern_head_t) + sizeof(airtalk_extern_head_data_t));
  72. local_rtp_head->version = 2;
  73. local_rtp_head->extension = 1;
  74. local_rtp_extern->profile_id = 1;
  75. local_rtp_extern->length = sizeof(airtalk_extern_head_data_t) >> 2;
  76. INIT_LLIST_HEAD(&prv_network.download_cache_head);
  77. INIT_LLIST_HEAD(&prv_network.upload_cache_head);
  78. INIT_LLIST_HEAD(&prv_network.free_cache_head);
  79. for(int i = 0; i < UPLOAD_CACHE_MAX; i++)
  80. {
  81. llist_add(&record_cache[i].node, &prv_network.free_cache_head);
  82. }
  83. prv_network.download_check_timer = luat_create_rtos_timer(download_check_timer, NULL, NULL);
  84. if (!prv_network.download_cache_time)
  85. {
  86. prv_network.download_cache_time = 500;
  87. }
  88. if (!prv_network.download_no_data_time)
  89. {
  90. prv_network.download_no_data_time = 5000;
  91. }
  92. prv_network.record_cache_locker = luat_mutex_create();
  93. luat_mutex_unlock(prv_network.record_cache_locker);
  94. while(1){
  95. luat_rtos_event_recv(prv_network.task_handle, 0, &event, NULL, LUAT_WAIT_FOREVER);
  96. switch(event.id)
  97. {
  98. case AIRTALK_EVENT_NETWORK_DOWNLINK_DATA:
  99. if (prv_network.debug_on_off)
  100. {
  101. LUAT_DEBUG_PRINT("%d, %d", prv_network.work_mode, prv_network.is_ready);
  102. }
  103. if (LUAT_AIRTALK_SPEECH_MODE_GROUP_SPEAKER == prv_network.work_mode)
  104. {
  105. goto RX_DATA_DONE;
  106. }
  107. if (!prv_network.is_ready)
  108. {
  109. goto RX_DATA_DONE;
  110. }
  111. p = (uint8_t *)event.param1;
  112. ret = luat_unpack_rtp_head(p, event.param2, remote_rtp_head, &remote_rtp_extern_data);
  113. if (ret <= 0)
  114. {
  115. LUAT_DEBUG_PRINT("rtp head error! %d", ret);
  116. goto RX_DATA_DONE;
  117. }
  118. if (prv_network.debug_on_off)
  119. {
  120. LUAT_DEBUG_PRINT("%x, %x", remote_rtp_head->ssrc, prv_network.local_ssrc);
  121. }
  122. if (prv_network.local_ssrc == remote_rtp_head->ssrc)
  123. {
  124. goto RX_DATA_DONE;
  125. }
  126. p += ret;
  127. event.param2 -= ret;
  128. ret = luat_unpack_rtp_extern_head(p, event.param2, remote_rtp_extern, &remote_rtp_extern_data);
  129. if (ret <= 0)
  130. {
  131. LUAT_DEBUG_PRINT("rtp ext head error!, %d", ret);
  132. goto RX_DATA_DONE;
  133. }
  134. if (remote_rtp_extern->profile_id != 1)
  135. {
  136. LUAT_DEBUG_PRINT("profile id failed!");
  137. goto RX_DATA_DONE;
  138. }
  139. if (remote_rtp_extern->length != 3)
  140. {
  141. LUAT_DEBUG_PRINT("profile length failed!");
  142. goto RX_DATA_DONE;
  143. }
  144. extern_data.tamp_high = BytesGetBe32(remote_rtp_extern_data);
  145. extern_data.tamp_low = BytesGetBe32(&remote_rtp_extern_data[1]);
  146. extern_data.fin_param = BytesGetBe32(&remote_rtp_extern_data[2]);
  147. tamp = extern_data.tamp_high;
  148. tamp = (tamp << 32) + extern_data.tamp_low;
  149. p += ret;
  150. event.param2 -= ret;
  151. if (extern_data.amr_data_len != event.param2)
  152. {
  153. LUAT_DEBUG_PRINT("amr data len error %d,%d", extern_data.amr_data_len, event.param2);
  154. goto RX_DATA_DONE;
  155. }
  156. if ((uint32_t)p != event.param1 + 28)
  157. {
  158. LUAT_DEBUG_PRINT("head len error %x,%x", p, event.param1 + 28);
  159. goto RX_DATA_DONE;
  160. }
  161. if (sync_lost)
  162. {
  163. prv_network.data_sync_ok = 0;
  164. sync_lost = 0;
  165. prv_network.remote_ssrc_exsit = 0;
  166. LUAT_DEBUG_PRINT("wait fisrt rtp for sync");
  167. }
  168. if (!prv_network.remote_ssrc_exsit)
  169. {
  170. prv_network.data_sync_ok = 0;
  171. net_cache = luat_heap_malloc(sizeof(net_data_struct) + extern_data.amr_data_len);
  172. net_cache->total_len = extern_data.amr_data_len;
  173. net_cache->remote_tamp = tamp;
  174. net_cache->local_tamp = luat_mcu_tick64_ms();
  175. memcpy(net_cache->amr_save_data, p, net_cache->total_len);
  176. llist_add_tail(&net_cache->node, &prv_network.download_cache_head);
  177. prv_network.remote_ssrc = remote_rtp_head->ssrc;
  178. prv_network.remote_ssrc_exsit = 1;
  179. prv_network.new_data_flag = 1;
  180. LUAT_DEBUG_PRINT("sync start remote %llu %llu %x", net_cache->remote_tamp, net_cache->local_tamp, prv_network.remote_ssrc);
  181. goto RX_DATA_DONE;
  182. }
  183. else
  184. {
  185. if (prv_network.remote_ssrc != remote_rtp_head->ssrc)
  186. {
  187. LUAT_DEBUG_PRINT("ssrc error drop %x,%x", prv_network.remote_ssrc, remote_rtp_head->ssrc);
  188. goto RX_DATA_DONE;
  189. }
  190. }
  191. prv_network.new_data_flag = 1;
  192. if (prv_network.data_sync_ok)
  193. {
  194. luat_airtalk_speech_save_downlink_data(p, extern_data.amr_data_len);
  195. }
  196. else
  197. {
  198. net_cache = luat_heap_malloc(sizeof(net_data_struct) + extern_data.amr_data_len);
  199. net_cache->total_len = extern_data.amr_data_len;
  200. net_cache->remote_tamp = tamp;
  201. net_cache->local_tamp = luat_mcu_tick64_ms();
  202. memcpy(net_cache->amr_save_data, p, net_cache->total_len);
  203. llist_add_tail(&net_cache->node, &prv_network.download_cache_head);
  204. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  205. remote_time_diff = (uint32_t)(tamp - net_cache->remote_tamp);
  206. if (remote_time_diff >= (prv_network.download_cache_time - 20))
  207. {
  208. local_time_diff = (uint32_t)(luat_mcu_tick64_ms() - net_cache->local_tamp);
  209. if (local_time_diff >= (prv_network.download_cache_time - 20))
  210. {
  211. LUAT_DEBUG_PRINT("sync ok");
  212. prv_network.data_sync_ok = 1;
  213. while(!llist_empty(&prv_network.download_cache_head))
  214. {
  215. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  216. llist_del(&net_cache->node);
  217. luat_airtalk_speech_save_downlink_data(net_cache->amr_save_data, net_cache->total_len);
  218. luat_heap_free(net_cache);
  219. }
  220. luat_airtalk_speech_sync_ok();
  221. }
  222. else
  223. {
  224. LUAT_DEBUG_PRINT("sync failed %u, %u", remote_time_diff, local_time_diff);
  225. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  226. llist_del(&net_cache->node);
  227. luat_heap_free(net_cache);
  228. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  229. LUAT_DEBUG_PRINT("resync start remote %llu %llu", net_cache->remote_tamp, net_cache->local_tamp);
  230. }
  231. }
  232. }
  233. RX_DATA_DONE:
  234. prv_network.recv_function((uint8_t *)event.param1, event.param3);
  235. break;
  236. case AIRTALK_EVENT_NETWORK_UPLINK_DATA:
  237. if (prv_network.is_ready)
  238. {
  239. record_cache = NULL;
  240. luat_mutex_lock(prv_network.record_cache_locker);
  241. if(!llist_empty(&prv_network.upload_cache_head))
  242. {
  243. record_cache = (record_data_struct *)prv_network.upload_cache_head.next;
  244. llist_del(&record_cache->node);
  245. local_rtp_head->sn = local_sn;
  246. local_sn++;
  247. extern_data.tamp_high = (uint32_t)(record_cache->local_tamp >> 32);
  248. extern_data.tamp_low = (uint32_t)(record_cache->local_tamp & 0x00000000ffffffff);
  249. extern_data.amr_data_len = record_cache->total_len;
  250. memcpy(local_rtp_extern->data, &extern_data, sizeof(extern_data));
  251. ret = luat_pack_rtp(local_rtp_head, local_rtp_extern, record_cache->save_data, record_cache->total_len, out, RECORD_DATA_MAX + 28);
  252. llist_add_tail(&record_cache->node, &prv_network.free_cache_head);
  253. }
  254. luat_mutex_unlock(prv_network.record_cache_locker);
  255. if (!record_cache)
  256. {
  257. goto TX_DATA_DONE;
  258. }
  259. if (ret > 0)
  260. {
  261. prv_network.send_function(out, ret);
  262. }
  263. else
  264. {
  265. LUAT_DEBUG_PRINT("rtp pack error");
  266. }
  267. }
  268. else
  269. {
  270. luat_mutex_lock(prv_network.record_cache_locker);
  271. while(!llist_empty(&prv_network.upload_cache_head))
  272. {
  273. record_cache = (record_data_struct *)prv_network.upload_cache_head.next;
  274. llist_del(&record_cache->node);
  275. llist_add_tail(&record_cache->node, &prv_network.free_cache_head);
  276. }
  277. LUAT_DEBUG_PRINT("upload %d, free %d", llist_num(&prv_network.upload_cache_head), llist_num(&prv_network.free_cache_head));
  278. luat_mutex_unlock(prv_network.record_cache_locker);
  279. }
  280. TX_DATA_DONE:
  281. record_cache = NULL;
  282. break;
  283. case AIRTALK_EVENT_NETWORK_UPLINK_END:
  284. if (prv_network.is_ready)
  285. {
  286. local_rtp_head->sn = local_sn;
  287. local_sn++;
  288. extern_data.amr_data_len = 0;
  289. memcpy(local_rtp_extern->data, &extern_data, sizeof(extern_data));
  290. ret = luat_pack_rtp(local_rtp_head, local_rtp_extern, NULL, 0, out, RECORD_DATA_MAX + 28);
  291. if (ret > 0)
  292. {
  293. prv_network.send_function(out, ret);
  294. }
  295. else
  296. {
  297. LUAT_DEBUG_PRINT("rtp pack error");
  298. }
  299. }
  300. break;
  301. case AIRTALK_EVENT_NETWORK_READY_START:
  302. local_rtp_head->ssrc = prv_network.local_ssrc;
  303. luat_mutex_lock(prv_network.record_cache_locker);
  304. prv_network.is_ready = 1;
  305. while(!llist_empty(&prv_network.upload_cache_head))
  306. {
  307. record_cache = (record_data_struct *)prv_network.upload_cache_head.next;
  308. llist_del(&record_cache->node);
  309. llist_add_tail(&record_cache->node, &prv_network.free_cache_head);
  310. }
  311. LUAT_DEBUG_PRINT("upload %d, free %d", llist_num(&prv_network.upload_cache_head), llist_num(&prv_network.free_cache_head));
  312. luat_mutex_unlock(prv_network.record_cache_locker);
  313. luat_start_rtos_timer(prv_network.download_check_timer, prv_network.download_no_data_time, 1);
  314. prv_network.new_data_flag = 0;
  315. luat_airtalk_callback(LUAT_AIRTALK_CB_DATA_START, NULL, 0);
  316. break;
  317. case AIRTALK_EVENT_NETWORK_FORCE_SYNC:
  318. LUAT_DEBUG_PRINT("sync lost resync!");
  319. sync_lost = 1;
  320. luat_airtalk_callback(LUAT_AIRTALK_CB_DATA_RESYNC, NULL, 0);
  321. break;
  322. case AIRTALK_EVENT_NETWORK_FORCE_STOP:
  323. if (prv_network.is_ready)
  324. {
  325. prv_network.is_ready = 0;
  326. airtalk_full_stop();
  327. }
  328. sync_lost = 1;
  329. luat_airtalk_callback(LUAT_AIRTALK_CB_DATA_STOP, NULL, 0);
  330. break;
  331. case AIRTALK_EVENT_NETWORK_MSG:
  332. break;
  333. }
  334. }
  335. }
  336. void *luat_airtalk_net_common_init(CBDataFun_t send_function, CBDataFun_t recv_function)
  337. {
  338. prv_network.send_function = send_function;
  339. prv_network.recv_function = recv_function;
  340. luat_rtos_task_create(&prv_network.task_handle, 6 * 1024, 90, "airtalk_net", airtalk_network_task, NULL, 0);
  341. return (void *)&prv_network;
  342. }
  343. void luat_airtalk_net_param_config(uint8_t audio_data_protocl, uint32_t download_cache_time, uint32_t no_data_time)
  344. {
  345. prv_network.audio_data_protocl = audio_data_protocl;
  346. prv_network.download_cache_time = download_cache_time;
  347. prv_network.download_no_data_time = no_data_time;
  348. }
  349. void luat_airtalk_net_set_ssrc(uint32_t ssrc)
  350. {
  351. LUAT_DEBUG_PRINT("%x", ssrc);
  352. prv_network.local_ssrc = ssrc;
  353. }
  354. void luat_airtalk_net_transfer_start(uint8_t work_mode)
  355. {
  356. LUAT_DEBUG_PRINT("%d", work_mode);
  357. prv_network.work_mode = work_mode;
  358. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_READY_START, 0, 0, 0, 0);
  359. }
  360. void luat_airtalk_net_transfer_stop(void)
  361. {
  362. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_FORCE_STOP, 0, 0, 0, 0);
  363. }
  364. void luat_airtalk_net_force_sync_downlink(void)
  365. {
  366. if (prv_network.data_sync_ok)
  367. {
  368. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_FORCE_SYNC, 0, 0, 0, 0);
  369. }
  370. }
  371. void luat_airtalk_net_uplink_once(uint64_t record_time, uint8_t *data, uint32_t len)
  372. {
  373. if (!prv_network.is_ready) return;
  374. luat_mutex_lock(prv_network.record_cache_locker);
  375. if (llist_empty(&prv_network.free_cache_head))
  376. {
  377. LUAT_DEBUG_PRINT("no cache for upload!");
  378. luat_mutex_unlock(prv_network.record_cache_locker);
  379. return;
  380. }
  381. record_data_struct *cur_record_node = (record_data_struct *)prv_network.free_cache_head.next;
  382. llist_del(&cur_record_node->node);
  383. cur_record_node->local_tamp = record_time;
  384. cur_record_node->total_len = len;
  385. memcpy(cur_record_node->save_data, data, len);
  386. llist_add_tail(&cur_record_node->node, &prv_network.upload_cache_head);
  387. luat_mutex_unlock(prv_network.record_cache_locker);
  388. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_UPLINK_DATA, 0, 0, 0, 0);
  389. }
  390. void luat_airtalk_net_uplink_end(void)
  391. {
  392. if (!prv_network.is_ready) return;
  393. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_UPLINK_END, 0, 0, 0, 0);
  394. }
  395. void luat_airtalk_net_debug_switch(uint8_t on_off)
  396. {
  397. prv_network.debug_on_off = on_off;
  398. }
  399. uint8_t luat_airtalk_is_debug(void)
  400. {
  401. return prv_network.debug_on_off;
  402. }
  403. void luat_airtalk_net_init(void)
  404. {
  405. switch(prv_network.audio_data_protocl)
  406. {
  407. case LUAT_AIRTALK_PROTOCOL_MQTT:
  408. luat_airtalk_net_mqtt_init();
  409. break;
  410. }
  411. }
  412. void luat_airtalk_use_16k(uint8_t on_off)
  413. {
  414. prv_network.is_16k = on_off;
  415. }
  416. uint8_t luat_airtalk_is_16k(void)
  417. {
  418. return prv_network.is_16k;
  419. }