Эх сурвалжийг харах

update: 将mqtt库解耦成2个文件

Wendal Chen 3 жил өмнө
parent
commit
21503fc5c7

+ 36 - 439
components/network/libemqtt/luat_lib_mqtt.c

@@ -13,53 +13,13 @@
 #include "luat_rtos.h"
 #include "luat_zbuff.h"
 #include "luat_malloc.h"
+#include "luat_mqtt.h"
 
 #define LUAT_LOG_TAG "mqtt"
 #include "luat_log.h"
 
 #define LUAT_MQTT_CTRL_TYPE "MQTTCTRL*"
 
-#define MQTT_MSG_RELEASE 0
-#define MQTT_MSG_TIMER_PING 2
-
-#define MQTT_RECV_BUF_LEN_MAX 4096
-
-#define MQTT_DEBUG 1
-#if MQTT_DEBUG == 0
-#undef LLOGD
-#define LLOGD(...)
-#endif
-
-typedef struct{
-	mqtt_broker_handle_t broker;// mqtt broker
-	network_ctrl_t *netc;		// mqtt netc
-	luat_ip_addr_t ip_addr;		// mqtt ip
-	const char *host; 			// mqtt host
-	uint16_t buffer_offset; 	// 用于标识mqtt_packet_buffer当前有多少数据
-	uint8_t mqtt_packet_buffer[MQTT_RECV_BUF_LEN_MAX + 4];
-	int mqtt_cb;				// mqtt lua回调函数
-	uint16_t remote_port; 		// 远程端口号
-	uint32_t keepalive;   		// 心跳时长 单位s
-	uint8_t adapter_index; 		// 适配器索引号, 似乎并没有什么用
-	uint8_t mqtt_state;    		// mqtt状态
-	uint8_t reconnect;    		// mqtt是否重连
-	uint32_t reconnect_time;    // mqtt重连时间 单位ms
-	void* reconnect_timer;		// mqtt重连定时器
-	void* ping_timer;			// mqtt_ping定时器
-	int mqtt_ref;				// 强制引用自身避免被GC
-}luat_mqtt_ctrl_t;
-
-typedef struct{
-	uint16_t topic_len;
-    uint16_t payload_len;
-	uint8_t data[];
-}luat_mqtt_msg_t;
-
-static int luat_socket_connect(luat_mqtt_ctrl_t *mqtt_ctrl, const char *hostname, uint16_t port, uint16_t keepalive);
-static void mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl);
-static int mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl);
-static int32_t l_mqtt_callback(lua_State *L, void* ptr);
-
 static luat_mqtt_ctrl_t * get_mqtt_ctrl(lua_State *L){
 	if (luaL_testudata(L, 1, LUAT_MQTT_CTRL_TYPE)){
 		return ((luat_mqtt_ctrl_t *)luaL_checkudata(L, 1, LUAT_MQTT_CTRL_TYPE));
@@ -68,172 +28,6 @@ static luat_mqtt_ctrl_t * get_mqtt_ctrl(lua_State *L){
 	}
 }
 
-static LUAT_RT_RET_TYPE mqtt_timer_callback(LUAT_RT_CB_PARAM){
-	luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
-	rtos_msg_t msg = {0};
-	msg.handler = l_mqtt_callback;
-	msg.ptr = mqtt_ctrl;
-	msg.arg1 = MQTT_MSG_TIMER_PING;
-	luat_msgbus_put(&msg, 0);
-
-}
-
-static void reconnect_timer_cb(LUAT_RT_CB_PARAM){
-	luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
-	int ret = network_wait_link_up(mqtt_ctrl->netc, 0);
-	if (ret == 0){
-		int ret = luat_socket_connect(mqtt_ctrl, mqtt_ctrl->host, mqtt_ctrl->remote_port, mqtt_ctrl->keepalive);
-		if(ret){
-			LLOGI("reconnect init socket ret=%d\n", ret);
-			mqtt_close_socket(mqtt_ctrl);
-		}
-	}
-}
-
-static void mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl){
-	if (mqtt_ctrl->reconnect){
-		LLOGI("reconnect after %sms", mqtt_ctrl->reconnect_time);
-		mqtt_ctrl->buffer_offset = 0;
-		mqtt_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, mqtt_ctrl, NULL);
-		luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
-	}
-}
-
-static void mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
-	LLOGI("mqtt closing socket");
-	if (mqtt_ctrl->netc){
-		network_force_close_socket(mqtt_ctrl->netc);
-	}
-	luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
-	mqtt_ctrl->mqtt_state = 0;
-	mqtt_reconnect(mqtt_ctrl);
-}
-
-static void mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
-	rtos_msg_t msg = {0};
-	msg.handler = l_mqtt_callback;
-	msg.ptr = mqtt_ctrl;
-	msg.arg1 = MQTT_MSG_RELEASE;
-	luat_msgbus_put(&msg, 0);
-	if (mqtt_ctrl->netc){
-		network_release_ctrl(mqtt_ctrl->netc);
-    	mqtt_ctrl->netc = NULL;
-	}
-	if (mqtt_ctrl->host){
-		luat_heap_free(mqtt_ctrl->host);
-	}
-}
-
-static int mqtt_parse(luat_mqtt_ctrl_t *mqtt_ctrl) {
-	LLOGD("mqtt_parse offset %d", mqtt_ctrl->buffer_offset);
-	if (mqtt_ctrl->buffer_offset < 2) {
-		LLOGD("wait more data");
-		return 0;
-	}
-	//mqtt_ctrl->mqtt_packet_buffer[mqtt_ctrl->buffer_offset] = 0x00;
-	// 判断数据长度, 前几个字节能判断出够不够读出mqtt的头
-	char* buf = mqtt_ctrl->mqtt_packet_buffer;
-	int num_bytes = 1;
-	if ((buf[1] & 0x80) == 0x80) {
-		num_bytes++;
-		if (mqtt_ctrl->buffer_offset < 3) {
-			LLOGD("wait more data for mqtt head");
-			return 0;
-		}
-		if ((buf[2] & 0x80) == 0x80) {
-			num_bytes ++;
-			if (mqtt_ctrl->buffer_offset < 4) {
-				LLOGD("wait more data for mqtt head");
-				return 0;
-			}
-			if ((buf[3] & 0x80) == 0x80) {
-				num_bytes ++;
-			}
-		}
-	}
-	// if (num_bytes > mqtt_ctrl->buffer_offset - 1) {
-	// 	LLOGD("wait more data for mqtt head");
-	// 	return 0;
-	// }
-	// 判断数据总长, 这里rem_len只包含mqtt头部之外的数据
-	uint16_t rem_len = mqtt_parse_rem_len(mqtt_ctrl->mqtt_packet_buffer);
-	if (rem_len > mqtt_ctrl->buffer_offset - num_bytes - 1) {
-		LLOGD("wait more data for mqtt head");
-		return 0;
-	}
-	// 至此, mqtt包是完整的 解析类型, 处理之
-	int ret = mqtt_msg_cb(mqtt_ctrl);
-	if (ret != 0){
-		LLOGW("bad mqtt packet!! ret %d", ret);
-		return -1;
-	}
-	// 处理完成后, 如果还有数据, 移动数据, 继续处理
-	mqtt_ctrl->buffer_offset -= (1 + num_bytes + rem_len);
-	memmove(mqtt_ctrl->mqtt_packet_buffer, mqtt_ctrl->mqtt_packet_buffer+1 + num_bytes + rem_len, mqtt_ctrl->buffer_offset);
-	return 1;
-}
-
-static int mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl){
-	// LLOGD("mqtt_read_packet mqtt_ctrl->buffer_offset:%d",mqtt_ctrl->buffer_offset);
-	int ret = -1;
-	uint8_t *read_buff = NULL;
-	uint32_t total_len = 0;
-	uint32_t rx_len = 0;
-	int result = network_rx(mqtt_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
-	if (total_len > 0xFFF) {
-		LLOGE("too many data wait for recv %d", total_len);
-		mqtt_close_socket(mqtt_ctrl);
-		return -1;
-	}
-	if (total_len == 0) {
-		LLOGW("rx event but NO data wait for recv");
-		return 0;
-	}
-	if (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset <= 0) {
-		LLOGE("buff is FULL, mqtt packet too big");
-		mqtt_close_socket(mqtt_ctrl);
-		return -1;
-	}
-	#define MAX_READ (1024)
-	int recv_want = 0;
-
-	while (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset > 0) {
-		if (MAX_READ > (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset)) {
-			recv_want = MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset;
-		}
-		else {
-			recv_want = MAX_READ;
-		}
-		// 从网络接收数据
-		result = network_rx(mqtt_ctrl->netc, mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
-		if (rx_len == 0 || result != 0 ) {
-			LLOGD("rx_len %d result %d", rx_len, result);
-			break;
-		}
-		// 收到数据了, 传给处理函数继续处理
-		// 数据的长度变更, 触发传递
-		mqtt_ctrl->buffer_offset += rx_len;
-		LLOGD("data recv %d offset %d", rx_len, mqtt_ctrl->buffer_offset);
-further:
-		result = mqtt_parse(mqtt_ctrl);
-		if (result == 0) {
-			// OK
-		}else if(result == 1){
-			if (mqtt_ctrl->buffer_offset > 0)
-				goto further;
-			else {
-				continue;
-			}
-		}
-		else {
-			LLOGW("mqtt_parse ret %d, closing socket");
-			mqtt_close_socket(mqtt_ctrl);
-			break;
-		}
-	}
-	return 0;
-}
-
 static int32_t l_mqtt_callback(lua_State *L, void* ptr){
     rtos_msg_t* msg = (rtos_msg_t*)lua_topointer(L, -1);
     luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)msg->ptr;
@@ -324,197 +118,19 @@ static int32_t l_mqtt_callback(lua_State *L, void* ptr){
             break;
         }
     }
-    lua_pushinteger(L, 0);
-    return 1;
-}
-
-static int mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
-	rtos_msg_t msg = {0};
-    msg.handler = l_mqtt_callback;
-    uint8_t msg_tp = MQTTParseMessageType(mqtt_ctrl->mqtt_packet_buffer);
-	uint16_t msg_id = 0;
-	uint8_t qos = 0;
-    switch (msg_tp) {
-		// case MQTT_MSG_CONNECT : {
-		// 	// LLOGD("MQTT_MSG_CONNECT");
-		// 	break;
-		// }
-		case MQTT_MSG_CONNACK: {
-			// LLOGD("MQTT_MSG_CONNACK");
-			if(mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
-				LLOGW("CONACK 0x%02x",mqtt_ctrl->mqtt_packet_buffer[3]);
-                mqtt_close_socket(mqtt_ctrl);
-                return -1;
-            }
-			mqtt_ctrl->mqtt_state = 1;
-			msg.ptr = mqtt_ctrl;
-			msg.arg1 = MQTT_MSG_CONNACK;
-			luat_msgbus_put(&msg, 0);
-            break;
-        }
-        case MQTT_MSG_PUBLISH : {
-			// LLOGD("MQTT_MSG_PUBLISH");
-			const uint8_t* ptr;
-			qos = MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer);
-			uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
-			uint16_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
-			luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
-			mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
-            mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
-			msg.ptr = mqtt_ctrl;
-			msg.arg1 = MQTT_MSG_PUBLISH;
-			msg.arg2 = mqtt_msg;
-			luat_msgbus_put(&msg, 0);
-			// 还是回复puback
-			if (qos == 1) {
-				msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
-				mqtt_puback(&(mqtt_ctrl->broker), msg_id);
-			}
-            break;
-        }
-        case MQTT_MSG_PUBACK : {
-			// LLOGD("MQTT_MSG_PUBACK");
-			msg.ptr = mqtt_ctrl;
-			msg.arg1 = MQTT_MSG_PUBACK;
-			msg.arg2 = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
-			luat_msgbus_put(&msg, 0);
-			break;
-		}
-		case MQTT_MSG_PUBREC : {
-			msg_id=mqtt_parse_msg_id(&(mqtt_ctrl->broker));
-			mqtt_pubrel(&(mqtt_ctrl->broker), msg_id);
-			// LLOGD("MQTT_MSG_PUBREC");
-			break;
-		}
-		case MQTT_MSG_PUBCOMP : {
-			// LLOGD("MQTT_MSG_PUBCOMP");
-			msg.ptr = mqtt_ctrl;
-			msg.arg1 = MQTT_MSG_PUBCOMP;
-			msg.arg2 = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
-			luat_msgbus_put(&msg, 0);
-			break;
-		}
-		// case MQTT_MSG_SUBSCRIBE : {
-		// 	// LLOGD("MQTT_MSG_SUBSCRIBE");
-        //     break;
-        // }
-        case MQTT_MSG_SUBACK : {
-			LLOGD("MQTT_MSG_SUBACK");
-            break;
-        }
-		// case MQTT_MSG_UNSUBSCRIBE : {
-		// 	// LLOGD("MQTT_MSG_UNSUBSCRIBE");
-        //     break;
-        // }
-		case MQTT_MSG_UNSUBACK : {
-			LLOGD("MQTT_MSG_UNSUBACK");
-            break;
-        }
-		// case MQTT_MSG_PINGREQ : {
-		// 	// LLOGD("MQTT_MSG_PINGREQ");
-        //     break;
-        // }
-        case MQTT_MSG_PINGRESP : {
-			LLOGD("MQTT_MSG_PINGRESP");
-            break;
-        }
-		case MQTT_MSG_DISCONNECT : {
-			// LLOGD("MQTT_MSG_DISCONNECT");
-            break;
-        }
-        default : {
-			LLOGD("mqtt_msg_cb error msg_tp:%d",msg_tp);
-            break;
-        }
-    }
+    // lua_pushinteger(L, 0);
     return 0;
 }
 
