luat_websocket.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. #include "luat_base.h"
  2. #include "luat_network_adapter.h"
  3. #include "luat_rtos.h"
  4. #include "luat_zbuff.h"
  5. #include "luat_malloc.h"
  6. #include "luat_websocket.h"
  7. // #include "http_parser.h"
  8. #define LUAT_LOG_TAG "websocket"
  9. #include "luat_log.h"
  10. #define WEBSOCKET_DEBUG 0
  11. #if WEBSOCKET_DEBUG == 0
  12. #undef LLOGD
  13. #define LLOGD(...)
  14. #endif
  15. #if WEBSOCKET_DEBUG
  16. static void print_pkg(const char *tag, char *buff, luat_websocket_pkg_t *pkg)
  17. {
  18. if (pkg == NULL)
  19. {
  20. LLOGD("pkg is NULL");
  21. return;
  22. }
  23. LLOGD("%s pkg %02X%02X", tag, buff[0], buff[1]);
  24. LLOGD("%s pkg FIN %d R %d OPT %d MARK %d PLEN %d", tag, pkg->FIN, pkg->R, pkg->OPT_CODE, pkg->mark, pkg->plen);
  25. }
  26. #else
  27. #define print_pkg(...)
  28. #endif
  29. int luat_websocket_payload(char *buf, luat_websocket_pkg_t *pkg, size_t limit)
  30. {
  31. uint32_t pkg_len = 0;
  32. // 先处理FIN
  33. if (buf[0] && (1 << 7))
  34. {
  35. pkg->FIN = 1;
  36. }
  37. // 处理操作码
  38. pkg->OPT_CODE = buf[0] & 0xF;
  39. // 然后处理plen
  40. pkg->plen = buf[1] & 0x7F;
  41. print_pkg("downlink", buf, pkg);
  42. // websocket的payload长度支持3种情况:
  43. // 0字节(小于126,放在头部)
  44. // 2个字节 126 ~ 0xFFFF
  45. // 6个字节 0x1000 ~ 0xFFFFFFFF, 不打算支持.
  46. if (pkg->plen == 126)
  47. {
  48. if (limit < 4)
  49. {
  50. // 还缺1个字节,等吧
  51. LLOGD("wait more data offset %d", limit);
  52. return 0;
  53. }
  54. pkg->plen = (buf[2] & 0xFF) << 8;
  55. pkg->plen += (buf[3] & 0xFF);
  56. pkg_len = 4 + pkg->plen;
  57. pkg->payload = buf + 4;
  58. }
  59. else if (pkg->plen == 127)
  60. {
  61. // 后续还要8个字节,但这个包也太大了吧!!!
  62. LLOGE("websocket payload is too large!!!");
  63. return -1;
  64. }
  65. else
  66. {
  67. pkg_len = 2 + pkg->plen;
  68. pkg->payload = buf + 2;
  69. }
  70. LLOGD("payload %04X pkg %04X", pkg->plen, pkg_len);
  71. if (limit < pkg_len)
  72. {
  73. LLOGD("wait more data offset %d", limit);
  74. return 0;
  75. }
  76. return 1;
  77. }
  78. int luat_websocket_send_packet(void *socket_info, const void *buf, unsigned int count)
  79. {
  80. luat_websocket_ctrl_t *websocket_ctrl = (luat_websocket_ctrl_t *)socket_info;
  81. uint32_t tx_len = 0;
  82. int ret = network_tx(websocket_ctrl->netc, buf, count, 0, NULL, 0, &tx_len, 0);
  83. if (ret < 0)
  84. {
  85. LLOGI("network_tx %d , close socket", ret);
  86. luat_websocket_close_socket(websocket_ctrl);
  87. return 0;
  88. }
  89. return count;
  90. }
  91. void luat_websocket_ping(luat_websocket_ctrl_t *websocket_ctrl)
  92. {
  93. if (websocket_ctrl->websocket_state != 0)
  94. return;
  95. luat_websocket_pkg_t pkg = {
  96. .FIN = 1,
  97. .OPT_CODE = WebSocket_OP_PING,
  98. .plen = 0};
  99. luat_websocket_send_frame(websocket_ctrl, &pkg);
  100. }
  101. void luat_websocket_pong(luat_websocket_ctrl_t *websocket_ctrl)
  102. {
  103. luat_websocket_pkg_t pkg = {
  104. .FIN = 1,
  105. .OPT_CODE = WebSocket_OP_PONG,
  106. .plen = 0};
  107. luat_websocket_send_frame(websocket_ctrl, &pkg);
  108. }
  109. void luat_websocket_reconnect(luat_websocket_ctrl_t *websocket_ctrl) {
  110. int ret = luat_websocket_connect(websocket_ctrl);
  111. if (ret)
  112. {
  113. LLOGI("reconnect init socket ret=%d\n", ret);
  114. luat_websocket_close_socket(websocket_ctrl);
  115. }
  116. }
  117. LUAT_RT_RET_TYPE luat_websocket_timer_callback(LUAT_RT_CB_PARAM)
  118. {
  119. luat_websocket_ctrl_t *websocket_ctrl = (luat_websocket_ctrl_t *)param;
  120. l_luat_websocket_msg_cb(websocket_ctrl, WEBSOCKET_MSG_TIMER_PING, 0);
  121. }
  122. static void reconnect_timer_cb(LUAT_RT_CB_PARAM)
  123. {
  124. luat_websocket_ctrl_t *websocket_ctrl = (luat_websocket_ctrl_t *)param;
  125. l_luat_websocket_msg_cb(websocket_ctrl, WEBSOCKET_MSG_RECONNECT, 0);
  126. }
  127. int luat_websocket_init(luat_websocket_ctrl_t *websocket_ctrl, int adapter_index)
  128. {
  129. memset(websocket_ctrl, 0, sizeof(luat_websocket_ctrl_t));
  130. websocket_ctrl->adapter_index = adapter_index;
  131. websocket_ctrl->netc = network_alloc_ctrl(adapter_index);
  132. if (!websocket_ctrl->netc)
  133. {
  134. LLOGW("network_alloc_ctrl fail");
  135. return -1;
  136. }
  137. network_init_ctrl(websocket_ctrl->netc, NULL, luat_websocket_callback, websocket_ctrl);
  138. websocket_ctrl->websocket_state = 0;
  139. websocket_ctrl->netc->is_debug = 0;
  140. websocket_ctrl->keepalive = 60;
  141. network_set_base_mode(websocket_ctrl->netc, 1, 10000, 0, 0, 0, 0);
  142. network_set_local_port(websocket_ctrl->netc, 0);
  143. websocket_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, websocket_ctrl, NULL);
  144. websocket_ctrl->ping_timer = luat_create_rtos_timer(luat_websocket_timer_callback, websocket_ctrl, NULL);
  145. return 0;
  146. }
  147. int luat_websocket_set_connopts(luat_websocket_ctrl_t *websocket_ctrl, const char *url)
  148. {
  149. int is_tls = 0;
  150. const char *tmp = url;
  151. LLOGD("url %s", url);
  152. // TODO 支持基本授权的URL ws://wendal:123@wendal.cn:8080/abc
  153. websocket_ctrl->host[0] = 0;
  154. char port_tmp[6] = {0};
  155. uint16_t port = 0;
  156. if (!memcmp(tmp, "wss://", strlen("wss://")))
  157. {
  158. // LLOGD("using WSS");
  159. is_tls = 1;
  160. tmp += strlen("wss://");
  161. }
  162. else if (!memcmp(tmp, "ws://", strlen("ws://")))
  163. {
  164. // LLOGD("using ws");
  165. is_tls = 0;
  166. tmp += strlen("ws://");
  167. }
  168. // LLOGD("tmp %s", tmp);
  169. size_t uri_start_index = 0;
  170. for (size_t i = 0; i < strlen(tmp); i++)
  171. {
  172. if (tmp[i] == '/')
  173. {
  174. uri_start_index = i;
  175. break;
  176. }
  177. }
  178. if (uri_start_index < 2) {
  179. uri_start_index = strlen(tmp);
  180. }
  181. for (size_t j = 0; j < uri_start_index; j++)
  182. {
  183. if (tmp[j] == ':')
  184. {
  185. memcpy(websocket_ctrl->host, tmp, j);
  186. websocket_ctrl->host[j] = 0;
  187. memcpy(port_tmp, tmp + j + 1, uri_start_index - j - 1);
  188. port = atoi(port_tmp);
  189. //LLOGD("port str %s %d", port_tmp, port);
  190. // LLOGD("found custom host %s port %d", websocket_ctrl->host, port);
  191. break;
  192. }
  193. }
  194. // 没有自定义host
  195. if (websocket_ctrl->host[0] == 0)
  196. {
  197. memcpy(websocket_ctrl->host, tmp, uri_start_index);
  198. websocket_ctrl->host[uri_start_index] = 0;
  199. // LLOGD("found custom host %s", websocket_ctrl->host);
  200. }
  201. memcpy(websocket_ctrl->uri, tmp + uri_start_index, strlen(tmp) - uri_start_index);
  202. websocket_ctrl->uri[strlen(tmp) - uri_start_index] = 0;
  203. if (port == 0) {
  204. port = is_tls ? 443 : 80;
  205. }
  206. if (websocket_ctrl->uri[0] == 0)
  207. {
  208. websocket_ctrl->uri[0] = '/';
  209. websocket_ctrl->uri[1] = 0x00;
  210. }
  211. // LLOGD("host %s port %d uri %s", host, port, uri);
  212. // memcpy(websocket_ctrl->host, host, strlen(host) + 1);
  213. websocket_ctrl->remote_port = port;
  214. // memcpy(websocket_ctrl->uri, uri, strlen(uri) + 1);
  215. LLOGD("host %s port %d uri %s", websocket_ctrl->host, port, websocket_ctrl->uri);
  216. if (is_tls)
  217. {
  218. if (network_init_tls(websocket_ctrl->netc, 0)){
  219. return -1;
  220. }
  221. }
  222. else
  223. {
  224. network_deinit_tls(websocket_ctrl->netc);
  225. }
  226. return 0;
  227. }
  228. static void websocket_reconnect(luat_websocket_ctrl_t *websocket_ctrl)
  229. {
  230. LLOGI("reconnect after %dms", websocket_ctrl->reconnect_time);
  231. websocket_ctrl->buffer_offset = 0;
  232. //websocket_ctrl->reconnect_timer = luat_create_rtos_timer(reconnect_timer_cb, websocket_ctrl, NULL);
  233. luat_stop_rtos_timer(websocket_ctrl->reconnect_timer);
  234. luat_start_rtos_timer(websocket_ctrl->reconnect_timer, websocket_ctrl->reconnect_time, 0);
  235. }
  236. void luat_websocket_close_socket(luat_websocket_ctrl_t *websocket_ctrl)
  237. {
  238. LLOGI("websocket closing socket");
  239. if (websocket_ctrl->netc)
  240. {
  241. network_force_close_socket(websocket_ctrl->netc);
  242. }
  243. l_luat_websocket_msg_cb(websocket_ctrl, WEBSOCKET_MSG_DISCONNECT, 0);
  244. luat_stop_rtos_timer(websocket_ctrl->ping_timer);
  245. websocket_ctrl->websocket_state = 0;
  246. if (websocket_ctrl->reconnect) {
  247. websocket_reconnect(websocket_ctrl);
  248. }
  249. }
  250. void luat_websocket_release_socket(luat_websocket_ctrl_t *websocket_ctrl)
  251. {
  252. l_luat_websocket_msg_cb(websocket_ctrl, WEBSOCKET_MSG_RELEASE, 0);
  253. if (websocket_ctrl->ping_timer) {
  254. luat_release_rtos_timer(websocket_ctrl->ping_timer);
  255. websocket_ctrl->ping_timer = NULL;
  256. }
  257. if (websocket_ctrl->reconnect_timer) {
  258. luat_release_rtos_timer(websocket_ctrl->reconnect_timer);
  259. websocket_ctrl->reconnect_timer = NULL;
  260. }
  261. if (websocket_ctrl->headers) {
  262. luat_heap_free(websocket_ctrl->headers);
  263. websocket_ctrl->headers = NULL;
  264. }
  265. if (websocket_ctrl->netc)
  266. {
  267. network_release_ctrl(websocket_ctrl->netc);
  268. websocket_ctrl->netc = NULL;
  269. }
  270. }
  271. static const char* ws_headers =
  272. "Upgrade: websocket\r\n"
  273. "Connection: Upgrade\r\n"
  274. "Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==\r\n"
  275. "Sec-WebSocket-Version: 13\r\n"
  276. "\r\n";
  277. static int websocket_connect(luat_websocket_ctrl_t *websocket_ctrl)
  278. {
  279. LLOGD("request host %s port %d uri %s", websocket_ctrl->host, websocket_ctrl->remote_port, websocket_ctrl->uri);
  280. // 借用pkg_buff
  281. int ret = snprintf_((char*)websocket_ctrl->pkg_buff,
  282. WEBSOCKET_RECV_BUF_LEN_MAX,
  283. "GET %s HTTP/1.1\r\n"
  284. "Host: %s\r\n",
  285. websocket_ctrl->uri, websocket_ctrl->host);
  286. //LLOGD("Request %s", websocket_ctrl->pkg_buff);
  287. ret = luat_websocket_send_packet(websocket_ctrl, websocket_ctrl->pkg_buff, ret);
  288. if (websocket_ctrl->headers) {
  289. luat_websocket_send_packet(websocket_ctrl, websocket_ctrl->headers, strlen(websocket_ctrl->headers));
  290. }
  291. luat_websocket_send_packet(websocket_ctrl, ws_headers, strlen(ws_headers));
  292. LLOGD("websocket_connect ret %d", ret);
  293. return ret;
  294. }
  295. int luat_websocket_send_frame(luat_websocket_ctrl_t *websocket_ctrl, luat_websocket_pkg_t *pkg)
  296. {
  297. char *dst = luat_heap_malloc(pkg->plen + 6);
  298. memset(dst, 0, pkg->plen + 6);
  299. size_t offset = 0;
  300. size_t ret = 0;
  301. // first byte, FIN and OPTCODE
  302. dst[0] = pkg->FIN << 7;
  303. dst[0] |= pkg->OPT_CODE & 0xF;
  304. if (pkg->plen < 126)
  305. {
  306. dst[1] = pkg->plen;
  307. offset = 2;
  308. }
  309. else if (pkg->plen < 0xFFFF)
  310. {
  311. dst[1] = 126;
  312. dst[2] = pkg->plen >> 8;
  313. dst[4] = pkg->plen & 0xFF;
  314. offset = 4;
  315. }
  316. dst[1] |= 1 << 7;
  317. print_pkg("uplink", dst, pkg);
  318. // 添加mark, TODO 改成随机?
  319. char mark[] = {0, 1, 2, 3};
  320. memcpy(dst + offset, mark, 4);
  321. offset += 4;
  322. if (pkg->plen > 0)
  323. {
  324. for (size_t i = 0; i < pkg->plen; i++)
  325. {
  326. dst[offset + i] = pkg->payload[i] ^ (mark[i % 4]);
  327. }
  328. }
  329. ret = luat_websocket_send_packet(websocket_ctrl, dst, offset + pkg->plen);
  330. luat_heap_free(dst);
  331. return ret;
  332. }
  333. static int websocket_parse(luat_websocket_ctrl_t *websocket_ctrl)
  334. {
  335. int ret = 0;
  336. char *buf = (char*)websocket_ctrl->pkg_buff;
  337. LLOGD("websocket_parse offset %d %d", websocket_ctrl->buffer_offset, websocket_ctrl->websocket_state);
  338. if (websocket_ctrl->websocket_state == 0)
  339. {
  340. if (websocket_ctrl->buffer_offset < strlen("HTTP/1.1 101"))
  341. { // 最起码得等5个字符
  342. LLOGD("wait more data offset %d", websocket_ctrl->buffer_offset);
  343. return 0;
  344. }
  345. // 前3个字符肯定是101, 否则必然是不合法的
  346. if (memcmp("HTTP/1.1 101", buf, strlen("HTTP/1.1 101")))
  347. {
  348. buf[websocket_ctrl->buffer_offset] = 0;
  349. LLOGD("server not support websocket? resp code %s", buf);
  350. return -1;
  351. }
  352. // 然后找\r\n\r\n
  353. for (size_t i = 4; i < websocket_ctrl->buffer_offset; i++)
  354. {
  355. if (!memcmp("\r\n\r\n", buf + i, 4))
  356. {
  357. // LLOGD("Found \\r\\n\\r\\n");
  358. // 找到了!! 但貌似完全不用处理呢
  359. websocket_ctrl->buffer_offset = 0;
  360. LLOGD("ready!!");
  361. websocket_ctrl->websocket_state = 1;
  362. luat_stop_rtos_timer(websocket_ctrl->ping_timer);
  363. luat_start_rtos_timer(websocket_ctrl->ping_timer, 30000, 1);
  364. l_luat_websocket_msg_cb(websocket_ctrl, WEBSOCKET_MSG_CONNACK, 0);
  365. return 1;
  366. }
  367. }
  368. // LLOGD("Not Found \\r\\n\\r\\n %s", buf);
  369. return 0;
  370. }
  371. if (websocket_ctrl->buffer_offset < 2)
  372. {
  373. LLOGD("wait more data offset %d", websocket_ctrl->buffer_offset);
  374. return 0;
  375. }
  376. // 判断数据长度, 前几个字节能判断出够不够读出websocket的头
  377. luat_websocket_pkg_t pkg = {0};
  378. ret = luat_websocket_payload(buf, &pkg, websocket_ctrl->buffer_offset);
  379. if (ret == 0) {
  380. LLOGD("wait more data offset %d", websocket_ctrl->buffer_offset);
  381. return 0;
  382. }
  383. if (ret < 0) {
  384. LLOGI("payload too large!!!");
  385. return -1;
  386. }
  387. switch (pkg.OPT_CODE)
  388. {
  389. case 0x01: // 文本帧
  390. break;
  391. case 0x02: // 二进制帧
  392. break;
  393. case 0x08:
  394. // 主动断开? 我擦
  395. LLOGD("server say CLOSE");
  396. return -1;
  397. case 0x09:
  398. // ping->pong
  399. luat_websocket_pong(websocket_ctrl);
  400. break;
  401. case 0x0A:
  402. break;
  403. default:
  404. LLOGE("unkown optcode %0x2X", pkg.OPT_CODE);
  405. return -1;
  406. }
  407. size_t pkg_len = pkg.plen >= 126 ? pkg.plen + 4 : pkg.plen + 2;
  408. if (pkg.OPT_CODE <= 0x02)
  409. {
  410. char *buff = luat_heap_malloc(pkg_len);
  411. if (buff == NULL)
  412. {
  413. LLOGE("out of memory when malloc websocket buff");
  414. return -1;
  415. }
  416. memcpy(buff, buf, pkg_len);
  417. l_luat_websocket_msg_cb(websocket_ctrl, WEBSOCKET_MSG_PUBLISH, (int)buff);
  418. }
  419. // 处理完成后, 如果还有数据, 移动数据, 继续处理
  420. websocket_ctrl->buffer_offset -= pkg_len;
  421. if (websocket_ctrl->buffer_offset > 0)
  422. {
  423. memmove(websocket_ctrl->pkg_buff, websocket_ctrl->pkg_buff + pkg_len, websocket_ctrl->buffer_offset);
  424. return 1;
  425. }
  426. return 0;
  427. }
  428. int luat_websocket_read_packet(luat_websocket_ctrl_t *websocket_ctrl)
  429. {
  430. // LLOGD("luat_websocket_read_packet websocket_ctrl->buffer_offset:%d",websocket_ctrl->buffer_offset);
  431. // int ret = -1;
  432. // uint8_t *read_buff = NULL;
  433. uint32_t total_len = 0;
  434. uint32_t rx_len = 0;
  435. int result = network_rx(websocket_ctrl->netc, NULL, 0, 0, NULL, NULL, &total_len);
  436. // if (total_len > 0xFFFF)
  437. // {
  438. // LLOGE("too many data wait for recv %d", total_len);
  439. // //luat_websocket_close_socket(websocket_ctrl);
  440. // return -1;
  441. // }
  442. if (total_len == 0)
  443. {
  444. LLOGW("rx event but NO data wait for recv");
  445. return 0;
  446. }
  447. if (WEBSOCKET_RECV_BUF_LEN_MAX - websocket_ctrl->buffer_offset <= 0)
  448. {
  449. LLOGE("buff is FULL, websocket packet too big");
  450. //luat_websocket_close_socket(websocket_ctrl);
  451. return -1;
  452. }
  453. #define MAX_READ (1024)
  454. int recv_want = 0;
  455. while (WEBSOCKET_RECV_BUF_LEN_MAX - websocket_ctrl->buffer_offset > 0)
  456. {
  457. if (MAX_READ > (WEBSOCKET_RECV_BUF_LEN_MAX - websocket_ctrl->buffer_offset))
  458. {
  459. recv_want = WEBSOCKET_RECV_BUF_LEN_MAX - websocket_ctrl->buffer_offset;
  460. }
  461. else
  462. {
  463. recv_want = MAX_READ;
  464. }
  465. // 从网络接收数据
  466. result = network_rx(websocket_ctrl->netc, websocket_ctrl->pkg_buff + websocket_ctrl->buffer_offset, recv_want, 0, NULL, NULL, &rx_len);
  467. if (rx_len == 0 || result != 0)
  468. {
  469. LLOGD("rx_len %d result %d", rx_len, result);
  470. break;
  471. }
  472. // 收到数据了, 传给处理函数继续处理
  473. // 数据的长度变更, 触发传递
  474. websocket_ctrl->buffer_offset += rx_len;
  475. LLOGD("data recv %d offset %d", rx_len, websocket_ctrl->buffer_offset);
  476. further:
  477. result = websocket_parse(websocket_ctrl);
  478. if (result == 0)
  479. {
  480. // OK
  481. }
  482. else if (result == 1)
  483. {
  484. if (websocket_ctrl->buffer_offset > 0)
  485. goto further;
  486. else
  487. {
  488. continue;
  489. }
  490. }
  491. else
  492. {
  493. LLOGW("websocket_parse ret %d", result);
  494. //luat_websocket_close_socket(websocket_ctrl);
  495. return -1;
  496. }
  497. }
  498. return 0;
  499. }
  500. int32_t luat_websocket_callback(void *data, void *param)
  501. {
  502. OS_EVENT *event = (OS_EVENT *)data;
  503. luat_websocket_ctrl_t *websocket_ctrl = (luat_websocket_ctrl_t *)param;
  504. int ret = 0;
  505. // LLOGD("LINK %d ON_LINE %d EVENT %d TX_OK %d CLOSED %d",EV_NW_RESULT_LINK & 0x0fffffff,EV_NW_RESULT_CONNECT & 0x0fffffff,EV_NW_RESULT_EVENT & 0x0fffffff,EV_NW_RESULT_TX & 0x0fffffff,EV_NW_RESULT_CLOSE & 0x0fffffff);
  506. //LLOGD("network websocket cb %8X %s %8X", event->ID & 0x0ffffffff, network_ctrl_callback_event_string(event->ID), event->Param1);
  507. if (event->ID == EV_NW_RESULT_LINK)
  508. {
  509. return 0; // 这里应该直接返回, 不能往下调用network_wait_event
  510. }
  511. else if (event->ID == EV_NW_RESULT_CONNECT)
  512. {
  513. if (event->Param1 == 0) {
  514. ret = websocket_connect(websocket_ctrl);
  515. if (ret < 0) {
  516. return 0; // 发送失败, 那么
  517. }
  518. }
  519. else {
  520. // 连接失败, 重连吧.
  521. }
  522. }
  523. else if (event->ID == EV_NW_RESULT_EVENT)
  524. {
  525. if (event->Param1 == 0)
  526. {
  527. ret = luat_websocket_read_packet(websocket_ctrl);
  528. if (ret < 0) {
  529. luat_websocket_close_socket(websocket_ctrl);
  530. return ret;
  531. }
  532. // LLOGD("luat_websocket_read_packet ret:%d",ret);
  533. luat_stop_rtos_timer(websocket_ctrl->ping_timer);
  534. luat_start_rtos_timer(websocket_ctrl->ping_timer, websocket_ctrl->keepalive * 1000 * 0.75, 1);
  535. }
  536. }
  537. else if (event->ID == EV_NW_RESULT_TX)
  538. {
  539. luat_stop_rtos_timer(websocket_ctrl->ping_timer);
  540. luat_start_rtos_timer(websocket_ctrl->ping_timer, websocket_ctrl->keepalive * 1000 * 0.75, 1);
  541. if (websocket_ctrl->frame_wait) {
  542. websocket_ctrl->frame_wait --;
  543. l_luat_websocket_msg_cb(websocket_ctrl, WEBSOCKET_MSG_SENT, 0);
  544. }
  545. }
  546. else if (event->ID == EV_NW_RESULT_CLOSE)
  547. {
  548. }
  549. if (event->Param1)
  550. {
  551. LLOGW("websocket_callback param1 %d, closing socket", event->Param1);
  552. luat_websocket_close_socket(websocket_ctrl);
  553. return 0;
  554. }
  555. ret = network_wait_event(websocket_ctrl->netc, NULL, 0, NULL);
  556. if (ret < 0)
  557. {
  558. LLOGW("network_wait_event ret %d, closing socket", ret);
  559. luat_websocket_close_socket(websocket_ctrl);
  560. return -1;
  561. }
  562. return 0;
  563. }
  564. int luat_websocket_connect(luat_websocket_ctrl_t *websocket_ctrl)
  565. {
  566. int ret = 0;
  567. const char *hostname = websocket_ctrl->host;
  568. uint16_t port = websocket_ctrl->remote_port;
  569. LLOGI("connect host %s port %d", hostname, port);
  570. network_close(websocket_ctrl->netc, 0);
  571. ret = network_connect(websocket_ctrl->netc, hostname, strlen(hostname), (!network_ip_is_vaild(&websocket_ctrl->ip_addr)) ? NULL : &(websocket_ctrl->ip_addr), port, 0) < 0;
  572. LLOGD("network_connect ret %d", ret);
  573. if (ret < 0)
  574. {
  575. network_close(websocket_ctrl->netc, 0);
  576. return -1;
  577. }
  578. return 0;
  579. }
  580. int luat_websocket_set_headers(luat_websocket_ctrl_t *websocket_ctrl, const char *headers) {
  581. if (websocket_ctrl == NULL)
  582. return 0;
  583. if (websocket_ctrl->headers != NULL) {
  584. luat_heap_free(websocket_ctrl->headers);
  585. websocket_ctrl->headers = NULL;
  586. }
  587. if (headers) {
  588. websocket_ctrl->headers = headers;
  589. }
  590. return 0;
  591. }