luat_libtcpip_mqtt.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. #include "luat_base.h"
  2. #include "luat_rtos.h"
  3. #include "luat_malloc.h"
  4. #include "luat_timer.h"
  5. #include "luat_libtcpip.h"
  6. #include "luat_libtcpip_mqtt.h"
  7. #include "luat_mcu.h"
  8. #include "libemqtt.h"
  9. #include <sys/socket.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. // #include <netinet/ip.h> /* superset of previous */
  13. #define PUB_MSG_MAGIC (0x1314)
  14. #define LUAT_LOG_TAG "mqtt"
  15. #include "luat_log.h"
  16. #define MQTT_CMD_START 0x1
  17. #define MQTT_CMD_HEART 0x2
  18. #define MQTT_CMD_LOOP 0x3
  19. #define MQTT_KEEPALIVE 240
  20. static const mqtt_queue_msg_t MQTT_QUEUE_MSG_LOOP = {MQTT_CMD_LOOP};
  21. // static const mqtt_queue_msg_t MQTT_QUEUE_MSG_HEART = {MQTT_CMD_HEART};
  22. LUAT_RET app_mqtt_ready(app_mqtt_ctx_t* ctx) {
  23. return ctx->conack_ready;
  24. }
  25. static int app_mqtt_close_socket(app_mqtt_ctx_t* ctx)
  26. {
  27. ctx->conack_ready = LUAT_FALSE;
  28. ctx->tcp_opts->_close(ctx->socket_ctx);
  29. ctx->socket_ctx = NULL;
  30. return 0;
  31. }
  32. static int app_mqtt_send_packet(void* userdata, const void *buf, unsigned int count)
  33. {
  34. app_mqtt_ctx_t* ctx = (app_mqtt_ctx_t*)userdata;
  35. LLOGD("ctx %p", ctx);
  36. LLOGD("send fd %d len %d", ctx->socket_ctx, count);
  37. return ctx->tcp_opts->_send(ctx->socket_ctx, buf, count, 0);
  38. }
  39. static int app_mqtt_read_packet(app_mqtt_ctx_t* ctx)
  40. {
  41. // int ret = 0;
  42. int total_bytes = 0, bytes_rcvd, packet_length;
  43. // int socket_fd = ctx->socket_fd;
  44. uint8_t* packet_buff = ctx->packet_buffer;
  45. luat_libtcpip_opts_t* tcp_opts = ctx->tcp_opts;
  46. memset(packet_buff, 0, sizeof(MQTT_RECV_BUF_LEN_MAX));
  47. // XXX 替换原有posix的API调用
  48. // if((bytes_rcvd = recv(app_mqtt_socket_id, (app_mqtt_packet_buffer + total_bytes), MQTT_RECV_BUF_LEN_MAX, 0)) <= 0)
  49. if((bytes_rcvd = ctx->tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), 2, 0, 5)) <= 0)
  50. {
  51. // printf("%d, %d", bytes_rcvd, app_mqtt_socket_id);
  52. return MQTT_READ_TIMEOUT;
  53. }
  54. // printf("recv [len=%d] : %s", bytes_rcvd, app_mqtt_packet_buffer);
  55. total_bytes += bytes_rcvd; // Keep tally of total bytes
  56. if (total_bytes < 2) {
  57. // 少于2字节,那就肯定1个字节, 那我们再等15000ms
  58. if((bytes_rcvd = tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), 1, 0, 15000)) <= 0) {
  59. LLOGD("read package header timeout, close socket");
  60. app_mqtt_close_socket(ctx);
  61. return -1;
  62. }
  63. total_bytes += bytes_rcvd;
  64. }
  65. // if (app_mqtt_packet_buffer[1] & 0x80) {
  66. for (size_t i = 1; i < 5; i++)
  67. {
  68. if (packet_buff[i] & 0x80) {
  69. if((bytes_rcvd = tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), 1, 0, 15000)) <= 0) {
  70. LLOGD("read package header timeout, close socket");
  71. app_mqtt_close_socket(ctx);
  72. return -1;
  73. }
  74. total_bytes += bytes_rcvd;
  75. }
  76. else {
  77. break;
  78. }
  79. }
  80. // }
  81. // now we have the full fixed header in app_mqtt_packet_buffer
  82. // parse it for remaining length and number of bytes
  83. uint16_t rem_len = mqtt_parse_rem_len(packet_buff);
  84. uint8_t rem_len_bytes = mqtt_num_rem_len_bytes(packet_buff);
  85. //packet_length = app_mqtt_packet_buffer[1] + 2; // Remaining length + fixed header length
  86. // total packet length = remaining length + byte 1 of fixed header + remaning length part of fixed header
  87. packet_length = rem_len + rem_len_bytes + 1;
  88. // LLOGD("packet_length %d total_bytes %d", packet_length, total_bytes);
  89. while(total_bytes < packet_length) // Reading the packet
  90. {
  91. // LLOGD("packet_length %d total_bytes %d", packet_length, total_bytes);
  92. // LLOGD("more data %d", packet_length - total_bytes);
  93. if((bytes_rcvd = tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), packet_length - total_bytes, 0, 2000)) <= 0)
  94. return -1;
  95. total_bytes += bytes_rcvd; // Keep tally of total bytes
  96. }
  97. // LLOGD("packet_length %d", packet_length);
  98. return packet_length;
  99. }
  100. static int app_mqtt_init_socket(app_mqtt_ctx_t* ctx)
  101. {
  102. int flag = 1;
  103. // struct hostent *hp;
  104. // Create the socket
  105. if((ctx->socket_ctx = ctx->tcp_opts->_socket(PF_INET, SOCK_STREAM, 0)) < 0) {
  106. LLOGE("socket create error %p", ctx->socket_ctx);
  107. return -1;
  108. }
  109. // Disable Nagle Algorithm
  110. if (ctx->tcp_opts->_setsockopt(ctx->socket_ctx, IPPROTO_TCP, 0x01, (char *)&flag, sizeof(flag)) < 0){
  111. LLOGE("socket setsockopt error");
  112. app_mqtt_close_socket(ctx);
  113. return -2;
  114. }
  115. // Connect the socket
  116. // XXX 替换原有posix的API调用
  117. // if((connect(app_mqtt_socket_id, (struct sockaddr *)&socket_address, sizeof(socket_address))) < 0)
  118. // if((tcp_opts->_connect(app_mqtt_socket_id, (struct sockaddr *)&socket_address, sizeof(socket_address))) < 0)
  119. if(ctx->tcp_opts->_connect(ctx->socket_ctx, ctx->host, ctx->port) < 0){
  120. LLOGE("socket connect error");
  121. app_mqtt_close_socket(ctx);
  122. return -3;
  123. }
  124. // MQTT stuffs
  125. mqtt_set_alive(&ctx->broker, ctx->keepalive > 0 ? ctx->keepalive : 240);
  126. ctx->broker.userdata = ctx;
  127. ctx->broker.mqttsend = app_mqtt_send_packet;
  128. //LLOGD("socket id = %d", app_mqtt_socket_id);
  129. return 0;
  130. }
  131. static int app_mqtt_init_inner(app_mqtt_ctx_t* ctx)
  132. {
  133. int ret = 0;
  134. // 将SUBACK的状态设置为未收到
  135. ctx->connect_ready = LUAT_FALSE;
  136. ctx->conack_ready = LUAT_FALSE;
  137. #if 1
  138. LLOGD("step1: init mqtt lib.");
  139. LLOGD("mqtt client_id:%s", ctx->client_id);
  140. LLOGD("mqtt username: %s", ctx->username);
  141. LLOGD("mqtt password: %s", ctx->password);
  142. LLOGD("mqtt host: %s", ctx->host);
  143. LLOGD("mqtt port: %d", ctx->port);
  144. #endif
  145. mqtt_init(&ctx->broker, ctx->client_id);
  146. mqtt_init_auth(&ctx->broker, ctx->username, ctx->password);
  147. LLOGD("step2: establishing TCP connection.");
  148. ret = app_mqtt_init_socket(ctx);
  149. if(ret){
  150. LLOGD("init_socket ret=%d", ret);
  151. return -4;
  152. }
  153. LLOGD("step3: establishing mqtt connection.");
  154. ret = mqtt_connect(&ctx->broker);
  155. if(ret){
  156. LLOGD("mqtt_connect ret=%d", ret);
  157. return -5;
  158. }
  159. ctx->connect_ready = LUAT_TRUE;
  160. return 0;
  161. }
  162. static int app_mqtt_msg_cb(app_mqtt_ctx_t* ctx) {
  163. const uint8_t *topic;
  164. const uint8_t *payload;
  165. uint16_t topic_len;
  166. uint16_t payload_len;
  167. uint8_t msg_tp = MQTTParseMessageType(ctx->packet_buffer);
  168. LLOGD("mqtt msg %02X", msg_tp);
  169. switch (msg_tp) {
  170. case MQTT_MSG_PUBLISH : {
  171. //ctx->keepalive_mark = 0;
  172. // uint8_t topic[128], *msg;
  173. topic_len = mqtt_parse_pub_topic_ptr(ctx->packet_buffer, &topic);
  174. LLOGD("recvd: topic len %d", topic_len);
  175. payload_len = mqtt_parse_pub_msg_ptr(ctx->packet_buffer, &payload);
  176. LLOGD("recvd: msg len %d", payload_len);
  177. // TODO 输出到回调函数, 例如uart
  178. LLOGD("topic %.*s", topic_len, topic);
  179. LLOGD("payload %.*s", payload_len, payload);
  180. // printf("%.*s",payload);
  181. //app_uart_write(payload, payload_len);
  182. #ifdef USE_OTA_MQTT
  183. if (0 != app_mqtt_ota_on_publish(topic, topic_len, payload, payload_len))
  184. break;
  185. #endif
  186. ctx->publish_cb(ctx, (char*)topic, topic_len, (char*)payload, payload_len);
  187. break;
  188. }
  189. case MQTT_MSG_CONNACK: {
  190. LLOGD("CONNACK %02X%02X%02X%02X",ctx->packet_buffer[0],ctx->packet_buffer[1],ctx->packet_buffer[2],ctx->packet_buffer[3]);
  191. if(ctx->packet_buffer[3] != 0x00)
  192. {
  193. LLOGD("CONNACK failed!");
  194. app_mqtt_close_socket(ctx);
  195. return -2;
  196. }
  197. ctx->conack_ready = LUAT_TRUE;
  198. LLOGD("step4: subscribe %s", ctx->sub_topic);
  199. int subscribe_state = mqtt_subscribe(&ctx->broker, ctx->sub_topic, NULL);
  200. if (subscribe_state<0)
  201. {
  202. LLOGD("Error(%d) on subscribe mqtt!", subscribe_state);
  203. app_mqtt_close_socket(ctx);
  204. return -1;
  205. }
  206. #ifdef USE_OTA_MQTT
  207. app_mqtt_ota_init(&ctx->broker);
  208. #endif
  209. break;
  210. }
  211. case MQTT_MSG_PINGRESP : {
  212. break;
  213. }
  214. case MQTT_MSG_SUBACK : {
  215. // 订阅应该成功吧
  216. LLOGD("SUBACK %02X%02X%02X%02X%02X",
  217. ctx->packet_buffer[0],ctx->packet_buffer[1],
  218. ctx->packet_buffer[2],ctx->packet_buffer[3],
  219. ctx->packet_buffer[4]);
  220. break;
  221. }
  222. case MQTT_MSG_UNSUBACK : {
  223. break;
  224. }
  225. default : {
  226. break;
  227. }
  228. }
  229. return 0;
  230. }
  231. int app_mqtt_disconnect(app_mqtt_ctx_t *ctx) {
  232. return app_mqtt_close_socket(ctx);
  233. }
  234. static int app_mqtt_loop(app_mqtt_ctx_t *ctx)
  235. {
  236. int ret = 0;
  237. int packet_length = 0;
  238. int counter = 0;
  239. counter++;
  240. packet_length = app_mqtt_read_packet(ctx);
  241. if(packet_length > 0)
  242. {
  243. //LLOGD("recvd Packet Header: 0x%x...", app_mqtt_packet_buffer[0]);
  244. ret = app_mqtt_msg_cb(ctx);
  245. if (ret != 0) {
  246. app_mqtt_close_socket(ctx);
  247. return -1;
  248. }
  249. }
  250. else if (packet_length == MQTT_READ_TIMEOUT)
  251. {
  252. // nop
  253. }
  254. else if(packet_length == -1)
  255. {
  256. LLOGD("mqtt error:(%d), stop mqtt!", packet_length);
  257. app_mqtt_close_socket(ctx);
  258. return -1;
  259. }
  260. return 0;
  261. }
  262. extern int app_mqtt_authentication_get(app_mqtt_ctx_t* ctx);
  263. app_mqtt_ctx_t* app_mqtt_configure_create(void) {
  264. app_mqtt_ctx_t* ctx = luat_heap_malloc(sizeof(app_mqtt_ctx_t));
  265. if (ctx == NULL) {
  266. LLOGE("out of memory when mallo app_mqtt_ctx_t");
  267. return NULL;
  268. }
  269. memset(ctx, 0, sizeof(app_mqtt_ctx_t));
  270. ctx->keepalive = 240;
  271. ctx->port = 1883;
  272. ctx->keep_run = 1;
  273. luat_queue_create(&ctx->msg_queue, 128, 4);
  274. return ctx;
  275. }
  276. void app_mqtt_task(void *p)
  277. {
  278. int ret = 0;
  279. mqtt_queue_msg_t *msg;
  280. app_mqtt_pub_data_t* pmsg;
  281. // uint32_t retry_time = 2;
  282. app_mqtt_ctx_t* ctx = (app_mqtt_ctx_t*)p;
  283. msg = &MQTT_QUEUE_MSG_LOOP;
  284. // 计算ping的时机
  285. // uint64_t last_pkg_ticks = 0;
  286. size_t hz = luat_mcu_hz();
  287. uint64_t tick_used;
  288. while (ctx->keep_run)
  289. {
  290. if (ctx->connect_ready == LUAT_FALSE)
  291. {
  292. ret = app_mqtt_init_inner(ctx);
  293. if (ret) {
  294. LLOGD("mqtt init fail %d", ret);
  295. if (ctx->keep_run) {
  296. luat_timer_mdelay(ctx->reconnet_delay);
  297. continue; // 开始下一轮重连循环
  298. }
  299. else {
  300. LLOGD("mqtt exit");
  301. break;
  302. }
  303. }
  304. ctx->last_pkg_tick = luat_mcu_ticks();
  305. }
  306. ret = luat_queue_recv(&ctx->msg_queue, &msg, sizeof(mqtt_queue_msg_t), 1);
  307. if (!ret)
  308. {
  309. switch((uint32_t)msg->type)
  310. {
  311. case MQTT_CMD_LOOP:
  312. // LLOGD("MQTT_CMD_LOOP");
  313. app_mqtt_loop(ctx);
  314. break;
  315. case PUB_MSG_MAGIC:
  316. // ctx->keepalive_mark = 0;
  317. ctx->last_pkg_tick = luat_mcu_ticks();
  318. pmsg = msg;
  319. ret = mqtt_publish_with_qos(&ctx->broker, pmsg->topic, pmsg->data, pmsg->data_len, pmsg->retain, pmsg->qos, NULL);
  320. LLOGD("app_mqtt_pub_data_t free %p", msg);
  321. luat_heap_free(msg);
  322. break;
  323. default :
  324. LLOGD("unknow mqtt queue msg %08X", msg->type);
  325. break;
  326. }
  327. }
  328. else {
  329. app_mqtt_loop(ctx);
  330. }
  331. tick_used = luat_mcu_ticks() - ctx->last_pkg_tick;
  332. if (tick_used > (ctx->keepalive * hz / 3)) {
  333. mqtt_ping(&ctx->broker);
  334. ctx->last_pkg_tick = luat_mcu_ticks();
  335. }
  336. }
  337. }
  338. // 发送数据到mqtt
  339. int app_mqtt_publish(app_mqtt_ctx_t* ctx, const char* topic, char* data, size_t len, int qos, int retain) {
  340. // 未连接, 就不准发数据
  341. if (ctx->conack_ready != LUAT_TRUE) {
  342. LLOGD("mqtt not ready yet");
  343. return -1;
  344. }
  345. app_mqtt_pub_data_t* msg = luat_heap_malloc(sizeof(app_mqtt_pub_data_t) + len - 4);
  346. LLOGD("app_mqtt_pub_data_t malloc %p", msg);
  347. if (msg == NULL) {
  348. LLOGD("out of memory app_mqtt_publish!!!");
  349. return -1;
  350. }
  351. msg->magic = PUB_MSG_MAGIC;
  352. msg->qos = 1;
  353. msg->retain = retain;
  354. msg->data_len = len;
  355. memcpy(msg->data, data, len);
  356. if (topic)
  357. memcpy(msg->topic, topic, strlen(topic) + 1);
  358. else
  359. memcpy(msg->topic, ctx->pub_topic, strlen(ctx->pub_topic) + 1);
  360. luat_queue_send(&ctx->msg_queue, msg, sizeof(app_mqtt_pub_data_t), 1);
  361. return 0;
  362. }