luat_lib_mqttcore.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. /*
  2. @module libmqtt
  3. @summary mqtt协议处理
  4. @version 1.0
  5. @date 2020.07.03
  6. */
  7. #include "luat_base.h"
  8. #include "luat_log.h"
  9. #include "luat_sys.h"
  10. #include "luat_msgbus.h"
  11. #include "luat_pack.h"
  12. enum msgTypes
  13. {
  14. CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
  15. PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
  16. PINGREQ, PINGRESP, DISCONNECT, AUTH
  17. };
  18. static const char *packet_names[] =
  19. {
  20. "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
  21. "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
  22. "PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
  23. };
  24. const char** MQTTClient_packet_names = packet_names;
  25. /**
  26. * Converts an MQTT packet code into its name
  27. * @param ptype packet code
  28. * @return the corresponding string, or "UNKNOWN"
  29. */
  30. const char* MQTTPacket_name(int ptype)
  31. {
  32. return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
  33. }
  34. /**
  35. * Array of functions to build packets, indexed according to packet code
  36. */
  37. // pf new_packets[] =
  38. // {
  39. // NULL, /**< reserved */
  40. // NULL, /**< MQTTPacket_connect*/
  41. // MQTTPacket_connack, /**< CONNACK */
  42. // MQTTPacket_publish, /**< PUBLISH */
  43. // MQTTPacket_ack, /**< PUBACK */
  44. // MQTTPacket_ack, /**< PUBREC */
  45. // MQTTPacket_ack, /**< PUBREL */
  46. // MQTTPacket_ack, /**< PUBCOMP */
  47. // NULL, /**< MQTTPacket_subscribe*/
  48. // MQTTPacket_suback, /**< SUBACK */
  49. // NULL, /**< MQTTPacket_unsubscribe*/
  50. // MQTTPacket_unsuback, /**< UNSUBACK */
  51. // MQTTPacket_header_only, /**< PINGREQ */
  52. // MQTTPacket_header_only, /**< PINGRESP */
  53. // MQTTPacket_ack, /**< DISCONNECT */
  54. // MQTTPacket_ack /**< AUTH */
  55. // };
  56. /**
  57. * Encodes the message length according to the MQTT algorithm
  58. * @param buf the buffer into which the encoded data is written
  59. * @param length the length to be encoded
  60. * @return the number of bytes written to buffer
  61. */
  62. int MQTTPacket_encode(char* buf, size_t length)
  63. {
  64. int rc = 0;
  65. //FUNC_ENTRY;
  66. do
  67. {
  68. char d = length % 128;
  69. length /= 128;
  70. /* if there are more digits to encode, set the top bit of this digit */
  71. if (length > 0)
  72. d |= 0x80;
  73. if (buf)
  74. buf[rc++] = d;
  75. else
  76. rc++;
  77. } while (length > 0);
  78. //FUNC_EXIT_RC(rc);
  79. return rc;
  80. }
  81. // /**
  82. // * Calculates an integer from two bytes read from the input buffer
  83. // * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  84. // * @return the integer value calculated
  85. // */
  86. // int readInt(char** pptr)
  87. // {
  88. // char* ptr = *pptr;
  89. // int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
  90. // *pptr += 2;
  91. // return len;
  92. // }
  93. // /**
  94. // * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
  95. // * a length delimited string. So it reads the two byte length then the data according to
  96. // * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
  97. // * by an incorrect length.
  98. // * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  99. // * @param enddata pointer to the end of the buffer not to be read beyond
  100. // * @param len returns the calculcated value of the length bytes read
  101. // * @return an allocated C string holding the characters read, or NULL if the length read would
  102. // * have caused an overrun.
  103. // *
  104. // */
  105. // static char* readUTFlen(char** pptr, char* enddata, int* len)
  106. // {
  107. // char* string = NULL;
  108. // //FUNC_ENTRY;
  109. // if (enddata - (*pptr) > 1) /* enough length to read the integer? */
  110. // {
  111. // *len = readInt(pptr);
  112. // if (&(*pptr)[*len] <= enddata)
  113. // {
  114. // if ((string = malloc(*len+1)) == NULL)
  115. // goto exit;
  116. // memcpy(string, *pptr, *len);
  117. // string[*len] = '\0';
  118. // *pptr += *len;
  119. // }
  120. // }
  121. // exit:
  122. // //FUNC_EXIT;
  123. // return string;
  124. // }
  125. // /**
  126. // * Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
  127. // * a length delimited string. So it reads the two byte length then the data according to
  128. // * that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
  129. // * by an incorrect length.
  130. // * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  131. // * @param enddata pointer to the end of the buffer not to be read beyond
  132. // * @return an allocated C string holding the characters read, or NULL if the length read would
  133. // * have caused an overrun.
  134. // */
  135. // char* readUTF(char** pptr, char* enddata)
  136. // {
  137. // int len;
  138. // return readUTFlen(pptr, enddata, &len);
  139. // }
  140. // /**
  141. // * Reads one character from the input buffer.
  142. // * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  143. // * @return the character read
  144. // */
  145. // unsigned char readChar(char** pptr)
  146. // {
  147. // unsigned char c = **pptr;
  148. // (*pptr)++;
  149. // return c;
  150. // }
  151. // /**
  152. // * Writes one character to an output buffer.
  153. // * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  154. // * @param c the character to write
  155. // */
  156. // void writeChar(char** pptr, char c)
  157. // {
  158. // **pptr = c;
  159. // (*pptr)++;
  160. // }
  161. // /**
  162. // * Writes an integer as 2 bytes to an output buffer.
  163. // * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  164. // * @param anInt the integer to write
  165. // */
  166. // void writeInt(char** pptr, int anInt)
  167. // {
  168. // **pptr = (char)(anInt / 256);
  169. // (*pptr)++;
  170. // **pptr = (char)(anInt % 256);
  171. // (*pptr)++;
  172. // }
  173. // /**
  174. // * Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
  175. // * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  176. // * @param string the C string to write
  177. // */
  178. // void writeUTF(char** pptr, const char* string)
  179. // {
  180. // size_t len = strlen(string);
  181. // writeInt(pptr, (int)len);
  182. // memcpy(*pptr, string, len);
  183. // *pptr += len;
  184. // }
  185. // /**
  186. // * Writes length delimited data to an output buffer
  187. // * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  188. // * @param data the data to write
  189. // * @param datalen the length of the data to write
  190. // */
  191. // void writeData(char** pptr, const void* data, int datalen)
  192. // {
  193. // writeInt(pptr, datalen);
  194. // memcpy(*pptr, data, datalen);
  195. // *pptr += datalen;
  196. // }
  197. // /**
  198. // * Function used in the new packets table to create packets which have only a header.
  199. // * @param MQTTVersion the version of MQTT
  200. // * @param aHeader the MQTT header byte
  201. // * @param data the rest of the packet
  202. // * @param datalen the length of the rest of the packet
  203. // * @return pointer to the packet structure
  204. // */
  205. // void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
  206. // {
  207. // static unsigned char header = 0;
  208. // header = aHeader;
  209. // return &header;
  210. // }
  211. // /**
  212. // * Writes an integer as 4 bytes to an output buffer.
  213. // * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  214. // * @param anInt the integer to write
  215. // */
  216. // void writeInt4(char** pptr, int anInt)
  217. // {
  218. // **pptr = (char)(anInt / 16777216);
  219. // (*pptr)++;
  220. // anInt %= 16777216;
  221. // **pptr = (char)(anInt / 65536);
  222. // (*pptr)++;
  223. // anInt %= 65536;
  224. // **pptr = (char)(anInt / 256);
  225. // (*pptr)++;
  226. // **pptr = (char)(anInt % 256);
  227. // (*pptr)++;
  228. // }
  229. // /**
  230. // * Calculates an integer from two bytes read from the input buffer
  231. // * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  232. // * @return the integer value calculated
  233. // */
  234. // int readInt4(char** pptr)
  235. // {
  236. // unsigned char* ptr = (unsigned char*)*pptr;
  237. // int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
  238. // *pptr += 4;
  239. // return value;
  240. // }
  241. // void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
  242. // {
  243. // writeInt(pptr, lenstring.len);
  244. // memcpy(*pptr, lenstring.data, lenstring.len);
  245. // *pptr += lenstring.len;
  246. // }
  247. // int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
  248. // {
  249. // int len = 0;
  250. // /* the first two bytes are the length of the string */
  251. // if (enddata - (*pptr) > 1) /* enough length to read the integer? */
  252. // {
  253. // lenstring->len = readInt(pptr); /* increments pptr to point past length */
  254. // if (&(*pptr)[lenstring->len] <= enddata)
  255. // {
  256. // lenstring->data = (char*)*pptr;
  257. // *pptr += lenstring->len;
  258. // len = 2 + lenstring->len;
  259. // }
  260. // }
  261. // return len;
  262. // }
  263. // /*
  264. // if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
  265. // len = 1;
  266. // else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
  267. // len = 2;
  268. // else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
  269. // len = 3;
  270. // else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
  271. // len = 4;
  272. // */
  273. // int MQTTPacket_VBIlen(int rem_len)
  274. // {
  275. // int rc = 0;
  276. // if (rem_len < 128)
  277. // rc = 1;
  278. // else if (rem_len < 16384)
  279. // rc = 2;
  280. // else if (rem_len < 2097152)
  281. // rc = 3;
  282. // else
  283. // rc = 4;
  284. // return rc;
  285. // }
  286. // /**
  287. // * Decodes the message length according to the MQTT algorithm
  288. // * @param getcharfn pointer to function to read the next character from the data source
  289. // * @param value the decoded length returned
  290. // * @return the number of bytes read from the socket
  291. // */
  292. // int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), unsigned int* value)
  293. // {
  294. // char c;
  295. // int multiplier = 1;
  296. // int len = 0;
  297. // #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
  298. // *value = 0;
  299. // do
  300. // {
  301. // int rc = MQTTPACKET_READ_ERROR;
  302. // if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
  303. // {
  304. // rc = MQTTPACKET_READ_ERROR; /* bad data */
  305. // goto exit;
  306. // }
  307. // rc = (*getcharfn)(&c, 1);
  308. // if (rc != 1)
  309. // goto exit;
  310. // *value += (c & 127) * multiplier;
  311. // multiplier *= 128;
  312. // } while ((c & 128) != 0);
  313. // exit:
  314. // return len;
  315. // }
  316. static int l_mqttcore_encodeLen(lua_State *L) {
  317. size_t len = 0;
  318. char buff[4];
  319. len = luaL_checkinteger(L, 1);
  320. int rc = MQTTPacket_encode(buff, len);
  321. lua_pushlstring(L, (const char*)buff, rc);
  322. return 1;
  323. }
  324. static void _add_mqtt_str(luaL_Buffer *buff, const char* str, size_t len) {
  325. if (len == 0) return;
  326. luaL_addchar(buff, len / 256);
  327. luaL_addchar(buff, len % 256);
  328. luaL_addlstring(buff, str, len);
  329. }
  330. static int l_mqttcore_encodeUTF8(lua_State *L) {
  331. if(!lua_isstring(L, 1) || 0 == lua_rawlen(L, 1)) {
  332. lua_pushlstring(L, "", 0);
  333. return 1;
  334. }
  335. luaL_Buffer buff;
  336. luaL_buffinit(L, &buff);
  337. size_t len = 0;
  338. const char* str = lua_tolstring(L, 1, &len);
  339. _add_mqtt_str(&buff, str, len);
  340. luaL_pushresult(&buff);
  341. return 1;
  342. }
  343. static int l_mqttcore_packCONNECT(lua_State *L) {
  344. luaL_Buffer buff;
  345. luaL_buffinit(L, &buff);
  346. // 把参数取一下
  347. // 1 2 3 4 5 6 7
  348. // clientId, keepAlive, username, password, cleanSession, will, version
  349. const char* clientId = luaL_checkstring(L, 1);
  350. int keepAlive = luaL_optinteger(L, 2, 240);
  351. const char* username = luaL_optstring(L, 3, "");
  352. const char* password = luaL_optstring(L, 4, "");
  353. int cleanSession = luaL_optinteger(L, 5, 1);
  354. // 处理will
  355. // topic payload retain qos flag
  356. lua_pushstring(L, "topic");
  357. lua_gettable(L, 6);
  358. const char* will_topic = luaL_checkstring(L, -1);
  359. lua_pop(L, 1);
  360. lua_pushstring(L, "payload");
  361. lua_gettable(L, 6);
  362. const char* will_payload = luaL_checkstring(L, -1);
  363. lua_pop(L, 1);
  364. lua_pushstring(L, "retain");
  365. lua_gettable(L, 6);
  366. uint8_t will_retain = luaL_checkinteger(L, -1);
  367. lua_pop(L, 1);
  368. lua_pushstring(L, "qos");
  369. lua_gettable(L, 6);
  370. uint8_t will_qos = luaL_checkinteger(L, -1);
  371. lua_pop(L, 1);
  372. lua_pushstring(L, "flag");
  373. lua_gettable(L, 6);
  374. uint8_t will_flag = luaL_checkinteger(L, -1);
  375. lua_pop(L, 1);
  376. // ----- 结束处理will
  377. // 添加固定头 MQTT
  378. luaL_addlstring(&buff, "\0\4MQTT", 6);
  379. // 版本号 4
  380. luaL_addchar(&buff, 4);
  381. // flags
  382. uint8_t flags = 0;
  383. if (strlen(username) > 0) flags += 128;
  384. if (strlen(password) > 0) flags += 64;
  385. if (will_retain) flags += 32;
  386. if (will_qos) flags += will_qos*8;
  387. if (will_flag) flags += 4;
  388. if (cleanSession) flags += 2;
  389. luaL_addchar(&buff, flags);
  390. // keepalive
  391. luaL_addchar(&buff, keepAlive / 256);
  392. luaL_addchar(&buff, keepAlive % 256);
  393. // client id
  394. _add_mqtt_str(&buff, clientId, strlen(clientId));
  395. // will_topic
  396. _add_mqtt_str(&buff, will_topic, strlen(will_topic));
  397. // will_topic
  398. _add_mqtt_str(&buff, will_payload, strlen(will_payload));
  399. // username and password
  400. _add_mqtt_str(&buff, username, strlen(username));
  401. _add_mqtt_str(&buff, password, strlen(password));
  402. // 然后计算总长度,坑呀
  403. luaL_Buffer buff2;
  404. luaL_buffinitsize(L, &buff2, buff.n + 5);
  405. // 标识 CONNECT
  406. luaL_addchar(&buff2, CONNECT * 16);
  407. // 剩余长度
  408. char buf[4];
  409. int rc = MQTTPacket_encode(buf, buff.n);
  410. luaL_addlstring(&buff2, buf, rc);
  411. luaL_addlstring(&buff2, buff.b, buff.n);
  412. // 清理掉
  413. luaL_pushresult(&buff);
  414. lua_pop(L, 1);
  415. luaL_pushresult(&buff2);
  416. return 1;
  417. }
  418. #include "rotable.h"
  419. static const rotable_Reg reg_mqttcore[] =
  420. {
  421. { "encodeLen", l_mqttcore_encodeLen, 0},
  422. { "encodeUTF8",l_mqttcore_encodeUTF8,0},
  423. { "packCONNECT", l_mqttcore_packCONNECT,0},
  424. { NULL, NULL }
  425. };
  426. LUAMOD_API int luaopen_mqttcore( lua_State *L ) {
  427. rotable_newlib(L, reg_mqttcore);
  428. return 1;
  429. }