luat_mqtt_client.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. #include "luat_base.h"
  2. #include "luat_network_adapter.h"
  3. #include "libemqtt.h"
  4. #include "luat_rtos.h"
  5. #include "luat_zbuff.h"
  6. #include "luat_malloc.h"
  7. #include "luat_mqtt.h"
  8. #define LUAT_LOG_TAG "mqtt"
  9. #include "luat_log.h"
  10. #define MQTT_DEBUG 0
  11. #if MQTT_DEBUG == 0
  12. #undef LLOGD
  13. #define LLOGD(...)
  14. #endif
  15. LUAT_RT_RET_TYPE luat_mqtt_timer_callback(LUAT_RT_CB_PARAM){
  16. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  17. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TIMER_PING, 0);
  18. }
  19. static LUAT_RT_RET_TYPE reconnect_timer_cb(LUAT_RT_CB_PARAM){
  20. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  21. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RECONNECT, 0);
  22. }
  23. int luat_mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  24. int ret = luat_mqtt_connect(mqtt_ctrl);
  25. if(ret){
  26. LLOGI("reconnect init socket ret=%d\n", ret);
  27. luat_mqtt_close_socket(mqtt_ctrl);
  28. }
  29. }
  30. int luat_mqtt_ping(luat_mqtt_ctrl_t *mqtt_ctrl) {
  31. mqtt_ping(&mqtt_ctrl->broker);
  32. return 0;
  33. }
  34. int luat_mqtt_init(luat_mqtt_ctrl_t *mqtt_ctrl, int adapter_index) {
  35. memset(mqtt_ctrl, 0, sizeof(luat_mqtt_ctrl_t));
  36. mqtt_ctrl->adapter_index = adapter_index;
  37. mqtt_ctrl->netc = network_alloc_ctrl(adapter_index);
  38. if (!mqtt_ctrl->netc){
  39. LLOGW("network_alloc_ctrl fail");
  40. return -1;
  41. }
  42. network_init_ctrl(mqtt_ctrl->netc, NULL, luat_mqtt_callback, mqtt_ctrl);
  43. mqtt_ctrl->mqtt_state = 0;
  44. mqtt_ctrl->netc->is_debug = 0;
  45. mqtt_ctrl->keepalive = 240;
  46. network_set_base_mode(mqtt_ctrl->netc, 1, 10000, 0, 0, 0, 0);
  47. network_set_local_port(mqtt_ctrl->netc, 0);
  48. mqtt_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, mqtt_ctrl, NULL);
  49. mqtt_ctrl->ping_timer = luat_create_rtos_timer(luat_mqtt_timer_callback, mqtt_ctrl, NULL);
  50. return 0;
  51. }
  52. int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts) {
  53. memcpy(mqtt_ctrl->host, opts->host, strlen(opts->host) + 1);
  54. mqtt_ctrl->remote_port = opts->port;
  55. if (opts->is_tls){
  56. network_init_tls(mqtt_ctrl->netc, (opts->server_cert || opts->client_cert)?2:0);
  57. if (opts->server_cert){
  58. network_set_server_cert(mqtt_ctrl->netc, (const unsigned char *)opts->server_cert, opts->server_cert_len);
  59. }
  60. if (opts->client_cert){
  61. network_set_client_cert(mqtt_ctrl->netc, (const unsigned char*)opts->client_cert, opts->client_cert_len,
  62. (const unsigned char*)opts->client_key, opts->client_key_len,
  63. (const unsigned char*)opts->client_password, opts->client_password_len);
  64. }
  65. } else {
  66. network_deinit_tls(mqtt_ctrl->netc);
  67. }
  68. mqtt_ctrl->broker.socket_info = mqtt_ctrl;
  69. mqtt_ctrl->broker.send = luat_mqtt_send_packet;
  70. return 0;
  71. }
  72. static void mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl){
  73. LLOGI("reconnect after %dms", mqtt_ctrl->reconnect_time);
  74. mqtt_ctrl->buffer_offset = 0;
  75. luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
  76. }
  77. void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  78. LLOGI("mqtt closing socket");
  79. if (mqtt_ctrl->netc){
  80. network_force_close_socket(mqtt_ctrl->netc);
  81. }
  82. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  83. mqtt_ctrl->mqtt_state = 0;
  84. if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time > 0){
  85. mqtt_reconnect(mqtt_ctrl);
  86. }
  87. }
  88. void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  89. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RELEASE, 0);
  90. if (mqtt_ctrl->ping_timer){
  91. luat_release_rtos_timer(mqtt_ctrl->ping_timer);
  92. mqtt_ctrl->ping_timer = NULL;
  93. }
  94. if (mqtt_ctrl->reconnect_timer){
  95. luat_release_rtos_timer(mqtt_ctrl->reconnect_timer);
  96. mqtt_ctrl->reconnect_timer = NULL;
  97. }
  98. if (mqtt_ctrl->netc){
  99. network_release_ctrl(mqtt_ctrl->netc);
  100. mqtt_ctrl->netc = NULL;
  101. }
  102. }
  103. static int mqtt_parse(luat_mqtt_ctrl_t *mqtt_ctrl) {
  104. LLOGD("mqtt_parse offset %d", mqtt_ctrl->buffer_offset);
  105. if (mqtt_ctrl->buffer_offset < 2) {
  106. LLOGD("wait more data");
  107. return 0;
  108. }
  109. // 判断数据长度, 前几个字节能判断出够不够读出mqtt的头
  110. char* buf = mqtt_ctrl->mqtt_packet_buffer;
  111. int num_bytes = 1;
  112. if ((buf[1] & 0x80) == 0x80) {
  113. num_bytes++;
  114. if (mqtt_ctrl->buffer_offset < 3) {
  115. LLOGD("wait more data for mqtt head");
  116. return 0;
  117. }
  118. if ((buf[2] & 0x80) == 0x80) {
  119. num_bytes ++;
  120. if (mqtt_ctrl->buffer_offset < 4) {
  121. LLOGD("wait more data for mqtt head");
  122. return 0;
  123. }
  124. if ((buf[3] & 0x80) == 0x80) {
  125. num_bytes ++;
  126. }
  127. }
  128. }
  129. // 判断数据总长, 这里rem_len只包含mqtt头部之外的数据
  130. uint16_t rem_len = mqtt_parse_rem_len(mqtt_ctrl->mqtt_packet_buffer);
  131. if (rem_len > mqtt_ctrl->buffer_offset - num_bytes - 1) {
  132. LLOGD("wait more data for mqtt head");
  133. return 0;
  134. }
  135. // 至此, mqtt包是完整的 解析类型, 处理之
  136. int ret = luat_mqtt_msg_cb(mqtt_ctrl);
  137. if (ret != 0){
  138. LLOGW("bad mqtt packet!! ret %d", ret);
  139. return -1;
  140. }
  141. // 处理完成后, 如果还有数据, 移动数据, 继续处理
  142. mqtt_ctrl->buffer_offset -= (1 + num_bytes + rem_len);
  143. memmove(mqtt_ctrl->mqtt_packet_buffer, mqtt_ctrl->mqtt_packet_buffer+1 + num_bytes + rem_len, mqtt_ctrl->buffer_offset);
  144. return 1;
  145. }
  146. int luat_mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl){
  147. // LLOGD("luat_mqtt_read_packet mqtt_ctrl->buffer_offset:%d",mqtt_ctrl->buffer_offset);
  148. int ret = -1;
  149. uint8_t *read_buff = NULL;
  150. uint32_t total_len = 0;
  151. uint32_t rx_len = 0;
  152. int result = network_rx(mqtt_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
  153. if (total_len > 0xFFF) {
  154. LLOGE("too many data wait for recv %d", total_len);
  155. luat_mqtt_close_socket(mqtt_ctrl);
  156. return -1;
  157. }
  158. if (total_len == 0) {
  159. LLOGW("rx event but NO data wait for recv");
  160. return 0;
  161. }
  162. if (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset <= 0) {
  163. LLOGE("buff is FULL, mqtt packet too big");
  164. luat_mqtt_close_socket(mqtt_ctrl);
  165. return -1;
  166. }
  167. #define MAX_READ (1024)
  168. int recv_want = 0;
  169. while (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset > 0) {
  170. if (MAX_READ > (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset)) {
  171. recv_want = MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset;
  172. }
  173. else {
  174. recv_want = MAX_READ;
  175. }
  176. // 从网络接收数据
  177. result = network_rx(mqtt_ctrl->netc, mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
  178. if (rx_len == 0 || result != 0 ) {
  179. LLOGD("rx_len %d result %d", rx_len, result);
  180. break;
  181. }
  182. // 收到数据了, 传给处理函数继续处理
  183. // 数据的长度变更, 触发传递
  184. mqtt_ctrl->buffer_offset += rx_len;
  185. LLOGD("data recv %d offset %d", rx_len, mqtt_ctrl->buffer_offset);
  186. further:
  187. result = mqtt_parse(mqtt_ctrl);
  188. if (result == 0) {
  189. // OK
  190. }else if(result == 1){
  191. if (mqtt_ctrl->buffer_offset > 0)
  192. goto further;
  193. else {
  194. continue;
  195. }
  196. }
  197. else {
  198. LLOGW("mqtt_parse ret %d, closing socket");
  199. luat_mqtt_close_socket(mqtt_ctrl);
  200. break;
  201. }
  202. }
  203. return 0;
  204. }
  205. static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
  206. rtos_msg_t msg = {0};
  207. // msg.handler = l_mqtt_callback;
  208. uint8_t msg_tp = MQTTParseMessageType(mqtt_ctrl->mqtt_packet_buffer);
  209. uint16_t msg_id = 0;
  210. uint8_t qos = 0;
  211. switch (msg_tp) {
  212. case MQTT_MSG_CONNACK: {
  213. // LLOGD("MQTT_MSG_CONNACK");
  214. if(mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
  215. LLOGW("CONACK 0x%02x",mqtt_ctrl->mqtt_packet_buffer[3]);
  216. luat_mqtt_close_socket(mqtt_ctrl);
  217. return -1;
  218. }
  219. mqtt_ctrl->mqtt_state = 1;
  220. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CONNACK, 0);
  221. break;
  222. }
  223. case MQTT_MSG_PUBLISH : {
  224. // LLOGD("MQTT_MSG_PUBLISH");
  225. const uint8_t* ptr;
  226. qos = MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer);
  227. uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  228. uint16_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  229. luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
  230. mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
  231. mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
  232. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, mqtt_msg);
  233. // 还要回复puback
  234. if (qos == 1) {
  235. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  236. mqtt_puback(&(mqtt_ctrl->broker), msg_id);
  237. }
  238. break;
  239. }
  240. case MQTT_MSG_PUBACK : {
  241. // LLOGD("MQTT_MSG_PUBACK");
  242. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBACK, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  243. break;
  244. }
  245. case MQTT_MSG_PUBREC : {
  246. msg_id = mqtt_parse_msg_id(&(mqtt_ctrl->broker));
  247. mqtt_pubrel(&(mqtt_ctrl->broker), msg_id);
  248. // LLOGD("MQTT_MSG_PUBREC");
  249. break;
  250. }
  251. case MQTT_MSG_PUBCOMP : {
  252. // LLOGD("MQTT_MSG_PUBCOMP");
  253. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBCOMP, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  254. break;
  255. }
  256. case MQTT_MSG_SUBACK : {
  257. LLOGD("MQTT_MSG_SUBACK");
  258. break;
  259. }
  260. case MQTT_MSG_UNSUBACK : {
  261. LLOGD("MQTT_MSG_UNSUBACK");
  262. break;
  263. }
  264. case MQTT_MSG_PINGRESP : {
  265. LLOGD("MQTT_MSG_PINGRESP");
  266. break;
  267. }
  268. case MQTT_MSG_DISCONNECT : {
  269. // LLOGD("MQTT_MSG_DISCONNECT");
  270. break;
  271. }
  272. default : {
  273. LLOGD("luat_mqtt_msg_cb error msg_tp:%d",msg_tp);
  274. break;
  275. }
  276. }
  277. return 0;
  278. }
  279. static const char* event2str(uint32_t id) {
  280. switch (id)
  281. {
  282. case EV_NW_RESULT_LINK:
  283. return "EV_NW_RESULT_LINK";
  284. case EV_NW_RESULT_CONNECT:
  285. return "EV_NW_RESULT_CONNECT";
  286. case EV_NW_RESULT_EVENT:
  287. return "EV_NW_RESULT_EVENT";
  288. case EV_NW_RESULT_TX:
  289. return "EV_NW_RESULT_TX";
  290. case EV_NW_RESULT_CLOSE:
  291. return "EV_NW_RESULT_CLOSE";
  292. default:
  293. return "UNKOWN";
  294. }
  295. }
  296. int32_t luat_mqtt_callback(void *data, void *param) {
  297. OS_EVENT *event = (OS_EVENT *)data;
  298. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)param;
  299. int ret = 0;
  300. // LLOGD("LINK %d ON_LINE %d EVENT %d TX_OK %d CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
  301. LLOGD("network mqtt cb %8X %s %8X",event->ID & 0x0ffffffff, event2str(event->ID & 0x0ffffffff) ,event->Param1);
  302. if (event->ID == EV_NW_RESULT_LINK){
  303. return 0; // 这里应该直接返回, 不能往下调用network_wait_event
  304. }else if(event->ID == EV_NW_RESULT_CONNECT){
  305. ret = mqtt_connect(&(mqtt_ctrl->broker));
  306. if(ret==1){
  307. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  308. }
  309. }else if(event->ID == EV_NW_RESULT_EVENT){
  310. if (event->Param1==0){
  311. ret = luat_mqtt_read_packet(mqtt_ctrl);
  312. // LLOGD("luat_mqtt_read_packet ret:%d",ret);
  313. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  314. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  315. }
  316. }else if(event->ID == EV_NW_RESULT_TX){
  317. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  318. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  319. }else if(event->ID == EV_NW_RESULT_CLOSE){
  320. }
  321. if (event->Param1){
  322. LLOGW("mqtt_callback param1 %d, closing socket", event->Param1);
  323. luat_mqtt_close_socket(mqtt_ctrl);
  324. }
  325. ret = network_wait_event(mqtt_ctrl->netc, NULL, 0, NULL);
  326. if (ret < 0){
  327. LLOGW("network_wait_event ret %d, closing socket", ret);
  328. luat_mqtt_close_socket(mqtt_ctrl);
  329. return -1;
  330. }
  331. return 0;
  332. }
  333. int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
  334. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
  335. uint32_t tx_len = 0;
  336. int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
  337. if (ret < 0)
  338. return 0;
  339. return count;
  340. }
  341. int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  342. int ret = 0;
  343. const char *hostname = mqtt_ctrl->host;
  344. uint16_t port = mqtt_ctrl->remote_port;
  345. uint16_t keepalive = mqtt_ctrl->keepalive;
  346. LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
  347. mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
  348. #ifdef LUAT_USE_LWIP
  349. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.type)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  350. #else
  351. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.is_ipv6)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  352. #endif
  353. LLOGD("network_connect ret %d", ret);
  354. if (ret < 0) {
  355. network_close(mqtt_ctrl->netc, 0);
  356. return -1;
  357. }
  358. return 0;
  359. }