Просмотр исходного кода

Merge branch 'feat/mqtt_over_ws'

zengeshuai 4 месяцев назад
Родитель
Сommit
559c2dc2db

+ 18 - 2
components/network/libemqtt/luat_lib_mqtt.c

@@ -8,7 +8,7 @@
 @usage
 -- 具体用法请查看demo
 -- 本库只支持 mqtt 3.1.1, 其他版本例如3.1 或 5 均不支持!!!
--- 只支持纯MQTT/MQTTS通信, 不支持 mqtt over websocket
+-- 现已支持 MQTT over WebSocket(ws/wss),--曾帅 2025-09-23
 
 -- 几个大前提:
 -- 本库是基于TCP链接的, 支持加密TCP和非加密TCP
@@ -439,7 +439,19 @@ static int l_mqtt_create(lua_State *L) {
 		// TODO 判断 host的长度,超过191就不行了
 	}
 
-	opts.port = luaL_checkinteger(L, 3);
+    int is_ws_url = 0;
+    if (opts.host && (0==memcmp(opts.host, "ws://", 5) || 0==memcmp(opts.host, "wss://", 6))) {
+        is_ws_url = 1;
+    }
+    if (is_ws_url) {
+        if (lua_isinteger(L, 3)) {
+            opts.port = luaL_checkinteger(L, 3);
+        } else {
+            opts.port = 0; // 由 URL 指定端口或默认端口
+        }
+    } else {
+        opts.port = luaL_checkinteger(L, 3);
+    }
 
 	// 加密相关
 	if (lua_isboolean(L, 4)){
@@ -841,6 +853,10 @@ static const rotable_Reg_t reg_mqtt[] =
 	{"STATE_MQTT",  	ROREG_INT(MQTT_STATE_MQTT)},
 	//@const STATE_READY number mqtt mqtt已连接
 	{"STATE_READY",  	ROREG_INT(MQTT_STATE_READY)},
+    /* TLS verify constants */
+    {"VERIFY_NONE",     ROREG_INT(0)},
+    {"VERIFY_OPTION",   ROREG_INT(1)},
+    {"VERIFY_REQUIRED", ROREG_INT(2)},
 	{ NULL,             ROREG_INT(0)}
 };
 

+ 6 - 0
components/network/libemqtt/luat_mqtt.h

@@ -1,6 +1,7 @@
 #ifndef LUAT_MQTT_H
 #define LUAT_MQTT_H
 #include "luat_network_adapter.h"
+#include "luat_websocket.h"
 /**
  * @defgroup luatos_MQTT  MQTT相关接口
  * @{
@@ -64,6 +65,11 @@ typedef struct{
 	int mqtt_ref;				/**<  强制引用自身避免被GC*/
 	uint16_t conn_timeout;		/**< 连接超时时间,单位秒,默认15秒*/
 	void* userdata;				/**< userdata */
+	/* MQTT over WebSocket 扩展字段 */
+	uint8_t ws_mode; 					/**< 0: TCP/MQTTS, 1: MQTT over WebSocket */
+	luat_websocket_ctrl_t ws_ctrl; 		/**< WebSocket 控制块 */
+	char ws_url[256]; 					/**< 完整 ws(s)://url 路径 */
+	char* ws_headers; 					/**< 额外 WebSocket 握手头部(以\r\n结尾) */
 }luat_mqtt_ctrl_t;
 
 typedef struct{

+ 170 - 45
components/network/libemqtt/luat_mqtt_client.c

@@ -5,6 +5,8 @@
 #include "luat_rtos.h"
 #include "luat_mem.h"
 #include "luat_mqtt.h"
+#include "luat_websocket.h"
+#include <stddef.h>
 
 #define LUAT_LOG_TAG "mqtt"
 #include "luat_log.h"
@@ -19,6 +21,8 @@
 #endif
 
 static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl);
+static void mqtt_ws_on_event(luat_websocket_ctrl_t *ws_ctrl, int arg1, int arg2);
+static int luat_mqtt_ws_send_packet(void* socket_info, const void* buf, unsigned int count);
 
 #ifdef __LUATOS__
 #include "luat_msgbus.h"
