Wendal Chen 3 лет назад
Родитель
Сommit
e61b217d2d

+ 63 - 4
components/network/libemqtt/libemqtt.c

@@ -27,6 +27,8 @@
 
 #include <string.h>
 #include <libemqtt.h>
+#include "luat_base.h"
+#include "luat_malloc.h"
 
 #define MQTT_DUP_FLAG     (1<<3)
 #define MQTT_QOS0_FLAG    (0<<1)
@@ -213,6 +215,15 @@ int mqtt_connect(mqtt_broker_handle_t* broker)
 	if(broker->clean_session) {
 		flags |= MQTT_CLEAN_SESSION;
 	}
+	if (broker->will_len > 0) {
+		payload_len += broker->will_len;
+		flags |= MQTT_WILL_FLAG;
+		if (broker->will_retain)
+			flags |= MQTT_WILL_RETAIN;
+		if (broker->will_qos) {
+			flags |= (broker->will_qos << 3);
+		}
+	}
 
 	// Variable header
 	uint8_t var_header[] = {
@@ -246,7 +257,12 @@ int mqtt_connect(mqtt_broker_handle_t* broker)
     }
 
 	uint16_t offset = 0;
-	uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+payload_len];
+	uint32_t packet_size = sizeof(fixed_header)+sizeof(var_header)+payload_len;
+	uint8_t *packet = luat_heap_malloc(packet_size);
+	if (packet == NULL) {
+		LLOGE("out of memory when malloc connect packet");
+		return -2;
+	}
 	memset(packet, 0, sizeof(packet));
 	memcpy(packet, fixed_header, sizeof(fixed_header));
 	offset += sizeof(fixed_header);
@@ -259,6 +275,12 @@ int mqtt_connect(mqtt_broker_handle_t* broker)
 		memcpy(packet+offset, broker->clientid, clientidlen);
 	offset += clientidlen;
 
+	
+	if (broker->will_len) {
+		memcpy(packet+offset, broker->will_data, broker->will_len);
+		offset += broker->will_len;
+	}
+
 	if(usernamelen) {
 		// Username - UTF encoded
 		packet[offset++] = usernamelen>>8;
@@ -276,10 +298,11 @@ int mqtt_connect(mqtt_broker_handle_t* broker)
 	}
 
 	// Send the packet
-	if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
+	if(broker->send(broker->socket_info, packet, packet_size) < packet_size) {
+		luat_heap_free(packet);
 		return -1;
 	}
-
+	luat_heap_free(packet);
 	return 1;
 }
 
@@ -504,4 +527,40 @@ int mqtt_unsubscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t*
 	}
 
 	return 1;
-}
+}
+
+int mqtt_set_will(mqtt_broker_handle_t* broker, const char* topic, 
+						const char* payload, size_t payload_len, 
+						uint8_t qos, size_t retain) {
+	if (broker == NULL)
+		return -1;
+	//LLOGD("will %s %.*s %d %d", topic, payload_len, payload, qos, retain);
+	// 如果之前有数据, 那就释放掉
+	if (broker->will_data != NULL) {
+		broker->will_len = 0;
+		luat_heap_free(broker->will_data);
+		broker->will_data = NULL;
+	}
+	if (topic == NULL || payload == NULL || payload_len == 0) {
+		LLOGI("will topic/payload is NULL");
+		return 0;
+	}
+	size_t topic_len = strlen(topic);
+	broker->will_data = luat_heap_malloc(topic_len + 2 + payload_len + 2);
+	if (broker->will_data == NULL) {
+		return -2;
+	}
+	broker->will_data[0] = (uint8_t)(topic_len >> 8);
+	broker->will_data[1] = (uint8_t)(topic_len & 0xFF);
+	memcpy(broker->will_data + 2, topic, topic_len);
+	
+	broker->will_data[2  + topic_len] = (uint8_t)(payload_len >> 8);
+	broker->will_data[2  + topic_len + 1] = (uint8_t)(payload_len & 0xFF);
+	memcpy(broker->will_data + 2 + topic_len + 2, payload, payload_len);
+
+	broker->will_qos = qos > 2 ? 0 : qos;
+	broker->will_retain = retain;
+	broker->will_len = topic_len + 2 + payload_len + 2;
+	//LLOGD("will len %d", broker->will_len);
+	return 0;
+}

+ 9 - 3
components/network/libemqtt/libemqtt.h

