airtalk_mqtt.c 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. #include "csdk.h"
  2. #include "airtalk_def.h"
  3. #include "airtalk_api.h"
  4. #include "luat_airtalk.h"
  5. #include "libemqtt.h"
  6. #include "luat_mqtt.h"
  7. typedef struct
  8. {
  9. uint8_t save_data[RECORD_DATA_MAX + 28];
  10. uint8_t in_use;
  11. }airtalk_net_data_cache_t;
  12. typedef struct
  13. {
  14. airtalk_network_ctrl_t *net_ctrl;
  15. luat_mqtt_ctrl_t *mqtt_ctrl;
  16. Buffer_Struct topic;
  17. airtalk_net_data_cache_t *net_data_cache;
  18. volatile uint32_t cache_cnt;
  19. }airtalk_mqtt_ctrl_t;
  20. static airtalk_mqtt_ctrl_t prv_mqtt;
  21. static void luat_airtalk_mqtt_send_function(uint8_t *data, uint32_t len)
  22. {
  23. mqtt_publish(&(prv_mqtt.mqtt_ctrl->broker), prv_mqtt.topic.Data, data, len, 0);
  24. }
  25. static void luat_airtalk_mqtt_recv_function(uint8_t *data, uint32_t len)
  26. {
  27. if (len >= DOWNLOAD_CACHE_MAX)
  28. {
  29. LUAT_DEBUG_PRINT("mqtt data cache cnt error %d", len);
  30. }
  31. if (!prv_mqtt.net_data_cache[len].in_use)
  32. {
  33. LUAT_DEBUG_PRINT("mqtt data cache not used %d", len);
  34. }
  35. prv_mqtt.net_data_cache[len].in_use = 0;
  36. }
  37. static int airtalk_mqtt_cb(luat_mqtt_ctrl_t *mqtt_ctrl, uint16_t event)
  38. {
  39. int ret;
  40. if (!prv_mqtt.topic.Data)
  41. {
  42. return 0;
  43. }
  44. if (event != MQTT_MSG_PUBLISH)
  45. {
  46. return 0;
  47. }
  48. else
  49. {
  50. const uint8_t* ptr;
  51. uint32_t len;
  52. len = mqtt_parse_pub_topic_ptr(prv_mqtt.mqtt_ctrl->mqtt_packet_buffer, &ptr);
  53. if (!memcmp(prv_mqtt.topic.Data, ptr, len))
  54. {
  55. len = mqtt_parse_pub_msg_ptr(prv_mqtt.mqtt_ctrl->mqtt_packet_buffer, &ptr);
  56. if (len && (len <= RECORD_DATA_MAX + 28))
  57. {
  58. if (prv_mqtt.net_data_cache[prv_mqtt.cache_cnt].in_use)
  59. {
  60. LUAT_DEBUG_PRINT("mqtt data cache full!!!");
  61. }
  62. else
  63. {
  64. memcpy(prv_mqtt.net_data_cache[prv_mqtt.cache_cnt].save_data, ptr, len);
  65. prv_mqtt.net_data_cache[prv_mqtt.cache_cnt].in_use = 1;
  66. luat_rtos_event_send(prv_mqtt.net_ctrl->task_handle, AIRTALK_EVENT_NETWORK_DOWNLINK_DATA, (uint32_t)prv_mqtt.net_data_cache[prv_mqtt.cache_cnt].save_data, len, prv_mqtt.cache_cnt, 0);
  67. prv_mqtt.cache_cnt = (prv_mqtt.cache_cnt + 1) & (DOWNLOAD_CACHE_MAX - 1);
  68. return 1;
  69. }
  70. }
  71. }
  72. }
  73. return 0;
  74. }
  75. void luat_airtalk_net_mqtt_init(void)
  76. {
  77. prv_mqtt.net_data_cache = luat_heap_calloc(DOWNLOAD_CACHE_MAX, sizeof(airtalk_net_data_cache_t));
  78. prv_mqtt.net_ctrl = luat_airtalk_net_common_init(luat_airtalk_mqtt_send_function, luat_airtalk_mqtt_recv_function);
  79. }
  80. void luat_airtalk_net_set_mqtt_ctrl(void *ctrl)
  81. {
  82. prv_mqtt.mqtt_ctrl = ctrl;
  83. prv_mqtt.mqtt_ctrl->app_cb = airtalk_mqtt_cb;
  84. }
  85. void luat_airtalk_net_set_mqtt_topic(const void *data, uint32_t len)
  86. {
  87. OS_ReInitBuffer(&prv_mqtt.topic, len);
  88. OS_BufferWrite(&prv_mqtt.topic, data, len);
  89. LUAT_DEBUG_PRINT("%.*s", prv_mqtt.topic.Pos, prv_mqtt.topic.Data);
  90. }