package mqtt import ( mqtts "github.com/eclipse/paho.mqtt.golang" "github.com/satori/go.uuid" "github.com/sirupsen/logrus" "strconv" "time" ) var log = logrus.WithField("module", "mqtt") type MqttTopicStore struct { qos byte fun func(topic string, payload []byte) } type MqttClient struct { Host string //mqtt服务器地址 Port int //mqtt 端口 Id string //clientId Username string //mqtt 登录用户名 Password string //mqtt 登录密码 Timeout int //超时时间 Retry int //重试次数 topics map[string]MqttTopicStore //订阅的主题 client mqtts.Client //客户端 } func NewMqttClient(host string, port int, username, password string) *MqttClient { return &MqttClient{ Host: host, Port: port, Username: username, Password: password, Timeout: 30, topics: make(map[string]MqttTopicStore), Retry: 3, } } func NewMqttClientWithId(host string, port int, username, password, id string) *MqttClient { return &MqttClient{ Host: host, Port: port, Username: username, Password: password, Id: id, Timeout: 30, topics: make(map[string]MqttTopicStore), Retry: 3, } } // 创建全局mqtt publish消息处理 handler var messagePubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) { log.Debug("Push Message:") log.Debug("TOPIC: %s\n", msg.Topic()) log.Debug("MSG: %s\n", msg.Payload()) } // 创建全局mqtt sub消息处理 handler var messageSubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) { log.Debug("收到订阅消息:") log.Debug("Sub Client Topic : %s \n", msg.Topic()) log.Debug("Sub Client msg : %s \n", msg.Payload()) } // 连接的回掉函数 func (mqtt *MqttClient) connectHandler(client mqtts.Client) { log.Debug("新的连接!" + " Connected") for topic, st := range mqtt.topics { mqtt.Subscribe(topic, st.qos, st.fun) } } // 丢失连接的回掉函数 var connectLostHandler mqtts.ConnectionLostHandler = func(client mqtts.Client, err error) { log.Debug("Connect loss: %v\n", err) } func (mqtt *MqttClient) Connect() *MqttClient { opts := mqtts.NewClientOptions().AddBroker("tcp://" + mqtt.Host + ":" + strconv.Itoa(mqtt.Port)) opts.SetKeepAlive(60 * time.Second) id := uuid.NewV4() // Message callback handler,在没有任何订阅时,发布端调用此函数 opts.SetDefaultPublishHandler(messagePubHandler) opts.Username = mqtt.Username opts.Password = mqtt.Password opts.SetPingTimeout(60 * time.Second) opts.OnConnect = mqtt.connectHandler opts.OnConnectionLost = connectLostHandler if len(mqtt.Id) == 0 { opts.ClientID = id.String() } else { opts.ClientID = mqtt.Id } mqtt.client = mqtts.NewClient(opts) if token := mqtt.client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return mqtt } func (mqtt *MqttClient) Disconnect() { mqtt.client.Disconnect(250) } func (mqtt *MqttClient) Publish(topic string, qos byte, payload interface{}) *MqttClient { token := mqtt.client.Publish(topic, qos, false, payload) token.Wait() log.Debug("Push Data : "+topic, "Data Size is ", payload) return mqtt } func (mqtt *MqttClient) Subscribe(topic string, qos byte, handleFun func(topic string, payload []byte)) bool { // 订阅消息 if token := mqtt.client.Subscribe(topic, qos, func(client mqtts.Client, msg mqtts.Message) { log.Debug("Receive Subscribe Message :") log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload())) if len(msg.Payload()) > 0 { handleFun(msg.Topic(), msg.Payload()) } }); token.Wait() && token.Error() != nil { log.Debug(token.Error()) return false } mqtt.topics[topic] = MqttTopicStore{qos, handleFun} return true } func (mqtt *MqttClient) SubscribeMultiple(filters map[string]byte, handleFun func(topic string, payload []byte)) bool { // 订阅消息 if token := mqtt.client.SubscribeMultiple(filters, func(client mqtts.Client, msg mqtts.Message) { log.Debug("Receive Subscribe Message :") log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload())) if len(msg.Payload()) > 0 { handleFun(msg.Topic(), msg.Payload()) } }); token.Wait() && token.Error() != nil { log.Debug(token.Error()) return false } for k, v := range filters { mqtt.topics[k] = MqttTopicStore{v, handleFun} } return true } func (mqtt *MqttClient) UnSubscripte(topic string) bool { if token := mqtt.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { log.Debug(token.Error()) return false } return true }