luat_lib_mqttcore.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. /*
  2. @module libmqtt
  3. @summary mqtt协议处理,供mqtt.lua使用
  4. @version 1.0
  5. @date 2020.07.03
  6. */
  7. #include "luat_base.h"
  8. #include "luat_sys.h"
  9. #include "luat_msgbus.h"
  10. #include "luat_pack.h"
  11. #define LUAT_LOG_TAG "mqttcore"
  12. #include "luat_log.h"
  13. enum msgTypes
  14. {
  15. CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
  16. PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
  17. PINGREQ, PINGRESP, DISCONNECT, AUTH
  18. };
  19. // static const char *packet_names[] =
  20. // {
  21. // "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
  22. // "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
  23. // "PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
  24. // };
  25. // static const char** MQTTClient_packet_names = packet_names;
  26. /**
  27. * Converts an MQTT packet code into its name
  28. * @param ptype packet code
  29. * @return the corresponding string, or "UNKNOWN"
  30. */
  31. // static const char* MQTTPacket_name(int ptype)
  32. // {
  33. // return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
  34. // }
  35. /**
  36. * Encodes the message length according to the MQTT algorithm
  37. * @param buf the buffer into which the encoded data is written
  38. * @param length the length to be encoded
  39. * @return the number of bytes written to buffer
  40. */
  41. static int MQTTPacket_encode(char* buf, size_t length)
  42. {
  43. int rc = 0;
  44. //FUNC_ENTRY;
  45. do
  46. {
  47. char d = length % 128;
  48. length /= 128;
  49. /* if there are more digits to encode, set the top bit of this digit */
  50. if (length > 0)
  51. d |= 0x80;
  52. if (buf)
  53. buf[rc++] = d;
  54. else
  55. rc++;
  56. } while (length > 0);
  57. //FUNC_EXIT_RC(rc);
  58. return rc;
  59. }
  60. static int l_mqttcore_encodeLen(lua_State *L) {
  61. size_t len = 0;
  62. char buff[4];
  63. len = luaL_checkinteger(L, 1);
  64. int rc = MQTTPacket_encode(buff, len);
  65. lua_pushlstring(L, (const char*)buff, rc);
  66. return 1;
  67. }
  68. static void _add_mqtt_str(luaL_Buffer *buff, const char* str, size_t len) {
  69. if (len == 0) return;
  70. luaL_addchar(buff, len / 256);
  71. luaL_addchar(buff, len % 256);
  72. luaL_addlstring(buff, str, len);
  73. }
  74. static int l_mqttcore_encodeUTF8(lua_State *L) {
  75. if(!lua_isstring(L, 1) || 0 == lua_rawlen(L, 1)) {
  76. lua_pushlstring(L, "", 0);
  77. return 1;
  78. }
  79. luaL_Buffer buff;
  80. luaL_buffinit(L, &buff);
  81. size_t len = 0;
  82. const char* str = lua_tolstring(L, 1, &len);
  83. _add_mqtt_str(&buff, str, len);
  84. luaL_pushresult(&buff);
  85. return 1;
  86. }
  87. static void mqttcore_packXXX(lua_State *L, luaL_Buffer *buff, uint8_t header) {
  88. luaL_Buffer buff2;
  89. luaL_buffinitsize(L, &buff2, buff->n + 5);
  90. // 标识 CONNECT
  91. luaL_addchar(&buff2, header);
  92. // 剩余长度
  93. char buf[4];
  94. int rc = MQTTPacket_encode(buf, buff->n);
  95. luaL_addlstring(&buff2, buf, rc);
  96. luaL_addlstring(&buff2, buff->b, buff->n);
  97. // 清理掉
  98. luaL_pushresult(buff);
  99. lua_pop(L, 1);
  100. luaL_pushresult(&buff2);
  101. }
  102. static int l_mqttcore_packCONNECT(lua_State *L) {
  103. luaL_Buffer buff;
  104. luaL_buffinit(L, &buff);
  105. // 把参数取一下
  106. // 1 2 3 4 5 6 7
  107. // clientId, keepAlive, username, password, cleanSession, will, version
  108. const char* clientId = luaL_checkstring(L, 1);
  109. int keepAlive = luaL_optinteger(L, 2, 240);
  110. const char* username = luaL_optstring(L, 3, "");
  111. const char* password = luaL_optstring(L, 4, "");
  112. int cleanSession = luaL_optinteger(L, 5, 1);
  113. cleanSession = 1; // 暂时强制清除
  114. // 处理will
  115. // topic payload retain qos flag
  116. lua_pushstring(L, "topic");
  117. lua_gettable(L, 6);
  118. const char* will_topic = luaL_checkstring(L, -1);
  119. lua_pop(L, 1);
  120. lua_pushstring(L, "payload");
  121. lua_gettable(L, 6);
  122. const char* will_payload = luaL_checkstring(L, -1);
  123. lua_pop(L, 1);
  124. lua_pushstring(L, "retain");
  125. lua_gettable(L, 6);
  126. uint8_t will_retain = luaL_checkinteger(L, -1);
  127. lua_pop(L, 1);
  128. lua_pushstring(L, "qos");
  129. lua_gettable(L, 6);
  130. uint8_t will_qos = luaL_checkinteger(L, -1);
  131. lua_pop(L, 1);
  132. lua_pushstring(L, "flag");
  133. lua_gettable(L, 6);
  134. uint8_t will_flag = luaL_checkinteger(L, -1);
  135. lua_pop(L, 1);
  136. // ----- 结束处理will
  137. // 添加固定头 MQTT
  138. luaL_addlstring(&buff, "\0\4MQTT", 6);
  139. // 版本号 4
  140. luaL_addchar(&buff, 4);
  141. // flags
  142. uint8_t flags = 0;
  143. if (strlen(username) > 0) flags += 128;
  144. if (strlen(password) > 0) flags += 64;
  145. if (will_retain) flags += 32;
  146. if (will_qos) flags += will_qos*8;
  147. if (will_flag) flags += 4;
  148. if (cleanSession) flags += 2;
  149. luaL_addchar(&buff, flags);
  150. // keepalive
  151. luaL_addchar(&buff, keepAlive / 256);
  152. luaL_addchar(&buff, keepAlive % 256);
  153. // client id
  154. _add_mqtt_str(&buff, clientId, strlen(clientId));
  155. // will_topic
  156. _add_mqtt_str(&buff, will_topic, strlen(will_topic));
  157. // will_topic
  158. _add_mqtt_str(&buff, will_payload, strlen(will_payload));
  159. // username and password
  160. _add_mqtt_str(&buff, username, strlen(username));
  161. _add_mqtt_str(&buff, password, strlen(password));
  162. // 然后计算总长度,坑呀
  163. mqttcore_packXXX(L, &buff, CONNECT * 16);
  164. return 1;
  165. }
  166. //82 2F0002002A2F613159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F67657400
  167. //82 2D00 2A2F613159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F67657400
  168. static int l_mqttcore_packSUBSCRIBE(lua_State *L) {
  169. // dup, packetId, topics
  170. uint8_t dup = luaL_checkinteger(L, 1);
  171. uint16_t packetId = luaL_checkinteger(L, 2);
  172. if (!lua_istable(L, 3)) {
  173. LLOGE("args for packSUBSCRIBE must be table");
  174. return 0;
  175. }
  176. luaL_Buffer buff;
  177. luaL_buffinit(L, &buff);
  178. // 添加packetId
  179. luaL_addchar(&buff, packetId >> 8);
  180. luaL_addchar(&buff, packetId & 0xFF);
  181. size_t len = 0;
  182. lua_pushnil(L);
  183. while (lua_next(L, 3) != 0) {
  184. /* 使用 '键' (在索引 -2 处) 和 '值' (在索引 -1 处)*/
  185. if (lua_isstring(L, -2) && lua_isnumber(L, -1)) {
  186. const char* topic = luaL_checklstring(L, -2, &len);
  187. uint8_t qos = luaL_checkinteger(L, -1);
  188. luaL_addchar(&buff, len >> 8);
  189. luaL_addchar(&buff, len & 0xFF);
  190. luaL_addlstring(&buff, topic, len);
  191. luaL_addchar(&buff, qos);
  192. }
  193. lua_pop(L, 1);
  194. }
  195. lua_pop(L, 1);
  196. mqttcore_packXXX(L, &buff, SUBSCRIBE * 16 + dup * 8 + 2);
  197. return 1;
  198. }
  199. static int l_mqttcore_packUNSUBSCRIBE(lua_State *L) {
  200. // dup, packetId, topics
  201. uint8_t dup = luaL_checkinteger(L, 1);
  202. uint16_t packetId = luaL_checkinteger(L, 2);
  203. if (!lua_istable(L, 3)) {
  204. LLOGE("args for l_mqttcore_packUNSUBSCRIBE must be table");
  205. return 0;
  206. }
  207. luaL_Buffer buff;
  208. luaL_buffinit(L, &buff);
  209. // 添加packetId
  210. luaL_addchar(&buff, packetId >> 8);
  211. luaL_addchar(&buff, packetId & 0xFF);
  212. size_t len = 0;
  213. lua_pushnil(L);
  214. while (lua_next(L, 3) != 0) {
  215. /* 使用 '键' (在索引 -2 处) 和 '值' (在索引 -1 处)*/
  216. const char* topic = luaL_checklstring(L, -1, &len);
  217. luaL_addchar(&buff, len >> 8);
  218. luaL_addchar(&buff, len & 0xFF);
  219. luaL_addlstring(&buff, topic, len);
  220. lua_pop(L, 1);
  221. }
  222. lua_pop(L, 1);
  223. mqttcore_packXXX(L, &buff, UNSUBSCRIBE * 16 + dup * 8 + 2);
  224. return 1;
  225. }
  226. /*
  227. local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
  228. local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
  229. local len = 2 + #topic + #payload
  230. if qos > 0 then
  231. return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
  232. else
  233. return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
  234. end
  235. end
  236. */
  237. // 32 4D00 2D 2F 61 3159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F757064617465 0003 74657374207075626C69736820383636383138303339393231383534
  238. // 32 4D00 2D 2F 61 3159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F757064617465 0001001C 74657374207075626C69736820383636383138303339393231383534
  239. static int l_mqttcore_packPUBLISH(lua_State *L) {
  240. luaL_Buffer buff;
  241. luaL_buffinit(L, &buff);
  242. size_t topic_len = 0;
  243. size_t payload_len = 0;
  244. uint8_t dup = luaL_checkinteger(L, 1);
  245. uint8_t qos = luaL_checkinteger(L, 2);
  246. uint8_t retain = luaL_checkinteger(L, 3);
  247. //uint16_t packetId = luaL_checkinteger(L, 4);
  248. const char* topic = luaL_checklstring(L, 5, &topic_len);
  249. const char* payload = luaL_checklstring(L, 6, &payload_len);
  250. size_t total_len = 2 + topic_len + payload_len;
  251. // 添加头部
  252. uint8_t header = PUBLISH * 16 + dup * 8 + qos * 2 + retain;
  253. luaL_addchar(&buff, header);
  254. // 添加可变长度
  255. char buf[4];
  256. int rc = 0;
  257. if (qos > 0) {
  258. rc = MQTTPacket_encode(buf, total_len + 2);
  259. }
  260. else {
  261. rc = MQTTPacket_encode(buf, total_len);
  262. }
  263. luaL_addlstring(&buff, buf, rc);
  264. // 添加topic
  265. luaL_addchar(&buff, topic_len >> 8);
  266. luaL_addchar(&buff, topic_len & 0xFF);
  267. luaL_addlstring(&buff, topic, topic_len);
  268. if (qos > 0) {
  269. luaL_addchar(&buff, qos >> 8);
  270. luaL_addchar(&buff, qos & 0xFF);
  271. }
  272. // 添加payload, 这里是 >A 不是 >P
  273. //luaL_addchar(&buff, payload_len >> 8);
  274. //luaL_addchar(&buff, payload_len & 0xFF);
  275. luaL_addlstring(&buff, payload, payload_len);
  276. luaL_pushresult(&buff);
  277. return 1;
  278. }
  279. static int l_mqttcore_packACK(lua_State *L) {
  280. // Id == ACK or PUBREL
  281. uint8_t id = luaL_checkinteger(L, 1);
  282. uint8_t dup = luaL_checkinteger(L, 2);
  283. uint16_t packetId = luaL_checkinteger(L, 3);
  284. char buff[4];
  285. buff[0] = id * 16 + dup * 8 + (id == PUBREL ? 1 : 0) * 2;
  286. buff[1] = 0x02;
  287. buff[2] = packetId >> 8;
  288. buff[3] = packetId & 0xFF;
  289. lua_pushlstring(L, (const char*) buff, 4);
  290. return 1;
  291. }
  292. /*
  293. local function packZeroData(id, dup, qos, retain)
  294. dup = dup or 0
  295. qos = qos or 0
  296. retain = retain or 0
  297. return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
  298. end
  299. */
  300. static int l_mqttcore_packZeroData(lua_State *L) {
  301. // Id == ACK or PUBREL
  302. uint8_t id = luaL_checkinteger(L, 1);
  303. uint8_t dup = luaL_optinteger(L, 2, 0);
  304. uint8_t qos = luaL_optinteger(L, 3, 0);
  305. uint8_t retain = luaL_optinteger(L, 4, 0);
  306. char buff[2];
  307. buff[0] = id * 16 + dup * 8 + qos * 2 + retain;
  308. buff[1] = 0;
  309. lua_pushlstring(L, (const char*) buff, 2);
  310. return 1;
  311. }
  312. static size_t _mqtt_unpack_P(lua_State *L, char* ptr) {
  313. size_t len = (0xFF & ptr[0]) * 256 + (0xFF & ptr[1]);
  314. //LLOGD("_mqtt_unpack_P %02X %02X len %d", (0xFF & ptr[0]), (0xFF & ptr[1]), len);
  315. lua_pushlstring(L, ptr+2, len);
  316. return len;
  317. }
  318. static int l_mqttcore_unpack(lua_State *L) {
  319. size_t slen = 0;
  320. size_t nextpos = 0;
  321. uint32_t packetId = 0;
  322. char* data = (char*)luaL_checklstring(L, 1, &slen);
  323. if (slen < 2) {
  324. return 0;
  325. }
  326. //LLOGD("unpack first 2 byte %02X %02X", data[0] & 0xFF, data[1] & 0xFF);
  327. // 首先, 获取package的长度
  328. size_t dlen = 0;
  329. size_t poffset = 1;
  330. size_t multiplier = 1;
  331. for (; poffset < 4; poffset++)
  332. {
  333. if (slen <= poffset) {
  334. //LLOGD("unpack, slen=%d poffset=%d, execpt more data", slen, poffset);
  335. return 0;
  336. }
  337. dlen += (data[poffset] & 0x7F) * multiplier;
  338. multiplier *= 128;
  339. //LLOGD("unpack dlen current %d", dlen);
  340. if((data[poffset] & 0x80) == 0) {
  341. break;
  342. }
  343. }
  344. //LLOGD("unpack, poffset %d dlen %d act %d", poffset, dlen, slen);
  345. if (poffset + dlen > slen) {
  346. //LLOGD("unpack, wait more data");
  347. return 0;
  348. }
  349. // 然后解析第0个字节,header的数据
  350. uint8_t header = data[0] & 0xFF;
  351. // local packet = {id = (header - (header % 16)) >> 4,
  352. // dup = ((header % 16) - ((header % 16) % 8)) >> 3,
  353. // qos = (header & 0x06) >> 1,
  354. // retain = (header & 0x01)}
  355. int id = (header - (header % 16)) >> 4;
  356. int dup = ((header % 16) - ((header % 16) % 8)) >> 3;
  357. int qos = (header & 0x06) >> 1;
  358. int retain = (header & 0x01);
  359. //LLOGD("unpack id %d dup %d qos %d retain %d", id, dup, qos, retain);
  360. lua_createtable(L, 0, 7);
  361. lua_pushliteral(L, "id");
  362. lua_pushinteger(L, id);
  363. lua_settable(L, -3);
  364. lua_pushliteral(L, "dup");
  365. lua_pushinteger(L, dup);
  366. lua_settable(L, -3);
  367. lua_pushliteral(L, "qos");
  368. lua_pushinteger(L, qos);
  369. lua_settable(L, -3);
  370. lua_pushliteral(L, "retain");
  371. lua_pushinteger(L, retain);
  372. lua_settable(L, -3);
  373. nextpos = poffset+1;
  374. switch(id) {
  375. case CONNACK:
  376. lua_pushliteral(L, "ackFlag");
  377. lua_pushinteger(L, 0xFF & data[nextpos++]);
  378. lua_settable(L, -3);
  379. lua_pushliteral(L, "rc");
  380. lua_pushinteger(L, 0xFF & data[nextpos++]);
  381. lua_settable(L, -3);
  382. break;
  383. case PUBLISH:
  384. lua_pushliteral(L, "topic");
  385. nextpos += _mqtt_unpack_P(L, data + nextpos) + 2;
  386. //LLOGD("nextpos %d after topic", nextpos);
  387. lua_settable(L, -3);
  388. if (qos > 0) {
  389. lua_pushliteral(L, "packetId");
  390. packetId = 0xFF & data[nextpos++];
  391. packetId = packetId * 256;
  392. packetId = packetId + (0xFF & data[nextpos++]);
  393. lua_pushinteger(L, packetId);
  394. lua_settable(L, -3);
  395. //LLOGD("nextpos %d after packetId", nextpos);
  396. }
  397. //LLOGD("nextpos %d before payload", nextpos);
  398. lua_pushliteral(L, "payload");
  399. //LLOGD("payload strlen=%d dlen %d poffset %d nextpos %d", dlen+poffset+1 - nextpos, dlen, poffset, nextpos);
  400. lua_pushlstring(L, data+nextpos, dlen+poffset+1 - nextpos);
  401. lua_settable(L, -3);
  402. break;
  403. case PINGRESP:
  404. if (dlen) {
  405. lua_pushliteral(L, "packetId");
  406. packetId = 0xFF & data[nextpos++];
  407. packetId = packetId * 256;
  408. packetId = packetId + (0xFF & data[nextpos++]);
  409. lua_pushinteger(L, packetId);
  410. lua_settable(L, -3);
  411. }
  412. break;
  413. }
  414. lua_pushinteger(L, poffset+dlen+2);// Lua的索引从1开始,所以需要额外加1
  415. return 2;
  416. }
  417. #include "rotable2.h"
  418. static const rotable_Reg_t reg_mqttcore[] =
  419. {
  420. { "encodeLen", ROREG_FUNC(l_mqttcore_encodeLen)},
  421. { "encodeUTF8", ROREG_FUNC(l_mqttcore_encodeUTF8)},
  422. { "packCONNECT", ROREG_FUNC(l_mqttcore_packCONNECT)},
  423. { "packSUBSCRIBE", ROREG_FUNC(l_mqttcore_packSUBSCRIBE)},
  424. { "packPUBLISH", ROREG_FUNC(l_mqttcore_packPUBLISH)},
  425. { "packACK", ROREG_FUNC(l_mqttcore_packACK)},
  426. { "packZeroData", ROREG_FUNC(l_mqttcore_packZeroData)},
  427. { "packUNSUBSCRIBE",ROREG_FUNC(l_mqttcore_packUNSUBSCRIBE)},
  428. { "unpack", ROREG_FUNC(l_mqttcore_unpack)},
  429. { NULL, ROREG_INT(0) }
  430. };
  431. LUAMOD_API int luaopen_mqttcore( lua_State *L ) {
  432. luat_newlib2(L, reg_mqttcore);
  433. return 1;
  434. }