mqtt.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package mqtt
  2. import (
  3. mqtts "github.com/eclipse/paho.mqtt.golang"
  4. "github.com/satori/go.uuid"
  5. "github.com/sirupsen/logrus"
  6. "strconv"
  7. "time"
  8. )
  9. var log = logrus.WithField("module", "mqtt")
  10. type MqttTopicStore struct {
  11. qos byte
  12. fun func(topic string, payload []byte)
  13. }
  14. type MqttClient struct {
  15. Host string //mqtt服务器地址
  16. Port int //mqtt 端口
  17. Id string //clientId
  18. Username string //mqtt 登录用户名
  19. Password string //mqtt 登录密码
  20. Timeout int //超时时间
  21. Retry int //重试次数
  22. topics map[string]MqttTopicStore //订阅的主题
  23. client mqtts.Client //客户端
  24. }
  25. func NewMqttClient(host string, port int, username, password string) *MqttClient {
  26. return &MqttClient{
  27. Host: host,
  28. Port: port,
  29. Username: username,
  30. Password: password,
  31. Timeout: 30,
  32. topics: make(map[string]MqttTopicStore),
  33. Retry: 3,
  34. }
  35. }
  36. func NewMqttClientWithId(host string, port int, username, password, id string) *MqttClient {
  37. return &MqttClient{
  38. Host: host,
  39. Port: port,
  40. Username: username,
  41. Password: password,
  42. Id: id,
  43. Timeout: 30,
  44. topics: make(map[string]MqttTopicStore),
  45. Retry: 3,
  46. }
  47. }
  48. // 创建全局mqtt publish消息处理 handler
  49. var messagePubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) {
  50. log.Debug("Push Message:")
  51. log.Debug("TOPIC: %s\n", msg.Topic())
  52. log.Debug("MSG: %s\n", msg.Payload())
  53. }
  54. // 创建全局mqtt sub消息处理 handler
  55. var messageSubHandler mqtts.MessageHandler = func(client mqtts.Client, msg mqtts.Message) {
  56. log.Debug("收到订阅消息:")
  57. log.Debug("Sub Client Topic : %s \n", msg.Topic())
  58. log.Debug("Sub Client msg : %s \n", msg.Payload())
  59. }
  60. // 连接的回掉函数
  61. func (mqtt *MqttClient) connectHandler(client mqtts.Client) {
  62. log.Debug("新的连接!" + " Connected")
  63. for topic, st := range mqtt.topics {
  64. mqtt.Subscribe(topic, st.qos, st.fun)
  65. }
  66. }
  67. // 丢失连接的回掉函数
  68. var connectLostHandler mqtts.ConnectionLostHandler = func(client mqtts.Client, err error) {
  69. log.Debug("Connect loss: %v\n", err)
  70. }
  71. func (mqtt *MqttClient) Connect() *MqttClient {
  72. opts := mqtts.NewClientOptions().AddBroker("tcp://" + mqtt.Host + ":" + strconv.Itoa(mqtt.Port))
  73. opts.SetKeepAlive(60 * time.Second)
  74. id := uuid.NewV4()
  75. // Message callback handler,在没有任何订阅时,发布端调用此函数
  76. opts.SetDefaultPublishHandler(messagePubHandler)
  77. opts.Username = mqtt.Username
  78. opts.Password = mqtt.Password
  79. opts.SetPingTimeout(60 * time.Second)
  80. opts.OnConnect = mqtt.connectHandler
  81. opts.OnConnectionLost = connectLostHandler
  82. if len(mqtt.Id) == 0 {
  83. opts.ClientID = id.String()
  84. } else {
  85. opts.ClientID = mqtt.Id
  86. }
  87. mqtt.client = mqtts.NewClient(opts)
  88. if token := mqtt.client.Connect(); token.Wait() && token.Error() != nil {
  89. panic(token.Error())
  90. }
  91. return mqtt
  92. }
  93. func (mqtt *MqttClient) Disconnect() {
  94. mqtt.client.Disconnect(250)
  95. }
  96. func (mqtt *MqttClient) Publish(topic string, qos byte, payload interface{}) *MqttClient {
  97. token := mqtt.client.Publish(topic, qos, false, payload)
  98. token.Wait()
  99. log.Debug("Push Data : "+topic, "Data Size is ", payload)
  100. return mqtt
  101. }
  102. func (mqtt *MqttClient) Subscribe(topic string, qos byte, handleFun func(topic string, payload []byte)) bool {
  103. // 订阅消息
  104. if token := mqtt.client.Subscribe(topic, qos, func(client mqtts.Client, msg mqtts.Message) {
  105. log.Debug("Receive Subscribe Message :")
  106. log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
  107. if len(msg.Payload()) > 0 {
  108. handleFun(msg.Topic(), msg.Payload())
  109. }
  110. }); token.Wait() && token.Error() != nil {
  111. log.Debug(token.Error())
  112. return false
  113. }
  114. mqtt.topics[topic] = MqttTopicStore{qos, handleFun}
  115. return true
  116. }
  117. func (mqtt *MqttClient) SubscribeMultiple(filters map[string]byte, handleFun func(topic string, payload []byte)) bool {
  118. // 订阅消息
  119. if token := mqtt.client.SubscribeMultiple(filters, func(client mqtts.Client, msg mqtts.Message) {
  120. log.Debug("Receive Subscribe Message :")
  121. log.Debug("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
  122. if len(msg.Payload()) > 0 {
  123. handleFun(msg.Topic(), msg.Payload())
  124. }
  125. }); token.Wait() && token.Error() != nil {
  126. log.Debug(token.Error())
  127. return false
  128. }
  129. for k, v := range filters {
  130. mqtt.topics[k] = MqttTopicStore{v, handleFun}
  131. }
  132. return true
  133. }
  134. func (mqtt *MqttClient) UnSubscripte(topic string) bool {
  135. if token := mqtt.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
  136. log.Debug(token.Error())
  137. return false
  138. }
  139. return true
  140. }