@@ -161,13 +161,15 @@ typedef struct {
 	// Auth fields
 	char username[MQTT_CONF_USERNAME_LENGTH];
 	char password[MQTT_CONF_PASSWORD_LENGTH];
+	// Management fields
+	uint16_t seq;
+	uint16_t alive;
 	// Will topic
 	uint8_t will_retain;
 	uint8_t will_qos;
 	uint8_t clean_session;
-	// Management fields
-	uint16_t seq;
-	uint16_t alive;
+	char *will_data; // 包含topic和payload
+	uint32_t will_len;
 } mqtt_broker_handle_t;
 
 
@@ -196,6 +198,10 @@ void mqtt_init_auth(mqtt_broker_handle_t* broker, const char* username, const ch
  */
 void mqtt_set_alive(mqtt_broker_handle_t* broker, uint16_t alive);
 
+int mqtt_set_will(mqtt_broker_handle_t* broker, const char* topic, 
+						const char* payload, size_t payload_len, 
+						uint8_t qos, size_t retain);
+
 /** Connect to the broker.
  * @param broker Data structure that contains the connection information with the broker.
  *

+ 24 - 0
components/network/libemqtt/luat_lib_mqtt.c

@@ -516,6 +516,29 @@ static int l_mqtt_ready(lua_State *L) {
 	return 1;
 }
 
+/*
+设置遗嘱消息
+@api mqttc:will(topic, payload, qos, retain)
+@string 遗嘱消息的topic
+@string 遗嘱消息的payload
+@string 遗嘱消息的qos, 默认0, 可以不填
+@string 遗嘱消息的retain, 默认0, 可以不填
+@return bool 成功返回true,否则返回false
+@usage
+-- 要在connect之前调用 
+mqttc:will("/xxx/xxx", "xxxxxx")
+*/
+static int l_mqtt_will(lua_State *L) {
+	luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
+	size_t payload_len = 0;
+	const char* topic = luaL_checkstring(L, 2);
+	const char* payload = luaL_checklstring(L, 3, &payload_len);
+	int qos = luaL_optinteger(L, 4, 0);
+	int retain = luaL_optinteger(L, 5, 0);
+	lua_pushboolean(L, luat_mqtt_set_will(mqtt_ctrl, topic, payload, payload_len, qos, retain) == 0 ? 1 : 0);
+	return 1;
+}
+
 static int _mqtt_struct_newindex(lua_State *L);
 
 void luat_mqtt_struct_init(lua_State *L) {
@@ -539,6 +562,7 @@ static const rotable_Reg_t reg_mqtt[] =
 	{"unsubscribe",		ROREG_FUNC(l_mqtt_unsubscribe)},
 	{"close",			ROREG_FUNC(l_mqtt_close)},
 	{"ready",			ROREG_FUNC(l_mqtt_ready)},
+	{"will",			ROREG_FUNC(l_mqtt_will)},
 
 	{ NULL,             ROREG_INT(0)}
 };

+ 3 - 1
components/network/libemqtt/luat_mqtt.h

@@ -65,5 +65,7 @@ int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *op
 
 int luat_mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl);
 int luat_mqtt_ping(luat_mqtt_ctrl_t *mqtt_ctrl);
-
+int luat_mqtt_set_will(luat_mqtt_ctrl_t *mqtt_ctrl, const char* topic, 
+						const char* payload, size_t payload_len, 
+						uint8_t qos, size_t retain);
 #endif

+ 12 - 0
components/network/libemqtt/luat_mqtt_client.c

@@ -114,6 +114,11 @@ void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
 		luat_release_rtos_timer(mqtt_ctrl->reconnect_timer);
     	mqtt_ctrl->reconnect_timer = NULL;
 	}
+	if (mqtt_ctrl->broker.will_data) {
+		mqtt_ctrl->broker.will_len = 0;
+		luat_heap_free(mqtt_ctrl->broker.will_data);
+		mqtt_ctrl->broker.will_data = NULL;
+	}
 	if (mqtt_ctrl->netc){
 		network_release_ctrl(mqtt_ctrl->netc);
     	mqtt_ctrl->netc = NULL;
@@ -387,3 +392,10 @@ int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
     return 0;
 }
 
+int luat_mqtt_set_will(luat_mqtt_ctrl_t *mqtt_ctrl, const char* topic, 
+						const char* payload, size_t payload_len, 
+						uint8_t qos, size_t retain) {
+	if (mqtt_ctrl == NULL || mqtt_ctrl->netc == NULL)
+		return -1;
+	return mqtt_set_will(&mqtt_ctrl->broker, topic, payload, payload_len, qos, retain);
+}