libemqtt.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. /*
  2. * This file is part of libemqtt.
  3. *
  4. * libemqtt is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Lesser General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * libemqtt is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with libemqtt. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. /*
  18. *
  19. * Created by Filipe Varela on 09/10/16.
  20. * Copyright 2009 Caixa Mágica Software. All rights reserved.
  21. *
  22. * Fork developed by Vicente Ruiz Rodríguez
  23. * Copyright 2012 Vicente Ruiz Rodríguez <vruiz2.0@gmail.com>. All rights reserved.
  24. *
  25. */
  26. #include <string.h>
  27. #include <libemqtt.h>
  28. #include "luat_base.h"
  29. #include "luat_mem.h"
  30. #define MQTT_DUP_FLAG (1<<3)
  31. #define MQTT_QOS0_FLAG (0<<1)
  32. #define MQTT_QOS1_FLAG (1<<1)
  33. #define MQTT_QOS2_FLAG (2<<1)
  34. #define MQTT_RETAIN_FLAG 1
  35. #define MQTT_CLEAN_SESSION (1<<1)
  36. #define MQTT_WILL_FLAG (1<<2)
  37. #define MQTT_WILL_RETAIN (1<<5)
  38. #define MQTT_USERNAME_FLAG (1<<7)
  39. #define MQTT_PASSWORD_FLAG (1<<6)
  40. #define LUAT_LOG_TAG "mqtt"
  41. #include "luat_log.h"
  42. uint8_t mqtt_num_rem_len_bytes(const uint8_t* buf) {
  43. uint8_t num_bytes = 1;
  44. //printf("mqtt_num_rem_len_bytes\n");
  45. if ((buf[1] & 0x80) == 0x80) {
  46. num_bytes++;
  47. if ((buf[2] & 0x80) == 0x80) {
  48. num_bytes ++;
  49. if ((buf[3] & 0x80) == 0x80) {
  50. num_bytes ++;
  51. }
  52. }
  53. }
  54. return num_bytes;
  55. }
  56. uint16_t mqtt_parse_rem_len(const uint8_t* buf) {
  57. uint16_t multiplier = 1;
  58. uint16_t value = 0;
  59. uint8_t digit;
  60. //printf("mqtt_parse_rem_len\n");
  61. buf++; // skip "flags" byte in fixed header
  62. do {
  63. digit = *buf;
  64. value += (digit & 127) * multiplier;
  65. multiplier *= 128;
  66. buf++;
  67. } while ((digit & 128) != 0);
  68. return value;
  69. }
  70. uint16_t mqtt_parse_msg_id(const uint8_t* buf) {
  71. uint8_t type = MQTTParseMessageType(buf);
  72. uint8_t qos = MQTTParseMessageQos(buf);
  73. uint16_t id = 0;
  74. //printf("mqtt_parse_msg_id\n");
  75. if(type >= MQTT_MSG_PUBLISH && type <= MQTT_MSG_UNSUBACK) {
  76. if(type == MQTT_MSG_PUBLISH) {
  77. if(qos != 0) {
  78. // fixed header length + Topic (UTF encoded)
  79. // = 1 for "flags" byte + rlb for length bytes + topic size
  80. uint8_t rlb = mqtt_num_rem_len_bytes(buf);
  81. uint8_t offset = *(buf+1+rlb)<<8; // topic UTF MSB
  82. offset |= *(buf+1+rlb+1); // topic UTF LSB
  83. offset += (1+rlb+2); // fixed header + topic size
  84. id = *(buf+offset)<<8; // id MSB
  85. id |= *(buf+offset+1); // id LSB
  86. }
  87. } else {
  88. // fixed header length
  89. // 1 for "flags" byte + rlb for length bytes
  90. uint8_t rlb = mqtt_num_rem_len_bytes(buf);
  91. id = *(buf+1+rlb)<<8; // id MSB
  92. id |= *(buf+1+rlb+1); // id LSB
  93. }
  94. }
  95. return id;
  96. }
  97. uint16_t mqtt_parse_pub_topic(const uint8_t* buf, uint8_t* topic) {
  98. const uint8_t* ptr;
  99. uint16_t topic_len = mqtt_parse_pub_topic_ptr(buf, &ptr);
  100. //printf("mqtt_parse_pub_topic\n");
  101. if(topic_len != 0 && ptr != NULL) {
  102. memcpy(topic, ptr, topic_len);
  103. }
  104. return topic_len;
  105. }
  106. uint16_t mqtt_parse_pub_topic_ptr(const uint8_t* buf, const uint8_t **topic_ptr) {
  107. uint16_t len = 0;
  108. //printf("mqtt_parse_pub_topic_ptr\n");
  109. if(MQTTParseMessageType(buf) == MQTT_MSG_PUBLISH) {
  110. // fixed header length = 1 for "flags" byte + rlb for length bytes
  111. uint8_t rlb = mqtt_num_rem_len_bytes(buf);
  112. len = *(buf+1+rlb)<<8; // MSB of topic UTF
  113. len |= *(buf+1+rlb+1); // LSB of topic UTF
  114. // start of topic = add 1 for "flags", rlb for remaining length, 2 for UTF
  115. *topic_ptr = (buf + (1+rlb+2));
  116. } else {
  117. *topic_ptr = NULL;
  118. }
  119. return len;
  120. }
  121. uint16_t mqtt_parse_publish_msg(const uint8_t* buf, uint8_t* msg) {
  122. const uint8_t* ptr;
  123. //printf("mqtt_parse_publish_msg\n");
  124. uint16_t msg_len = mqtt_parse_pub_msg_ptr(buf, &ptr);
  125. if(msg_len != 0 && ptr != NULL) {
  126. memcpy(msg, ptr, msg_len);
  127. }
  128. return msg_len;
  129. }
  130. uint16_t mqtt_parse_pub_msg_ptr(const uint8_t* buf, const uint8_t **msg_ptr) {
  131. uint16_t len = 0;
  132. //printf("mqtt_parse_pub_msg_ptr\n");
  133. if(MQTTParseMessageType(buf) == MQTT_MSG_PUBLISH) {
  134. // message starts at
  135. // fixed header length + Topic (UTF encoded) + msg id (if QoS>0)
  136. uint8_t rlb = mqtt_num_rem_len_bytes(buf);
  137. uint8_t offset = (*(buf+1+rlb))<<8; // topic UTF MSB
  138. offset |= *(buf+1+rlb+1); // topic UTF LSB
  139. offset += (1+rlb+2); // fixed header + topic size
  140. if(MQTTParseMessageQos(buf)) {
  141. offset += 2; // add two bytes of msg id
  142. }
  143. *msg_ptr = (buf + offset);
  144. // offset is now pointing to start of message
  145. // length of the message is remaining length - variable header
  146. // variable header is offset - fixed header
  147. // fixed header is 1 + rlb
  148. // so, lom = remlen - (offset - (1+rlb))
  149. len = mqtt_parse_rem_len(buf) - (offset-(rlb+1));
  150. } else {
  151. *msg_ptr = NULL;
  152. }
  153. return len;
  154. }
  155. void mqtt_init(mqtt_broker_handle_t* broker, const char* clientid) {
  156. // Connection options
  157. broker->alive = 300; // 300 seconds = 5 minutes
  158. broker->seq = 1; // Sequency for message indetifiers
  159. // Client options
  160. memset(broker->clientid, 0, sizeof(broker->clientid));
  161. memset(broker->username, 0, sizeof(broker->username));
  162. memset(broker->password, 0, sizeof(broker->password));
  163. if(clientid) {
  164. memcpy(broker->clientid, clientid, strlen(clientid));
  165. }
  166. // Will topic
  167. broker->clean_session = 1;
  168. }
  169. void mqtt_init_auth(mqtt_broker_handle_t* broker, const char* username, const char* password) {
  170. if(username && username[0] != '\0')
  171. memcpy(broker->username, username, strlen(username)+1);
  172. if(password && password[0] != '\0')
  173. memcpy(broker->password, password, strlen(password)+1);
  174. }
  175. void mqtt_set_alive(mqtt_broker_handle_t* broker, uint16_t alive) {
  176. broker->alive = alive;
  177. }
  178. int mqtt_connect(mqtt_broker_handle_t* broker)
  179. {
  180. uint8_t flags = 0x00;
  181. uint16_t clientidlen = strlen(broker->clientid);
  182. uint16_t usernamelen = strlen(broker->username);
  183. uint16_t passwordlen = strlen(broker->password);
  184. uint16_t payload_len = clientidlen + 2;
  185. // Preparing the flags
  186. if(usernamelen) {
  187. payload_len += usernamelen + 2;
  188. flags |= MQTT_USERNAME_FLAG;
  189. }
  190. if(passwordlen) {
  191. payload_len += passwordlen + 2;
  192. flags |= MQTT_PASSWORD_FLAG;
  193. }
  194. if(broker->clean_session) {
  195. flags |= MQTT_CLEAN_SESSION;
  196. }
  197. if (broker->will_len > 0) {
  198. payload_len += broker->will_len;
  199. flags |= MQTT_WILL_FLAG;
  200. if (broker->will_retain)
  201. flags |= MQTT_WILL_RETAIN;
  202. if (broker->will_qos) {
  203. flags |= (broker->will_qos << 3);
  204. }
  205. }
  206. // Variable header
  207. uint8_t var_header[] = {
  208. 0x00,0x04,0x4d,0x51,0x54,0x54, // Protocol name: MQTT
  209. 0x04, // Protocol version 3.1.1
  210. flags, // Connect flags
  211. broker->alive>>8, broker->alive&0xFF, // Keep alive
  212. };
  213. // Fixed header
  214. uint8_t fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length
  215. uint8_t remainLen = sizeof(var_header)+payload_len;
  216. if (remainLen > 127) {
  217. fixedHeaderSize++; // add an additional byte for Remaining Length
  218. }
  219. uint8_t fixed_header[3];
  220. // Message Type
  221. fixed_header[0] = MQTT_MSG_CONNECT;
  222. // Remaining Length
  223. if (remainLen <= 127) {
  224. fixed_header[1] = remainLen;
  225. } else {
  226. // first byte is remainder (mod) of 128, then set the MSB to indicate more bytes
  227. fixed_header[1] = remainLen % 128;
  228. fixed_header[1] = fixed_header[1] | 0x80;
  229. // second byte is number of 128s
  230. fixed_header[2] = remainLen / 128;
  231. }
  232. uint16_t offset = 0;
  233. uint32_t packet_size = fixedHeaderSize+sizeof(var_header)+payload_len;
  234. uint8_t *packet = luat_heap_malloc(packet_size);
  235. if (packet == NULL) {
  236. LLOGE("out of memory when malloc connect packet");
  237. return -2;
  238. }
  239. memset(packet, 0, packet_size);
  240. memcpy(packet, fixed_header, fixedHeaderSize);
  241. offset += fixedHeaderSize;
  242. memcpy(packet+offset, var_header, sizeof(var_header));
  243. offset += sizeof(var_header);
  244. // Client ID - UTF encoded
  245. packet[offset++] = clientidlen>>8;
  246. packet[offset++] = clientidlen&0xFF;
  247. if (clientidlen)
  248. memcpy(packet+offset, broker->clientid, clientidlen);
  249. offset += clientidlen;
  250. if (broker->will_len) {
  251. memcpy(packet+offset, broker->will_data, broker->will_len);
  252. offset += broker->will_len;
  253. }
  254. if(usernamelen) {
  255. // Username - UTF encoded
  256. packet[offset++] = usernamelen>>8;
  257. packet[offset++] = usernamelen&0xFF;
  258. memcpy(packet+offset, broker->username, usernamelen);
  259. offset += usernamelen;
  260. }
  261. if(passwordlen) {
  262. // Password - UTF encoded
  263. packet[offset++] = passwordlen>>8;
  264. packet[offset++] = passwordlen&0xFF;
  265. memcpy(packet+offset, broker->password, passwordlen);
  266. offset += passwordlen;
  267. }
  268. // Send the packet
  269. if(broker->send(broker->socket_info, packet, packet_size) < packet_size) {
  270. luat_heap_free(packet);
  271. return -1;
  272. }
  273. luat_heap_free(packet);
  274. return 1;
  275. }
  276. int mqtt_disconnect(mqtt_broker_handle_t* broker) {
  277. static const uint8_t packet[] = {
  278. MQTT_MSG_DISCONNECT, // Message Type, DUP flag, QoS level, Retain
  279. 0x00 // Remaining length
  280. };
  281. // Send the packet
  282. if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
  283. return -1;
  284. }
  285. return 1;
  286. }
  287. int mqtt_ping(mqtt_broker_handle_t* broker) {
  288. static const uint8_t packet[] = {
  289. MQTT_MSG_PINGREQ, // Message Type, DUP flag, QoS level, Retain
  290. 0x00 // Remaining length
  291. };
  292. // Send the packet
  293. if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
  294. return -1;
  295. }
  296. return 1;
  297. }
  298. int mqtt_publish(mqtt_broker_handle_t* broker, const char* topic, const char* msg, uint32_t msg_len, uint8_t retain) {
  299. return mqtt_publish_with_qos(broker, topic, msg, msg_len, retain, 0, NULL);
  300. }
  301. int mqtt_publish_with_qos(mqtt_broker_handle_t* broker, const char* topic, const char* msg, uint32_t msg_len, uint8_t retain, uint8_t qos, uint16_t* message_id) {
  302. uint16_t topiclen = strlen(topic);
  303. uint32_t msglen = msg_len;
  304. // uint32_t tem_len;
  305. uint8_t qos_flag = MQTT_QOS0_FLAG;
  306. uint8_t qos_size = 0; // No QoS included
  307. if(qos == 1) {
  308. qos_size = 2; // 2 bytes for QoS
  309. qos_flag = MQTT_QOS1_FLAG;
  310. }
  311. else if(qos == 2) {
  312. qos_size = 2; // 2 bytes for QoS
  313. qos_flag = MQTT_QOS2_FLAG;
  314. }
  315. // Variable header
  316. size_t var_header_len = topiclen+2+qos_size;
  317. uint8_t *var_header = luat_heap_malloc(var_header_len); // Topic size (2 bytes), utf-encoded topic
  318. if (var_header == NULL) {
  319. LLOGE("out of memory when malloc publish var_header");
  320. return -1;
  321. }
  322. memset(var_header, 0, var_header_len);
  323. var_header[0] = topiclen>>8;
  324. var_header[1] = topiclen&0xFF;
  325. memcpy(var_header+2, topic, topiclen);
  326. if(qos_size) {
  327. var_header[topiclen+2] = broker->seq>>8;
  328. var_header[topiclen+3] = broker->seq&0xFF;
  329. if(message_id) { // Returning message id
  330. *message_id = broker->seq;
  331. }
  332. broker->seq++;
  333. }
  334. // Fixed header
  335. uint32_t remainLen = var_header_len+msglen;
  336. uint8_t buf[4] = {0};
  337. int rc = 0;
  338. uint32_t length = remainLen;
  339. do
  340. {
  341. char d = length % 128;
  342. length /= 128;
  343. /* if there are more digits to encode, set the top bit of this digit */
  344. if (length > 0)
  345. d |= 0x80;
  346. buf[rc++] = d;
  347. } while (length > 0);
  348. size_t fixed_header_len = rc + 1;
  349. uint8_t fixed_header[8];
  350. // Message Type, DUP flag, QoS level, Retain
  351. fixed_header[0] = MQTT_MSG_PUBLISH | qos_flag;
  352. if(retain) {
  353. fixed_header[0] |= MQTT_RETAIN_FLAG;
  354. }
  355. memcpy(fixed_header+1, buf, rc);
  356. #define SMALL_PUB (1400)
  357. uint8_t header_size = fixed_header_len+var_header_len;
  358. uint32_t total_size = header_size + msg_len;
  359. int ret = 0;
  360. uint8_t *packet = luat_heap_malloc(total_size <= SMALL_PUB ? total_size : SMALL_PUB);
  361. if (packet == NULL) {
  362. luat_heap_free(var_header);
  363. LLOGE("out of memory when malloc publish packet");
  364. return -1;
  365. }
  366. memset(packet, 0, header_size);
  367. memcpy(packet, fixed_header, fixed_header_len);
  368. memcpy(packet+fixed_header_len, var_header, var_header_len);
  369. //memcpy(packet+sizeof(fixed_header)+sizeof(var_header), msg, msglen);
  370. // Send the packet
  371. if (total_size <= SMALL_PUB) { // 针对小包的情况进行优化, 减少TCP交互
  372. memcpy(packet + header_size, msg, msg_len);
  373. ret = broker->send(broker->socket_info, packet, total_size);
  374. if(ret < 0 || ret < total_size) {
  375. luat_heap_free(packet);
  376. luat_heap_free(var_header);
  377. return -1;
  378. }
  379. }
  380. else {
  381. ret = broker->send(broker->socket_info, packet, header_size);
  382. //LLOGD("publish packet header %d ret %d", sizeof(packet), ret);
  383. if(ret < 0 || ret < header_size) {
  384. luat_heap_free(packet);
  385. luat_heap_free(var_header);
  386. return -1;
  387. }
  388. ret = broker->send(broker->socket_info, msg, msg_len);
  389. //LLOGD("publish packet body %d ret %d", msg_len, ret);
  390. if(ret < 0 || ret < msg_len) {
  391. luat_heap_free(packet);
  392. luat_heap_free(var_header);
  393. return -1;
  394. }
  395. }
  396. luat_heap_free(packet);
  397. luat_heap_free(var_header);
  398. return 1;
  399. }
  400. int mqtt_pubrel(mqtt_broker_handle_t* broker, uint16_t message_id) {
  401. uint8_t packet[] = {
  402. MQTT_MSG_PUBREL | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain
  403. 0x02, // Remaining length
  404. message_id>>8,
  405. message_id&0xFF
  406. };
  407. // Send the packet
  408. if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
  409. return -1;
  410. }
  411. return 1;
  412. }
  413. int mqtt_puback(mqtt_broker_handle_t* broker, uint16_t message_id) {
  414. uint8_t packet[] = {
  415. MQTT_MSG_PUBACK | MQTT_QOS0_FLAG, // Message Type, DUP flag, QoS level, Retain
  416. 0x02, // Remaining length
  417. message_id>>8,
  418. message_id&0xFF
  419. };
  420. // Send the packet
  421. if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
  422. return -1;
  423. }
  424. return 1;
  425. }
  426. int mqtt_pubrec(mqtt_broker_handle_t* broker, uint16_t message_id) {
  427. uint8_t packet[] = {
  428. MQTT_MSG_PUBREC | MQTT_QOS0_FLAG, // Message Type, DUP flag, QoS level, Retain
  429. 0x02, // Remaining length
  430. message_id>>8,
  431. message_id&0xFF
  432. };
  433. // Send the packet
  434. if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
  435. return -1;
  436. }
  437. return 1;
  438. }
  439. int mqtt_pubcomp(mqtt_broker_handle_t* broker, uint16_t message_id) {
  440. uint8_t packet[] = {
  441. MQTT_MSG_PUBCOMP | MQTT_QOS0_FLAG, // Message Type, DUP flag, QoS level, Retain
  442. 0x02, // Remaining length
  443. message_id>>8,
  444. message_id&0xFF
  445. };
  446. // Send the packet
  447. if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
  448. return -1;
  449. }
  450. return 1;
  451. }
  452. int mqtt_subscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id, uint8_t qos) {
  453. uint16_t topiclen = strlen(topic);
  454. if (qos>2) qos=0;
  455. // Variable header
  456. uint8_t var_header[2]; // Message ID
  457. var_header[0] = broker->seq>>8;
  458. var_header[1] = broker->seq&0xFF;
  459. if(message_id) { // Returning message id
  460. *message_id = broker->seq;
  461. }
  462. broker->seq++;
  463. // utf topic
  464. size_t utf_topic_len = topiclen + 3;
  465. uint8_t *utf_topic = luat_heap_malloc(utf_topic_len); // Topic size (2 bytes), utf-encoded topic, QoS byte
  466. if (utf_topic == NULL) {
  467. LLOGE("out of memory when malloc subscribe utf_topic");
  468. return -1;
  469. }
  470. memset(utf_topic, 0, utf_topic_len);
  471. utf_topic[0] = topiclen>>8;
  472. utf_topic[1] = topiclen&0xFF;
  473. memcpy(utf_topic+2, topic, topiclen);
  474. utf_topic[topiclen+2] |= qos;
  475. // Fixed header
  476. uint8_t fixed_header[] = {
  477. MQTT_MSG_SUBSCRIBE | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain
  478. sizeof(var_header)+utf_topic_len
  479. };
  480. size_t packet_len = sizeof(var_header)+sizeof(fixed_header)+utf_topic_len;
  481. uint8_t *packet = luat_heap_malloc(packet_len);
  482. if (packet == NULL) {
  483. LLOGE("out of memory when malloc subscribe packet");
  484. return -1;
  485. }
  486. memset(packet, 0, packet_len);
  487. memcpy(packet, fixed_header, sizeof(fixed_header));
  488. memcpy(packet+sizeof(fixed_header), var_header, sizeof(var_header));
  489. memcpy(packet+sizeof(fixed_header)+sizeof(var_header), utf_topic, utf_topic_len);
  490. // Send the packet
  491. if(broker->send(broker->socket_info, packet, packet_len) < packet_len) {
  492. luat_heap_free(utf_topic);
  493. luat_heap_free(packet);
  494. return -1;
  495. }
  496. luat_heap_free(utf_topic);
  497. luat_heap_free(packet);
  498. return 1;
  499. }
  500. int mqtt_unsubscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id) {
  501. uint16_t topiclen = strlen(topic);
  502. // Variable header
  503. uint8_t var_header[2]; // Message ID
  504. var_header[0] = broker->seq>>8;
  505. var_header[1] = broker->seq&0xFF;
  506. if(message_id) { // Returning message id
  507. *message_id = broker->seq;
  508. }
  509. broker->seq++;
  510. // utf topic
  511. size_t utf_topic_len = topiclen + 2;
  512. uint8_t *utf_topic = luat_heap_malloc(utf_topic_len); // Topic size (2 bytes), utf-encoded topic
  513. if (utf_topic == NULL) {
  514. LLOGE("out of memory when malloc subscribe utf_topic");
  515. return -1;
  516. }
  517. memset(utf_topic, 0, utf_topic_len);
  518. utf_topic[0] = topiclen>>8;
  519. utf_topic[1] = topiclen&0xFF;
  520. memcpy(utf_topic+2, topic, topiclen);
  521. // Fixed header
  522. uint8_t fixed_header[] = {
  523. MQTT_MSG_UNSUBSCRIBE | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain
  524. sizeof(var_header)+utf_topic_len
  525. };
  526. size_t packet_len = sizeof(var_header)+sizeof(fixed_header)+utf_topic_len;
  527. uint8_t packet[1024];
  528. memset(packet, 0, packet_len);
  529. memcpy(packet, fixed_header, sizeof(fixed_header));
  530. memcpy(packet+sizeof(fixed_header), var_header, sizeof(var_header));
  531. memcpy(packet+sizeof(fixed_header)+sizeof(var_header), utf_topic, utf_topic_len);
  532. // Send the packet
  533. if(broker->send(broker->socket_info, packet, packet_len) < packet_len) {
  534. luat_heap_free(utf_topic);
  535. return -1;
  536. }
  537. luat_heap_free(utf_topic);
  538. return 1;
  539. }
  540. int mqtt_set_will(mqtt_broker_handle_t* broker, const char* topic,
  541. const char* payload, size_t payload_len,
  542. uint8_t qos, size_t retain) {
  543. if (broker == NULL)
  544. return -1;
  545. //LLOGD("will %s %.*s %d %d", topic, payload_len, payload, qos, retain);
  546. // 如果之前有数据, 那就释放掉
  547. if (broker->will_data != NULL) {
  548. broker->will_len = 0;
  549. luat_heap_free(broker->will_data);
  550. broker->will_data = NULL;
  551. }
  552. if (topic == NULL) {
  553. LLOGI("will topic is NULL");
  554. return 0;
  555. }
  556. size_t topic_len = strlen(topic);
  557. broker->will_data = luat_heap_malloc(topic_len + 2 + payload_len + 2);
  558. if (broker->will_data == NULL) {
  559. return -2;
  560. }
  561. broker->will_data[0] = (uint8_t)(topic_len >> 8);
  562. broker->will_data[1] = (uint8_t)(topic_len & 0xFF);
  563. memcpy(broker->will_data + 2, topic, topic_len);
  564. broker->will_data[2 + topic_len] = (uint8_t)(payload_len >> 8);
  565. broker->will_data[2 + topic_len + 1] = (uint8_t)(payload_len & 0xFF);
  566. if (payload_len)
  567. memcpy(broker->will_data + 2 + topic_len + 2, payload, payload_len);
  568. broker->will_qos = qos > 2 ? 0 : qos;
  569. broker->will_retain = retain;
  570. broker->will_len = topic_len + 2 + payload_len + 2;
  571. //LLOGD("will len %d", broker->will_len);
  572. return 0;
  573. }