123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- package mqtt
- import (
- mqtts "github.com/eclipse/paho.mqtt.golang"
- "github.com/satori/go.uuid"
- "github.com/sirupsen/logrus"
- "strconv"
- "time"
- )
- var log = logrus.WithField("module", "mqtt")
- type MqttTopicStore struct {
- qos byte
- fun func(topic string, payload []byte)
- }
- type MqttClient struct {
- Host string //mqtt服务器地址
- Port int //mqtt 端口
- Id string //clientId
- Username string //mqtt 登录用户名
- Password string //mqtt 登录密码
- Timeout int //超时时间
- Retry int //重试次数
- topics map[string]MqttTopicStore //订阅的主题
- client mqtts.Client //客户端
- }
- func NewMqttClient(host string, port int, username, password string) *MqttClient {
- return &MqttClient{
- Host: host,
- Port: port,
- Username: username,
- Password: password,
- Timeout: 30,
- topics: make(map[string]MqttTopicStore),
- Retry: 3,
- }
- }
- func NewMqttClientWithId(host string, port int, username, password, id string) *MqttClient {
- return &MqttClient{
- Host: host,
- Port: port,
- Username: username,
- Password: password,
- Id: id,
- Timeout: 30,
- topics: make(map[string]MqttTopicStore),
- Retry: 3,
- }
- }
- // 创建全局mqtt publish消息处理 handler
- var messagePubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) {
- log.Debug("Push Message:")
- log.Debug("TOPIC: %s\n", msg.Topic())
- log.Debug("MSG: %s\n", msg.Payload())
- }
- // 创建全局mqtt sub消息处理 handler
- var messageSubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) {
- log.Debug("收到订阅消息:")
- log.Debug("Sub Client Topic : %s \n", msg.Topic())
- log.Debug("Sub Client msg : %s \n", msg.Payload())
- }
- // 连接的回掉函数
- func (mqtt *MqttClient) connectHandler(client mqtts.Client) {
- log.Debug("新的连接!" + " Connected")
- for topic, st := range mqtt.topics {
- mqtt.Subscribe(topic, st.qos, st.fun)
- }
- }
- // 丢失连接的回掉函数
- var connectLostHandler mqtts.ConnectionLostHandler = func(client mqtts.Client, err error) {
- log.Debug("Connect loss: %v\n", err)
- }
- func (mqtt *MqttClient) Connect() *MqttClient {
- opts := mqtts.NewClientOptions().AddBroker("tcp://" + mqtt.Host + ":" + strconv.Itoa(mqtt.Port))
- opts.SetKeepAlive(60 * time.Second)
- id := uuid.NewV4()
- // Message callback handler,在没有任何订阅时,发布端调用此函数
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.Username = mqtt.Username
- opts.Password = mqtt.Password
- opts.SetPingTimeout(60 * time.Second)
- opts.OnConnect = mqtt.connectHandler
- opts.OnConnectionLost = connectLostHandler
- if len(mqtt.Id) == 0 {
- opts.ClientID = id.String()
- } else {
- opts.ClientID = mqtt.Id
- }
- mqtt.client = mqtts.NewClient(opts)
- if token := mqtt.client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- return mqtt
- }
- func (mqtt *MqttClient) Disconnect() {
- mqtt.client.Disconnect(250)
- }
- func (mqtt *MqttClient) Publish(topic string, qos byte, payload interface{}) *MqttClient {
- token := mqtt.client.Publish(topic, qos, false, payload)
- token.Wait()
- log.Debug("Push Data : "+topic, "Data Size is ", payload)
- return mqtt
- }
- func (mqtt *MqttClient) Subscribe(topic string, qos byte, handleFun func(topic string, payload []byte)) bool {
- // 订阅消息
- if token := mqtt.client.Subscribe(topic, qos, func(client mqtts.Client, msg mqtts.Message) {
- log.Debug("Receive Subscribe Message :")
- log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
- if len(msg.Payload()) > 0 {
- handleFun(msg.Topic(), msg.Payload())
- }
- }); token.Wait() && token.Error() != nil {
- log.Debug(token.Error())
- return false
- }
- mqtt.topics[topic] = MqttTopicStore{qos, handleFun}
- return true
- }
- func (mqtt *MqttClient) SubscribeMultiple(filters map[string]byte, handleFun func(topic string, payload []byte)) bool {
- // 订阅消息
- if token := mqtt.client.SubscribeMultiple(filters, func(client mqtts.Client, msg mqtts.Message) {
- log.Debug("Receive Subscribe Message :")
- log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
- if len(msg.Payload()) > 0 {
- handleFun(msg.Topic(), msg.Payload())
- }
- }); token.Wait() && token.Error() != nil {
- log.Debug(token.Error())
- return false
- }
- for k, v := range filters {
- mqtt.topics[k] = MqttTopicStore{v, handleFun}
- }
- return true
- }
- func (mqtt *MqttClient) UnSubscripte(topic string) bool {
- if token := mqtt.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
- log.Debug(token.Error())
- return false
- }
- return true
- }
|