luat_mqtt_client.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. #include "luat_base.h"
  2. #include "luat_network_adapter.h"
  3. #include "libemqtt.h"
  4. #include "luat_rtos.h"
  5. #include "luat_zbuff.h"
  6. #include "luat_malloc.h"
  7. #include "luat_mqtt.h"
  8. #define LUAT_LOG_TAG "mqtt"
  9. #include "luat_log.h"
  10. #define MQTT_DEBUG 0
  11. #if MQTT_DEBUG == 0
  12. #undef LLOGD
  13. #define LLOGD(...)
  14. #endif
  15. static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl);
  16. LUAT_RT_RET_TYPE luat_mqtt_timer_callback(LUAT_RT_CB_PARAM){
  17. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  18. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TIMER_PING, 0);
  19. }
  20. static LUAT_RT_RET_TYPE reconnect_timer_cb(LUAT_RT_CB_PARAM){
  21. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  22. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RECONNECT, 0);
  23. }
  24. int luat_mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  25. int ret = luat_mqtt_connect(mqtt_ctrl);
  26. if(ret){
  27. LLOGI("reconnect init socket ret=%d\n", ret);
  28. luat_mqtt_close_socket(mqtt_ctrl);
  29. }
  30. return ret;
  31. }
  32. int luat_mqtt_ping(luat_mqtt_ctrl_t *mqtt_ctrl) {
  33. mqtt_ping(&mqtt_ctrl->broker);
  34. return 0;
  35. }
  36. int luat_mqtt_init(luat_mqtt_ctrl_t *mqtt_ctrl, int adapter_index) {
  37. memset(mqtt_ctrl, 0, sizeof(luat_mqtt_ctrl_t));
  38. mqtt_ctrl->adapter_index = adapter_index;
  39. mqtt_ctrl->netc = network_alloc_ctrl(adapter_index);
  40. if (!mqtt_ctrl->netc){
  41. LLOGW("network_alloc_ctrl fail");
  42. return -1;
  43. }
  44. network_init_ctrl(mqtt_ctrl->netc, NULL, luat_mqtt_callback, mqtt_ctrl);
  45. mqtt_ctrl->mqtt_state = 0;
  46. mqtt_ctrl->netc->is_debug = 0;
  47. mqtt_ctrl->keepalive = 240;
  48. network_set_base_mode(mqtt_ctrl->netc, 1, 10000, 0, 0, 0, 0);
  49. network_set_local_port(mqtt_ctrl->netc, 0);
  50. mqtt_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, mqtt_ctrl, NULL);
  51. mqtt_ctrl->ping_timer = luat_create_rtos_timer(luat_mqtt_timer_callback, mqtt_ctrl, NULL);
  52. return 0;
  53. }
  54. int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts) {
  55. memcpy(mqtt_ctrl->host, opts->host, strlen(opts->host) + 1);
  56. mqtt_ctrl->remote_port = opts->port;
  57. if (opts->is_tls){
  58. network_init_tls(mqtt_ctrl->netc, (opts->server_cert || opts->client_cert)?2:0);
  59. if (opts->server_cert){
  60. network_set_server_cert(mqtt_ctrl->netc, (const unsigned char *)opts->server_cert, opts->server_cert_len+1);
  61. }
  62. if (opts->client_cert){
  63. network_set_client_cert(mqtt_ctrl->netc, (const unsigned char*)opts->client_cert, opts->client_cert_len+1,
  64. (const unsigned char*)opts->client_key, opts->client_key_len+1,
  65. (const unsigned char*)opts->client_password, opts->client_password_len+1);
  66. }
  67. } else {
  68. network_deinit_tls(mqtt_ctrl->netc);
  69. }
  70. if (opts->is_ipv6) {
  71. network_connect_ipv6_domain(mqtt_ctrl->netc, 1);
  72. }
  73. mqtt_ctrl->broker.socket_info = mqtt_ctrl;
  74. mqtt_ctrl->broker.send = luat_mqtt_send_packet;
  75. return 0;
  76. }
  77. static void mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl){
  78. LLOGI("reconnect after %dms", mqtt_ctrl->reconnect_time);
  79. mqtt_ctrl->buffer_offset = 0;
  80. luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
  81. }
  82. void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  83. LLOGI("mqtt closing socket");
  84. if (mqtt_ctrl->netc){
  85. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_DISCONNECT, 0);
  86. network_force_close_socket(mqtt_ctrl->netc);
  87. }
  88. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  89. mqtt_ctrl->mqtt_state = 0;
  90. if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time > 0){
  91. mqtt_reconnect(mqtt_ctrl);
  92. }
  93. }
  94. void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  95. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RELEASE, 0);
  96. if (mqtt_ctrl->ping_timer){
  97. luat_release_rtos_timer(mqtt_ctrl->ping_timer);
  98. mqtt_ctrl->ping_timer = NULL;
  99. }
  100. if (mqtt_ctrl->reconnect_timer){
  101. luat_release_rtos_timer(mqtt_ctrl->reconnect_timer);
  102. mqtt_ctrl->reconnect_timer = NULL;
  103. }
  104. if (mqtt_ctrl->broker.will_data) {
  105. mqtt_ctrl->broker.will_len = 0;
  106. luat_heap_free(mqtt_ctrl->broker.will_data);
  107. mqtt_ctrl->broker.will_data = NULL;
  108. }
  109. if (mqtt_ctrl->netc){
  110. network_release_ctrl(mqtt_ctrl->netc);
  111. mqtt_ctrl->netc = NULL;
  112. }
  113. }
  114. static int mqtt_parse(luat_mqtt_ctrl_t *mqtt_ctrl) {
  115. LLOGD("mqtt_parse offset %d", mqtt_ctrl->buffer_offset);
  116. if (mqtt_ctrl->buffer_offset < 2) {
  117. LLOGD("wait more data");
  118. return 0;
  119. }
  120. // 判断数据长度, 前几个字节能判断出够不够读出mqtt的头
  121. char* buf = (char*)mqtt_ctrl->mqtt_packet_buffer;
  122. int num_bytes = 1;
  123. if ((buf[1] & 0x80) == 0x80) {
  124. num_bytes++;
  125. if (mqtt_ctrl->buffer_offset < 3) {
  126. LLOGD("wait more data for mqtt head");
  127. return 0;
  128. }
  129. if ((buf[2] & 0x80) == 0x80) {
  130. num_bytes ++;
  131. if (mqtt_ctrl->buffer_offset < 4) {
  132. LLOGD("wait more data for mqtt head");
  133. return 0;
  134. }
  135. if ((buf[3] & 0x80) == 0x80) {
  136. num_bytes ++;
  137. }
  138. }
  139. }
  140. // 判断数据总长, 这里rem_len只包含mqtt头部之外的数据
  141. uint16_t rem_len = mqtt_parse_rem_len(mqtt_ctrl->mqtt_packet_buffer);
  142. if (rem_len > mqtt_ctrl->buffer_offset - num_bytes - 1) {
  143. LLOGD("wait more data for mqtt head");
  144. return 0;
  145. }
  146. // 至此, mqtt包是完整的 解析类型, 处理之
  147. int ret = luat_mqtt_msg_cb(mqtt_ctrl);
  148. if (ret != 0){
  149. LLOGW("bad mqtt packet!! ret %d", ret);
  150. return -1;
  151. }
  152. // 处理完成后, 如果还有数据, 移动数据, 继续处理
  153. mqtt_ctrl->buffer_offset -= (1 + num_bytes + rem_len);
  154. memmove(mqtt_ctrl->mqtt_packet_buffer, mqtt_ctrl->mqtt_packet_buffer+1 + num_bytes + rem_len, mqtt_ctrl->buffer_offset);
  155. return 1;
  156. }
  157. int luat_mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl){
  158. // LLOGD("luat_mqtt_read_packet mqtt_ctrl->buffer_offset:%d",mqtt_ctrl->buffer_offset);
  159. int ret = -1;
  160. uint8_t *read_buff = NULL;
  161. uint32_t total_len = 0;
  162. uint32_t rx_len = 0;
  163. int result = network_rx(mqtt_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
  164. if (total_len > 0xFFF) {
  165. LLOGE("too many data wait for recv %d", total_len);
  166. luat_mqtt_close_socket(mqtt_ctrl);
  167. return -1;
  168. }
  169. if (total_len == 0) {
  170. LLOGW("rx event but NO data wait for recv");
  171. return 0;
  172. }
  173. if (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset <= 0) {
  174. LLOGE("buff is FULL, mqtt packet too big");
  175. luat_mqtt_close_socket(mqtt_ctrl);
  176. return -1;
  177. }
  178. #define MAX_READ (1024)
  179. int recv_want = 0;
  180. while (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset > 0) {
  181. if (MAX_READ > (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset)) {
  182. recv_want = MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset;
  183. }
  184. else {
  185. recv_want = MAX_READ;
  186. }
  187. // 从网络接收数据
  188. result = network_rx(mqtt_ctrl->netc, mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
  189. if (rx_len == 0 || result != 0 ) {
  190. LLOGD("rx_len %d result %d", rx_len, result);
  191. break;
  192. }
  193. // 收到数据了, 传给处理函数继续处理
  194. // 数据的长度变更, 触发传递
  195. mqtt_ctrl->buffer_offset += rx_len;
  196. LLOGD("data recv %d offset %d", rx_len, mqtt_ctrl->buffer_offset);
  197. further:
  198. result = mqtt_parse(mqtt_ctrl);
  199. if (result == 0) {
  200. // OK
  201. }else if(result == 1){
  202. if (mqtt_ctrl->buffer_offset > 0)
  203. goto further;
  204. else {
  205. continue;
  206. }
  207. }
  208. else {
  209. LLOGW("mqtt_parse ret %d, closing socket");
  210. luat_mqtt_close_socket(mqtt_ctrl);
  211. break;
  212. }
  213. }
  214. return 0;
  215. }
  216. static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
  217. rtos_msg_t msg = {0};
  218. // msg.handler = l_mqtt_callback;
  219. uint8_t msg_tp = MQTTParseMessageType(mqtt_ctrl->mqtt_packet_buffer);
  220. uint16_t msg_id = 0;
  221. uint8_t qos = 0;
  222. switch (msg_tp) {
  223. case MQTT_MSG_CONNACK: {
  224. // LLOGD("MQTT_MSG_CONNACK");
  225. if(mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
  226. LLOGW("CONACK 0x%02x",mqtt_ctrl->mqtt_packet_buffer[3]);
  227. luat_mqtt_close_socket(mqtt_ctrl);
  228. return -1;
  229. }
  230. mqtt_ctrl->mqtt_state = 1;
  231. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CONNACK, 0);
  232. break;
  233. }
  234. case MQTT_MSG_PUBLISH : {
  235. // LLOGD("MQTT_MSG_PUBLISH");
  236. const uint8_t* ptr;
  237. qos = MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer);
  238. uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  239. uint16_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  240. luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
  241. mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
  242. mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
  243. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, mqtt_msg);
  244. // 还要回复puback
  245. if (qos == 1) {
  246. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  247. mqtt_puback(&(mqtt_ctrl->broker), msg_id);
  248. }
  249. break;
  250. }
  251. case MQTT_MSG_PUBACK : {
  252. // LLOGD("MQTT_MSG_PUBACK");
  253. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBACK, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  254. break;
  255. }
  256. case MQTT_MSG_PUBREC : {
  257. msg_id = mqtt_parse_msg_id(&(mqtt_ctrl->broker));
  258. mqtt_pubrel(&(mqtt_ctrl->broker), msg_id);
  259. // LLOGD("MQTT_MSG_PUBREC");
  260. break;
  261. }
  262. case MQTT_MSG_PUBCOMP : {
  263. // LLOGD("MQTT_MSG_PUBCOMP");
  264. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBCOMP, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  265. break;
  266. }
  267. case MQTT_MSG_SUBACK : {
  268. LLOGD("MQTT_MSG_SUBACK");
  269. break;
  270. }
  271. case MQTT_MSG_UNSUBACK : {
  272. LLOGD("MQTT_MSG_UNSUBACK");
  273. break;
  274. }
  275. case MQTT_MSG_PINGRESP : {
  276. LLOGD("MQTT_MSG_PINGRESP");
  277. break;
  278. }
  279. case MQTT_MSG_DISCONNECT : {
  280. // LLOGD("MQTT_MSG_DISCONNECT");
  281. break;
  282. }
  283. default : {
  284. LLOGD("luat_mqtt_msg_cb error msg_tp:%d",msg_tp);
  285. break;
  286. }
  287. }
  288. return 0;
  289. }
  290. static const char* event2str(uint32_t id) {
  291. switch (id)
  292. {
  293. case EV_NW_RESULT_LINK:
  294. return "EV_NW_RESULT_LINK";
  295. case EV_NW_RESULT_CONNECT:
  296. return "EV_NW_RESULT_CONNECT";
  297. case EV_NW_RESULT_EVENT:
  298. return "EV_NW_RESULT_EVENT";
  299. case EV_NW_RESULT_TX:
  300. return "EV_NW_RESULT_TX";
  301. case EV_NW_RESULT_CLOSE:
  302. return "EV_NW_RESULT_CLOSE";
  303. default:
  304. return "UNKOWN";
  305. }
  306. }
  307. int32_t luat_mqtt_callback(void *data, void *param) {
  308. OS_EVENT *event = (OS_EVENT *)data;
  309. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)param;
  310. int ret = 0;
  311. // LLOGD("LINK %d ON_LINE %d EVENT %d TX_OK %d CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
  312. LLOGD("network mqtt cb %8X %s %8X",event->ID & 0x0ffffffff, event2str(event->ID & 0x0ffffffff) ,event->Param1);
  313. if (event->ID == EV_NW_RESULT_LINK){
  314. return 0; // 这里应该直接返回, 不能往下调用network_wait_event
  315. }else if(event->ID == EV_NW_RESULT_CONNECT){
  316. ret = mqtt_connect(&(mqtt_ctrl->broker));
  317. if(ret==1){
  318. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  319. }
  320. }else if(event->ID == EV_NW_RESULT_EVENT){
  321. if (event->Param1==0){
  322. ret = luat_mqtt_read_packet(mqtt_ctrl);
  323. // LLOGD("luat_mqtt_read_packet ret:%d",ret);
  324. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  325. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  326. }
  327. }else if(event->ID == EV_NW_RESULT_TX){
  328. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  329. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  330. }else if(event->ID == EV_NW_RESULT_CLOSE){
  331. }
  332. if (event->Param1){
  333. LLOGW("mqtt_callback param1 %d, closing socket", event->Param1);
  334. luat_mqtt_close_socket(mqtt_ctrl);
  335. }
  336. ret = network_wait_event(mqtt_ctrl->netc, NULL, 0, NULL);
  337. if (ret < 0){
  338. LLOGW("network_wait_event ret %d, closing socket", ret);
  339. luat_mqtt_close_socket(mqtt_ctrl);
  340. return -1;
  341. }
  342. return 0;
  343. }
  344. int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
  345. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
  346. uint32_t tx_len = 0;
  347. int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
  348. if (ret < 0)
  349. return 0;
  350. return count;
  351. }
  352. int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  353. int ret = 0;
  354. const char *hostname = mqtt_ctrl->host;
  355. uint16_t port = mqtt_ctrl->remote_port;
  356. uint16_t keepalive = mqtt_ctrl->keepalive;
  357. LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
  358. mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
  359. #ifdef LUAT_USE_LWIP
  360. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), ip_addr_isany_val(mqtt_ctrl->ip_addr)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  361. #else
  362. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.is_ipv6)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  363. #endif
  364. LLOGD("network_connect ret %d", ret);
  365. if (ret < 0) {
  366. network_close(mqtt_ctrl->netc, 0);
  367. return -1;
  368. }
  369. return 0;
  370. }
  371. int luat_mqtt_set_will(luat_mqtt_ctrl_t *mqtt_ctrl, const char* topic,
  372. const char* payload, size_t payload_len,
  373. uint8_t qos, size_t retain) {
  374. if (mqtt_ctrl == NULL || mqtt_ctrl->netc == NULL)
  375. return -1;
  376. return mqtt_set_will(&mqtt_ctrl->broker, topic, payload, payload_len, qos, retain);
  377. }