airtalk_network.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. #include "csdk.h"
  2. #include "luat_airtalk.h"
  3. #include "airtalk_def.h"
  4. #include "airtalk_api.h"
  5. #include "luat_rtp.h"
  6. typedef struct
  7. {
  8. llist_head node;
  9. uint64_t remote_tamp;
  10. uint64_t local_tamp;
  11. uint32_t total_len;
  12. uint8_t amr_save_data[];
  13. }net_data_struct;
  14. typedef struct
  15. {
  16. uint32_t tamp_high;
  17. uint32_t tamp_low;
  18. union
  19. {
  20. struct
  21. {
  22. uint32_t amr_data_len:16;
  23. uint32_t encode_type:4;
  24. uint32_t unused:12;
  25. };
  26. uint32_t fin_param;
  27. };
  28. }airtalk_extern_head_data_t;
  29. static airtalk_network_ctrl_t prv_network;
  30. //播放完成
  31. static void airtalk_full_stop(void)
  32. {
  33. net_data_struct *net_cache;
  34. luat_airtalk_speech_stop_play();
  35. luat_stop_rtos_timer(prv_network.download_check_timer);
  36. while(!llist_empty(&prv_network.download_cache_head))
  37. {
  38. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  39. llist_del(&net_cache->node);
  40. luat_heap_free(net_cache);
  41. }
  42. }
  43. static void download_check_timer(void *param)
  44. {
  45. LUAT_DEBUG_PRINT("broadcast long time no data!");
  46. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_FORCE_STOP, 0, 0, 0, 0);
  47. }
  48. static void airtalk_network_task(void *param)
  49. {
  50. uint64_t tamp;
  51. airtalk_extern_head_data_t extern_data;
  52. rtp_base_head_t *remote_rtp_head = luat_heap_malloc(sizeof(rtp_base_head_t));
  53. rtp_extern_head_t *remote_rtp_extern = luat_heap_malloc(sizeof(rtp_extern_head_t));
  54. rtp_base_head_t *local_rtp_head = luat_heap_malloc(sizeof(rtp_base_head_t));
  55. rtp_extern_head_t *local_rtp_extern = luat_heap_malloc(sizeof(rtp_extern_head_t) + sizeof(airtalk_extern_head_data_t));
  56. uint32_t local_time_diff, remote_time_diff, *remote_rtp_extern_data;
  57. luat_event_t event;
  58. net_data_struct *net_cache;
  59. record_data_struct *record_cache = luat_heap_calloc(UPLOAD_CACHE_MAX, sizeof(record_data_struct));
  60. int ret = -1;
  61. uint16_t local_sn = 0;
  62. uint8_t *p;
  63. uint8_t *out = luat_heap_malloc(RECORD_DATA_MAX + 28);
  64. uint8_t sync_lost = 1;
  65. memset(local_rtp_head, 0, sizeof(rtp_base_head_t));
  66. memset(local_rtp_extern, 0, sizeof(rtp_extern_head_t) + sizeof(airtalk_extern_head_data_t));
  67. local_rtp_head->version = 2;
  68. local_rtp_head->extension = 1;
  69. local_rtp_extern->profile_id = 1;
  70. local_rtp_extern->length = sizeof(airtalk_extern_head_data_t) >> 2;
  71. INIT_LLIST_HEAD(&prv_network.download_cache_head);
  72. INIT_LLIST_HEAD(&prv_network.upload_cache_head);
  73. INIT_LLIST_HEAD(&prv_network.free_cache_head);
  74. for(int i = 0; i < UPLOAD_CACHE_MAX; i++)
  75. {
  76. llist_add(&record_cache[i].node, &prv_network.free_cache_head);
  77. }
  78. prv_network.download_check_timer = luat_create_rtos_timer(download_check_timer, NULL, NULL);
  79. if (!prv_network.download_cache_time)
  80. {
  81. prv_network.download_cache_time = 500;
  82. }
  83. prv_network.record_cache_locker = luat_mutex_create();
  84. while(1){
  85. luat_rtos_event_recv(prv_network.task_handle, 0, &event, NULL, LUAT_WAIT_FOREVER);
  86. switch(event.id)
  87. {
  88. case AIRTALK_EVENT_NETWORK_DOWNLINK_DATA:
  89. p = (uint8_t *)event.param1;
  90. ret = luat_unpack_rtp_head(p, event.param2, remote_rtp_head, &remote_rtp_extern_data);
  91. if (ret <= 0)
  92. {
  93. goto RX_DATA_DONE;
  94. }
  95. p += ret;
  96. event.param2 -= ret;
  97. ret = luat_unpack_rtp_extern_head(p, event.param2, remote_rtp_extern, &remote_rtp_extern_data);
  98. if (ret <= 0)
  99. {
  100. goto RX_DATA_DONE;
  101. }
  102. if (remote_rtp_extern->profile_id != 1)
  103. {
  104. LUAT_DEBUG_PRINT("profile id failed!");
  105. goto RX_DATA_DONE;
  106. }
  107. if (remote_rtp_extern->length != 3)
  108. {
  109. LUAT_DEBUG_PRINT("profile length failed!");
  110. goto RX_DATA_DONE;
  111. }
  112. extern_data.tamp_high = BytesGetBe32(remote_rtp_extern_data);
  113. extern_data.tamp_low = BytesGetBe32(&remote_rtp_extern_data[1]);
  114. extern_data.fin_param = BytesGetBe32(&remote_rtp_extern_data[2]);
  115. tamp = extern_data.tamp_high;
  116. tamp = (tamp << 32) + extern_data.tamp_low;
  117. p += ret;
  118. event.param2 -= ret;
  119. if (extern_data.amr_data_len != event.param2)
  120. {
  121. LUAT_DEBUG_PRINT("amr data len error %d,%d", extern_data.amr_data_len, event.param2);
  122. goto RX_DATA_DONE;
  123. }
  124. if ((uint32_t)p != event.param1 + 28)
  125. {
  126. LUAT_DEBUG_PRINT("head len error %x,%x", p, event.param1 + 28);
  127. goto RX_DATA_DONE;
  128. }
  129. if (sync_lost)
  130. {
  131. prv_network.data_sync_ok = 0;
  132. sync_lost = 0;
  133. prv_network.remote_ssrc_exsit = 0;
  134. LUAT_DEBUG_PRINT("wait fisrt rtp for sync");
  135. }
  136. if (!prv_network.remote_ssrc_exsit)
  137. {
  138. prv_network.data_sync_ok = 0;
  139. net_cache = luat_heap_malloc(sizeof(net_data_struct) + event.param3);
  140. net_cache->total_len = extern_data.amr_data_len;
  141. net_cache->remote_tamp = tamp;
  142. net_cache->local_tamp = luat_mcu_tick64_ms();
  143. memcpy(net_cache->amr_save_data, p, net_cache->total_len);
  144. llist_add_tail(&net_cache->node, &prv_network.download_cache_head);
  145. prv_network.remote_ssrc = remote_rtp_head->ssrc;
  146. LUAT_DEBUG_PRINT("sync start remote %llu %llu", net_cache->remote_tamp, net_cache->local_tamp);
  147. }
  148. else
  149. {
  150. if (prv_network.remote_ssrc != remote_rtp_head->ssrc)
  151. {
  152. LUAT_DEBUG_PRINT("ssrc error drop %x,%x", prv_network.remote_ssrc, remote_rtp_head->ssrc);
  153. goto RX_DATA_DONE;
  154. }
  155. }
  156. if (prv_network.data_sync_ok)
  157. {
  158. luat_airtalk_speech_save_downlink_data(p, extern_data.amr_data_len);
  159. }
  160. else
  161. {
  162. net_cache = luat_heap_malloc(sizeof(net_data_struct) + event.param3);
  163. net_cache->total_len = extern_data.amr_data_len;
  164. net_cache->remote_tamp = tamp;
  165. net_cache->local_tamp = luat_mcu_tick64_ms();
  166. memcpy(net_cache->amr_save_data, p, net_cache->total_len);
  167. llist_add_tail(&net_cache->node, &prv_network.download_cache_head);
  168. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  169. remote_time_diff = (uint32_t)(tamp - net_cache->remote_tamp);
  170. if (remote_time_diff >= (prv_network.download_cache_time - 20))
  171. {
  172. local_time_diff = (uint32_t)(luat_mcu_tick64_ms() - net_cache->local_tamp);
  173. if (local_time_diff >= (prv_network.download_cache_time - 20))
  174. {
  175. LUAT_DEBUG_PRINT("sync ok");
  176. prv_network.data_sync_ok = 1;
  177. while(!llist_empty(&prv_network.download_cache_head))
  178. {
  179. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  180. llist_del(&net_cache->node);
  181. luat_airtalk_speech_save_downlink_data(net_cache->amr_save_data, net_cache->total_len);
  182. luat_heap_free(net_cache);
  183. }
  184. luat_airtalk_speech_sync_ok();
  185. }
  186. else
  187. {
  188. LUAT_DEBUG_PRINT("sync failed %u, %u", remote_time_diff, local_time_diff);
  189. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  190. llist_del(&net_cache->node);
  191. luat_heap_free(net_cache);
  192. net_cache = (net_data_struct *)prv_network.download_cache_head.next;
  193. LUAT_DEBUG_PRINT("resync start remote %llu %llu", net_cache->remote_tamp, net_cache->local_tamp);
  194. }
  195. }
  196. }
  197. luat_airtalk_speech_start_play(prv_network.is_16k);
  198. luat_start_rtos_timer(prv_network.download_check_timer, 3000, 0);
  199. RX_DATA_DONE:
  200. prv_network.recv_function((uint8_t *)event.param1, event.param3);
  201. break;
  202. case AIRTALK_EVENT_NETWORK_UPLINK_DATA:
  203. if (prv_network.is_ready)
  204. {
  205. TX_DATA_START:
  206. record_cache = NULL;
  207. luat_rtos_mutex_lock(prv_network.record_cache_locker, LUAT_WAIT_FOREVER);
  208. if(!llist_empty(&prv_network.upload_cache_head))
  209. {
  210. record_cache = (record_data_struct *)prv_network.upload_cache_head.next;
  211. llist_del(&record_cache->node);
  212. local_rtp_head->sn = local_sn;
  213. local_sn++;
  214. extern_data.tamp_high = (uint32_t)(record_cache->local_tamp >> 32);
  215. extern_data.tamp_low = (uint32_t)(record_cache->local_tamp & 0x00000000ffffffff);
  216. extern_data.amr_data_len = record_cache->total_len;
  217. memcpy(local_rtp_extern->data, &extern_data, sizeof(extern_data));
  218. ret = luat_pack_rtp(local_rtp_head, local_rtp_extern, record_cache->save_data, record_cache->total_len, out, RECORD_DATA_MAX + 28);
  219. llist_add_tail(&record_cache->node, &prv_network.free_cache_head);
  220. }
  221. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  222. if (!record_cache)
  223. {
  224. goto TX_DATA_DONE;
  225. }
  226. if (ret > 0)
  227. {
  228. prv_network.send_function(out, ret);
  229. }
  230. else
  231. {
  232. LUAT_DEBUG_PRINT("rtp pack error");
  233. }
  234. }
  235. else
  236. {
  237. luat_rtos_mutex_lock(prv_network.record_cache_locker, LUAT_WAIT_FOREVER);
  238. while(!llist_empty(&prv_network.upload_cache_head))
  239. {
  240. record_cache = (record_data_struct *)prv_network.upload_cache_head.next;
  241. llist_del(&record_cache->node);
  242. llist_add_tail(&record_cache->node, &prv_network.free_cache_head);
  243. }
  244. LUAT_DEBUG_PRINT("upload %d, free %d", llist_num(&prv_network.upload_cache_head), llist_num(&prv_network.free_cache_head));
  245. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  246. }
  247. TX_DATA_DONE:
  248. record_cache = NULL;
  249. break;
  250. case AIRTALK_EVENT_NETWORK_UPLINK_END:
  251. if (prv_network.is_ready)
  252. {
  253. local_rtp_head->sn = local_sn;
  254. local_sn++;
  255. extern_data.amr_data_len = 0;
  256. memcpy(local_rtp_extern->data, &extern_data, sizeof(extern_data));
  257. ret = luat_pack_rtp(local_rtp_head, local_rtp_extern, NULL, 0, out, RECORD_DATA_MAX + 28);
  258. if (ret > 0)
  259. {
  260. prv_network.send_function(out, ret);
  261. }
  262. else
  263. {
  264. LUAT_DEBUG_PRINT("rtp pack error");
  265. }
  266. }
  267. break;
  268. case AIRTALK_EVENT_NETWORK_READY_START:
  269. local_rtp_head->ssrc = prv_network.local_ssrc;
  270. luat_rtos_mutex_lock(prv_network.record_cache_locker, LUAT_WAIT_FOREVER);
  271. prv_network.is_ready = 1;
  272. while(!llist_empty(&prv_network.upload_cache_head))
  273. {
  274. record_cache = (record_data_struct *)prv_network.upload_cache_head.next;
  275. llist_del(&record_cache->node);
  276. llist_add_tail(&record_cache->node, &prv_network.free_cache_head);
  277. }
  278. LUAT_DEBUG_PRINT("upload %d, free %d", llist_num(&prv_network.upload_cache_head), llist_num(&prv_network.free_cache_head));
  279. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  280. break;
  281. case AIRTALK_EVENT_NETWORK_FORCE_SYNC:
  282. LUAT_DEBUG_PRINT("sync lost resync!");
  283. sync_lost = 1;
  284. break;
  285. case AIRTALK_EVENT_NETWORK_FORCE_STOP:
  286. if (prv_network.is_ready)
  287. {
  288. sync_lost = 1;
  289. prv_network.is_ready = 0;
  290. airtalk_full_stop();
  291. }
  292. break;
  293. case AIRTALK_EVENT_NETWORK_MSG:
  294. break;
  295. }
  296. }
  297. }
  298. void *luat_airtalk_net_common_init(CBDataFun_t send_function, CBDataFun_t recv_function)
  299. {
  300. prv_network.send_function = send_function;
  301. prv_network.recv_function = recv_function;
  302. luat_rtos_task_create(&prv_network.task_handle, 6 * 1024, 90, "airtalk_net", airtalk_network_task, NULL, 0);
  303. return (void *)&prv_network;
  304. }
  305. void luat_airtalk_net_param_config(uint8_t audio_data_protocl, uint32_t download_cache_time)
  306. {
  307. prv_network.audio_data_protocl = audio_data_protocl;
  308. prv_network.download_cache_time = download_cache_time;
  309. }
  310. void luat_airtalk_net_set_ssrc(uint32_t ssrc)
  311. {
  312. prv_network.local_ssrc = ssrc;
  313. }
  314. void luat_airtalk_net_transfer_start(uint8_t work_mode)
  315. {
  316. prv_network.work_mode = work_mode;
  317. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_READY_START, 0, 0, 0, 0);
  318. }
  319. void luat_airtalk_net_transfer_stop(void)
  320. {
  321. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_FORCE_STOP, 0, 0, 0, 0);
  322. }
  323. void luat_airtalk_net_force_sync_downlink(void)
  324. {
  325. if (prv_network.data_sync_ok)
  326. {
  327. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_FORCE_SYNC, 0, 0, 0, 0);
  328. }
  329. }
  330. void luat_airtalk_net_save_uplink_head(uint64_t record_time)
  331. {
  332. if (!prv_network.is_ready) return;
  333. luat_rtos_mutex_lock(prv_network.record_cache_locker, LUAT_WAIT_FOREVER);
  334. if (!prv_network.cur_record_node)
  335. {
  336. if (llist_empty(&prv_network.free_cache_head))
  337. {
  338. LUAT_DEBUG_PRINT("no cache for upload!");
  339. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  340. return;
  341. }
  342. prv_network.cur_record_node = (record_data_struct *)prv_network.free_cache_head.next;
  343. llist_del(&prv_network.cur_record_node->node);
  344. }
  345. prv_network.cur_record_node->local_tamp = record_time;
  346. prv_network.cur_record_node->total_len = 0;
  347. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  348. }
  349. void luat_airtalk_net_save_uplink_data(uint8_t *data, uint32_t len)
  350. {
  351. if (!prv_network.is_ready) return;
  352. luat_rtos_mutex_lock(prv_network.record_cache_locker, LUAT_WAIT_FOREVER);
  353. if (!prv_network.cur_record_node)
  354. {
  355. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  356. LUAT_DEBUG_PRINT("no head!");
  357. return;
  358. }
  359. if (prv_network.cur_record_node->total_len + len <= RECORD_DATA_MAX)
  360. {
  361. memcpy(prv_network.cur_record_node->save_data + prv_network.cur_record_node->total_len, data, len);
  362. prv_network.cur_record_node->total_len += len;
  363. }
  364. else
  365. {
  366. LUAT_DEBUG_PRINT("no mem!");
  367. }
  368. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  369. }
  370. void luat_airtalk_net_uplink_once(void)
  371. {
  372. if (!prv_network.is_ready) return;
  373. luat_rtos_mutex_lock(prv_network.record_cache_locker, LUAT_WAIT_FOREVER);
  374. if (!prv_network.cur_record_node)
  375. {
  376. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  377. LUAT_DEBUG_PRINT("no head!");
  378. return;
  379. }
  380. llist_add_tail(&prv_network.cur_record_node->node, &prv_network.upload_cache_head);
  381. prv_network.cur_record_node = NULL;
  382. luat_rtos_mutex_unlock(prv_network.record_cache_locker);
  383. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_UPLINK_DATA, 0, 0, 0, 0);
  384. }
  385. void luat_airtalk_net_uplink_end(void)
  386. {
  387. if (!prv_network.is_ready) return;
  388. luat_rtos_event_send(prv_network.task_handle, AIRTALK_EVENT_NETWORK_UPLINK_END, 0, 0, 0, 0);
  389. }
  390. void luat_airtalk_net_debug_switch(uint8_t on_off)
  391. {
  392. prv_network.debug_on_off = on_off;
  393. }
  394. void luat_airtalk_net_init(void)
  395. {
  396. switch(prv_network.audio_data_protocl)
  397. {
  398. case LUAT_AIRTALK_PROTOCOL_MQTT:
  399. luat_airtalk_net_mqtt_init();
  400. break;
  401. }
  402. }
  403. void luat_airtalk_use_16k(uint8_t on_off)
  404. {
  405. prv_network.is_16k = on_off;
  406. }
  407. uint8_t luat_airtalk_is_16k(void)
  408. {
  409. return prv_network.is_16k;
  410. }