index.uts 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import MqttClient from 'org.eclipse.paho.client.mqttv3.MqttClient';
  2. import MqttConnectOptions from 'org.eclipse.paho.client.mqttv3.MqttConnectOptions';
  3. import MqttMessage from 'org.eclipse.paho.client.mqttv3.MqttMessage';
  4. import MemoryPersistence from 'org.eclipse.paho.client.mqttv3.persist.MemoryPersistence';
  5. import MqttCallbackExtended from 'org.eclipse.paho.client.mqttv3.MqttCallbackExtended';
  6. import IMqttDeliveryToken from 'org.eclipse.paho.client.mqttv3.IMqttDeliveryToken';
  7. class MyMqttCallback implements MqttCallbackExtended {
  8. private manager:MyMqttManager
  9. constructor(t:MyMqttManager){
  10. this.manager = t
  11. }
  12. override connectionLost(cause : Throwable | null) {
  13. console.error("MQTT Lost: " + (cause != null ? cause.message : "unknown"));
  14. }
  15. override messageArrived(topic : string, message : MqttMessage) {
  16. // 1. 获取字节数组
  17. const payload = message.getPayload();
  18. // 2. 转换为 UTF-8 字符串 (这是最稳妥的 Java 原生做法)
  19. const content = new java.lang.String(payload, "UTF-8").toString();
  20. console.log("收到主题:", topic);
  21. console.log("收到内容:", content, this.manager.onMessageReceived);
  22. // 3. 回传给 Vue (注意:此处在非 UI 线程,Vue 内部会自动处理)
  23. if (this.manager.onMessageReceived != null) {
  24. this.manager.onMessageReceived!(topic, content);
  25. }
  26. }
  27. override deliveryComplete(token : IMqttDeliveryToken | null) {
  28. console.log(token)
  29. // 可留空
  30. }
  31. override connectComplete(reconnect:boolean, serverURI:string)
  32. {
  33. console.log("链接完成:", serverURI, reconnect)
  34. this.manager.reconnect()
  35. }
  36. }
  37. export class MyMqttManager {
  38. private client: MqttClient | null = null;
  39. private brokerUrl: string = ""
  40. private clientId: string = ""
  41. private username: string = ""
  42. private password: string = ""
  43. private topics: string[] = []
  44. // 定义一个回调函数,用于把消息传回 Vue 界面
  45. public onMessageReceived : ((topic : string, message : string) => void) | null = null;
  46. on(on:((topic : string, message : string) => void) | null){
  47. this.onMessageReceived = on
  48. }
  49. reconnect(){
  50. // this.connect(this.brokerUrl, this.clientId, this.username, this.password)
  51. this.topics.forEach((t : string) => {
  52. this.client!.subscribe(t);
  53. console.log("正在订阅单个主题:", t);
  54. });
  55. }
  56. initCallback() {
  57. if (this.client == null) return;
  58. // 1. 先声明实例
  59. const callbackInstance = new MyMqttCallback(this); // <--- 注意这里的括号,表示实例化
  60. // 2. 再进行设置
  61. this.client!.setCallback(callbackInstance);
  62. }
  63. /**
  64. * 连接 MQTT 服务器
  65. */
  66. connect(broker: string, clientId: string, username:string, password: string) {
  67. // #ifdef APP-ANDROID
  68. try {
  69. this.clientId = clientId
  70. this.username = username
  71. this.password = password
  72. const persistence = new MemoryPersistence();
  73. this.client = new MqttClient(broker, this.clientId, persistence);
  74. const connOpts = new MqttConnectOptions();
  75. connOpts.setCleanSession(true);
  76. connOpts.setConnectionTimeout(10);
  77. connOpts.setUserName(this.username)
  78. connOpts.setPassword(this.password.toCharArray())
  79. // connOpts.setKeepAliveInterval(60)
  80. console.log("正在连接 MQTT: " + broker);
  81. connOpts.setAutomaticReconnect(true)
  82. this.client!.connect(connOpts);
  83. console.log("MQTT 连接成功");
  84. } catch (e) {
  85. console.error("MQTT 连接失败: " + e);
  86. }
  87. // #endif
  88. }
  89. /**
  90. * 发布消息
  91. */
  92. publish(topic: string, content: string) {
  93. // #ifdef APP-ANDROID
  94. try {
  95. if (this.client != null && this.client!.isConnected()) {
  96. const message = new MqttMessage(content.toByteArray());
  97. message.setQos(1);
  98. this.client!.publish(topic, message);
  99. console.log("消息发送成功", topic, content);
  100. }
  101. } catch (e) {
  102. console.error("发布失败: " + e);
  103. throw e
  104. }
  105. // #endif
  106. }
  107. /**
  108. * 订阅消息
  109. */
  110. subscribe(topic: string) {
  111. try {
  112. this.topics.push(topic)
  113. if (this.client != null && this.client!.isConnected()) {
  114. // Paho 的 subscribe 需要 Int 类型
  115. this.client!.subscribe(topic);
  116. console.log("订阅成功: " + topic);
  117. } else {
  118. console.error("订阅失败: 客户端未连接");
  119. }
  120. } catch (e) {
  121. console.error("订阅操作异常: " + e);
  122. }
  123. }
  124. /**
  125. * 关闭 mqtt
  126. */
  127. close(){
  128. try{
  129. if (this.client != null && this.client!.isConnected()) {
  130. // Paho 的 subscribe 需要 Int 类型
  131. this.client!.close();
  132. console.log("关闭成功: ");
  133. } else {
  134. console.error("关闭失败: 客户端未连接");
  135. }
  136. }catch(e){
  137. console.error("关闭操作异常: " + e);
  138. }
  139. }
  140. }