@@ -125,34 +129,78 @@ int luat_mqtt_set_auto_connect(luat_mqtt_ctrl_t *mqtt_ctrl, uint8_t auto_connect
 }
 
 int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts) {
+    if (opts == NULL || opts->host == NULL) return -1;
+    /* 检测是否为 WebSocket URL */
+    if (!memcmp(opts->host, "ws://", 5) || !memcmp(opts->host, "wss://", 6)) {
+        mqtt_ctrl->ws_mode = 1;
+        memset(mqtt_ctrl->ws_url, 0, sizeof(mqtt_ctrl->ws_url));
+        memcpy(mqtt_ctrl->ws_url, opts->host, strlen(opts->host));
+        /* 初始化 WebSocket 控制块 */
+        luat_websocket_init(&mqtt_ctrl->ws_ctrl, mqtt_ctrl->adapter_index);
+        /* 强制无效IP,避免传入0.0.0.0 导致直连失败,走域名解析 */
+        network_set_ip_invaild(&mqtt_ctrl->ws_ctrl.ip_addr);
+        luat_websocket_connopts_t ws_opts = {0};
+        ws_opts.url = mqtt_ctrl->ws_url;
+        ws_opts.keepalive = 60;
+        ws_opts.use_ipv6 = opts->is_ipv6;
+        /* 透传 TLS 选项 */
+        ws_opts.verify = opts->verify;
+        ws_opts.server_cert = opts->server_cert;
+        ws_opts.server_cert_len = opts->server_cert_len;
+        ws_opts.client_cert = opts->client_cert;
+        ws_opts.client_cert_len = opts->client_cert_len;
+        ws_opts.client_key = opts->client_key;
+        ws_opts.client_key_len = opts->client_key_len;
+        ws_opts.client_password = opts->client_password;
+        ws_opts.client_password_len = opts->client_password_len;
+        luat_websocket_set_connopts(&mqtt_ctrl->ws_ctrl, &ws_opts);
+        /* 增加子协议头 */
+        static const char proto_hdr[] = "Sec-WebSocket-Protocol: mqtt\r\n";
+        char *hdr = (char*)luat_heap_malloc(sizeof(proto_hdr));
+        if (hdr) {
+            LLOGD("WebSocket header allocation successful, size: %d", sizeof(proto_hdr));
+            memcpy(hdr, proto_hdr, sizeof(proto_hdr));
+        } else {
+            LLOGW("WebSocket header allocation failed, size: %d", sizeof(proto_hdr));
+			return -1;
+        }
+        luat_websocket_set_headers(&mqtt_ctrl->ws_ctrl, hdr);
+        /* 绑定回调,切换发送函数 */
+        luat_websocket_set_cb(&mqtt_ctrl->ws_ctrl, (luat_websocket_cb_t)mqtt_ws_on_event);
+        mqtt_ctrl->broker.socket_info = mqtt_ctrl;
+        mqtt_ctrl->broker.send = luat_mqtt_ws_send_packet;
+        return 0;
+    }
+
+    /* 常规 TCP/MQTTS */
     memcpy(mqtt_ctrl->host, opts->host, strlen(opts->host) + 1);
     mqtt_ctrl->remote_port = opts->port;
-	if (opts->is_tls){
-		if (network_init_tls(mqtt_ctrl->netc, opts->verify)){
-			LLOGE("初始化tls失败");
-			return -1;
-		}
-		if (opts->server_cert){
-			if (network_set_server_cert(mqtt_ctrl->netc, (const unsigned char *)opts->server_cert, opts->server_cert_len+1)){
-				LLOGE("network_set_server_cert error");
-				return -1;
-			}
-		}
-		if (opts->client_cert){
-			if (network_set_client_cert(mqtt_ctrl->netc, (const unsigned char*)opts->client_cert, opts->client_cert_len+1,
-					(const unsigned char*)opts->client_key, opts->client_key_len+1,
-					(const unsigned char*)opts->client_password, opts->client_password_len+1)){
-				LLOGE("network_set_client_cert error");
-				return -1;
-			}
-		}
-	} else {
-		network_deinit_tls(mqtt_ctrl->netc);
-	}
+    if (opts->is_tls){
+        if (network_init_tls(mqtt_ctrl->netc, opts->verify)){
+            LLOGE("初始化tls失败");
+            return -1;
+        }
+        if (opts->server_cert){
+            if (network_set_server_cert(mqtt_ctrl->netc, (const unsigned char *)opts->server_cert, opts->server_cert_len+1)){
+                LLOGE("network_set_server_cert error");
+                return -1;
+            }
+        }
+        if (opts->client_cert){
+            if (network_set_client_cert(mqtt_ctrl->netc, (const unsigned char*)opts->client_cert, opts->client_cert_len+1,
+                    (const unsigned char*)opts->client_key, opts->client_key_len+1,
+                    (const unsigned char*)opts->client_password, opts->client_password_len+1)){
+                LLOGE("network_set_client_cert error");
+                return -1;
+            }
+        }
+    } else {
+        network_deinit_tls(mqtt_ctrl->netc);
+    }
 
-	if (opts->is_ipv6) {
-		network_connect_ipv6_domain(mqtt_ctrl->netc, 1);
-	}
+    if (opts->is_ipv6) {
+        network_connect_ipv6_domain(mqtt_ctrl->netc, 1);
+    }
 
     mqtt_ctrl->broker.socket_info = mqtt_ctrl;
     mqtt_ctrl->broker.send = luat_mqtt_send_packet;
@@ -174,10 +222,15 @@ void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
 		mqtt_ctrl->buffer_offset = 0;
 		luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
 		luat_stop_rtos_timer(mqtt_ctrl->conn_timer);
-		if (mqtt_ctrl->netc){
-			network_force_close_socket(mqtt_ctrl->netc);
-			l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_DISCONNECT, mqtt_ctrl->error_state==0?MQTT_ERROR_STATE_SOCKET:mqtt_ctrl->error_state);
-		}
+
+		if (mqtt_ctrl->ws_mode) {
+            luat_websocket_close_socket(&mqtt_ctrl->ws_ctrl);
+            l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_DISCONNECT, mqtt_ctrl->error_state==0?MQTT_ERROR_STATE_SOCKET:mqtt_ctrl->error_state);
+        } else if (mqtt_ctrl->netc){
+            network_force_close_socket(mqtt_ctrl->netc);
+            l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_DISCONNECT, mqtt_ctrl->error_state==0?MQTT_ERROR_STATE_SOCKET:mqtt_ctrl->error_state);
+        }
+
 		if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time > 0){
 			luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
 		}else{
@@ -206,10 +259,13 @@ void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
 		luat_heap_free(mqtt_ctrl->broker.will_data);
 		mqtt_ctrl->broker.will_data = NULL;
 	}
