luat_mqtt_client.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. #include "luat_base.h"
  2. #include "luat_network_adapter.h"
  3. #include "libemqtt.h"
  4. #include "luat_rtos.h"
  5. #include "luat_mem.h"
  6. #include "luat_mqtt.h"
  7. #include "luat_websocket.h"
  8. #include <stddef.h>
  9. #define LUAT_LOG_TAG "mqtt"
  10. #include "luat_log.h"
  11. #ifndef LUAT_MQTT_DEBUG
  12. #define LUAT_MQTT_DEBUG 0
  13. #endif
  14. #if LUAT_MQTT_DEBUG == 0
  15. #undef LLOGD
  16. #define LLOGD(...)
  17. #endif
  18. static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl);
  19. static void mqtt_ws_on_event(luat_websocket_ctrl_t *ws_ctrl, int arg1, int arg2);
  20. static int luat_mqtt_ws_send_packet(void* socket_info, const void* buf, unsigned int count);
  21. #ifdef __LUATOS__
  22. #include "luat_msgbus.h"
  23. int32_t luatos_mqtt_callback(lua_State *L, void* ptr);
  24. #endif
  25. int l_luat_mqtt_msg_cb(luat_mqtt_ctrl_t * ptr, int arg1, int arg2) {
  26. #ifdef __LUATOS__
  27. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)ptr;
  28. rtos_msg_t msg = {
  29. .handler = luatos_mqtt_callback,
  30. .ptr = ptr,
  31. .arg1 = arg1,
  32. .arg2 = arg2
  33. };
  34. if (mqtt_ctrl->app_cb)
  35. {
  36. luat_mqtt_app_cb_t mqtt_cb = mqtt_ctrl->app_cb;
  37. if (mqtt_cb(mqtt_ctrl, arg1))
  38. {
  39. return 0;
  40. }
  41. }
  42. luat_msgbus_put(&msg, 0);
  43. #else
  44. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)ptr;
  45. if (mqtt_ctrl->mqtt_cb){
  46. luat_mqtt_cb_t mqtt_cb = mqtt_ctrl->mqtt_cb;
  47. mqtt_cb(mqtt_ctrl, arg1);
  48. }
  49. #endif
  50. return 0;
  51. }
  52. LUAT_RT_RET_TYPE luat_mqtt_timer_callback(LUAT_RT_CB_PARAM){
  53. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  54. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TIMER_PING, 0);
  55. }
  56. static LUAT_RT_RET_TYPE reconnect_timer_cb(LUAT_RT_CB_PARAM){
  57. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)param;
  58. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RECONNECT, 0);
  59. }
  60. int luat_mqtt_reconnect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  61. mqtt_ctrl->buffer_offset = 0;
  62. int ret = luat_mqtt_connect(mqtt_ctrl);
  63. if(ret){
  64. LLOGI("reconnect init socket ret=%d\n", ret);
  65. luat_mqtt_close_socket(mqtt_ctrl);
  66. }
  67. return ret;
  68. }
  69. int luat_mqtt_ping(luat_mqtt_ctrl_t *mqtt_ctrl) {
  70. mqtt_ping(&mqtt_ctrl->broker);
  71. return 0;
  72. }
  73. int luat_mqtt_init(luat_mqtt_ctrl_t *mqtt_ctrl, int adapter_index) {
  74. memset(mqtt_ctrl, 0, sizeof(luat_mqtt_ctrl_t));
  75. mqtt_ctrl->rxbuff_size = MQTT_RECV_BUF_LEN_MAX+4;
  76. mqtt_ctrl->adapter_index = adapter_index;
  77. mqtt_ctrl->netc = network_alloc_ctrl(adapter_index);
  78. if (!mqtt_ctrl->netc){
  79. LLOGW("network_alloc_ctrl fail");
  80. return -1;
  81. }
  82. network_init_ctrl(mqtt_ctrl->netc, NULL, luat_mqtt_callback, mqtt_ctrl);
  83. mqtt_ctrl->mqtt_state = MQTT_STATE_DISCONNECT;
  84. mqtt_ctrl->netc->is_debug = 0;
  85. mqtt_ctrl->keepalive = 240;
  86. network_set_base_mode(mqtt_ctrl->netc, 1, 10000, 0, 0, 0, 0);
  87. network_set_local_port(mqtt_ctrl->netc, 0);
  88. mqtt_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, mqtt_ctrl, NULL);
  89. mqtt_ctrl->ping_timer = luat_create_rtos_timer(luat_mqtt_timer_callback, mqtt_ctrl, NULL);
  90. return 0;
  91. }
  92. int luat_mqtt_set_rxbuff_size(luat_mqtt_ctrl_t *mqtt_ctrl, uint32_t rxbuff_size){
  93. mqtt_ctrl->rxbuff_size = rxbuff_size;
  94. return 0;
  95. }
  96. int luat_mqtt_set_keepalive(luat_mqtt_ctrl_t *mqtt_ctrl, uint32_t keepalive){
  97. mqtt_ctrl->keepalive = keepalive;
  98. return 0;
  99. }
  100. int luat_mqtt_set_auto_connect(luat_mqtt_ctrl_t *mqtt_ctrl, uint8_t auto_connect,uint32_t reconnect_time){
  101. mqtt_ctrl->reconnect = auto_connect;
  102. mqtt_ctrl->reconnect_time = reconnect_time;
  103. return 0;
  104. }
  105. int luat_mqtt_set_connopts(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_connopts_t *opts) {
  106. if (opts == NULL || opts->host == NULL) return -1;
  107. /* 检测是否为 WebSocket URL */
  108. if (!memcmp(opts->host, "ws://", 5) || !memcmp(opts->host, "wss://", 6)) {
  109. mqtt_ctrl->ws_mode = 1;
  110. memset(mqtt_ctrl->ws_url, 0, sizeof(mqtt_ctrl->ws_url));
  111. memcpy(mqtt_ctrl->ws_url, opts->host, strlen(opts->host));
  112. /* 初始化 WebSocket 控制块 */
  113. luat_websocket_init(&mqtt_ctrl->ws_ctrl, mqtt_ctrl->adapter_index);
  114. /* 强制无效IP,避免传入0.0.0.0 导致直连失败,走域名解析 */
  115. network_set_ip_invaild(&mqtt_ctrl->ws_ctrl.ip_addr);
  116. luat_websocket_connopts_t ws_opts = {0};
  117. ws_opts.url = mqtt_ctrl->ws_url;
  118. ws_opts.keepalive = 60;
  119. ws_opts.use_ipv6 = opts->is_ipv6;
  120. /* 透传 TLS 选项 */
  121. ws_opts.verify = opts->verify;
  122. ws_opts.server_cert = opts->server_cert;
  123. ws_opts.server_cert_len = opts->server_cert_len;
  124. ws_opts.client_cert = opts->client_cert;
  125. ws_opts.client_cert_len = opts->client_cert_len;
  126. ws_opts.client_key = opts->client_key;
  127. ws_opts.client_key_len = opts->client_key_len;
  128. ws_opts.client_password = opts->client_password;
  129. ws_opts.client_password_len = opts->client_password_len;
  130. luat_websocket_set_connopts(&mqtt_ctrl->ws_ctrl, &ws_opts);
  131. /* 增加子协议头 */
  132. static const char proto_hdr[] = "Sec-WebSocket-Protocol: mqtt\r\n";
  133. char *hdr = (char*)luat_heap_malloc(sizeof(proto_hdr));
  134. if (hdr) {
  135. LLOGD("WebSocket header allocation successful, size: %d", sizeof(proto_hdr));
  136. memcpy(hdr, proto_hdr, sizeof(proto_hdr));
  137. } else {
  138. LLOGW("WebSocket header allocation failed, size: %d", sizeof(proto_hdr));
  139. return -1;
  140. }
  141. luat_websocket_set_headers(&mqtt_ctrl->ws_ctrl, hdr);
  142. /* 绑定回调,切换发送函数 */
  143. luat_websocket_set_cb(&mqtt_ctrl->ws_ctrl, (luat_websocket_cb_t)mqtt_ws_on_event);
  144. mqtt_ctrl->broker.socket_info = mqtt_ctrl;
  145. mqtt_ctrl->broker.send = luat_mqtt_ws_send_packet;
  146. return 0;
  147. }
  148. /* 常规 TCP/MQTTS */
  149. memcpy(mqtt_ctrl->host, opts->host, strlen(opts->host) + 1);
  150. mqtt_ctrl->remote_port = opts->port;
  151. if (opts->is_tls){
  152. if (network_init_tls(mqtt_ctrl->netc, opts->verify)){
  153. LLOGE("初始化tls失败");
  154. return -1;
  155. }
  156. if (opts->server_cert){
  157. if (network_set_server_cert(mqtt_ctrl->netc, (const unsigned char *)opts->server_cert, opts->server_cert_len+1)){
  158. LLOGE("network_set_server_cert error");
  159. return -1;
  160. }
  161. }
  162. if (opts->client_cert){
  163. if (network_set_client_cert(mqtt_ctrl->netc, (const unsigned char*)opts->client_cert, opts->client_cert_len+1,
  164. (const unsigned char*)opts->client_key, opts->client_key_len+1,
  165. (const unsigned char*)opts->client_password, opts->client_password_len+1)){
  166. LLOGE("network_set_client_cert error");
  167. return -1;
  168. }
  169. }
  170. } else {
  171. network_deinit_tls(mqtt_ctrl->netc);
  172. }
  173. if (opts->is_ipv6) {
  174. network_connect_ipv6_domain(mqtt_ctrl->netc, 1);
  175. }
  176. mqtt_ctrl->broker.socket_info = mqtt_ctrl;
  177. mqtt_ctrl->broker.send = luat_mqtt_send_packet;
  178. return 0;
  179. }
  180. int luat_mqtt_set_triad(luat_mqtt_ctrl_t *mqtt_ctrl, const char* clientid, const char* username, const char* password){
  181. mqtt_init(&(mqtt_ctrl->broker), clientid);
  182. mqtt_init_auth(&(mqtt_ctrl->broker), username, password);
  183. return 0;
  184. }
  185. void luat_mqtt_close_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  186. LLOGI("mqtt closing socket netc:%p mqtt_state:%d",mqtt_ctrl->netc,mqtt_ctrl->mqtt_state);
  187. if (mqtt_ctrl->mqtt_state){
  188. mqtt_ctrl->mqtt_state = 0;
  189. mqtt_ctrl->buffer_offset = 0;
  190. if (mqtt_ctrl->ws_mode) {
  191. luat_websocket_close_socket(&mqtt_ctrl->ws_ctrl);
  192. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_DISCONNECT, mqtt_ctrl->error_state==0?MQTT_ERROR_STATE_SOCKET:mqtt_ctrl->error_state);
  193. } else if (mqtt_ctrl->netc){
  194. network_force_close_socket(mqtt_ctrl->netc);
  195. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_DISCONNECT, mqtt_ctrl->error_state==0?MQTT_ERROR_STATE_SOCKET:mqtt_ctrl->error_state);
  196. }
  197. luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  198. if (mqtt_ctrl->reconnect && mqtt_ctrl->reconnect_time > 0){
  199. luat_start_rtos_timer(mqtt_ctrl->reconnect_timer, mqtt_ctrl->reconnect_time, 0);
  200. }else{
  201. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CLOSE, 0);
  202. }
  203. }
  204. mqtt_ctrl->buffer_offset = 0;
  205. }
  206. void luat_mqtt_release_socket(luat_mqtt_ctrl_t *mqtt_ctrl){
  207. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_RELEASE, 0);
  208. if (mqtt_ctrl->ping_timer){
  209. luat_release_rtos_timer(mqtt_ctrl->ping_timer);
  210. mqtt_ctrl->ping_timer = NULL;
  211. }
  212. if (mqtt_ctrl->reconnect_timer){
  213. luat_release_rtos_timer(mqtt_ctrl->reconnect_timer);
  214. mqtt_ctrl->reconnect_timer = NULL;
  215. }
  216. if (mqtt_ctrl->broker.will_data) {
  217. mqtt_ctrl->broker.will_len = 0;
  218. luat_heap_free(mqtt_ctrl->broker.will_data);
  219. mqtt_ctrl->broker.will_data = NULL;
  220. }
  221. if (mqtt_ctrl->ws_mode){
  222. luat_websocket_release_socket(&mqtt_ctrl->ws_ctrl);
  223. }
  224. if (mqtt_ctrl->netc){
  225. network_release_ctrl(mqtt_ctrl->netc);
  226. mqtt_ctrl->netc = NULL;
  227. }
  228. if (mqtt_ctrl->mqtt_packet_buffer) {
  229. luat_heap_free(mqtt_ctrl->mqtt_packet_buffer);
  230. mqtt_ctrl->mqtt_packet_buffer = NULL;
  231. }
  232. }
  233. static int mqtt_parse(luat_mqtt_ctrl_t *mqtt_ctrl) {
  234. LLOGD("mqtt_parse offset %d", mqtt_ctrl->buffer_offset);
  235. if (mqtt_ctrl->buffer_offset < 2) {
  236. LLOGD("wait more data");
  237. return 0;
  238. }
  239. // 判断数据长度, 前几个字节能判断出够不够读出mqtt的头
  240. char* buf = (char*)mqtt_ctrl->mqtt_packet_buffer;
  241. int num_bytes = 1;
  242. if ((buf[1] & 0x80) == 0x80) {
  243. num_bytes++;
  244. if (mqtt_ctrl->buffer_offset < 3) {
  245. LLOGD("wait more data for mqtt head");
  246. return 0;
  247. }
  248. if ((buf[2] & 0x80) == 0x80) {
  249. num_bytes ++;
  250. if (mqtt_ctrl->buffer_offset < 4) {
  251. LLOGD("wait more data for mqtt head");
  252. return 0;
  253. }
  254. if ((buf[3] & 0x80) == 0x80) {
  255. num_bytes ++;
  256. }
  257. }
  258. }
  259. // 判断数据总长, 这里rem_len只包含mqtt头部之外的数据
  260. uint32_t rem_len = mqtt_parse_rem_len(mqtt_ctrl->mqtt_packet_buffer);
  261. if (rem_len > mqtt_ctrl->buffer_offset - num_bytes - 1) {
  262. LLOGD("wait more data for mqtt head");
  263. return 0;
  264. }
  265. // 至此, mqtt包是完整的 解析类型, 处理之
  266. int ret = luat_mqtt_msg_cb(mqtt_ctrl);
  267. if (ret != 0){
  268. LLOGW("bad mqtt packet!! ret %d", ret);
  269. return -1;
  270. }
  271. // 处理完成后, 如果还有数据, 移动数据, 继续处理
  272. mqtt_ctrl->buffer_offset -= (1 + num_bytes + rem_len);
  273. memmove(mqtt_ctrl->mqtt_packet_buffer, mqtt_ctrl->mqtt_packet_buffer+1 + num_bytes + rem_len, mqtt_ctrl->buffer_offset);
  274. return 1;
  275. }
  276. int luat_mqtt_read_packet(luat_mqtt_ctrl_t *mqtt_ctrl){
  277. LLOGD("luat_mqtt_read_packet mqtt_ctrl->buffer_offset:%d",mqtt_ctrl->buffer_offset);
  278. uint32_t total_len = 0;
  279. uint32_t rx_len = 0;
  280. int result = network_rx(mqtt_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
  281. if (total_len > mqtt_ctrl->rxbuff_size - mqtt_ctrl->buffer_offset) {
  282. LLOGE("too many data wait for recv %d", total_len);
  283. luat_mqtt_close_socket(mqtt_ctrl);
  284. return -1;
  285. }
  286. if (total_len == 0 && !mqtt_ctrl->netc->tls_mode) {
  287. LLOGW("rx event but NO data wait for recv");
  288. return 0;
  289. }
  290. if (mqtt_ctrl->rxbuff_size - mqtt_ctrl->buffer_offset <= 0) {
  291. LLOGE("buff is FULL, mqtt packet too big");
  292. luat_mqtt_close_socket(mqtt_ctrl);
  293. return -1;
  294. }
  295. #define MAX_READ (1024)
  296. int recv_want = 0;
  297. while (mqtt_ctrl->rxbuff_size - mqtt_ctrl->buffer_offset > 0) {
  298. if (MAX_READ > (mqtt_ctrl->rxbuff_size - mqtt_ctrl->buffer_offset)) {
  299. recv_want = mqtt_ctrl->rxbuff_size - mqtt_ctrl->buffer_offset;
  300. }
  301. else {
  302. recv_want = MAX_READ;
  303. }
  304. // 从网络接收数据
  305. result = network_rx(mqtt_ctrl->netc, mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
  306. if (rx_len == 0 || result != 0 ) {
  307. LLOGD("rx_len %d result %d", rx_len, result);
  308. break;
  309. }
  310. // 收到数据了, 传给处理函数继续处理
  311. // 数据的长度变更, 触发传递
  312. mqtt_ctrl->buffer_offset += rx_len;
  313. LLOGD("data recv %d offset %d", rx_len, mqtt_ctrl->buffer_offset);
  314. further:
  315. result = mqtt_parse(mqtt_ctrl);
  316. if (result == 0) {
  317. // OK
  318. }else if(result == 1){
  319. if (mqtt_ctrl->buffer_offset > 0)
  320. goto further;
  321. else {
  322. continue;
  323. }
  324. }
  325. else {
  326. LLOGW("mqtt_parse ret %d, closing socket",result);
  327. luat_mqtt_close_socket(mqtt_ctrl);
  328. return -1;
  329. }
  330. }
  331. return 0;
  332. }
  333. static int luat_mqtt_msg_cb(luat_mqtt_ctrl_t *mqtt_ctrl) {
  334. uint8_t msg_tp = MQTTParseMessageType(mqtt_ctrl->mqtt_packet_buffer);
  335. uint16_t msg_id = 0;
  336. uint8_t qos = 0;
  337. switch (msg_tp) {
  338. case MQTT_MSG_CONNACK: {
  339. LLOGD("MQTT_MSG_CONNACK");
  340. if(mqtt_ctrl->mqtt_packet_buffer[3] != 0x00){
  341. LLOGW("CONACK 0x%02x",mqtt_ctrl->mqtt_packet_buffer[3]);
  342. mqtt_ctrl->error_state = mqtt_ctrl->mqtt_packet_buffer[3];
  343. luat_mqtt_close_socket(mqtt_ctrl);
  344. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CONACK_ERROR, mqtt_ctrl->mqtt_packet_buffer[3]);
  345. return -1;
  346. }
  347. mqtt_ctrl->mqtt_state = MQTT_STATE_READY;
  348. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CONNACK, 0);
  349. break;
  350. }
  351. case MQTT_MSG_PUBLISH : {
  352. LLOGD("MQTT_MSG_PUBLISH");
  353. qos = MQTTParseMessageQos(mqtt_ctrl->mqtt_packet_buffer);
  354. #ifdef __LUATOS__
  355. if (mqtt_ctrl->app_cb)
  356. {
  357. luat_mqtt_app_cb_t mqtt_cb = mqtt_ctrl->app_cb;
  358. if (mqtt_cb(mqtt_ctrl, MQTT_MSG_PUBLISH))
  359. {
  360. goto MQTT_MSG_PUBLISH_DONE;
  361. }
  362. }
  363. const uint8_t* ptr;
  364. uint16_t topic_len = mqtt_parse_pub_topic_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  365. uint32_t payload_len = mqtt_parse_pub_msg_ptr(mqtt_ctrl->mqtt_packet_buffer, &ptr);
  366. luat_mqtt_msg_t *mqtt_msg = (luat_mqtt_msg_t *)luat_heap_malloc(sizeof(luat_mqtt_msg_t)+topic_len+payload_len);
  367. mqtt_msg->topic_len = mqtt_parse_pub_topic(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data);
  368. mqtt_msg->payload_len = mqtt_parse_publish_msg(mqtt_ctrl->mqtt_packet_buffer, mqtt_msg->data+topic_len);
  369. mqtt_msg->message_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  370. mqtt_msg->flags = mqtt_ctrl->mqtt_packet_buffer[0];
  371. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, (int)mqtt_msg);
  372. #else
  373. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBLISH, 0);
  374. #endif
  375. MQTT_MSG_PUBLISH_DONE:
  376. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  377. LLOGD("msg %d qos %d", msg_id, qos);
  378. // 还要回复puback
  379. if (qos == 1) {
  380. LLOGD("reply puback %d", msg_id);
  381. mqtt_puback(&(mqtt_ctrl->broker), msg_id);
  382. }
  383. else if (qos == 2) {
  384. LLOGD("reply pubrec %d", msg_id);
  385. mqtt_pubrec(&(mqtt_ctrl->broker), msg_id);
  386. }
  387. break;
  388. }
  389. case MQTT_MSG_PUBACK : {
  390. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  391. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBACK, msg_id);
  392. break;
  393. }
  394. case MQTT_MSG_PUBREC : {
  395. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  396. mqtt_pubrel(&(mqtt_ctrl->broker), msg_id);
  397. LLOGD("MQTT_MSG_PUBREC %d", msg_id);
  398. break;
  399. }
  400. case MQTT_MSG_PUBCOMP : {
  401. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  402. LLOGD("MQTT_MSG_PUBCOMP %d", msg_id);
  403. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PUBCOMP, msg_id);
  404. break;
  405. }
  406. case MQTT_MSG_PUBREL : {
  407. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  408. LLOGD("MQTT_MSG_PUBREL %d", msg_id);
  409. mqtt_pubcomp(&(mqtt_ctrl->broker), msg_id);
  410. break;
  411. }
  412. case MQTT_MSG_SUBACK : {
  413. LLOGD("MQTT_MSG_SUBACK");
  414. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  415. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_SUBACK, mqtt_ctrl->mqtt_packet_buffer[4]);
  416. break;
  417. }
  418. case MQTT_MSG_UNSUBACK : {
  419. LLOGD("MQTT_MSG_UNSUBACK");
  420. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  421. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_UNSUBACK, msg_id);
  422. break;
  423. }
  424. case MQTT_MSG_PINGRESP : {
  425. LLOGD("MQTT_MSG_PINGRESP");
  426. msg_id = mqtt_parse_msg_id(mqtt_ctrl->mqtt_packet_buffer);
  427. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_PINGRESP, msg_id);
  428. break;
  429. }
  430. case MQTT_MSG_DISCONNECT : {
  431. LLOGD("MQTT_MSG_DISCONNECT");
  432. mqtt_ctrl->error_state = MQTT_ERROR_STATE_DISCONNECT;
  433. break;
  434. }
  435. default : {
  436. LLOGD("luat_mqtt_msg_cb error msg_tp:%d",msg_tp);
  437. break;
  438. }
  439. }
  440. return 0;
  441. }
  442. int32_t luat_mqtt_callback(void *data, void *param) {
  443. OS_EVENT *event = (OS_EVENT *)data;
  444. luat_mqtt_ctrl_t *mqtt_ctrl =(luat_mqtt_ctrl_t *)param;
  445. int ret = 0;
  446. // LLOGD("LINK %08X ON_LINE %08X EVENT %08X TX_OK %08X CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
  447. LLOGD("network mqtt cb %8X %08X",event->ID & 0x0ffffffff, event->Param1);
  448. if (event->Param1){
  449. LLOGE("mqtt_callback param1 %d, event %d closing socket", event->Param1, event->ID - EV_NW_RESULT_BASE);
  450. switch(event->ID)
  451. {
  452. case EV_NW_RESULT_CONNECT:
  453. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_CON_ERROR, 0);
  454. break;
  455. case EV_NW_RESULT_TX:
  456. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TX_ERROR, 0);
  457. break;
  458. default:
  459. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_NET_ERROR, 0);
  460. break;
  461. }
  462. luat_mqtt_close_socket(mqtt_ctrl);
  463. return -1;
  464. }
  465. if (event->ID == EV_NW_RESULT_LINK){
  466. return 0; // 这里应该直接返回, 不能往下调用network_wait_event
  467. }else if(event->ID == EV_NW_RESULT_CONNECT){
  468. mqtt_ctrl->mqtt_state = MQTT_STATE_MQTT;
  469. ret = mqtt_connect(&(mqtt_ctrl->broker));
  470. if(ret==1){
  471. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*3/4, 1);
  472. }
  473. }else if(event->ID == EV_NW_RESULT_EVENT){
  474. if (event->Param1==0){
  475. ret = luat_mqtt_read_packet(mqtt_ctrl);
  476. if (ret){
  477. return -1;
  478. }
  479. // LLOGD("luat_mqtt_read_packet ret:%d",ret);
  480. // luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  481. // luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  482. }
  483. }else if(event->ID == EV_NW_RESULT_TX){
  484. #ifdef __LUATOS__
  485. #else
  486. if (MQTT_STATE_READY == mqtt_ctrl->mqtt_state) {
  487. l_luat_mqtt_msg_cb(mqtt_ctrl, MQTT_MSG_TCP_TX_DONE, 0);
  488. }
  489. #endif
  490. // luat_stop_rtos_timer(mqtt_ctrl->ping_timer);
  491. // luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*0.75, 1);
  492. }else if(event->ID == EV_NW_RESULT_CLOSE){
  493. }
  494. ret = network_wait_event(mqtt_ctrl->netc, NULL, 0, NULL);
  495. if (ret < 0){
  496. LLOGW("network_wait_event ret %d, closing socket", ret);
  497. luat_mqtt_close_socket(mqtt_ctrl);
  498. return -1;
  499. }
  500. return 0;
  501. }
  502. /* WebSocket 回调:握手成功后发起 MQTT CONNECT;收到数据则喂入 mqtt_parse */
  503. static void mqtt_ws_on_event(luat_websocket_ctrl_t *ws_ctrl, int arg1, int arg2) {
  504. luat_mqtt_ctrl_t *mqtt_ctrl = (luat_mqtt_ctrl_t *)((char*)ws_ctrl - offsetof(luat_mqtt_ctrl_t, ws_ctrl));
  505. if (arg1 == WEBSOCKET_MSG_CONNACK) {
  506. mqtt_ctrl->mqtt_state = MQTT_STATE_MQTT;
  507. int ret = mqtt_connect(&(mqtt_ctrl->broker));
  508. if (ret == 1) {
  509. luat_start_rtos_timer(mqtt_ctrl->ping_timer, mqtt_ctrl->keepalive*1000*3/4, 1);
  510. }
  511. } else if (arg1 == WEBSOCKET_MSG_PUBLISH) {
  512. /* arg2 指向的是复制的帧头+payload;解析出 MQTT 负载数据区域并追加到 mqtt_packet_buffer */
  513. uint8_t *frame = (uint8_t *)arg2;
  514. if (!frame) return;
  515. uint16_t plen = 0;
  516. if ((frame[1] & 0x7F) == 126) {
  517. plen = (frame[2] << 8) | frame[3];
  518. frame += 4;
  519. } else {
  520. plen = (frame[1] & 0x7F);
  521. frame += 2;
  522. }
  523. if (plen == 0) return;
  524. if (mqtt_ctrl->rxbuff_size - mqtt_ctrl->buffer_offset < plen) {
  525. LLOGW("mqtt rx buffer not enough for ws payload %d", plen);
  526. return;
  527. }
  528. memcpy(mqtt_ctrl->mqtt_packet_buffer + mqtt_ctrl->buffer_offset, frame, plen);
  529. mqtt_ctrl->buffer_offset += plen;
  530. /* 循环解析 */
  531. while (mqtt_parse(mqtt_ctrl) == 1) {
  532. if (mqtt_ctrl->buffer_offset == 0) break;
  533. }
  534. } else if (arg1 == WEBSOCKET_MSG_DISCONNECT || arg1 >= WEBSOCKET_MSG_ERROR_CONN) {
  535. mqtt_ctrl->error_state = MQTT_ERROR_STATE_SOCKET;
  536. luat_mqtt_close_socket(mqtt_ctrl);
  537. } else if (arg1 == WEBSOCKET_MSG_SENT) {
  538. /* no-op */
  539. } else if (arg1 == WEBSOCKET_MSG_TIMER_PING) {
  540. /* WS 层心跳已在 websocket 管理,这里无需处理 */
  541. }
  542. }
  543. /* 通过 WebSocket 发送 MQTT 报文 */
  544. static int luat_mqtt_ws_send_packet(void* socket_info, const void* buf, unsigned int count) {
  545. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
  546. luat_websocket_pkg_t pkg = {0};
  547. pkg.FIN = 1;
  548. pkg.OPT_CODE = WebSocket_OP_BINARY;
  549. pkg.plen = (uint16_t)count;
  550. pkg.payload = (const char*)buf;
  551. int ret = luat_websocket_send_frame(&mqtt_ctrl->ws_ctrl, &pkg);
  552. if (ret < 0) return 0;
  553. return count;
  554. }
  555. int luat_mqtt_send_packet(void* socket_info, const void* buf, unsigned int count){
  556. luat_mqtt_ctrl_t * mqtt_ctrl = (luat_mqtt_ctrl_t *)socket_info;
  557. if (mqtt_ctrl->ws_mode) {
  558. return luat_mqtt_ws_send_packet(socket_info, buf, count);
  559. }
  560. uint32_t tx_len = 0;
  561. int ret = network_tx(mqtt_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
  562. if (ret < 0) {
  563. LLOGW("network_tx ret %d, closing socket", ret);
  564. luat_mqtt_close_socket(mqtt_ctrl);
  565. return 0;
  566. }
  567. if (tx_len != count) {
  568. LLOGW("network_tx expect %d but %d", count, tx_len);
  569. luat_mqtt_close_socket(mqtt_ctrl);
  570. return 0;
  571. }
  572. return count;
  573. }
  574. int luat_mqtt_connect(luat_mqtt_ctrl_t *mqtt_ctrl) {
  575. int ret = 0;
  576. mqtt_ctrl->error_state=0;
  577. if (!mqtt_ctrl->mqtt_packet_buffer) {
  578. mqtt_ctrl->mqtt_packet_buffer = luat_heap_malloc(mqtt_ctrl->rxbuff_size+4);
  579. }
  580. if (mqtt_ctrl->mqtt_packet_buffer == NULL){
  581. return -1;
  582. }
  583. memset(mqtt_ctrl->mqtt_packet_buffer, 0, mqtt_ctrl->rxbuff_size+4);
  584. uint16_t keepalive = mqtt_ctrl->keepalive;
  585. mqtt_set_alive(&(mqtt_ctrl->broker), keepalive);
  586. if (mqtt_ctrl->ws_mode) {
  587. /* 通过 WebSocket 发起连接,完成握手后再 mqtt_connect */
  588. int r = luat_websocket_connect(&mqtt_ctrl->ws_ctrl);
  589. if (r < 0) {
  590. return -1;
  591. }
  592. mqtt_ctrl->mqtt_state = MQTT_STATE_SCONNECT;
  593. return 0;
  594. } else {
  595. const char *hostname = mqtt_ctrl->host;
  596. uint16_t port = mqtt_ctrl->remote_port;
  597. LLOGD("host %s port %d keepalive %d", hostname, port, keepalive);
  598. ret = network_connect(mqtt_ctrl->netc, hostname, strlen(hostname), NULL, port, 0) < 0;
  599. LLOGD("network_connect ret %d", ret);
  600. if (ret < 0) {
  601. network_close(mqtt_ctrl->netc, 0);
  602. return -1;
  603. }
  604. mqtt_ctrl->mqtt_state = MQTT_STATE_SCONNECT;
  605. return 0;
  606. }
  607. }
  608. int luat_mqtt_set_will(luat_mqtt_ctrl_t *mqtt_ctrl, const char* topic,
  609. const char* payload, size_t payload_len,
  610. uint8_t qos, size_t retain) {
  611. if (mqtt_ctrl == NULL || mqtt_ctrl->netc == NULL)
  612. return -1;
  613. return mqtt_set_will(&mqtt_ctrl->broker, topic, payload, payload_len, qos, retain);
  614. }
  615. int luat_mqtt_set_cb(luat_mqtt_ctrl_t *mqtt_ctrl, luat_mqtt_cb_t mqtt_cb){
  616. if (mqtt_ctrl == NULL || mqtt_ctrl->netc == NULL)
  617. return -1;
  618. mqtt_ctrl->mqtt_cb = mqtt_cb;
  619. return 0;
  620. }
  621. LUAT_MQTT_STATE_E luat_mqtt_state_get(luat_mqtt_ctrl_t *mqtt_ctrl){
  622. return mqtt_ctrl->mqtt_state;
  623. }