luat_remotem_mqtt.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. 通信层,基于MQTT
  3. */
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include "MQTTAsync.h"
  8. // #include "MQTTClient.h"
  9. #include "luat_remotem.h"
  10. #if !defined(_WIN32)
  11. #include <unistd.h>
  12. #else
  13. #include <windows.h>
  14. #endif
  15. #if defined(_WRS_KERNEL)
  16. #include <OsWrapper.h>
  17. #endif
  18. // #define ADDRESS "tcp://broker-cn.emqx.io:1883"
  19. // #define CLIENTID "123TTTZZVVV"
  20. // #define SUB_TOPIC "/sys/luatos/em/test/down"
  21. // #define PUB_TOPIC "/sys/luatos/em/test/up"
  22. // #define PAYLOAD "Hello World!"
  23. // #define QOS 1
  24. #define TIMEOUT 10000L
  25. // int finished = 0;
  26. static MQTTAsync client;
  27. static boolean mqtt_client_ready;
  28. static boolean mqtt_client_suback_ready;
  29. static MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  30. static boolean mqtt_client_isconneting;
  31. void luat_remotem_putbuff(char* buff, size_t len);
  32. extern luat_remotem_ctx_t rctx;
  33. static void mqtt_uplink_cb(char* buff, size_t len) {
  34. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  35. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  36. pubmsg.payload = buff;
  37. pubmsg.payloadlen = len;
  38. pubmsg.qos = 1;
  39. pubmsg.retained = 0;
  40. int rc = 0;
  41. if ((rc = MQTTAsync_sendMessage(client, rctx.mqtt.topic_uplink, &pubmsg, NULL)) != MQTTASYNC_SUCCESS)
  42. {
  43. printf("Failed to publish message, return code %d\n", rc);
  44. rc = EXIT_FAILURE;
  45. }
  46. }
  47. void connlost(void *context, char *cause)
  48. {
  49. MQTTAsync client = (MQTTAsync)context;
  50. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  51. int rc;
  52. printf("\nConnection lost\n");
  53. printf(" cause: %s\n", cause);
  54. Sleep(2000);
  55. printf("Reconnecting\n");
  56. conn_opts.keepAliveInterval = 20;
  57. conn_opts.cleansession = 1;
  58. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  59. {
  60. printf("Failed to start connect, return code %d\n", rc);
  61. // finished = 1;
  62. }
  63. }
  64. void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  65. {
  66. printf("Disconnect failed\n");
  67. // finished = 1;
  68. }
  69. void onDisconnect(void* context, MQTTAsync_successData* response)
  70. {
  71. printf("Successful disconnection\n");
  72. // finished = 1;
  73. }
  74. void onSendFailure(void* context, MQTTAsync_failureData* response)
  75. {
  76. MQTTAsync client = (MQTTAsync)context;
  77. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  78. int rc;
  79. printf("Message send failed token %d error code %d\n", response->token, response->code);
  80. opts.onSuccess = onDisconnect;
  81. opts.onFailure = onDisconnectFailure;
  82. opts.context = client;
  83. if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
  84. {
  85. printf("Failed to start disconnect, return code %d\n", rc);
  86. exit(EXIT_FAILURE);
  87. }
  88. }
  89. void onSend(void* context, MQTTAsync_successData* response)
  90. {
  91. }
  92. void onConnectFailure(void* context, MQTTAsync_failureData* response)
  93. {
  94. printf("Connect failed, rc %d\n", response ? response->code : 0);
  95. // finished = 1;
  96. }
  97. void onSubscribe(void* context, MQTTAsync_successData* response)
  98. {
  99. // printf("Subscribe succeeded\n");
  100. // subscribed = 1;
  101. mqtt_client_suback_ready = TRUE;
  102. }
  103. void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  104. {
  105. printf("Subscribe failed, rc %d\n", response->code);
  106. // finished = 1;
  107. }
  108. void onConnect(void* context, MQTTAsync_successData* response)
  109. {
  110. MQTTAsync client = (MQTTAsync)context;
  111. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  112. int rc;
  113. printf("mqtt connect ok\n");
  114. opts.onSuccess = onSubscribe;
  115. opts.onFailure = onSubscribeFailure;
  116. opts.context = client;
  117. rc = MQTTAsync_subscribe(client, rctx.mqtt.topic_downlink, 1, &opts);
  118. }
  119. int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
  120. {
  121. luat_remotem_putbuff((char*)m->payload, m->payloadlen);
  122. return 1;
  123. }
  124. int mqtt_main(void)
  125. {
  126. mqtt_client_ready = FALSE;
  127. int rc;
  128. char mqtturl[512];
  129. sprintf(mqtturl, "%s://%s:%d", rctx.mqtt.protocol, rctx.mqtt.host, rctx.mqtt.port);
  130. if ((rc = MQTTAsync_create(&client, mqtturl, rctx.self_id, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
  131. {
  132. printf("Failed to create client object, return code %d\n", rc);
  133. exit(EXIT_FAILURE);
  134. }
  135. if ((rc = MQTTAsync_setCallbacks(client, NULL, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS)
  136. {
  137. printf("Failed to set callback, return code %d\n", rc);
  138. exit(EXIT_FAILURE);
  139. }
  140. conn_opts.keepAliveInterval = 20;
  141. conn_opts.cleansession = 1;
  142. conn_opts.onSuccess = onConnect;
  143. conn_opts.onFailure = onConnectFailure;
  144. conn_opts.context = client;
  145. mqtt_client_ready = TRUE;
  146. mqtt_client_suback_ready = FALSE;
  147. if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
  148. {
  149. printf("Failed to start connect, return code %d\n", rc);
  150. exit(EXIT_FAILURE);
  151. }
  152. size_t wait_time = 15;
  153. size_t wait_ms = 10;
  154. for (size_t i = 0; i < wait_time * (1000 / wait_ms); i++)
  155. {
  156. if (mqtt_client_ready && MQTTAsync_isConnected(client) && mqtt_client_suback_ready) {
  157. printf("mqtt link ready\n");
  158. // 发送初始化命令
  159. luat_remotem_set_uplink(mqtt_uplink_cb);
  160. break;
  161. }
  162. Sleep(wait_ms);
  163. }
  164. return rc;
  165. }