Просмотр исходного кода

add: mqtt库支持qos2的消息下发

Wendal Chen 2 лет назад
Родитель
Сommit
20270b4994

+ 34 - 0
components/network/libemqtt/libemqtt.c

@@ -446,6 +446,40 @@ int mqtt_puback(mqtt_broker_handle_t* broker, uint16_t message_id) {
 	return 1;
 }
 
+int mqtt_pubrec(mqtt_broker_handle_t* broker, uint16_t message_id) {
+	uint8_t packet[] = {
+		MQTT_MSG_PUBREC | MQTT_QOS0_FLAG, // Message Type, DUP flag, QoS level, Retain
+		0x02, // Remaining length
+		message_id>>8,
+		message_id&0xFF
+	};
+
+	// Send the packet
+	if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
+		return -1;
+	}
+
+	return 1;
+}
+
+
+
+int mqtt_pubcomp(mqtt_broker_handle_t* broker, uint16_t message_id) {
+	uint8_t packet[] = {
+		MQTT_MSG_PUBCOMP | MQTT_QOS0_FLAG, // Message Type, DUP flag, QoS level, Retain
+		0x02, // Remaining length
+		message_id>>8,
+		message_id&0xFF
+	};
+
+	// Send the packet
+	if(broker->send(broker->socket_info, packet, sizeof(packet)) < sizeof(packet)) {
+		return -1;
+	}
+
+	return 1;
+}
+
 int mqtt_subscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id, uint8_t qos) {
 	uint16_t topiclen = strlen(topic);
 	if (qos>2) qos=0;

+ 4 - 0
components/network/libemqtt/libemqtt.h

@@ -291,4 +291,8 @@ int mqtt_ping(mqtt_broker_handle_t* broker);
 
 int mqtt_puback(mqtt_broker_handle_t* broker, uint16_t message_id);
 
+int mqtt_pubrec(mqtt_broker_handle_t* broker, uint16_t message_id);
+
+int mqtt_pubcomp(mqtt_broker_handle_t* broker, uint16_t message_id);
+
 #endif // __LIBEMQTT_H__

+ 12 - 2
components/network/libemqtt/luat_mqtt_client.c

@@ -274,11 +274,15 @@ static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
 			mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
             mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
 			l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, mqtt_msg);
+			msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
+			LLOGD("msg %d qos %d", msg_id, qos);
 			// 还要回复puback
 			if (qos == 1) {
-				msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
 				mqtt_puback(&(mqtt_ctrl->broker), msg_id);
 			}
+			else if (qos == 2) {
+				mqtt_pubrec(&(mqtt_ctrl->broker), msg_id);
+			}
             break;
         }
         case MQTT_MSG_PUBACK : {
@@ -299,6 +303,12 @@ static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
             l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBCOMP, msg_id);
 			break;
 		}
+		case MQTT_MSG_PUBREL : {
+			msg_id = mqtt_parse_msg_id(&(mqtt_ctrl->mqtt_packet_buffer));
+			LLOGD("MQTT_MSG_PUBREL %d", msg_id);
+            mqtt_pubcomp(&(mqtt_ctrl->broker), msg_id);
+			break;
+		}
         case MQTT_MSG_SUBACK : {
 			LLOGD("MQTT_MSG_SUBACK");
             break;
@@ -312,7 +322,7 @@ static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
             break;
         }
 		case MQTT_MSG_DISCONNECT : {
-			// LLOGD("MQTT_MSG_DISCONNECT");
+			LLOGD("MQTT_MSG_DISCONNECT");
             break;
         }
         default : {