-	if (mqtt_ctrl->netc){
-		network_release_ctrl(mqtt_ctrl->netc);
-    	mqtt_ctrl->netc = NULL;
-	}
+    if (mqtt_ctrl->ws_mode){
+        luat_websocket_release_socket(&mqtt_ctrl->ws_ctrl);
+    }
+    if (mqtt_ctrl->netc){
+        network_release_ctrl(mqtt_ctrl->netc);
+        mqtt_ctrl->netc = NULL;
+    }
 	if (mqtt_ctrl->mqtt_packet_buffer) {
 		luat_heap_free(mqtt_ctrl->mqtt_packet_buffer);
 		mqtt_ctrl->mqtt_packet_buffer = NULL;
@@ -358,11 +414,11 @@ static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
 			uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
 			uint32_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
 			luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
-			mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
-            mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
-            mqtt_msg->message_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
-            mqtt_msg->flags = mqtt_ctrl->mqtt_packet_buffer[0];
-            l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, (int)mqtt_msg);
+				mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
+	            mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
+	            mqtt_msg->message_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
+	            mqtt_msg->flags = mqtt_ctrl->mqtt_packet_buffer[0];
+	            l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, (int)mqtt_msg);
 #else
 			l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, 0);
 #endif
@@ -502,8 +558,66 @@ int32_t luat_mqtt_callback(void *data, void *param) {
     return 0;
 }
 
