luat_mqtt_client.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. #include "luat_base.h"
  2. #include "luat_network_adapter.h"
  3. #include "libemqtt.h"
  4. #include "luat_rtos.h"
  5. #include "luat_zbuff.h"
  6. #include "luat_malloc.h"
  7. #include "luat_mqtt.h"
  8. #define LUAT_LOG_TAG "mqtt"
  9. #include "luat_log.h"
  10. #define MQTT_DEBUG 0
  11. #if MQTT_DEBUG == 0
  12. #undef LLOGD
  13. #define LLOGD(...)
  14. #endif
  15. LUAT_RT_RET_TYPE luat_mqtt_timer_callback(LUAT_RT_CB_PARAM){
  16. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  17. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TIMER_PING, 0);
  18. }
  19. static LUAT_RT_RET_TYPE reconnect_timer_cb(LUAT_RT_CB_PARAM){
  20. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  21. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RECONNECT, 0);
  22. }
  23. int luat_mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  24. int ret = luat_mqtt_connect(mqtt_ctrl);
  25. if(ret){
  26. LLOGI("reconnect init socket ret=%d\n", ret);
  27. luat_mqtt_close_socket(mqtt_ctrl);
  28. }
  29. }
  30. int luat_mqtt_ping(luat_mqtt_ctrl_t *mqtt_ctrl) {
  31. mqtt_ping(&mqtt_ctrl->broker);
  32. return 0;
  33. }
  34. int luat_mqtt_init(luat_mqtt_ctrl_t *mqtt_ctrl, int adapter_index) {
  35. memset(mqtt_ctrl, 0, sizeof(luat_mqtt_ctrl_t));
  36. mqtt_ctrl->adapter_index = adapter_index;
  37. mqtt_ctrl->netc = network_alloc_ctrl(adapter_index);
  38. if (!mqtt_ctrl->netc){
  39. LLOGW("network_alloc_ctrl fail");
  40. return -1;
  41. }
  42. network_init_ctrl(mqtt_ctrl->netc, NULL, luat_mqtt_callback, mqtt_ctrl);
  43. mqtt_ctrl->mqtt_state = 0;
  44. mqtt_ctrl->netc->is_debug = 0;
  45. mqtt_ctrl->keepalive = 240;
  46. network_set_base_mode(mqtt_ctrl->netc, 1, 10000, 0, 0, 0, 0);
  47. network_set_local_port(mqtt_ctrl->netc, 0);
  48. if (mqtt_ctrl->reconnect){
  49. mqtt_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, mqtt_ctrl, NULL);
  50. }
  51. mqtt_ctrl->ping_timer = luat_create_rtos_timer(luat_mqtt_timer_callback, mqtt_ctrl, NULL);
  52. return 0;
  53. }
  54. int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts) {
  55. memcpy(mqtt_ctrl->host, opts->host, strlen(opts->host) + 1);
  56. mqtt_ctrl->remote_port = opts->port;
  57. if (opts->is_tls){
  58. network_init_tls(mqtt_ctrl->netc, opts->client_cert?2:0);
  59. if (opts->client_cert){
  60. network_set_client_cert(mqtt_ctrl->netc, (const unsigned char*)opts->client_cert, opts->client_cert_len,
  61. (const unsigned char*)opts->client_key, opts->client_key_len,
  62. (const unsigned char*)opts->client_password, opts->client_password_len);
  63. }
  64. } else {
  65. network_deinit_tls(mqtt_ctrl->netc);
  66. }
  67. mqtt_ctrl->broker.socket_info = mqtt_ctrl;
  68. mqtt_ctrl->broker.send = luat_mqtt_send_packet;
  69. return 0;
  70. }
  71. static void mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl){
  72. LLOGI("reconnect after %dms", mqtt_ctrl->reconnect_time);
  73. mqtt_ctrl->buffer_offset = 0;
  74. luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
  75. }
  76. void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  77. LLOGI("mqtt closing socket");
  78. if (mqtt_ctrl->netc){
  79. network_force_close_socket(mqtt_ctrl->netc);
  80. }
  81. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  82. mqtt_ctrl->mqtt_state = 0;
  83. if (mqtt_ctrl->reconnect){
  84. mqtt_reconnect(mqtt_ctrl);
  85. }
  86. }
  87. void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  88. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RELEASE, 0);
  89. if (mqtt_ctrl->ping_timer){
  90. luat_release_rtos_timer(mqtt_ctrl->ping_timer);
  91. mqtt_ctrl->ping_timer = NULL;
  92. }
  93. if (mqtt_ctrl->reconnect_timer){
  94. luat_release_rtos_timer(mqtt_ctrl->reconnect_timer);
  95. mqtt_ctrl->reconnect_timer = NULL;
  96. }
  97. if (mqtt_ctrl->netc){
  98. network_release_ctrl(mqtt_ctrl->netc);
  99. mqtt_ctrl->netc = NULL;
  100. }
  101. }
  102. static int mqtt_parse(luat_mqtt_ctrl_t *mqtt_ctrl) {
  103. LLOGD("mqtt_parse offset %d", mqtt_ctrl->buffer_offset);
  104. if (mqtt_ctrl->buffer_offset < 2) {
  105. LLOGD("wait more data");
  106. return 0;
  107. }
  108. // 判断数据长度, 前几个字节能判断出够不够读出mqtt的头
  109. char* buf = mqtt_ctrl->mqtt_packet_buffer;
  110. int num_bytes = 1;
  111. if ((buf[1] & 0x80) == 0x80) {
  112. num_bytes++;
  113. if (mqtt_ctrl->buffer_offset < 3) {
  114. LLOGD("wait more data for mqtt head");
  115. return 0;
  116. }
  117. if ((buf[2] & 0x80) == 0x80) {
  118. num_bytes ++;
  119. if (mqtt_ctrl->buffer_offset < 4) {
  120. LLOGD("wait more data for mqtt head");
  121. return 0;
  122. }
  123. if ((buf[3] & 0x80) == 0x80) {
  124. num_bytes ++;
  125. }
  126. }
  127. }
  128. // 判断数据总长, 这里rem_len只包含mqtt头部之外的数据
  129. uint16_t rem_len = mqtt_parse_rem_len(mqtt_ctrl->mqtt_packet_buffer);
  130. if (rem_len > mqtt_ctrl->buffer_offset - num_bytes - 1) {
  131. LLOGD("wait more data for mqtt head");
  132. return 0;
  133. }
  134. // 至此, mqtt包是完整的 解析类型, 处理之
  135. int ret = luat_mqtt_msg_cb(mqtt_ctrl);
  136. if (ret != 0){
  137. LLOGW("bad mqtt packet!! ret %d", ret);
  138. return -1;
  139. }
  140. // 处理完成后, 如果还有数据, 移动数据, 继续处理
  141. mqtt_ctrl->buffer_offset -= (1 + num_bytes + rem_len);
  142. memmove(mqtt_ctrl->mqtt_packet_buffer, mqtt_ctrl->mqtt_packet_buffer+1 + num_bytes + rem_len, mqtt_ctrl->buffer_offset);
  143. return 1;
  144. }
  145. int luat_mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl){
  146. // LLOGD("luat_mqtt_read_packet mqtt_ctrl->buffer_offset:%d",mqtt_ctrl->buffer_offset);
  147. int ret = -1;
  148. uint8_t *read_buff = NULL;
  149. uint32_t total_len = 0;
  150. uint32_t rx_len = 0;
  151. int result = network_rx(mqtt_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
  152. if (total_len > 0xFFF) {
  153. LLOGE("too many data wait for recv %d", total_len);
  154. luat_mqtt_close_socket(mqtt_ctrl);
  155. return -1;
  156. }
  157. if (total_len == 0) {
  158. LLOGW("rx event but NO data wait for recv");
  159. return 0;
  160. }
  161. if (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset <= 0) {
  162. LLOGE("buff is FULL, mqtt packet too big");
  163. luat_mqtt_close_socket(mqtt_ctrl);
  164. return -1;
  165. }
  166. #define MAX_READ (1024)
  167. int recv_want = 0;
  168. while (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset > 0) {
  169. if (MAX_READ > (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset)) {
  170. recv_want = MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset;
  171. }
  172. else {
  173. recv_want = MAX_READ;
  174. }
  175. // 从网络接收数据
  176. result = network_rx(mqtt_ctrl->netc, mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
  177. if (rx_len == 0 || result != 0 ) {
  178. LLOGD("rx_len %d result %d", rx_len, result);
  179. break;
  180. }
  181. // 收到数据了, 传给处理函数继续处理
  182. // 数据的长度变更, 触发传递
  183. mqtt_ctrl->buffer_offset += rx_len;
  184. LLOGD("data recv %d offset %d", rx_len, mqtt_ctrl->buffer_offset);
  185. further:
  186. result = mqtt_parse(mqtt_ctrl);
  187. if (result == 0) {
  188. // OK
  189. }else if(result == 1){
  190. if (mqtt_ctrl->buffer_offset > 0)
  191. goto further;
  192. else {
  193. continue;
  194. }
  195. }
  196. else {
  197. LLOGW("mqtt_parse ret %d, closing socket");
  198. luat_mqtt_close_socket(mqtt_ctrl);
  199. break;
  200. }
  201. }
  202. return 0;
  203. }
  204. static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
  205. rtos_msg_t msg = {0};
  206. // msg.handler = l_mqtt_callback;
  207. uint8_t msg_tp = MQTTParseMessageType(mqtt_ctrl->mqtt_packet_buffer);
  208. uint16_t msg_id = 0;
  209. uint8_t qos = 0;
  210. switch (msg_tp) {
  211. case MQTT_MSG_CONNACK: {
  212. // LLOGD("MQTT_MSG_CONNACK");
  213. if(mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
  214. LLOGW("CONACK 0x%02x",mqtt_ctrl->mqtt_packet_buffer[3]);
  215. luat_mqtt_close_socket(mqtt_ctrl);
  216. return -1;
  217. }
  218. mqtt_ctrl->mqtt_state = 1;
  219. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CONNACK, 0);
  220. break;
  221. }
  222. case MQTT_MSG_PUBLISH : {
  223. // LLOGD("MQTT_MSG_PUBLISH");
  224. const uint8_t* ptr;
  225. qos = MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer);
  226. uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  227. uint16_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  228. luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
  229. mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
  230. mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
  231. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, mqtt_msg);
  232. // 还要回复puback
  233. if (qos == 1) {
  234. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  235. mqtt_puback(&(mqtt_ctrl->broker), msg_id);
  236. }
  237. break;
  238. }
  239. case MQTT_MSG_PUBACK : {
  240. // LLOGD("MQTT_MSG_PUBACK");
  241. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBACK, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  242. break;
  243. }
  244. case MQTT_MSG_PUBREC : {
  245. msg_id = mqtt_parse_msg_id(&(mqtt_ctrl->broker));
  246. mqtt_pubrel(&(mqtt_ctrl->broker), msg_id);
  247. // LLOGD("MQTT_MSG_PUBREC");
  248. break;
  249. }
  250. case MQTT_MSG_PUBCOMP : {
  251. // LLOGD("MQTT_MSG_PUBCOMP");
  252. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBCOMP, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  253. break;
  254. }
  255. case MQTT_MSG_SUBACK : {
  256. LLOGD("MQTT_MSG_SUBACK");
  257. break;
  258. }
  259. case MQTT_MSG_UNSUBACK : {
  260. LLOGD("MQTT_MSG_UNSUBACK");
  261. break;
  262. }
  263. case MQTT_MSG_PINGRESP : {
  264. LLOGD("MQTT_MSG_PINGRESP");
  265. break;
  266. }
  267. case MQTT_MSG_DISCONNECT : {
  268. // LLOGD("MQTT_MSG_DISCONNECT");
  269. break;
  270. }
  271. default : {
  272. LLOGD("luat_mqtt_msg_cb error msg_tp:%d",msg_tp);
  273. break;
  274. }
  275. }
  276. return 0;
  277. }
  278. static const char* event2str(uint32_t id) {
  279. switch (id)
  280. {
  281. case EV_NW_RESULT_LINK:
  282. return "EV_NW_RESULT_LINK";
  283. case EV_NW_RESULT_CONNECT:
  284. return "EV_NW_RESULT_CONNECT";
  285. case EV_NW_RESULT_EVENT:
  286. return "EV_NW_RESULT_EVENT";
  287. case EV_NW_RESULT_TX:
  288. return "EV_NW_RESULT_TX";
  289. case EV_NW_RESULT_CLOSE:
  290. return "EV_NW_RESULT_CLOSE";
  291. default:
  292. return "UNKOWN";
  293. }
  294. }
  295. int32_t luat_mqtt_callback(void *data, void *param) {
  296. OS_EVENT *event = (OS_EVENT *)data;
  297. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)param;
  298. int ret = 0;
  299. // LLOGD("LINK %d ON_LINE %d EVENT %d TX_OK %d CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
  300. LLOGD("network mqtt cb %8X %s %8X",event->ID & 0x0ffffffff, event2str(event->ID & 0x0ffffffff) ,event->Param1);
  301. if (event->ID == EV_NW_RESULT_LINK){
  302. return 0; // 这里应该直接返回, 不能往下调用network_wait_event
  303. }else if(event->ID == EV_NW_RESULT_CONNECT){
  304. ret = mqtt_connect(&(mqtt_ctrl->broker));
  305. if(ret==1){
  306. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  307. }
  308. }else if(event->ID == EV_NW_RESULT_EVENT){
  309. if (event->Param1==0){
  310. ret = luat_mqtt_read_packet(mqtt_ctrl);
  311. // LLOGD("luat_mqtt_read_packet ret:%d",ret);
  312. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  313. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  314. }
  315. }else if(event->ID == EV_NW_RESULT_TX){
  316. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  317. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  318. }else if(event->ID == EV_NW_RESULT_CLOSE){
  319. }
  320. if (event->Param1){
  321. LLOGW("mqtt_callback param1 %d, closing socket", event->Param1);
  322. luat_mqtt_close_socket(mqtt_ctrl);
  323. }
  324. ret = network_wait_event(mqtt_ctrl->netc, NULL, 0, NULL);
  325. if (ret < 0){
  326. LLOGW("network_wait_event ret %d, closing socket", ret);
  327. luat_mqtt_close_socket(mqtt_ctrl);
  328. return -1;
  329. }
  330. return 0;
  331. }
  332. int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
  333. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
  334. uint32_t tx_len = 0;
  335. int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
  336. if (ret < 0)
  337. return 0;
  338. return count;
  339. }
  340. int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  341. int ret = 0;
  342. const char *hostname = mqtt_ctrl->host;
  343. uint16_t port = mqtt_ctrl->remote_port;
  344. uint16_t keepalive = mqtt_ctrl->keepalive;
  345. LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
  346. mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
  347. #ifdef LUAT_USE_LWIP
  348. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.type)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  349. #else
  350. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.is_ipv6)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  351. #endif
  352. LLOGD("network_connect ret %d", ret);
  353. if (ret < 0) {
  354. network_close(mqtt_ctrl->netc, 0);
  355. return -1;
  356. }
  357. return 0;
  358. }