-static const char* event2str(uint32_t id) {
-	switch (id)
-	{
-	case EV_NW_RESULT_LINK:
-		return "EV_NW_RESULT_LINK";
-	case EV_NW_RESULT_CONNECT:
-		return "EV_NW_RESULT_CONNECT";
-	case EV_NW_RESULT_EVENT:
-		return "EV_NW_RESULT_EVENT";
-	case EV_NW_RESULT_TX:
-		return "EV_NW_RESULT_TX";
-	case EV_NW_RESULT_CLOSE:
-		return "EV_NW_RESULT_CLOSE";
-	default:
-		return "UNKOWN";
-	}
-}
-
-static int32_t luat_lib_mqtt_callback(void *data, void *param){
-	OS_EVENT *event = (OS_EVENT *)data;
-	luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)param;
-	int ret = 0;
-	// LLOGD("LINK %d ON_LINE %d EVENT %d TX_OK %d CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
-	LLOGD("network mqtt cb %8X %s %8X",event->ID & 0x0ffffffff, event2str(event->ID & 0x0ffffffff) ,event->Param1);
-	if (event->ID == EV_NW_RESULT_LINK){
-		// int ret = luat_socket_connect(mqtt_ctrl, mqtt_ctrl->host, mqtt_ctrl->remote_port, mqtt_ctrl->keepalive);
-		// if(ret){
-		// 	LLOGE("socket connect ret=%d\n", ret);
-		// 	mqtt_close_socket(mqtt_ctrl);
-		// }
-		return 0; // 这里应该直接返回, 不能往下调用network_wait_event
-	}else if(event->ID == EV_NW_RESULT_CONNECT){
-		ret = mqtt_connect(&(mqtt_ctrl->broker));
-		if(ret==1){
-			luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
-		}
-	}else if(event->ID == EV_NW_RESULT_EVENT){
-		if (event->Param1==0){
-			ret = mqtt_read_packet(mqtt_ctrl);
-			// LLOGD("mqtt_read_packet ret:%d",ret);
-			luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
-			luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
-		}
-	}else if(event->ID == EV_NW_RESULT_TX){
-		luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
-		luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
-	}else if(event->ID == EV_NW_RESULT_CLOSE){
-
-	}
-	if (event->Param1){
-		LLOGW("mqtt_callback param1 %d, closing socket", event->Param1);
-		mqtt_close_socket(mqtt_ctrl);
-	}
-	ret = network_wait_event(mqtt_ctrl->netc, NULL, 0, NULL);
-	if (ret < 0){
-		LLOGW("network_wait_event ret %d, closing socket", ret);
-		mqtt_close_socket(mqtt_ctrl);
-		return -1;
-	}
-    return 0;
-}
-
-static int mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
-    luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
-	uint32_t tx_len = 0;
-	int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
-	if (ret < 0)
-		return 0;
-	return count;
-}
-
-static int luat_socket_connect(luat_mqtt_ctrl_t *mqtt_ctrl, const char *hostname, uint16_t port, uint16_t keepalive){
-	int ret = 0;
-    mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
-#ifdef LUAT_USE_LWIP
-	ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.type)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
-#else
-	ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.is_ipv6)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
-#endif
-	LLOGD("network_connect ret %d", ret);
-	if (ret < 0) {
-        network_close(mqtt_ctrl->netc, 0);
-        return -1;
-    }
-    return 0;
+int l_luat_mqtt_msg_cb(luat_mqtt_ctrl_t * ptr, int arg1, int arg2) {
+	rtos_msg_t msg = {
+		.handler = l_mqtt_callback,
+		.ptr = ptr,
+		.arg1 = arg1,
+		.arg2 = arg2
+	};
+	luat_msgbus_put(&msg, 0);
+	return 0;
 }
 
 /*
@@ -623,33 +239,26 @@ mqttc = mqtt.create(nil,"120.55.137.106", 8883, true)
 mqttc = mqtt.create(nil,"120.55.137.106", 8883, true, io.readFile("/luadb/ca.crt"), "123", "456")
 */
 static int l_mqtt_create(lua_State *L) {
-	size_t client_cert_len, client_key_len, client_password_len;
-	const char *client_cert = NULL;
-	const char *client_key = NULL;
-	const char *client_password = NULL;
+	int ret = 0;
 	int adapter_index = luaL_optinteger(L, 1, network_get_last_register_adapter());
 	if (adapter_index < 0 || adapter_index >= NW_ADAPTER_QTY){
 		return 0;
 	}
 	luat_mqtt_ctrl_t *mqtt_ctrl = (luat_mqtt_ctrl_t *)lua_newuserdata(L, sizeof(luat_mqtt_ctrl_t));
 	if (!mqtt_ctrl){
+		LLOGE("out of memory when malloc mqtt_ctrl");
 		return 0;
 	}
-	memset(mqtt_ctrl, 0, sizeof(luat_mqtt_ctrl_t));
-	mqtt_ctrl->adapter_index = adapter_index;
-	mqtt_ctrl->netc = network_alloc_ctrl(adapter_index);
-	if (!mqtt_ctrl->netc){
-		LLOGD("create fail");
+
+	ret = luat_mqtt_init(mqtt_ctrl, adapter_index);
+	if (ret) {
+		LLOGE("mqtt init FAID ret %d", ret);
 		return 0;
 	}
-	network_init_ctrl(mqtt_ctrl->netc, NULL, luat_lib_mqtt_callback, mqtt_ctrl);
 
-	mqtt_ctrl->mqtt_state = 0;
-	mqtt_ctrl->netc->is_debug = 0;
-	mqtt_ctrl->keepalive = 240;
-	network_set_base_mode(mqtt_ctrl->netc, 1, 10000, 0, 0, 0, 0);
-	network_set_local_port(mqtt_ctrl->netc, 0);
+	luat_mqtt_connopts_t opts = {0};
 
+	// 连接参数相关
 	const char *ip;
 	size_t ip_len = 0;
 #ifdef LUAT_USE_LWIP
@@ -669,41 +278,29 @@ static int l_mqtt_create(lua_State *L) {
 		ip_len = 0;
 	}else{
 		ip_len = 0;
-		ip = luaL_checklstring(L, 2, &ip_len);
+		opts.host = luaL_checklstring(L, 2, &ip_len);
+		// TODO 判断 host的长度,超过191就不行了
 	}
-	mqtt_ctrl->host = luat_heap_malloc(ip_len + 1);
-	memset(mqtt_ctrl->host, 0, ip_len + 1);
-	memcpy(mqtt_ctrl->host, ip, ip_len);
-	mqtt_ctrl->remote_port = luaL_checkinteger(L, 3);
-	mqtt_ctrl->ping_timer = luat_create_rtos_timer(mqtt_timer_callback, mqtt_ctrl, NULL);
-	
-	uint8_t is_tls = 0;
+
+	opts.port = luaL_checkinteger(L, 3);
+
+	// 加密相关
 	if (lua_isboolean(L, 4)){
-		is_tls = lua_toboolean(L, 4);
+		opts.is_tls = lua_toboolean(L, 4);
 	}
 	if (lua_isstring(L, 5)){
-		client_cert = luaL_checklstring(L, 5, &client_cert_len);
+		opts.client_cert = luaL_checklstring(L, 5, &opts.client_cert_len);
 	}
 	if (lua_isstring(L, 6)){
-		client_key = luaL_checklstring(L, 6, &client_key_len);
+		opts.client_key = luaL_checklstring(L, 6, &opts.client_key_len);
 	}
 	if (lua_isstring(L, 7)){
-		client_password = luaL_checklstring(L, 7, &client_password_len);
-	}
-	if (is_tls){
-		network_init_tls(mqtt_ctrl->netc, client_cert?2:0);
-		if (client_cert){
-			network_set_client_cert(mqtt_ctrl->netc, client_cert, client_cert_len,
-					client_key, client_key_len,
-					client_password, client_password_len);
-		}
-	}else{
-		network_deinit_tls(mqtt_ctrl->netc);
+		opts.client_password = luaL_checklstring(L, 7, &opts.client_password_len);
 	}
 
-	
-    mqtt_ctrl->broker.socket_info = mqtt_ctrl;
-    mqtt_ctrl->broker.send = mqtt_send_packet;
+	ret = luat_mqtt_set_connopts(mqtt_ctrl, &opts);
+
+	// TODO 判断ret, 如果初始化失败, 应该终止
 
 	luaL_setmetatable(L, LUAT_MQTT_CTRL_TYPE);
 	lua_pushvalue(L, -1);
@@ -785,10 +382,10 @@ mqttc:connect()
 */
 static int l_mqtt_connect(lua_State *L) {
 	luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
-	int ret = ret = luat_socket_connect(mqtt_ctrl, mqtt_ctrl->host, mqtt_ctrl->remote_port, mqtt_ctrl->keepalive);
+	int ret = ret = luat_mqtt_connect(mqtt_ctrl);
 	if (ret) {
 		LLOGE("socket connect ret=%d\n", ret);
-		mqtt_close_socket(mqtt_ctrl);
+		luat_mqtt_close_socket(mqtt_ctrl);
 		lua_pushboolean(L, 0);
 		return 1;
 	}
@@ -867,12 +464,12 @@ mqttc:close()
 static int l_mqtt_close(lua_State *L) {
 	luat_mqtt_ctrl_t * mqtt_ctrl = get_mqtt_ctrl(L);
 	mqtt_disconnect(&(mqtt_ctrl->broker));
-	mqtt_close_socket(mqtt_ctrl);
+	luat_mqtt_close_socket(mqtt_ctrl);
 	if (mqtt_ctrl->mqtt_cb != 0) {
 		luaL_unref(L, LUA_REGISTRYINDEX, mqtt_ctrl->mqtt_cb);
 		mqtt_ctrl->mqtt_cb = 0;
 	}
-	mqtt_release_socket(mqtt_ctrl);
+	luat_mqtt_release_socket(mqtt_ctrl);
 	return 0;
 }
 

+ 63 - 0
components/network/libemqtt/luat_mqtt.h

@@ -0,0 +1,63 @@
+#ifndef LUAT_MQTT_H
+#define LUAT_MQTT_H
+
+#define MQTT_MSG_RELEASE 0
+#define MQTT_MSG_TIMER_PING 2
+
+#define MQTT_RECV_BUF_LEN_MAX 4096
+
+
+typedef struct{
+	mqtt_broker_handle_t broker;// mqtt broker
+	network_ctrl_t *netc;		// mqtt netc
+	luat_ip_addr_t ip_addr;		// mqtt ip
+	char host[192]; 			// mqtt host
+	uint16_t buffer_offset; 	// 用于标识mqtt_packet_buffer当前有多少数据
+	uint8_t mqtt_packet_buffer[MQTT_RECV_BUF_LEN_MAX + 4];
+	int mqtt_cb;				// mqtt lua回调函数
+	uint16_t remote_port; 		// 远程端口号
+	uint32_t keepalive;   		// 心跳时长 单位s
+	uint8_t adapter_index; 		// 适配器索引号, 似乎并没有什么用
+	uint8_t mqtt_state;    		// mqtt状态
+	uint8_t reconnect;    		// mqtt是否重连
+	uint32_t reconnect_time;    // mqtt重连时间 单位ms
+	void* reconnect_timer;		// mqtt重连定时器
+	void* ping_timer;			// mqtt_ping定时器
+	int mqtt_ref;				// 强制引用自身避免被GC
+}luat_mqtt_ctrl_t;
+
+typedef struct{
+	uint16_t topic_len;
+    uint16_t payload_len;
+	uint8_t data[];
+}luat_mqtt_msg_t;
+
+typedef struct luat_mqtt_connopts
+{
+    const char* host;
+    uint16_t port;
+    uint16_t is_tls;
+    const char* client_cert;
+    size_t client_cert_len;
+    const char* client_key;
+    size_t client_key_len;
+    const char* client_password;
+    size_t client_password_len;
+}luat_mqtt_connopts_t;
+
+
+int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl);
+static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl);
+// int l_mqtt_callback(lua_State *L, void* ptr);
+int l_luat_mqtt_msg_cb(luat_mqtt_ctrl_t * ctrl, int arg1, int arg2);
+int32_t luat_mqtt_callback(void *data, void *param);
+LUAT_RT_RET_TYPE luat_mqtt_timer_callback(LUAT_RT_CB_PARAM);
+// int luat_mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl);
+int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count);
+void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl);
+void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl);
+
+int luat_mqtt_init(luat_mqtt_ctrl_t *mqtt_ctrl, int adapter_index);
+int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts);
+
+#endif