+/* WebSocket 回调:握手成功后发起 MQTT CONNECT;收到数据则喂入 mqtt_parse */
+static void mqtt_ws_on_event(luat_websocket_ctrl_t *ws_ctrl, int arg1, int arg2) {
+    luat_mqtt_ctrl_t *mqtt_ctrl = (luat_mqtt_ctrl_t *)((char*)ws_ctrl - offsetof(luat_mqtt_ctrl_t, ws_ctrl));
+    if (arg1 == WEBSOCKET_MSG_CONNACK) {
+        mqtt_ctrl->mqtt_state = MQTT_STATE_MQTT;
+        int ret = mqtt_connect(&(mqtt_ctrl->broker));
+        if (ret == 1) {
+            luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*3/4, 1);
+        }
+    } else if (arg1 == WEBSOCKET_MSG_PUBLISH) {
+        /* arg2 指向的是复制的帧头+payload;解析出 MQTT 负载数据区域并追加到 mqtt_packet_buffer */
+        uint8_t *frame = (uint8_t *)arg2;
+        if (!frame) return;
+        uint16_t plen = 0;
+        if ((frame[1] & 0x7F) == 126) {
+            plen = (frame[2] << 8) | frame[3];
+            frame += 4;
+        } else {
+            plen = (frame[1] & 0x7F);
+            frame += 2;
+        }
+        if (plen == 0) return;
+        if (mqtt_ctrl->rxbuff_size - mqtt_ctrl->buffer_offset < plen) {
+            LLOGW("mqtt rx buffer not enough for ws payload %d", plen);
+            return;
+        }
+        memcpy(mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, frame, plen);
+        mqtt_ctrl->buffer_offset += plen;
+        /* 循环解析 */
+        while (mqtt_parse(mqtt_ctrl) == 1) {
+            if (mqtt_ctrl->buffer_offset == 0) break;
+        }
+    } else if (arg1 == WEBSOCKET_MSG_DISCONNECT || arg1 >= WEBSOCKET_MSG_ERROR_CONN) {
+        mqtt_ctrl->error_state = MQTT_ERROR_STATE_SOCKET;
+        luat_mqtt_close_socket(mqtt_ctrl);
+    } else if (arg1 == WEBSOCKET_MSG_SENT) {
+        /* no-op */
+    } else if (arg1 == WEBSOCKET_MSG_TIMER_PING) {
+        /* WS 层心跳已在 websocket 管理,这里无需处理 */
+    }
+}
+
+/* 通过 WebSocket 发送 MQTT 报文 */
+static int luat_mqtt_ws_send_packet(void* socket_info, const void* buf, unsigned int count) {
+    luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
+    luat_websocket_pkg_t pkg = {0};
+    pkg.FIN = 1;
+    pkg.OPT_CODE = WebSocket_OP_BINARY;
+    pkg.plen = (uint16_t)count;
+    pkg.payload = (const char*)buf;
+    int ret = luat_websocket_send_frame(&mqtt_ctrl->ws_ctrl, &pkg);
+    if (ret < 0) return 0;
+    return count;
+}
+
 int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
     luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
+    if (mqtt_ctrl->ws_mode) {
+        return luat_mqtt_ws_send_packet(socket_info, buf, count);
+    }
 	uint32_t tx_len = 0;
 	int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
 	if (ret < 0) {
@@ -530,21 +644,32 @@ int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
 		return -1;
 	}
 	memset(mqtt_ctrl->mqtt_packet_buffer, 0, mqtt_ctrl->rxbuff_size+4);
-    const char *hostname = mqtt_ctrl->host;
-    uint16_t port = mqtt_ctrl->remote_port;
     uint16_t keepalive = mqtt_ctrl->keepalive;
-    LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
     mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
+    if (mqtt_ctrl->ws_mode) {
+        /* 通过 WebSocket 发起连接,完成握手后再 mqtt_connect */
+        int r = luat_websocket_connect(&mqtt_ctrl->ws_ctrl);
+        if (r < 0) {
+            return -1;
+        }
+        mqtt_ctrl->mqtt_state = MQTT_STATE_SCONNECT;
+        return 0;
+    } 
+	else {
+	const char *hostname = mqtt_ctrl->host;
+	uint16_t port = mqtt_ctrl->remote_port;
+	LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
 	ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), NULL, port, 0) < 0;
 	LLOGD("network_connect ret %d", ret);
 	if (ret < 0) {
-        network_close(mqtt_ctrl->netc, 0);
-        return -1;
-    }
+		network_close(mqtt_ctrl->netc, 0);
+		return -1;
+	}
 	mqtt_ctrl->mqtt_state = MQTT_STATE_SCONNECT;
 	// 启动连接超时定时器
 	luat_start_rtos_timer(mqtt_ctrl->conn_timer, mqtt_ctrl->conn_timeout * 1000, 0);
