| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 |
- /*
- @module libmqtt
- @summary mqtt协议处理,供mqtt.lua使用
- @version 1.0
- @date 2020.07.03
- */
- #include "luat_base.h"
- #include "luat_sys.h"
- #include "luat_msgbus.h"
- #include "luat_pack.h"
- #define LUAT_LOG_TAG "mqttcore"
- #include "luat_log.h"
- enum msgTypes
- {
- CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
- PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
- PINGREQ, PINGRESP, DISCONNECT, AUTH
- };
- // static const char *packet_names[] =
- // {
- // "RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
- // "PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
- // "PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
- // };
- // static const char** MQTTClient_packet_names = packet_names;
- /**
- * Converts an MQTT packet code into its name
- * @param ptype packet code
- * @return the corresponding string, or "UNKNOWN"
- */
- // static const char* MQTTPacket_name(int ptype)
- // {
- // return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
- // }
- /**
- * Encodes the message length according to the MQTT algorithm
- * @param buf the buffer into which the encoded data is written
- * @param length the length to be encoded
- * @return the number of bytes written to buffer
- */
- static int MQTTPacket_encode(char* buf, size_t length)
- {
- int rc = 0;
- //FUNC_ENTRY;
- do
- {
- char d = length % 128;
- length /= 128;
- /* if there are more digits to encode, set the top bit of this digit */
- if (length > 0)
- d |= 0x80;
- if (buf)
- buf[rc++] = d;
- else
- rc++;
- } while (length > 0);
- //FUNC_EXIT_RC(rc);
- return rc;
- }
- static int l_mqttcore_encodeLen(lua_State *L) {
- size_t len = 0;
- char buff[4];
- len = luaL_checkinteger(L, 1);
- int rc = MQTTPacket_encode(buff, len);
- lua_pushlstring(L, (const char*)buff, rc);
- return 1;
- }
- static void _add_mqtt_str(luaL_Buffer *buff, const char* str, size_t len) {
- if (len == 0) return;
- luaL_addchar(buff, len / 256);
- luaL_addchar(buff, len % 256);
- luaL_addlstring(buff, str, len);
- }
- static int l_mqttcore_encodeUTF8(lua_State *L) {
- if(!lua_isstring(L, 1) || 0 == lua_rawlen(L, 1)) {
- lua_pushlstring(L, "", 0);
- return 1;
- }
- luaL_Buffer buff;
- luaL_buffinit(L, &buff);
- size_t len = 0;
- const char* str = lua_tolstring(L, 1, &len);
- _add_mqtt_str(&buff, str, len);
- luaL_pushresult(&buff);
-
- return 1;
- }
- static void mqttcore_packXXX(lua_State *L, luaL_Buffer *buff, uint8_t header) {
- luaL_Buffer buff2;
- luaL_buffinitsize(L, &buff2, buff->n + 5);
- // 标识 CONNECT
- luaL_addchar(&buff2, header);
- // 剩余长度
- char buf[4];
- int rc = MQTTPacket_encode(buf, buff->n);
- luaL_addlstring(&buff2, buf, rc);
- luaL_addlstring(&buff2, buff->b, buff->n);
- // 清理掉
- luaL_pushresult(buff);
- lua_pop(L, 1);
- luaL_pushresult(&buff2);
- }
- static int l_mqttcore_packCONNECT(lua_State *L) {
- luaL_Buffer buff;
- luaL_buffinit(L, &buff);
- // 把参数取一下
- // 1 2 3 4 5 6 7
- // clientId, keepAlive, username, password, cleanSession, will, version
- const char* clientId = luaL_checkstring(L, 1);
- int keepAlive = luaL_optinteger(L, 2, 240);
- const char* username = luaL_optstring(L, 3, "");
- const char* password = luaL_optstring(L, 4, "");
- int cleanSession = luaL_optinteger(L, 5, 1);
- cleanSession = 1; // 暂时强制清除
- // 处理will
- // topic payload retain qos flag
- lua_pushstring(L, "topic");
- lua_gettable(L, 6);
- const char* will_topic = luaL_checkstring(L, -1);
- lua_pop(L, 1);
- lua_pushstring(L, "payload");
- lua_gettable(L, 6);
- const char* will_payload = luaL_checkstring(L, -1);
- lua_pop(L, 1);
- lua_pushstring(L, "retain");
- lua_gettable(L, 6);
- uint8_t will_retain = luaL_checkinteger(L, -1);
- lua_pop(L, 1);
- lua_pushstring(L, "qos");
- lua_gettable(L, 6);
- uint8_t will_qos = luaL_checkinteger(L, -1);
- lua_pop(L, 1);
- lua_pushstring(L, "flag");
- lua_gettable(L, 6);
- uint8_t will_flag = luaL_checkinteger(L, -1);
- lua_pop(L, 1);
- // ----- 结束处理will
- // 添加固定头 MQTT
- luaL_addlstring(&buff, "\0\4MQTT", 6);
- // 版本号 4
- luaL_addchar(&buff, 4);
- // flags
- uint8_t flags = 0;
- if (strlen(username) > 0) flags += 128;
- if (strlen(password) > 0) flags += 64;
- if (will_retain) flags += 32;
- if (will_qos) flags += will_qos*8;
- if (will_flag) flags += 4;
- if (cleanSession) flags += 2;
- luaL_addchar(&buff, flags);
- // keepalive
- luaL_addchar(&buff, keepAlive / 256);
- luaL_addchar(&buff, keepAlive % 256);
- // client id
- _add_mqtt_str(&buff, clientId, strlen(clientId));
- // will_topic
- _add_mqtt_str(&buff, will_topic, strlen(will_topic));
- // will_topic
- _add_mqtt_str(&buff, will_payload, strlen(will_payload));
- // username and password
- _add_mqtt_str(&buff, username, strlen(username));
- _add_mqtt_str(&buff, password, strlen(password));
- // 然后计算总长度,坑呀
- mqttcore_packXXX(L, &buff, CONNECT * 16);
- return 1;
- }
- //82 2F0002002A2F613159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F67657400
- //82 2D00 2A2F613159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F67657400
- static int l_mqttcore_packSUBSCRIBE(lua_State *L) {
- // dup, packetId, topics
- uint8_t dup = luaL_checkinteger(L, 1);
- uint16_t packetId = luaL_checkinteger(L, 2);
- if (!lua_istable(L, 3)) {
- LLOGE("args for packSUBSCRIBE must be table");
- return 0;
- }
- luaL_Buffer buff;
- luaL_buffinit(L, &buff);
- // 添加packetId
- luaL_addchar(&buff, packetId >> 8);
- luaL_addchar(&buff, packetId & 0xFF);
- size_t len = 0;
- lua_pushnil(L);
- while (lua_next(L, 3) != 0) {
- /* 使用 '键' (在索引 -2 处) 和 '值' (在索引 -1 处)*/
- if (lua_isstring(L, -2) && lua_isnumber(L, -1)) {
- const char* topic = luaL_checklstring(L, -2, &len);
- uint8_t qos = luaL_checkinteger(L, -1);
- luaL_addchar(&buff, len >> 8);
- luaL_addchar(&buff, len & 0xFF);
- luaL_addlstring(&buff, topic, len);
- luaL_addchar(&buff, qos);
- }
- lua_pop(L, 1);
- }
- lua_pop(L, 1);
- mqttcore_packXXX(L, &buff, SUBSCRIBE * 16 + dup * 8 + 2);
- return 1;
- }
- static int l_mqttcore_packUNSUBSCRIBE(lua_State *L) {
- // dup, packetId, topics
- uint8_t dup = luaL_checkinteger(L, 1);
- uint16_t packetId = luaL_checkinteger(L, 2);
- if (!lua_istable(L, 3)) {
- LLOGE("args for l_mqttcore_packUNSUBSCRIBE must be table");
- return 0;
- }
- luaL_Buffer buff;
- luaL_buffinit(L, &buff);
- // 添加packetId
- luaL_addchar(&buff, packetId >> 8);
- luaL_addchar(&buff, packetId & 0xFF);
- size_t len = 0;
- lua_pushnil(L);
- while (lua_next(L, 3) != 0) {
- /* 使用 '键' (在索引 -2 处) 和 '值' (在索引 -1 处)*/
- const char* topic = luaL_checklstring(L, -1, &len);
- luaL_addchar(&buff, len >> 8);
- luaL_addchar(&buff, len & 0xFF);
- luaL_addlstring(&buff, topic, len);
- lua_pop(L, 1);
- }
- lua_pop(L, 1);
- mqttcore_packXXX(L, &buff, UNSUBSCRIBE * 16 + dup * 8 + 2);
- return 1;
- }
- /*
- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
- local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
- local len = 2 + #topic + #payload
- if qos > 0 then
- return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
- else
- return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
- end
- end
- */
- // 32 4D00 2D 2F 61 3159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F757064617465 0003 74657374207075626C69736820383636383138303339393231383534
- // 32 4D00 2D 2F 61 3159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F757064617465 0001001C 74657374207075626C69736820383636383138303339393231383534
- static int l_mqttcore_packPUBLISH(lua_State *L) {
- luaL_Buffer buff;
- luaL_buffinit(L, &buff);
- size_t topic_len = 0;
- size_t payload_len = 0;
- uint8_t dup = luaL_checkinteger(L, 1);
- uint8_t qos = luaL_checkinteger(L, 2);
- uint8_t retain = luaL_checkinteger(L, 3);
- //uint16_t packetId = luaL_checkinteger(L, 4);
- const char* topic = luaL_checklstring(L, 5, &topic_len);
- const char* payload = luaL_checklstring(L, 6, &payload_len);
- size_t total_len = 2 + topic_len + payload_len;
- // 添加头部
- uint8_t header = PUBLISH * 16 + dup * 8 + qos * 2 + retain;
- luaL_addchar(&buff, header);
- // 添加可变长度
- char buf[4];
- int rc = 0;
- if (qos > 0) {
- rc = MQTTPacket_encode(buf, total_len + 2);
- }
- else {
- rc = MQTTPacket_encode(buf, total_len);
- }
- luaL_addlstring(&buff, buf, rc);
- // 添加topic
- luaL_addchar(&buff, topic_len >> 8);
- luaL_addchar(&buff, topic_len & 0xFF);
- luaL_addlstring(&buff, topic, topic_len);
-
- if (qos > 0) {
- luaL_addchar(&buff, qos >> 8);
- luaL_addchar(&buff, qos & 0xFF);
- }
- // 添加payload, 这里是 >A 不是 >P
- //luaL_addchar(&buff, payload_len >> 8);
- //luaL_addchar(&buff, payload_len & 0xFF);
- luaL_addlstring(&buff, payload, payload_len);
- luaL_pushresult(&buff);
- return 1;
- }
- static int l_mqttcore_packACK(lua_State *L) {
- // Id == ACK or PUBREL
- uint8_t id = luaL_checkinteger(L, 1);
- uint8_t dup = luaL_checkinteger(L, 2);
- uint16_t packetId = luaL_checkinteger(L, 3);
- char buff[4];
- buff[0] = id * 16 + dup * 8 + (id == PUBREL ? 1 : 0) * 2;
- buff[1] = 0x02;
- buff[2] = packetId >> 8;
- buff[3] = packetId & 0xFF;
- lua_pushlstring(L, (const char*) buff, 4);
- return 1;
- }
- /*
- local function packZeroData(id, dup, qos, retain)
- dup = dup or 0
- qos = qos or 0
- retain = retain or 0
- return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
- end
- */
- static int l_mqttcore_packZeroData(lua_State *L) {
- // Id == ACK or PUBREL
- uint8_t id = luaL_checkinteger(L, 1);
- uint8_t dup = luaL_optinteger(L, 2, 0);
- uint8_t qos = luaL_optinteger(L, 3, 0);
- uint8_t retain = luaL_optinteger(L, 4, 0);
- char buff[2];
- buff[0] = id * 16 + dup * 8 + qos * 2 + retain;
- buff[1] = 0;
- lua_pushlstring(L, (const char*) buff, 2);
- return 1;
- }
- static size_t _mqtt_unpack_P(lua_State *L, char* ptr) {
- size_t len = (0xFF & ptr[0]) * 256 + (0xFF & ptr[1]);
- //LLOGD("_mqtt_unpack_P %02X %02X len %d", (0xFF & ptr[0]), (0xFF & ptr[1]), len);
- lua_pushlstring(L, ptr+2, len);
- return len;
- }
- static int l_mqttcore_unpack(lua_State *L) {
- size_t slen = 0;
- size_t nextpos = 0;
- uint32_t packetId = 0;
- char* data = (char*)luaL_checklstring(L, 1, &slen);
- if (slen < 2) {
- return 0;
- }
- //LLOGD("unpack first 2 byte %02X %02X", data[0] & 0xFF, data[1] & 0xFF);
- // 首先, 获取package的长度
- size_t dlen = 0;
- size_t poffset = 1;
- size_t multiplier = 1;
- for (; poffset < 4; poffset++)
- {
- if (slen <= poffset) {
- //LLOGD("unpack, slen=%d poffset=%d, execpt more data", slen, poffset);
- return 0;
- }
- dlen += (data[poffset] & 0x7F) * multiplier;
- multiplier *= 128;
- //LLOGD("unpack dlen current %d", dlen);
- if((data[poffset] & 0x80) == 0) {
- break;
- }
- }
- //LLOGD("unpack, poffset %d dlen %d act %d", poffset, dlen, slen);
- if (poffset + dlen > slen) {
- //LLOGD("unpack, wait more data");
- return 0;
- }
-
- // 然后解析第0个字节,header的数据
- uint8_t header = data[0] & 0xFF;
- // local packet = {id = (header - (header % 16)) >> 4,
- // dup = ((header % 16) - ((header % 16) % 8)) >> 3,
- // qos = (header & 0x06) >> 1,
- // retain = (header & 0x01)}
- int id = (header - (header % 16)) >> 4;
- int dup = ((header % 16) - ((header % 16) % 8)) >> 3;
- int qos = (header & 0x06) >> 1;
- int retain = (header & 0x01);
- //LLOGD("unpack id %d dup %d qos %d retain %d", id, dup, qos, retain);
- lua_createtable(L, 0, 7);
- lua_pushliteral(L, "id");
- lua_pushinteger(L, id);
- lua_settable(L, -3);
- lua_pushliteral(L, "dup");
- lua_pushinteger(L, dup);
- lua_settable(L, -3);
-
- lua_pushliteral(L, "qos");
- lua_pushinteger(L, qos);
- lua_settable(L, -3);
-
- lua_pushliteral(L, "retain");
- lua_pushinteger(L, retain);
- lua_settable(L, -3);
- nextpos = poffset+1;
- switch(id) {
- case CONNACK:
- lua_pushliteral(L, "ackFlag");
- lua_pushinteger(L, 0xFF & data[nextpos++]);
- lua_settable(L, -3);
- lua_pushliteral(L, "rc");
- lua_pushinteger(L, 0xFF & data[nextpos++]);
- lua_settable(L, -3);
- break;
- case PUBLISH:
- lua_pushliteral(L, "topic");
- nextpos += _mqtt_unpack_P(L, data + nextpos) + 2;
- //LLOGD("nextpos %d after topic", nextpos);
- lua_settable(L, -3);
- if (qos > 0) {
- lua_pushliteral(L, "packetId");
- packetId = 0xFF & data[nextpos++];
- packetId = packetId * 256;
- packetId = packetId + (0xFF & data[nextpos++]);
- lua_pushinteger(L, packetId);
- lua_settable(L, -3);
- //LLOGD("nextpos %d after packetId", nextpos);
- }
- //LLOGD("nextpos %d before payload", nextpos);
- lua_pushliteral(L, "payload");
- //LLOGD("payload strlen=%d dlen %d poffset %d nextpos %d", dlen+poffset+1 - nextpos, dlen, poffset, nextpos);
- lua_pushlstring(L, data+nextpos, dlen+poffset+1 - nextpos);
- lua_settable(L, -3);
- break;
- case PINGRESP:
- if (dlen) {
- lua_pushliteral(L, "packetId");
- packetId = 0xFF & data[nextpos++];
- packetId = packetId * 256;
- packetId = packetId + (0xFF & data[nextpos++]);
- lua_pushinteger(L, packetId);
- lua_settable(L, -3);
- }
- break;
- }
- lua_pushinteger(L, poffset+dlen+2);// Lua的索引从1开始,所以需要额外加1
- return 2;
- }
- #include "rotable2.h"
- static const rotable_Reg_t reg_mqttcore[] =
- {
- { "encodeLen", ROREG_FUNC(l_mqttcore_encodeLen)},
- { "encodeUTF8", ROREG_FUNC(l_mqttcore_encodeUTF8)},
- { "packCONNECT", ROREG_FUNC(l_mqttcore_packCONNECT)},
- { "packSUBSCRIBE", ROREG_FUNC(l_mqttcore_packSUBSCRIBE)},
- { "packPUBLISH", ROREG_FUNC(l_mqttcore_packPUBLISH)},
- { "packACK", ROREG_FUNC(l_mqttcore_packACK)},
- { "packZeroData", ROREG_FUNC(l_mqttcore_packZeroData)},
- { "packUNSUBSCRIBE",ROREG_FUNC(l_mqttcore_packUNSUBSCRIBE)},
- { "unpack", ROREG_FUNC(l_mqttcore_unpack)},
- { NULL, ROREG_INT(0) }
- };
- LUAMOD_API int luaopen_mqttcore( lua_State *L ) {
- luat_newlib2(L, reg_mqttcore);
- return 1;
- }
|