luat_mqtt_client.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. #include "luat_base.h"
  2. #include "luat_network_adapter.h"
  3. #include "libemqtt.h"
  4. #include "luat_rtos.h"
  5. #include "luat_zbuff.h"
  6. #include "luat_malloc.h"
  7. #include "luat_mqtt.h"
  8. #define LUAT_LOG_TAG "mqtt"
  9. #include "luat_log.h"
  10. #define MQTT_DEBUG 0
  11. #if MQTT_DEBUG == 0
  12. #undef LLOGD
  13. #define LLOGD(...)
  14. #endif
  15. LUAT_RT_RET_TYPE luat_mqtt_timer_callback(LUAT_RT_CB_PARAM){
  16. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  17. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TIMER_PING, 0);
  18. }
  19. static LUAT_RT_RET_TYPE reconnect_timer_cb(LUAT_RT_CB_PARAM){
  20. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  21. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RECONNECT, 0);
  22. }
  23. int luat_mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  24. int ret = luat_mqtt_connect(mqtt_ctrl);
  25. if(ret){
  26. LLOGI("reconnect init socket ret=%d\n", ret);
  27. luat_mqtt_close_socket(mqtt_ctrl);
  28. }
  29. }
  30. int luat_mqtt_ping(luat_mqtt_ctrl_t *mqtt_ctrl) {
  31. mqtt_ping(&mqtt_ctrl->broker);
  32. return 0;
  33. }
  34. int luat_mqtt_init(luat_mqtt_ctrl_t *mqtt_ctrl, int adapter_index) {
  35. memset(mqtt_ctrl, 0, sizeof(luat_mqtt_ctrl_t));
  36. mqtt_ctrl->adapter_index = adapter_index;
  37. mqtt_ctrl->netc = network_alloc_ctrl(adapter_index);
  38. if (!mqtt_ctrl->netc){
  39. LLOGW("network_alloc_ctrl fail");
  40. return -1;
  41. }
  42. network_init_ctrl(mqtt_ctrl->netc, NULL, luat_mqtt_callback, mqtt_ctrl);
  43. mqtt_ctrl->mqtt_state = 0;
  44. mqtt_ctrl->netc->is_debug = 0;
  45. mqtt_ctrl->keepalive = 240;
  46. network_set_base_mode(mqtt_ctrl->netc, 1, 10000, 0, 0, 0, 0);
  47. network_set_local_port(mqtt_ctrl->netc, 0);
  48. mqtt_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, mqtt_ctrl, NULL);
  49. mqtt_ctrl->ping_timer = luat_create_rtos_timer(luat_mqtt_timer_callback, mqtt_ctrl, NULL);
  50. return 0;
  51. }
  52. int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts) {
  53. memcpy(mqtt_ctrl->host, opts->host, strlen(opts->host) + 1);
  54. mqtt_ctrl->remote_port = opts->port;
  55. if (opts->is_tls){
  56. network_init_tls(mqtt_ctrl->netc, (opts->server_cert || opts->client_cert)?2:0);
  57. if (opts->server_cert){
  58. network_set_server_cert(mqtt_ctrl->netc, (const unsigned char *)opts->server_cert, opts->server_cert_len+1);
  59. }
  60. if (opts->client_cert){
  61. network_set_client_cert(mqtt_ctrl->netc, (const unsigned char*)opts->client_cert, opts->client_cert_len+1,
  62. (const unsigned char*)opts->client_key, opts->client_key_len+1,
  63. (const unsigned char*)opts->client_password, opts->client_password_len+1);
  64. }
  65. } else {
  66. network_deinit_tls(mqtt_ctrl->netc);
  67. }
  68. mqtt_ctrl->broker.socket_info = mqtt_ctrl;
  69. mqtt_ctrl->broker.send = luat_mqtt_send_packet;
  70. return 0;
  71. }
  72. static void mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl){
  73. LLOGI("reconnect after %dms", mqtt_ctrl->reconnect_time);
  74. mqtt_ctrl->buffer_offset = 0;
  75. luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
  76. }
  77. void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  78. LLOGI("mqtt closing socket");
  79. if (mqtt_ctrl->netc){
  80. network_force_close_socket(mqtt_ctrl->netc);
  81. }
  82. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  83. mqtt_ctrl->mqtt_state = 0;
  84. if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time > 0){
  85. mqtt_reconnect(mqtt_ctrl);
  86. }
  87. }
  88. void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  89. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RELEASE, 0);
  90. if (mqtt_ctrl->ping_timer){
  91. luat_release_rtos_timer(mqtt_ctrl->ping_timer);
  92. mqtt_ctrl->ping_timer = NULL;
  93. }
  94. if (mqtt_ctrl->reconnect_timer){
  95. luat_release_rtos_timer(mqtt_ctrl->reconnect_timer);
  96. mqtt_ctrl->reconnect_timer = NULL;
  97. }
  98. if (mqtt_ctrl->broker.will_data) {
  99. mqtt_ctrl->broker.will_len = 0;
  100. luat_heap_free(mqtt_ctrl->broker.will_data);
  101. mqtt_ctrl->broker.will_data = NULL;
  102. }
  103. if (mqtt_ctrl->netc){
  104. network_release_ctrl(mqtt_ctrl->netc);
  105. mqtt_ctrl->netc = NULL;
  106. }
  107. }
  108. static int mqtt_parse(luat_mqtt_ctrl_t *mqtt_ctrl) {
  109. LLOGD("mqtt_parse offset %d", mqtt_ctrl->buffer_offset);
  110. if (mqtt_ctrl->buffer_offset < 2) {
  111. LLOGD("wait more data");
  112. return 0;
  113. }
  114. // 判断数据长度, 前几个字节能判断出够不够读出mqtt的头
  115. char* buf = mqtt_ctrl->mqtt_packet_buffer;
  116. int num_bytes = 1;
  117. if ((buf[1] & 0x80) == 0x80) {
  118. num_bytes++;
  119. if (mqtt_ctrl->buffer_offset < 3) {
  120. LLOGD("wait more data for mqtt head");
  121. return 0;
  122. }
  123. if ((buf[2] & 0x80) == 0x80) {
  124. num_bytes ++;
  125. if (mqtt_ctrl->buffer_offset < 4) {
  126. LLOGD("wait more data for mqtt head");
  127. return 0;
  128. }
  129. if ((buf[3] & 0x80) == 0x80) {
  130. num_bytes ++;
  131. }
  132. }
  133. }
  134. // 判断数据总长, 这里rem_len只包含mqtt头部之外的数据
  135. uint16_t rem_len = mqtt_parse_rem_len(mqtt_ctrl->mqtt_packet_buffer);
  136. if (rem_len > mqtt_ctrl->buffer_offset - num_bytes - 1) {
  137. LLOGD("wait more data for mqtt head");
  138. return 0;
  139. }
  140. // 至此, mqtt包是完整的 解析类型, 处理之
  141. int ret = luat_mqtt_msg_cb(mqtt_ctrl);
  142. if (ret != 0){
  143. LLOGW("bad mqtt packet!! ret %d", ret);
  144. return -1;
  145. }
  146. // 处理完成后, 如果还有数据, 移动数据, 继续处理
  147. mqtt_ctrl->buffer_offset -= (1 + num_bytes + rem_len);
  148. memmove(mqtt_ctrl->mqtt_packet_buffer, mqtt_ctrl->mqtt_packet_buffer+1 + num_bytes + rem_len, mqtt_ctrl->buffer_offset);
  149. return 1;
  150. }
  151. int luat_mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl){
  152. // LLOGD("luat_mqtt_read_packet mqtt_ctrl->buffer_offset:%d",mqtt_ctrl->buffer_offset);
  153. int ret = -1;
  154. uint8_t *read_buff = NULL;
  155. uint32_t total_len = 0;
  156. uint32_t rx_len = 0;
  157. int result = network_rx(mqtt_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
  158. if (total_len > 0xFFF) {
  159. LLOGE("too many data wait for recv %d", total_len);
  160. luat_mqtt_close_socket(mqtt_ctrl);
  161. return -1;
  162. }
  163. if (total_len == 0) {
  164. LLOGW("rx event but NO data wait for recv");
  165. return 0;
  166. }
  167. if (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset <= 0) {
  168. LLOGE("buff is FULL, mqtt packet too big");
  169. luat_mqtt_close_socket(mqtt_ctrl);
  170. return -1;
  171. }
  172. #define MAX_READ (1024)
  173. int recv_want = 0;
  174. while (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset > 0) {
  175. if (MAX_READ > (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset)) {
  176. recv_want = MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset;
  177. }
  178. else {
  179. recv_want = MAX_READ;
  180. }
  181. // 从网络接收数据
  182. result = network_rx(mqtt_ctrl->netc, mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
  183. if (rx_len == 0 || result != 0 ) {
  184. LLOGD("rx_len %d result %d", rx_len, result);
  185. break;
  186. }
  187. // 收到数据了, 传给处理函数继续处理
  188. // 数据的长度变更, 触发传递
  189. mqtt_ctrl->buffer_offset += rx_len;
  190. LLOGD("data recv %d offset %d", rx_len, mqtt_ctrl->buffer_offset);
  191. further:
  192. result = mqtt_parse(mqtt_ctrl);
  193. if (result == 0) {
  194. // OK
  195. }else if(result == 1){
  196. if (mqtt_ctrl->buffer_offset > 0)
  197. goto further;
  198. else {
  199. continue;
  200. }
  201. }
  202. else {
  203. LLOGW("mqtt_parse ret %d, closing socket");
  204. luat_mqtt_close_socket(mqtt_ctrl);
  205. break;
  206. }
  207. }
  208. return 0;
  209. }
  210. static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
  211. rtos_msg_t msg = {0};
  212. // msg.handler = l_mqtt_callback;
  213. uint8_t msg_tp = MQTTParseMessageType(mqtt_ctrl->mqtt_packet_buffer);
  214. uint16_t msg_id = 0;
  215. uint8_t qos = 0;
  216. switch (msg_tp) {
  217. case MQTT_MSG_CONNACK: {
  218. // LLOGD("MQTT_MSG_CONNACK");
  219. if(mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
  220. LLOGW("CONACK 0x%02x",mqtt_ctrl->mqtt_packet_buffer[3]);
  221. luat_mqtt_close_socket(mqtt_ctrl);
  222. return -1;
  223. }
  224. mqtt_ctrl->mqtt_state = 1;
  225. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CONNACK, 0);
  226. break;
  227. }
  228. case MQTT_MSG_PUBLISH : {
  229. // LLOGD("MQTT_MSG_PUBLISH");
  230. const uint8_t* ptr;
  231. qos = MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer);
  232. uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  233. uint16_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  234. luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
  235. mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
  236. mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
  237. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, mqtt_msg);
  238. // 还要回复puback
  239. if (qos == 1) {
  240. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  241. mqtt_puback(&(mqtt_ctrl->broker), msg_id);
  242. }
  243. break;
  244. }
  245. case MQTT_MSG_PUBACK : {
  246. // LLOGD("MQTT_MSG_PUBACK");
  247. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBACK, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  248. break;
  249. }
  250. case MQTT_MSG_PUBREC : {
  251. msg_id = mqtt_parse_msg_id(&(mqtt_ctrl->broker));
  252. mqtt_pubrel(&(mqtt_ctrl->broker), msg_id);
  253. // LLOGD("MQTT_MSG_PUBREC");
  254. break;
  255. }
  256. case MQTT_MSG_PUBCOMP : {
  257. // LLOGD("MQTT_MSG_PUBCOMP");
  258. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBCOMP, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
  259. break;
  260. }
  261. case MQTT_MSG_SUBACK : {
  262. LLOGD("MQTT_MSG_SUBACK");
  263. break;
  264. }
  265. case MQTT_MSG_UNSUBACK : {
  266. LLOGD("MQTT_MSG_UNSUBACK");
  267. break;
  268. }
  269. case MQTT_MSG_PINGRESP : {
  270. LLOGD("MQTT_MSG_PINGRESP");
  271. break;
  272. }
  273. case MQTT_MSG_DISCONNECT : {
  274. // LLOGD("MQTT_MSG_DISCONNECT");
  275. break;
  276. }
  277. default : {
  278. LLOGD("luat_mqtt_msg_cb error msg_tp:%d",msg_tp);
  279. break;
  280. }
  281. }
  282. return 0;
  283. }
  284. static const char* event2str(uint32_t id) {
  285. switch (id)
  286. {
  287. case EV_NW_RESULT_LINK:
  288. return "EV_NW_RESULT_LINK";
  289. case EV_NW_RESULT_CONNECT:
  290. return "EV_NW_RESULT_CONNECT";
  291. case EV_NW_RESULT_EVENT:
  292. return "EV_NW_RESULT_EVENT";
  293. case EV_NW_RESULT_TX:
  294. return "EV_NW_RESULT_TX";
  295. case EV_NW_RESULT_CLOSE:
  296. return "EV_NW_RESULT_CLOSE";
  297. default:
  298. return "UNKOWN";
  299. }
  300. }
  301. int32_t luat_mqtt_callback(void *data, void *param) {
  302. OS_EVENT *event = (OS_EVENT *)data;
  303. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)param;
  304. int ret = 0;
  305. // LLOGD("LINK %d ON_LINE %d EVENT %d TX_OK %d CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
  306. LLOGD("network mqtt cb %8X %s %8X",event->ID & 0x0ffffffff, event2str(event->ID & 0x0ffffffff) ,event->Param1);
  307. if (event->ID == EV_NW_RESULT_LINK){
  308. return 0; // 这里应该直接返回, 不能往下调用network_wait_event
  309. }else if(event->ID == EV_NW_RESULT_CONNECT){
  310. ret = mqtt_connect(&(mqtt_ctrl->broker));
  311. if(ret==1){
  312. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  313. }
  314. }else if(event->ID == EV_NW_RESULT_EVENT){
  315. if (event->Param1==0){
  316. ret = luat_mqtt_read_packet(mqtt_ctrl);
  317. // LLOGD("luat_mqtt_read_packet ret:%d",ret);
  318. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  319. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  320. }
  321. }else if(event->ID == EV_NW_RESULT_TX){
  322. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  323. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  324. }else if(event->ID == EV_NW_RESULT_CLOSE){
  325. }
  326. if (event->Param1){
  327. LLOGW("mqtt_callback param1 %d, closing socket", event->Param1);
  328. luat_mqtt_close_socket(mqtt_ctrl);
  329. }
  330. ret = network_wait_event(mqtt_ctrl->netc, NULL, 0, NULL);
  331. if (ret < 0){
  332. LLOGW("network_wait_event ret %d, closing socket", ret);
  333. luat_mqtt_close_socket(mqtt_ctrl);
  334. return -1;
  335. }
  336. return 0;
  337. }
  338. int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
  339. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
  340. uint32_t tx_len = 0;
  341. int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
  342. if (ret < 0)
  343. return 0;
  344. return count;
  345. }
  346. int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  347. int ret = 0;
  348. const char *hostname = mqtt_ctrl->host;
  349. uint16_t port = mqtt_ctrl->remote_port;
  350. uint16_t keepalive = mqtt_ctrl->keepalive;
  351. LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
  352. mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
  353. #ifdef LUAT_USE_LWIP
  354. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.type)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  355. #else
  356. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.is_ipv6)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
  357. #endif
  358. LLOGD("network_connect ret %d", ret);
  359. if (ret < 0) {
  360. network_close(mqtt_ctrl->netc, 0);
  361. return -1;
  362. }
  363. return 0;
  364. }
  365. int luat_mqtt_set_will(luat_mqtt_ctrl_t *mqtt_ctrl, const char* topic,
  366. const char* payload, size_t payload_len,
  367. uint8_t qos, size_t retain) {
  368. if (mqtt_ctrl == NULL || mqtt_ctrl->netc == NULL)
  369. return -1;
  370. return mqtt_set_will(&mqtt_ctrl->broker, topic, payload, payload_len, qos, retain);
  371. }