+ 368 - 0
components/network/libemqtt/luat_mqtt_client.c

@@ -0,0 +1,368 @@
+#include "luat_base.h"
+
+#include "luat_network_adapter.h"
+#include "libemqtt.h"
+#include "luat_rtos.h"
+#include "luat_zbuff.h"
+#include "luat_malloc.h"
+#include "luat_mqtt.h"
+
+#define LUAT_LOG_TAG "mqtt"
+#include "luat_log.h"
+
+
+#define MQTT_DEBUG 0
+#if MQTT_DEBUG == 0
+#undef LLOGD
+#define LLOGD(...)
+#endif
+
+
+LUAT_RT_RET_TYPE luat_mqtt_timer_callback(LUAT_RT_CB_PARAM){
+	luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
+    l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TIMER_PING, 0);
+}
+
+static void reconnect_timer_cb(LUAT_RT_CB_PARAM){
+	luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
+	int ret = network_wait_link_up(mqtt_ctrl->netc, 0);
+	if (ret == 0){
+		int ret = luat_mqtt_connect(mqtt_ctrl);
+		if(ret){
+			LLOGI("reconnect init socket ret=%d\n", ret);
+			luat_mqtt_close_socket(mqtt_ctrl);
+		}
+	}
+}
+
+int luat_mqtt_init(luat_mqtt_ctrl_t *mqtt_ctrl, int adapter_index) {
+	memset(mqtt_ctrl, 0, sizeof(luat_mqtt_ctrl_t));
+	mqtt_ctrl->adapter_index = adapter_index;
+	mqtt_ctrl->netc = network_alloc_ctrl(adapter_index);
+	if (!mqtt_ctrl->netc){
+		LLOGW("network_alloc_ctrl fail");
+		return -1;
+	}
+	network_init_ctrl(mqtt_ctrl->netc, NULL, luat_mqtt_callback, mqtt_ctrl);
+
+	mqtt_ctrl->mqtt_state = 0;
+	mqtt_ctrl->netc->is_debug = 0;
+	mqtt_ctrl->keepalive = 240;
+	network_set_base_mode(mqtt_ctrl->netc, 1, 10000, 0, 0, 0, 0);
+	network_set_local_port(mqtt_ctrl->netc, 0);
+	mqtt_ctrl->ping_timer = luat_create_rtos_timer(luat_mqtt_timer_callback, mqtt_ctrl, NULL);
+    return 0;
+}
+
+int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts) {
+    memcpy(mqtt_ctrl->host, opts->host, strlen(opts->host) + 1);
+    mqtt_ctrl->remote_port = opts->port;
+	if (opts->is_tls){
+		network_init_tls(mqtt_ctrl->netc, opts->client_cert?2:0);
+		if (opts->client_cert){
+			network_set_client_cert(mqtt_ctrl->netc, opts->client_cert, opts->client_cert_len,
+					opts->client_key, opts->client_key_len,
+					opts->client_password, opts->client_password_len);
+		}
+	} else {
+		network_deinit_tls(mqtt_ctrl->netc);
+	}
+
+    mqtt_ctrl->broker.socket_info = mqtt_ctrl;
+    mqtt_ctrl->broker.send = luat_mqtt_send_packet;
+    return 0;
+}
+
+static void mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl){
+	if (mqtt_ctrl->reconnect){
+		LLOGI("reconnect after %dms", mqtt_ctrl->reconnect_time);
+		mqtt_ctrl->buffer_offset = 0;
+		mqtt_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, mqtt_ctrl, NULL);
+		luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
+	}
+}
+
+void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
+	LLOGI("mqtt closing socket");
+	if (mqtt_ctrl->netc){
+		network_force_close_socket(mqtt_ctrl->netc);
+	}
+	luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
+	mqtt_ctrl->mqtt_state = 0;
+	mqtt_reconnect(mqtt_ctrl);
+}
+
+void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
+    l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RELEASE, 0);
+	if (mqtt_ctrl->netc){
+		network_release_ctrl(mqtt_ctrl->netc);
+    	mqtt_ctrl->netc = NULL;
+	}
+}
+
+static int mqtt_parse(luat_mqtt_ctrl_t *mqtt_ctrl) {
+	LLOGD("mqtt_parse offset %d", mqtt_ctrl->buffer_offset);
+	if (mqtt_ctrl->buffer_offset < 2) {
+		LLOGD("wait more data");
+		return 0;
+	}
+	// 判断数据长度, 前几个字节能判断出够不够读出mqtt的头
+	char* buf = mqtt_ctrl->mqtt_packet_buffer;
+	int num_bytes = 1;
+	if ((buf[1] & 0x80) == 0x80) {
+		num_bytes++;
+		if (mqtt_ctrl->buffer_offset < 3) {
+			LLOGD("wait more data for mqtt head");
+			return 0;
+		}
+		if ((buf[2] & 0x80) == 0x80) {
+			num_bytes ++;
+			if (mqtt_ctrl->buffer_offset < 4) {
+				LLOGD("wait more data for mqtt head");
+				return 0;
+			}
+			if ((buf[3] & 0x80) == 0x80) {
+				num_bytes ++;
+			}
+		}
+	}
+	// 判断数据总长, 这里rem_len只包含mqtt头部之外的数据
+	uint16_t rem_len = mqtt_parse_rem_len(mqtt_ctrl->mqtt_packet_buffer);
+	if (rem_len > mqtt_ctrl->buffer_offset - num_bytes - 1) {
+		LLOGD("wait more data for mqtt head");
+		return 0;
+	}
+	// 至此, mqtt包是完整的 解析类型, 处理之
+	int ret = luat_mqtt_msg_cb(mqtt_ctrl);
+	if (ret != 0){
+		LLOGW("bad mqtt packet!! ret %d", ret);
+		return -1;
+	}
+	// 处理完成后, 如果还有数据, 移动数据, 继续处理
+	mqtt_ctrl->buffer_offset -= (1 + num_bytes + rem_len);
+	memmove(mqtt_ctrl->mqtt_packet_buffer, mqtt_ctrl->mqtt_packet_buffer+1 + num_bytes + rem_len, mqtt_ctrl->buffer_offset);
+	return 1;
+}
+
+int luat_mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl){
+	// LLOGD("luat_mqtt_read_packet mqtt_ctrl->buffer_offset:%d",mqtt_ctrl->buffer_offset);
+	int ret = -1;
+	uint8_t *read_buff = NULL;
+	uint32_t total_len = 0;
+	uint32_t rx_len = 0;
+	int result = network_rx(mqtt_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
+	if (total_len > 0xFFF) {
+		LLOGE("too many data wait for recv %d", total_len);
+		luat_mqtt_close_socket(mqtt_ctrl);
+		return -1;
+	}
+	if (total_len == 0) {
+		LLOGW("rx event but NO data wait for recv");
+		return 0;
+	}
+	if (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset <= 0) {
+		LLOGE("buff is FULL, mqtt packet too big");
+		luat_mqtt_close_socket(mqtt_ctrl);
+		return -1;
+	}
+	#define MAX_READ (1024)
+	int recv_want = 0;
+
+	while (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset > 0) {
+		if (MAX_READ > (MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset)) {
+			recv_want = MQTT_RECV_BUF_LEN_MAX - mqtt_ctrl->buffer_offset;
+		}
+		else {
+			recv_want = MAX_READ;
+		}
+		// 从网络接收数据
+		result = network_rx(mqtt_ctrl->netc, mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
+		if (rx_len == 0 || result != 0 ) {
+			LLOGD("rx_len %d result %d", rx_len, result);
+			break;
+		}
+		// 收到数据了, 传给处理函数继续处理
+		// 数据的长度变更, 触发传递
+		mqtt_ctrl->buffer_offset += rx_len;
+		LLOGD("data recv %d offset %d", rx_len, mqtt_ctrl->buffer_offset);
+further:
+		result = mqtt_parse(mqtt_ctrl);
+		if (result == 0) {
+			// OK
+		}else if(result == 1){
+			if (mqtt_ctrl->buffer_offset > 0)
+				goto further;
+			else {
+				continue;
+			}
+		}
+		else {
+			LLOGW("mqtt_parse ret %d, closing socket");
+			luat_mqtt_close_socket(mqtt_ctrl);
+			break;
+		}
+	}
+	return 0;
+}
+
+
+static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
+	rtos_msg_t msg = {0};
+    // msg.handler = l_mqtt_callback;
+    uint8_t msg_tp = MQTTParseMessageType(mqtt_ctrl->mqtt_packet_buffer);
+	uint16_t msg_id = 0;
+	uint8_t qos = 0;
+    switch (msg_tp) {
+		case MQTT_MSG_CONNACK: {
+			// LLOGD("MQTT_MSG_CONNACK");
+			if(mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
+				LLOGW("CONACK 0x%02x",mqtt_ctrl->mqtt_packet_buffer[3]);
+                luat_mqtt_close_socket(mqtt_ctrl);
+                return -1;
+            }
+			mqtt_ctrl->mqtt_state = 1;
+            l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CONNACK, 0);
+            break;
+        }
+        case MQTT_MSG_PUBLISH : {
+			// LLOGD("MQTT_MSG_PUBLISH");
+			const uint8_t* ptr;
+			qos = MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer);
+			uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
+			uint16_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
+			luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
+			mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
+            mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
+			l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, mqtt_msg);
+			// 还要回复puback
+			if (qos == 1) {
+				msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
+				mqtt_puback(&(mqtt_ctrl->broker), msg_id);
+			}
+            break;
+        }
+        case MQTT_MSG_PUBACK : {
+			// LLOGD("MQTT_MSG_PUBACK");
+            l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBACK, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
+			break;
+		}
+		case MQTT_MSG_PUBREC : {
+			msg_id = mqtt_parse_msg_id(&(mqtt_ctrl->broker));
+			mqtt_pubrel(&(mqtt_ctrl->broker), msg_id);
+			// LLOGD("MQTT_MSG_PUBREC");
+			break;
+		}
+		case MQTT_MSG_PUBCOMP : {
+			// LLOGD("MQTT_MSG_PUBCOMP");
+            l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBCOMP, mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer));
+			break;
+		}
+        case MQTT_MSG_SUBACK : {
+			LLOGD("MQTT_MSG_SUBACK");
+            break;
+        }
+		case MQTT_MSG_UNSUBACK : {
+			LLOGD("MQTT_MSG_UNSUBACK");
+            break;
+        }
+        case MQTT_MSG_PINGRESP : {
+			LLOGD("MQTT_MSG_PINGRESP");
+            break;
+        }
+		case MQTT_MSG_DISCONNECT : {
+			// LLOGD("MQTT_MSG_DISCONNECT");
+            break;
+        }
+        default : {
+			LLOGD("luat_mqtt_msg_cb error msg_tp:%d",msg_tp);
+            break;
+        }
+    }
+    return 0;
+}
+
+static const char* event2str(uint32_t id) {
+	switch (id)
+	{
+	case EV_NW_RESULT_LINK:
+		return "EV_NW_RESULT_LINK";
+	case EV_NW_RESULT_CONNECT:
+		return "EV_NW_RESULT_CONNECT";
+	case EV_NW_RESULT_EVENT:
+		return "EV_NW_RESULT_EVENT";
+	case EV_NW_RESULT_TX:
+		return "EV_NW_RESULT_TX";
+	case EV_NW_RESULT_CLOSE:
+		return "EV_NW_RESULT_CLOSE";
+	default:
+		return "UNKOWN";
+	}
+}
+
+int32_t luat_mqtt_callback(void *data, void *param) {
+	OS_EVENT *event = (OS_EVENT *)data;
+	luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)param;
+	int ret = 0;
+	// LLOGD("LINK %d ON_LINE %d EVENT %d TX_OK %d CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
+	LLOGD("network mqtt cb %8X %s %8X",event->ID & 0x0ffffffff, event2str(event->ID & 0x0ffffffff) ,event->Param1);
+	if (event->ID == EV_NW_RESULT_LINK){
+		return 0; // 这里应该直接返回, 不能往下调用network_wait_event
+	}else if(event->ID == EV_NW_RESULT_CONNECT){
+		ret = mqtt_connect(&(mqtt_ctrl->broker));
+		if(ret==1){
+			luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
+		}
+	}else if(event->ID == EV_NW_RESULT_EVENT){
+		if (event->Param1==0){
+			ret = luat_mqtt_read_packet(mqtt_ctrl);
+			// LLOGD("luat_mqtt_read_packet ret:%d",ret);
+			luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
+			luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
+		}
+	}else if(event->ID == EV_NW_RESULT_TX){
+		luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
+		luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
+	}else if(event->ID == EV_NW_RESULT_CLOSE){
+
+	}
+	if (event->Param1){
+		LLOGW("mqtt_callback param1 %d, closing socket", event->Param1);
+		luat_mqtt_close_socket(mqtt_ctrl);
+	}
+	ret = network_wait_event(mqtt_ctrl->netc, NULL, 0, NULL);
+	if (ret < 0){
+		LLOGW("network_wait_event ret %d, closing socket", ret);
+		luat_mqtt_close_socket(mqtt_ctrl);
+		return -1;
+	}
+    return 0;
+}
+
+int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
+    luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
+	uint32_t tx_len = 0;
+	int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
+	if (ret < 0)
+		return 0;
+	return count;
+}
+
+int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
+	int ret = 0;
+    const char *hostname = mqtt_ctrl->host;
+    uint16_t port = mqtt_ctrl->remote_port;
+    uint16_t keepalive = mqtt_ctrl->keepalive;
+    LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
+    mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
+#ifdef LUAT_USE_LWIP
+	ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.type)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
+#else
+	ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), (0xff == mqtt_ctrl->ip_addr.is_ipv6)?NULL:&(mqtt_ctrl->ip_addr), port, 0) < 0;
+#endif
+	LLOGD("network_connect ret %d", ret);
+	if (ret < 0) {
+        network_close(mqtt_ctrl->netc, 0);
+        return -1;
+    }
+    return 0;
+}