airtalk_demo_mqtt.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. #if 0
  2. #include "csdk.h"
  3. #include "airtalk_def.h"
  4. #include "airtalk_api.h"
  5. #include "luat_airtalk.h"
  6. #include "libemqtt.h"
  7. #include "luat_mqtt.h"
  8. typedef struct
  9. {
  10. llist_head node;
  11. uint64_t remote_tamp;
  12. uint64_t local_tamp;
  13. uint32_t total_len;
  14. uint8_t amr_save_data[];
  15. }net_data_struct;
  16. typedef struct
  17. {
  18. luat_mqtt_ctrl_t *mqtt_ctrl;
  19. luat_rtos_task_handle mqtt_task_handle;
  20. luat_rtos_timer_t download_check_timer;
  21. Buffer_Struct uplink;
  22. llist_head download_cache_head; //下行数据接收缓存队列
  23. uint32_t download_cache_time;
  24. Buffer_Struct topic;
  25. char self_id[15];
  26. uint8_t data_sync_ok;
  27. uint8_t uplink_ready;
  28. uint8_t speech_on;
  29. uint8_t is_16k;
  30. uint8_t debug_on_off;
  31. }demo_mqtt_ctrl_t;
  32. static demo_mqtt_ctrl_t prv_demo_mqtt;
  33. //播放完成
  34. static void end_broadcast_play(void)
  35. {
  36. net_data_struct *net_cache;
  37. luat_airtalk_speech_stop_play();
  38. luat_stop_rtos_timer(prv_demo_mqtt.download_check_timer);
  39. while(!llist_empty(&prv_demo_mqtt.download_cache_head))
  40. {
  41. net_cache = (net_data_struct *)prv_demo_mqtt.download_cache_head.next;
  42. llist_del(&net_cache->node);
  43. luat_heap_free(net_cache);
  44. }
  45. prv_demo_mqtt.speech_on = 0;
  46. }
  47. static void download_check_timer(void *param)
  48. {
  49. luat_rtos_event_send(prv_demo_mqtt.mqtt_task_handle, AIRTALK_EVENT_MQTT_FORCE_STOP, 0, 0, 0, 0);
  50. }
  51. static void airtalk_mqtt_cb(luat_mqtt_ctrl_t *mqtt_ctrl, uint16_t event)
  52. {
  53. int ret;
  54. if (event != MQTT_MSG_PUBLISH)
  55. {
  56. luat_rtos_event_send(prv_demo_mqtt.mqtt_task_handle, AIRTALK_EVENT_MQTT_MSG, event, 0, 0, 0);
  57. }
  58. else
  59. {
  60. const uint8_t* ptr;
  61. uint32_t len;
  62. uint8_t *topic = NULL;
  63. uint8_t *payload = NULL;
  64. len = mqtt_parse_pub_topic_ptr(prv_demo_mqtt.mqtt_ctrl->mqtt_packet_buffer, &ptr);
  65. topic = luat_heap_calloc(len + 1, 1);
  66. memcpy(topic, ptr, len);
  67. len = mqtt_parse_pub_msg_ptr(prv_demo_mqtt.mqtt_ctrl->mqtt_packet_buffer, &ptr);
  68. if (len)
  69. {
  70. payload = luat_heap_malloc(len);
  71. memcpy(payload, ptr, len);
  72. }
  73. luat_rtos_event_send(prv_demo_mqtt.mqtt_task_handle, AIRTALK_EVENT_MQTT_DOWNLINK_DATA, (uint32_t)topic, (uint32_t)payload, len, 0);
  74. }
  75. return;
  76. }
  77. static void airtalk_demo_mqtt_task(void *param)
  78. {
  79. uint64_t tamp;
  80. uint32_t local_time_diff, remote_time_diff;
  81. luat_event_t event;
  82. net_data_struct *net_cache;
  83. int ret = -1;
  84. uint8_t *p;
  85. char *packet_id;
  86. int i;
  87. uint16_t msgid = 0;
  88. char remote_client[16] = {0};
  89. INIT_LLIST_HEAD(&prv_demo_mqtt.download_cache_head);
  90. prv_demo_mqtt.download_check_timer = luat_create_rtos_timer(download_check_timer, NULL, NULL);
  91. if (!prv_demo_mqtt.download_cache_time)
  92. {
  93. prv_demo_mqtt.download_cache_time = 500;
  94. }
  95. if (!prv_demo_mqtt.topic.Data)
  96. {
  97. OS_BufferWrite(&prv_demo_mqtt.topic, "speech_demo/all", 16);
  98. }
  99. LUAT_DEBUG_PRINT("device id, %.*s topic %s", sizeof(prv_demo_mqtt.self_id), prv_demo_mqtt.self_id, prv_demo_mqtt.topic.Data);
  100. while(1){
  101. luat_rtos_event_recv(prv_demo_mqtt.mqtt_task_handle, 0, &event, NULL, LUAT_WAIT_FOREVER);
  102. switch(event.id)
  103. {
  104. case AIRTALK_EVENT_MQTT_DOWNLINK_DATA:
  105. if (memcmp(prv_demo_mqtt.topic.Data, (char *)event.param1, prv_demo_mqtt.topic.Pos - 1))
  106. {
  107. LUAT_DEBUG_PRINT("topic %s", (char *)event.param1);
  108. }
  109. else
  110. {
  111. packet_id = (char *)event.param2;
  112. p = (uint8_t *)event.param2;
  113. if (packet_id[15] > 1)
  114. {
  115. goto RX_DATA_DONE;
  116. }
  117. if (!memcmp(packet_id, prv_demo_mqtt.self_id, 15))
  118. {
  119. #ifdef SELF_TEST
  120. #else
  121. goto RX_DATA_DONE;
  122. #endif
  123. }
  124. if (packet_id[15])
  125. {
  126. if (prv_demo_mqtt.speech_on)
  127. {
  128. remote_client[0] = 0;
  129. end_broadcast_play();
  130. goto RX_DATA_DONE;
  131. }
  132. else
  133. {
  134. LUAT_DEBUG_PRINT("speech already stop!");
  135. goto RX_DATA_DONE;
  136. }
  137. }
  138. memcpy(&tamp, p + 16, 8);
  139. if (!remote_client[0])
  140. {
  141. prv_demo_mqtt.speech_on = 1;
  142. prv_demo_mqtt.data_sync_ok = 0;
  143. net_cache = luat_heap_malloc(sizeof(net_data_struct) + event.param3);
  144. net_cache->total_len = event.param3 - 24;
  145. net_cache->remote_tamp = tamp;
  146. net_cache->local_tamp = luat_mcu_tick64_ms();
  147. memcpy(net_cache->amr_save_data, p + 24, net_cache->total_len);
  148. llist_add_tail(&net_cache->node, &prv_demo_mqtt.download_cache_head);
  149. memcpy(remote_client, packet_id, 15);
  150. LUAT_DEBUG_PRINT("sync start remote %s %llu %llu", remote_client, net_cache->remote_tamp, net_cache->local_tamp);
  151. }
  152. else
  153. {
  154. if (memcmp(remote_client, packet_id, 15))
  155. {
  156. goto RX_DATA_DONE;
  157. }
  158. }
  159. {
  160. event.param3 -= 24; //data_len
  161. p += 24; //data
  162. if (prv_demo_mqtt.data_sync_ok)
  163. {
  164. luat_airtalk_speech_save_downlink_data(p, event.param3);
  165. }
  166. else
  167. {
  168. net_cache = luat_heap_malloc(sizeof(net_data_struct) + event.param3);
  169. net_cache->total_len = event.param3;
  170. net_cache->remote_tamp = tamp;
  171. net_cache->local_tamp = luat_mcu_tick64_ms();
  172. memcpy(net_cache->amr_save_data, p, net_cache->total_len);
  173. llist_add_tail(&net_cache->node, &prv_demo_mqtt.download_cache_head);
  174. net_cache = (net_data_struct *)prv_demo_mqtt.download_cache_head.next;
  175. remote_time_diff = (uint32_t)(tamp - net_cache->remote_tamp);
  176. if (remote_time_diff >= (prv_demo_mqtt.download_cache_time - 20))
  177. {
  178. local_time_diff = (uint32_t)(luat_mcu_tick64_ms() - net_cache->local_tamp);
  179. if (local_time_diff >= (prv_demo_mqtt.download_cache_time - 20))
  180. {
  181. LUAT_DEBUG_PRINT("sync ok");
  182. prv_demo_mqtt.data_sync_ok = 1;
  183. while(!llist_empty(&prv_demo_mqtt.download_cache_head))
  184. {
  185. net_cache = (net_data_struct *)prv_demo_mqtt.download_cache_head.next;
  186. llist_del(&net_cache->node);
  187. luat_airtalk_speech_save_downlink_data(net_cache->amr_save_data, net_cache->total_len);
  188. luat_heap_free(net_cache);
  189. }
  190. luat_airtalk_speech_sync_ok();
  191. }
  192. else
  193. {
  194. LUAT_DEBUG_PRINT("sync failed %u, %u", remote_time_diff, local_time_diff);
  195. net_cache = (net_data_struct *)prv_demo_mqtt.download_cache_head.next;
  196. llist_del(&net_cache->node);
  197. luat_heap_free(net_cache);
  198. net_cache = (net_data_struct *)prv_demo_mqtt.download_cache_head.next;
  199. LUAT_DEBUG_PRINT("resync start remote %s %llu %llu", remote_client, net_cache->remote_tamp, net_cache->local_tamp);
  200. }
  201. }
  202. }
  203. luat_airtalk_speech_start_play(prv_demo_mqtt.is_16k);
  204. luat_start_rtos_timer(prv_demo_mqtt.download_check_timer, 3000, 0);
  205. }
  206. }
  207. RX_DATA_DONE:
  208. luat_heap_free((char *)event.param1);
  209. luat_heap_free((char *)event.param2);
  210. break;
  211. case AIRTALK_EVENT_MQTT_UPLINK_DATA:
  212. if (prv_demo_mqtt.uplink_ready)
  213. {
  214. mqtt_publish(&(prv_demo_mqtt.mqtt_ctrl->broker), (char *)prv_demo_mqtt.topic.Data, prv_demo_mqtt.uplink.Data, prv_demo_mqtt.uplink.Pos, 0);
  215. }
  216. break;
  217. case AIRTALK_EVENT_MQTT_UPLINK_END:
  218. if (prv_demo_mqtt.uplink_ready)
  219. {
  220. prv_demo_mqtt.uplink.Pos = 16;
  221. prv_demo_mqtt.uplink.Data[15] = 1;
  222. mqtt_publish(&(prv_demo_mqtt.mqtt_ctrl->broker), (char *)prv_demo_mqtt.topic.Data, prv_demo_mqtt.uplink.Data, prv_demo_mqtt.uplink.Pos, 0);
  223. }
  224. break;
  225. case AIRTALK_EVENT_MQTT_FORCE_SYNC:
  226. LUAT_DEBUG_PRINT("sync lost resync!");
  227. remote_client[0] = 0;
  228. break;
  229. case AIRTALK_EVENT_MQTT_FORCE_STOP:
  230. LUAT_DEBUG_PRINT("broadcast long time no data!");
  231. remote_client[0] = 0;
  232. end_broadcast_play();
  233. break;
  234. case AIRTALK_EVENT_MQTT_MSG:
  235. switch(event.param1)
  236. {
  237. case MQTT_MSG_TCP_TX_DONE:
  238. //如果用QOS0发送,可以作为发送成功的初步判断依据
  239. break;
  240. case MQTT_MSG_CONNACK:
  241. if(prv_demo_mqtt.mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
  242. LUAT_DEBUG_PRINT("CONACK 0x%02x",prv_demo_mqtt.mqtt_ctrl->mqtt_packet_buffer[3]);
  243. prv_demo_mqtt.mqtt_ctrl->error_state = prv_demo_mqtt.mqtt_ctrl->mqtt_packet_buffer[3];
  244. luat_mqtt_close_socket(prv_demo_mqtt.mqtt_ctrl);
  245. break;
  246. }
  247. mqtt_subscribe(&(prv_demo_mqtt.mqtt_ctrl->broker), (char *)prv_demo_mqtt.topic.Data, &msgid, 0);
  248. msgid++;
  249. break;
  250. case MQTT_MSG_SUBACK:
  251. if(prv_demo_mqtt.mqtt_ctrl->mqtt_packet_buffer[4] > 0x02){
  252. LUAT_DEBUG_PRINT("SUBACK 0x%02x",prv_demo_mqtt.mqtt_ctrl->mqtt_packet_buffer[4]);
  253. luat_mqtt_close_socket(prv_demo_mqtt.mqtt_ctrl);
  254. break;
  255. }
  256. LUAT_DEBUG_PRINT("mqtt_subscribe ok");
  257. OS_ReInitBuffer(&prv_demo_mqtt.uplink, 1024);
  258. OS_BufferWrite(&prv_demo_mqtt.uplink, prv_demo_mqtt.self_id, 15);
  259. prv_demo_mqtt.data_sync_ok = 0;
  260. prv_demo_mqtt.uplink_ready = 1;
  261. luat_airtalk_callback(LUAT_AIRTALK_CB_ON_LINE_IDLE, NULL, 0);
  262. break;
  263. case MQTT_MSG_DISCONNECT:
  264. LUAT_DEBUG_PRINT("airtalk_mqtt_cb mqtt disconnect");
  265. prv_demo_mqtt.uplink_ready = 0;
  266. end_broadcast_play();
  267. luat_airtalk_callback(LUAT_AIRTALK_CB_OFF_LINE, NULL, 0);
  268. break;
  269. case MQTT_MSG_TIMER_PING:
  270. break;
  271. case MQTT_MSG_RECONNECT:
  272. break;
  273. case MQTT_MSG_CLOSE :
  274. prv_demo_mqtt.uplink_ready = 0;
  275. luat_airtalk_callback(LUAT_AIRTALK_CB_OFF_LINE, NULL, 0);
  276. break;
  277. }
  278. break;
  279. }
  280. }
  281. }
  282. void luat_airtalk_net_demo_mqtt_init(uint8_t is_16k)
  283. {
  284. prv_demo_mqtt.is_16k = is_16k;
  285. OS_InitBuffer(&prv_demo_mqtt.uplink, 1024);
  286. luat_rtos_task_create(&prv_demo_mqtt.mqtt_task_handle, 8 * 1024, 90, "airtalk_mqtt", airtalk_demo_mqtt_task, NULL, 0);
  287. }
  288. void luat_airtalk_net_param_config(uint32_t download_cache_time)
  289. {
  290. prv_demo_mqtt.download_cache_time = download_cache_time;
  291. }
  292. void luat_airtalk_net_uplink_start(void)
  293. {
  294. prv_demo_mqtt.data_sync_ok = 0;
  295. }
  296. void luat_airtalk_net_force_sync_downlink(void)
  297. {
  298. if (prv_demo_mqtt.data_sync_ok)
  299. {
  300. luat_rtos_event_send(prv_demo_mqtt.mqtt_task_handle, AIRTALK_EVENT_MQTT_FORCE_SYNC, 0, 0, 0, 0);
  301. }
  302. }
  303. void luat_airtalk_net_save_uplink_head(uint64_t record_time)
  304. {
  305. if (!prv_demo_mqtt.uplink_ready) return;
  306. prv_demo_mqtt.uplink.Pos = 16;
  307. if (record_time)
  308. {
  309. prv_demo_mqtt.uplink.Data[15] = 0;
  310. OS_BufferWrite(&prv_demo_mqtt.uplink, &record_time, 8);
  311. }
  312. else
  313. {
  314. prv_demo_mqtt.uplink.Data[15] = 1;
  315. }
  316. }
  317. void luat_airtalk_net_save_uplink_data(uint8_t *data, uint32_t len)
  318. {
  319. if (!prv_demo_mqtt.uplink_ready) return;
  320. OS_BufferWrite(&prv_demo_mqtt.uplink, data, len);
  321. }
  322. void luat_airtalk_net_uplink_once(void)
  323. {
  324. if (!prv_demo_mqtt.uplink_ready) return;
  325. luat_rtos_event_send(prv_demo_mqtt.mqtt_task_handle, AIRTALK_EVENT_MQTT_UPLINK_DATA, 0, 0, 0, 0);
  326. }
  327. void luat_airtalk_net_uplink_end(void)
  328. {
  329. if (!prv_demo_mqtt.uplink_ready) return;
  330. luat_rtos_event_send(prv_demo_mqtt.mqtt_task_handle, AIRTALK_EVENT_MQTT_UPLINK_END, 0, 0, 0, 0);
  331. }
  332. int luat_airtalk_net_set_device_id(char *id, uint32_t len)
  333. {
  334. if (prv_demo_mqtt.uplink_ready) return -ERROR_DEVICE_BUSY;
  335. if (len > 15) len = 15;
  336. memset(prv_demo_mqtt.self_id, 0, sizeof(prv_demo_mqtt.self_id));
  337. memcpy(prv_demo_mqtt.self_id, id, len);
  338. return 0;
  339. }
  340. void luat_airtalk_net_set_mqtt_ctrl(void *ctrl)
  341. {
  342. prv_demo_mqtt.mqtt_ctrl = ctrl;
  343. prv_demo_mqtt.mqtt_ctrl->app_cb = airtalk_mqtt_cb;
  344. }
  345. void luat_airtalk_net_set_mqtt_topic(const void *data, uint32_t len)
  346. {
  347. OS_ReInitBuffer(&prv_demo_mqtt.topic, len);
  348. OS_BufferWrite(&prv_demo_mqtt.topic, data, len);
  349. }
  350. void luat_airtalk_net_debug_switch(uint8_t on_off)
  351. {
  352. prv_demo_mqtt.debug_on_off = on_off;
  353. }
  354. #endif