Просмотр исходного кода

update: 部分完成mqtt.lua的改造

refer: https://github.com/openLuat/LuatOS/issues/48
Wendal Chen 5 лет назад
Родитель
Сommit
12578be921
3 измененных файлов с 255 добавлено и 49 удалено
  1. 2 2
      bsp/air302/demo/aliyun/main.lua
  2. 52 34
      bsp/air302/lib/mqtt.lua
  3. 201 13
      luat/modules/luat_lib_mqttcore.c

+ 2 - 2
bsp/air302/demo/aliyun/main.lua

@@ -72,7 +72,7 @@ sys.taskInit(function()
         -- 等待底层tcp连接完成
         while not mqttc:connect(host, port) do sys.wait(15000) end
         -- 连接成功, 开始订阅
-        log.info("mqttc", "mqtt seem ok", "try subscribe", topic_req)
+        log.info("mqttc", "mqtt seem ok", "try subscribe", topic_get)
         if mqttc:subscribe(topic_get) then
             -- 订阅完成, 发布业务数据
             log.info("mqttc", "mqtt subscribe ok", "try publish")
@@ -86,7 +86,7 @@ sys.taskInit(function()
                         log.info("mqttc", "get message from server", data.payload or "nil", data.topic)
                     elseif data == "pub_msg" then -- 需要上报数据
                         log.info("mqttc", "send message to server", data, param)
-                        mqttc:publish(topic_update, "response " .. param)
+                        mqttc:publish(topic_update, "response " .. param, 1)
                     elseif data == "timeout" then -- 无交互,发个定时report也行
                         log.info("mqttc", "wait timeout, send custom report")
                         mqttc:publish(topic_update, "test publish " .. os.date() .. nbiot.imei())

+ 52 - 34
bsp/air302/lib/mqtt.lua

@@ -64,44 +64,62 @@ local packCONNECT = mqttcore.packCONNECT
 --     return mydata
 -- end
 
-local function packSUBSCRIBE(dup, packetId, topics)
-    local header = SUBSCRIBE * 16 + dup * 8 + 2
-    local data = pack.pack(">H", packetId)
-    for topic, qos in pairs(topics) do
-        data = data .. pack.pack(">Pb", topic, qos)
-    end
-    return pack.pack(">bAA", header, encodeLen(#data), data)
-end
+local packSUBSCRIBE = mqttcore.packSUBSCRIBE
 
-local function packUNSUBSCRIBE(dup, packetId, topics)
-    local header = UNSUBSCRIBE * 16 + dup * 8 + 2
-    local data = pack.pack(">H", packetId)
-    for k, topic in pairs(topics) do
-        data = data .. pack.pack(">P", topic)
-    end
-    return pack.pack(">bAA", header, encodeLen(#data), data)
-end
+-- local function packSUBSCRIBE(dup, packetId, topics)
+--     local header = SUBSCRIBE * 16 + dup * 8 + 2
+--     local data = pack.pack(">H", packetId)
+--     for topic, qos in pairs(topics) do
+--         data = data .. pack.pack(">Pb", topic, qos)
+--     end
+--     local mydata = pack.pack(">bAA", header, encodeLen(#data), data)
+--     log.info("mqtt", "true", mydata:toHex())
+--     local tdata = mqttcore.packSUBSCRIBE(dup, packetId, topics)
+--     log.info("mqtt", "false", tdata:toHex())
+--     return mydata
+-- end
 
-local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
-    local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
-    local len = 2 + #topic + #payload
-    if qos > 0 then
-        return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
-    else
-        return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
-    end
-end
+local packUNSUBSCRIBE = mqttcore.packUNSUBSCRIBE
+-- local function packUNSUBSCRIBE(dup, packetId, topics)
+--     local header = UNSUBSCRIBE * 16 + dup * 8 + 2
+--     local data = pack.pack(">H", packetId)
+--     for k, topic in pairs(topics) do
+--         data = data .. pack.pack(">P", topic)
+--     end
+--     return pack.pack(">bAA", header, encodeLen(#data), data)
+-- end
 
-local function packACK(id, dup, packetId)
-    return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
-end
+local packPUBLISH = mqttcore.packPUBLISH
 
-local function packZeroData(id, dup, qos, retain)
-    dup = dup or 0
-    qos = qos or 0
-    retain = retain or 0
-    return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
-end
+-- local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
+--     local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
+--     local len = 2 + #topic + #payload
+--     local mydata = nil
+--     if qos > 0 then
+--         mydata = pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
+--     else
+--         mydata = pack.pack(">bAPA", header, encodeLen(len), topic, payload)
+--     end
+--     local tdata = mqttcore.packPUBLISH(dup, qos, retain, packetId, topic, payload)
+--     log.info("mqtt", "true", mydata:toHex())
+--     log.info("mqtt", "false", tdata:toHex())
+--     return mydata
+-- end
+
+local packACK = mqttcore.packACK
+
+-- local function packACK(id, dup, packetId)
+--     return pack.pack(">bbH", id * 16 + dup * 8 + (id == PUBREL and 1 or 0) * 2, 0x02, packetId)
+-- end
+
+local packZeroData = mqttcore.packZeroData
+
+-- local function packZeroData(id, dup, qos, retain)
+--     dup = dup or 0
+--     qos = qos or 0
+--     retain = retain or 0
+--     return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
+-- end
 
 local function unpack(s)
     if #s < 2 then return end

+ 201 - 13
luat/modules/luat_lib_mqttcore.c

@@ -5,11 +5,13 @@
 @date    2020.07.03
 */
 #include "luat_base.h"
-#include "luat_log.h"
 #include "luat_sys.h"
 #include "luat_msgbus.h"
 #include "luat_pack.h"
 
+#define LUAT_LOG_TAG "luat.mqttcore"
+#include "luat_log.h"
+
 enum msgTypes
 {
 	CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
@@ -376,6 +378,26 @@ static int l_mqttcore_encodeUTF8(lua_State *L) {
 	return 1;
 }
 
+static void mqttcore_packXXX(lua_State *L, luaL_Buffer *buff, uint8_t header) {
+	luaL_Buffer buff2;
+	luaL_buffinitsize(L, &buff2, buff->n + 5);
+
+	// 标识 CONNECT
+	luaL_addchar(&buff2, header);
+	// 剩余长度
+    char buf[4];
+    int rc = MQTTPacket_encode(buf, buff->n);
+    luaL_addlstring(&buff2, buf, rc);
+
+	luaL_addlstring(&buff2, buff->b, buff->n);
+
+	// 清理掉
+	luaL_pushresult(buff);
+	lua_pop(L, 1);
+
+	luaL_pushresult(&buff2);
+}
+
 static int l_mqttcore_packCONNECT(lua_State *L) {
 	luaL_Buffer buff;
 	luaL_buffinit(L, &buff);
@@ -454,24 +476,185 @@ static int l_mqttcore_packCONNECT(lua_State *L) {
 
 	// 然后计算总长度,坑呀
 
-	luaL_Buffer buff2;
-	luaL_buffinitsize(L, &buff2, buff.n + 5);
+	mqttcore_packXXX(L, &buff, CONNECT * 16);
+	return 1;
+}
 
-	// 标识 CONNECT
-	luaL_addchar(&buff2, CONNECT * 16);
-	// 剩余长度
-    char buf[4];
-    int rc = MQTTPacket_encode(buf, buff.n);
-    luaL_addlstring(&buff2, buf, rc);
+//82 2F0002002A2F613159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F67657400
+//82 2D00    2A2F613159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F67657400
 
-	luaL_addlstring(&buff2, buff.b, buff.n);
+static int l_mqttcore_packSUBSCRIBE(lua_State *L) {
+	// dup, packetId, topics
+	uint8_t dup = luaL_checkinteger(L, 1);
+	uint16_t packetId = luaL_checkinteger(L, 2);
+	if (!lua_istable(L, 3)) {
+		LLOGE("args for packSUBSCRIBE must be table");
+		return 0;
+	}
 
-	// 清理掉
-	luaL_pushresult(&buff);
+	luaL_Buffer buff;
+	luaL_buffinit(L, &buff);
+
+	// 添加packetId
+	luaL_addchar(&buff, packetId >> 8);
+	luaL_addchar(&buff, packetId & 0xFF);
+
+	size_t len = 0;
+	lua_pushnil(L);
+	while (lua_next(L, 3) != 0) {
+       /* 使用 '键' (在索引 -2 处) 和 '值' (在索引 -1 处)*/
+       const char* topic = luaL_checklstring(L, -2, &len);
+	   uint8_t qos = luaL_checkinteger(L, -1);
+	   luaL_addchar(&buff, len >> 8);
+	   luaL_addchar(&buff, len & 0xFF);
+	   luaL_addlstring(&buff, topic, len);
+
+	   luaL_addchar(&buff, qos);
+
+	   lua_pop(L, 1);
+    }
 	lua_pop(L, 1);
 
+	mqttcore_packXXX(L, &buff, SUBSCRIBE * 16 + dup * 8 + 2);
+
+	return 1;
+}
+
+static int l_mqttcore_packUNSUBSCRIBE(lua_State *L) {
+	// dup, packetId, topics
+	uint8_t dup = luaL_checkinteger(L, 1);
+	uint16_t packetId = luaL_checkinteger(L, 2);
+	if (!lua_istable(L, 3)) {
+		LLOGE("args for l_mqttcore_packUNSUBSCRIBE must be table");
+		return 0;
+	}
+
+	luaL_Buffer buff;
+	luaL_buffinit(L, &buff);
+
+	// 添加packetId
+	luaL_addchar(&buff, packetId >> 8);
+	luaL_addchar(&buff, packetId & 0xFF);
+
+	size_t len = 0;
+	lua_pushnil(L);
+	while (lua_next(L, 3) != 0) {
+       /* 使用 '键' (在索引 -2 处) 和 '值' (在索引 -1 处)*/
+       const char* topic = luaL_checklstring(L, -1, &len);
+	   luaL_addchar(&buff, len >> 8);
+	   luaL_addchar(&buff, len & 0xFF);
+	   luaL_addlstring(&buff, topic, len);
+	   lua_pop(L, 1);
+    }
+	lua_pop(L, 1);
+
+	mqttcore_packXXX(L, &buff, UNSUBSCRIBE * 16 + dup * 8 + 2);
+
+	return 1;
+}
+
+/*
+local function packPUBLISH(dup, qos, retain, packetId, topic, payload)
+    local header = PUBLISH * 16 + dup * 8 + qos * 2 + retain
+    local len = 2 + #topic + #payload
+    if qos > 0 then
+        return pack.pack(">bAPHA", header, encodeLen(len + 2), topic, packetId, payload)
+    else
+        return pack.pack(">bAPA", header, encodeLen(len), topic, payload)
+    end
+end
+*/
+// 32 4D00 2D 2F 61 3159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F757064617465 0003     74657374207075626C69736820383636383138303339393231383534
+// 32 4D00 2D 2F 61 3159467559364F4331652F617A4E6849624E4E546473567759326D685A6E6F2F757365722F757064617465 0001001C 74657374207075626C69736820383636383138303339393231383534	
+static int l_mqttcore_packPUBLISH(lua_State *L) {
+	luaL_Buffer buff;
+	luaL_buffinit(L, &buff);
+
+	size_t topic_len = 0;
+	size_t payload_len = 0;
+
+	uint8_t dup = luaL_checkinteger(L, 1);
+	uint8_t qos = luaL_checkinteger(L, 2);
+	uint8_t retain = luaL_checkinteger(L, 3);
+	uint16_t packetId = luaL_checkinteger(L, 4);
+	const char* topic = luaL_checklstring(L, 5, &topic_len);
+	const char* payload = luaL_checklstring(L, 6, &payload_len);
+
+	size_t total_len = 2 + topic_len + payload_len;
+
+	// 添加头部
+	uint8_t header = PUBLISH * 16 + dup * 8 + qos * 2 + retain;
+
+	luaL_addchar(&buff, header);
+	// 添加可变长度
+	char buf[4];
+	int rc = 0;
+	if (qos > 0) {
+    	rc = MQTTPacket_encode(buf, total_len + 2);
+	}
+	else {
+		rc = MQTTPacket_encode(buf, total_len);
+	}
+    luaL_addlstring(&buff, buf, rc);
+
+	// 添加topic
+	luaL_addchar(&buff, topic_len >> 8);
+	luaL_addchar(&buff, topic_len & 0xFF);
+	luaL_addlstring(&buff, topic, topic_len);
+	
+	if (qos > 0) {
+		luaL_addchar(&buff, qos >> 8);
+		luaL_addchar(&buff, qos & 0xFF);
+	}
+
+	// 添加payload, 这里是 >A 不是 >P
+	//luaL_addchar(&buff, payload_len >> 8);
+	//luaL_addchar(&buff, payload_len & 0xFF);
+	luaL_addlstring(&buff, payload, payload_len);
+
+	luaL_pushresult(&buff);
+	return 1;
+}
+
+
+static int l_mqttcore_packACK(lua_State *L) {
+	// Id == ACK or PUBREL
+	uint8_t id = luaL_checkinteger(L, 1);
+	uint8_t dup = luaL_checkinteger(L, 2);
+	uint16_t packetId = luaL_checkinteger(L, 3);
+
+	char buff[4];
+	buff[0] = id * 16 + dup * 8 + (id == PUBREL ? 1 : 0) * 2;
+	buff[1] = 0x02;
+	buff[2] = packetId >> 8;
+	buff[3] = packetId & 0xFF;
+
+	lua_pushlstring(L, (const char*) buff, 4);
+
+	return 1;
+}
+
+/*
+local function packZeroData(id, dup, qos, retain)
+    dup = dup or 0
+    qos = qos or 0
+    retain = retain or 0
+    return pack.pack(">bb", id * 16 + dup * 8 + qos * 2 + retain, 0)
+end
+*/
+static int l_mqttcore_packZeroData(lua_State *L) {
+	// Id == ACK or PUBREL
+	uint8_t id = luaL_checkinteger(L, 1);
+	uint8_t dup = luaL_optinteger(L, 2, 0);
+	uint8_t qos = luaL_optinteger(L, 3, 0);
+	uint8_t retain = luaL_optinteger(L, 4, 0);
+
+	char buff[2];
+	buff[0] = id * 16 + dup * 8 + qos * 2 + retain;
+	buff[1] = 0;
+
+	lua_pushlstring(L, (const char*) buff, 2);
 
-	luaL_pushresult(&buff2);
 	return 1;
 }
 
@@ -481,6 +664,11 @@ static const rotable_Reg reg_mqttcore[] =
     { "encodeLen", l_mqttcore_encodeLen, 0},
     { "encodeUTF8",l_mqttcore_encodeUTF8,0},
 	{ "packCONNECT", l_mqttcore_packCONNECT,0},
+	{ "packSUBSCRIBE", l_mqttcore_packSUBSCRIBE, 0},
+	{ "packPUBLISH",	l_mqttcore_packPUBLISH,	0},
+	{ "packACK",		l_mqttcore_packACK,		0},
+	{ "packZeroData",   l_mqttcore_packZeroData,0},
+	{ "packUNSUBSCRIBE", l_mqttcore_packUNSUBSCRIBE,0},
 	{ NULL, NULL }
 };