|
@@ -0,0 +1,162 @@
|
|
|
+package mqtt
|
|
|
+
|
|
|
+import (
|
|
|
+ mqtts "github.com/eclipse/paho.mqtt.golang"
|
|
|
+ "github.com/satori/go.uuid"
|
|
|
+ "github.com/sirupsen/logrus"
|
|
|
+ "strconv"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+var log = logrus.WithField("module", "mqtt")
|
|
|
+
|
|
|
+type MqttTopicStore struct {
|
|
|
+ qos byte
|
|
|
+ fun func(topic string, payload []byte)
|
|
|
+}
|
|
|
+type MqttClient struct {
|
|
|
+ Host string //mqtt服务器地址
|
|
|
+ Port int //mqtt 端口
|
|
|
+ Id string //clientId
|
|
|
+ Username string //mqtt 登录用户名
|
|
|
+ Password string //mqtt 登录密码
|
|
|
+ Timeout int //超时时间
|
|
|
+ Retry int //重试次数
|
|
|
+ topics map[string]MqttTopicStore //订阅的主题
|
|
|
+ client mqtts.Client //客户端
|
|
|
+}
|
|
|
+
|
|
|
+func NewMqttClient(host string, port int, username, password string) *MqttClient {
|
|
|
+ return &MqttClient{
|
|
|
+ Host: host,
|
|
|
+ Port: port,
|
|
|
+ Username: username,
|
|
|
+ Password: password,
|
|
|
+ Timeout: 30,
|
|
|
+ topics: make(map[string]MqttTopicStore),
|
|
|
+ Retry: 3,
|
|
|
+ }
|
|
|
+}
|
|
|
+func NewMqttClientWithId(host string, port int, username, password, id string) *MqttClient {
|
|
|
+ return &MqttClient{
|
|
|
+ Host: host,
|
|
|
+ Port: port,
|
|
|
+ Username: username,
|
|
|
+ Password: password,
|
|
|
+ Id: id,
|
|
|
+ Timeout: 30,
|
|
|
+ topics: make(map[string]MqttTopicStore),
|
|
|
+ Retry: 3,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 创建全局mqtt publish消息处理 handler
|
|
|
+var messagePubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) {
|
|
|
+ log.Debug("Push Message:")
|
|
|
+ log.Debug("TOPIC: %s\n", msg.Topic())
|
|
|
+ log.Debug("MSG: %s\n", msg.Payload())
|
|
|
+}
|
|
|
+
|
|
|
+// 创建全局mqtt sub消息处理 handler
|
|
|
+var messageSubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) {
|
|
|
+ log.Debug("收到订阅消息:")
|
|
|
+ log.Debug("Sub Client Topic : %s \n", msg.Topic())
|
|
|
+ log.Debug("Sub Client msg : %s \n", msg.Payload())
|
|
|
+}
|
|
|
+
|
|
|
+// 连接的回掉函数
|
|
|
+func (mqtt *MqttClient) connectHandler(client mqtts.Client) {
|
|
|
+ log.Debug("新的连接!" + " Connected")
|
|
|
+ for topic, st := range mqtt.topics {
|
|
|
+ mqtt.Subscribe(topic, st.qos, st.fun)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 丢失连接的回掉函数
|
|
|
+var connectLostHandler mqtts.ConnectionLostHandler = func(client mqtts.Client, err error) {
|
|
|
+ log.Debug("Connect loss: %v\n", err)
|
|
|
+}
|
|
|
+
|
|
|
+func (mqtt *MqttClient) Connect() *MqttClient {
|
|
|
+ opts := mqtts.NewClientOptions().AddBroker("tcp://" + mqtt.Host + ":" + strconv.Itoa(mqtt.Port))
|
|
|
+ opts.SetKeepAlive(60 * time.Second)
|
|
|
+
|
|
|
+ id := uuid.NewV4()
|
|
|
+ // Message callback handler,在没有任何订阅时,发布端调用此函数
|
|
|
+ opts.SetDefaultPublishHandler(messagePubHandler)
|
|
|
+ opts.Username = mqtt.Username
|
|
|
+ opts.Password = mqtt.Password
|
|
|
+ opts.SetPingTimeout(60 * time.Second)
|
|
|
+ opts.OnConnect = mqtt.connectHandler
|
|
|
+ opts.OnConnectionLost = connectLostHandler
|
|
|
+ if len(mqtt.Id) == 0 {
|
|
|
+ opts.ClientID = id.String()
|
|
|
+ } else {
|
|
|
+ opts.ClientID = mqtt.Id
|
|
|
+ }
|
|
|
+ mqtt.client = mqtts.NewClient(opts)
|
|
|
+ if token := mqtt.client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
+ panic(token.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return mqtt
|
|
|
+}
|
|
|
+
|
|
|
+func (mqtt *MqttClient) Disconnect() {
|
|
|
+ mqtt.client.Disconnect(250)
|
|
|
+}
|
|
|
+
|
|
|
+func (mqtt *MqttClient) Publish(topic string, qos byte, payload interface{}) *MqttClient {
|
|
|
+ token := mqtt.client.Publish(topic, qos, false, payload)
|
|
|
+ token.Wait()
|
|
|
+ log.Debug("Push Data : "+topic, "Data Size is ", payload)
|
|
|
+
|
|
|
+ return mqtt
|
|
|
+}
|
|
|
+
|
|
|
+func (mqtt *MqttClient) Subscribe(topic string, qos byte, handleFun func(topic string, payload []byte)) bool {
|
|
|
+ // 订阅消息
|
|
|
+ if token := mqtt.client.Subscribe(topic, qos, func(client mqtts.Client, msg mqtts.Message) {
|
|
|
+ log.Debug("Receive Subscribe Message :")
|
|
|
+ log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
|
|
|
+
|
|
|
+ if len(msg.Payload()) > 0 {
|
|
|
+ handleFun(msg.Topic(), msg.Payload())
|
|
|
+ }
|
|
|
+ }); token.Wait() && token.Error() != nil {
|
|
|
+ log.Debug(token.Error())
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ mqtt.topics[topic] = MqttTopicStore{qos, handleFun}
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+func (mqtt *MqttClient) SubscribeMultiple(filters map[string]byte, handleFun func(topic string, payload []byte)) bool {
|
|
|
+
|
|
|
+ // 订阅消息
|
|
|
+ if token := mqtt.client.SubscribeMultiple(filters, func(client mqtts.Client, msg mqtts.Message) {
|
|
|
+ log.Debug("Receive Subscribe Message :")
|
|
|
+ log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
|
|
|
+
|
|
|
+ if len(msg.Payload()) > 0 {
|
|
|
+ handleFun(msg.Topic(), msg.Payload())
|
|
|
+ }
|
|
|
+ }); token.Wait() && token.Error() != nil {
|
|
|
+ log.Debug(token.Error())
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ for k, v := range filters {
|
|
|
+ mqtt.topics[k] = MqttTopicStore{v, handleFun}
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+func (mqtt *MqttClient) UnSubscripte(topic string) bool {
|
|
|
+ if token := mqtt.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
|
|
|
+ log.Debug(token.Error())
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|