-    return 0;
+        return 0;
+    }
 }
 
 int luat_mqtt_set_will(luat_mqtt_ctrl_t *mqtt_ctrl, const char* topic, 

+ 31 - 10
components/network/websocket/luat_websocket.c

@@ -41,6 +41,10 @@ int l_luat_websocket_msg_cb(luat_websocket_ctrl_t *websocket_ctrl, int arg1, int
 static int luat_websocket_msg_cb(luat_websocket_ctrl_t *websocket_ctrl, int arg1, int arg2){
 #ifdef __LUATOS__
     l_luat_websocket_msg_cb(websocket_ctrl,arg1,arg2);
+    if (websocket_ctrl->websocket_cb){
+        luat_websocket_cb_t websocket_cb = websocket_ctrl->websocket_cb;
+        websocket_cb(websocket_ctrl, arg1,arg2);
+    }
 #else
 	if (websocket_ctrl->websocket_cb){
 		luat_websocket_cb_t websocket_cb = websocket_ctrl->websocket_cb;
@@ -277,16 +281,33 @@ int luat_websocket_set_connopts(luat_websocket_ctrl_t *websocket_ctrl, luat_webs
 	// memcpy(websocket_ctrl->uri, uri, strlen(uri) + 1);
 	LLOGD("host %s port %d uri %s", websocket_ctrl->host, port, websocket_ctrl->uri);
 
-	if (is_tls)
-	{
-		if (network_init_tls(websocket_ctrl->netc, 0)){
-			return -1;
-		}
-	}
-	else
-	{
-		network_deinit_tls(websocket_ctrl->netc);
-	}
+    if (is_tls)
+    {
+        /* 支持 0(不校验)/1(可选校验)/2(严格校验) */
+        uint8_t verify = opts->verify;
+        if (network_init_tls(websocket_ctrl->netc, verify)){
+            return -1;
+        }
+        if (opts->server_cert && opts->server_cert_len > 0) {
+            if (network_set_server_cert(websocket_ctrl->netc, (const unsigned char *)opts->server_cert, opts->server_cert_len + 1)){
+                LLOGE("network_set_server_cert error");
+                return -1;
+            }
+        }
+        if (opts->client_cert && opts->client_key) {
+            if (network_set_client_cert(websocket_ctrl->netc,
+                    (const unsigned char*)opts->client_cert, opts->client_cert_len + 1,
+                    (const unsigned char*)opts->client_key, opts->client_key_len + 1,
+                    (const unsigned char*)opts->client_password, opts->client_password_len + 1)){
+                LLOGE("network_set_client_cert error");
+                return -1;
+            }
+        }
+    }
+    else
+    {
+        network_deinit_tls(websocket_ctrl->netc);
+    }
 	
 	if (opts->keepalive > 0) {
 		websocket_ctrl->keepalive = opts->keepalive;

+ 10 - 0
components/network/websocket/luat_websocket.h

@@ -57,6 +57,16 @@ typedef struct luat_websocket_connopts
 	const char *url;
 	uint16_t keepalive;
 	uint8_t use_ipv6;
+    /* TLS options for wss:// */
+    uint8_t verify;                 /* 0:不校验 1:可选 2:严格校验 */
+    const char* server_cert;        /* CA/根证书数据 */
+    size_t server_cert_len;
+    const char* client_cert;        /* 客户端证书 */
+    size_t client_cert_len;
+    const char* client_key;         /* 客户端私钥 */
+    size_t client_key_len;
+    const char* client_password;    /* 私钥口令 */
+    size_t client_password_len;
 } luat_websocket_connopts_t;
 
 typedef struct luat_websocket_pkg