Browse Source

update: 继续修改libtcpip

Wendal Chen 3 years ago
parent
commit
e7ec6b0817

+ 7 - 7
components/network/libtcpip/luat_libtcpip.h

@@ -11,16 +11,16 @@
 #define LUAT_IPPROTO_TCP     6
 #define LUAT_IPPROTO_UDP     17
 
-typedef int (*luat_libtcpip_socket)(int domain, int type, int protocol);
-typedef int (*luat_libtcpip_send)(int s, const void *data, size_t size, int flags);
-typedef int (*luat_libtcpip_recv)(int s, void *mem, size_t len, int flags);
-typedef int (*luat_libtcpip_recv_timeout)(int s, void *mem, size_t len, int flags, int timeout_ms);
+typedef void* (*luat_libtcpip_socket)(int domain, int type, int protocol);
+typedef int (*luat_libtcpip_send)(void* s, const void *data, size_t size, int flags);
+typedef int (*luat_libtcpip_recv)(void* s, void *mem, size_t len, int flags);
+typedef int (*luat_libtcpip_recv_timeout)(void* s, void *mem, size_t len, int flags, int timeout_ms);
 // typedef int (*luat_libtcpip_select)(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
 //             struct timeval *timeout);
-typedef int (*luat_libtcpip_close)(int fd);
-typedef int (*luat_libtcpip_connect)(int s, const char *hostname, uint16_t port);
+typedef int (*luat_libtcpip_close)(void* s);
+typedef int (*luat_libtcpip_connect)(void* s, const char *hostname, uint16_t port);
 typedef struct hostent* (*luat_libtcpip_gethostbyname)(const char* name);
-typedef int (*luat_libtcpip_setsockopt)(int s, int level, int optname, const void *optval, uint32_t optlen);
+typedef int (*luat_libtcpip_setsockopt)(void* s, int level, int optname, const void *optval, uint32_t optlen);
 
 typedef struct luat_libtcpip_opts
 {

+ 10 - 10
components/network/libtcpip/port/luat_libtcpip_mbedtls_ssl.c

@@ -36,12 +36,12 @@ typedef struct luat_libtcpip_mbedtls_ssl_ctx
 }luat_libtcpip_mbedtls_ssl_ctx_t;
 
 
-static int luat_libtcpip_socket_mbedtls_ssl(int domain, int type, int protocol) {
+static void* luat_libtcpip_socket_mbedtls_ssl(int domain, int type, int protocol) {
     int ret = 0;
     const char *pers = "ssl_client1";
     luat_libtcpip_mbedtls_ssl_ctx_t* ctx = luat_heap_malloc(sizeof(luat_libtcpip_mbedtls_ssl_ctx_t));
     if (ctx == NULL)
-        return 0;
+        return NULL;
     mbedtls_net_init( &ctx->server_fd );
     mbedtls_ssl_init( &ctx->ssl );
     mbedtls_ssl_config_init( &ctx->conf );
@@ -67,20 +67,20 @@ static int luat_libtcpip_socket_mbedtls_ssl(int domain, int type, int protocol)
     //     goto fail;
     // }
 
-    return (int)ctx;
+    return (void*)ctx;
 
 fail:
     luat_heap_free(ctx);
-    return 0;
+    return NULL;
 
 }
 
-static int luat_libtcpip_send_mbedtls_ssl(int s, const void *data, size_t size, int flags) {
+static int luat_libtcpip_send_mbedtls_ssl(void* s, const void *data, size_t size, int flags) {
     luat_libtcpip_mbedtls_ssl_ctx_t* ctx = (luat_libtcpip_mbedtls_ssl_ctx_t*)s;
     return mbedtls_ssl_write(&ctx->ssl, data, size);
 }
 
-static int luat_libtcpip_recv_mbedtls_ssl(int s, void *mem, size_t len, int flags) {
+static int luat_libtcpip_recv_mbedtls_ssl(void* s, void *mem, size_t len, int flags) {
     luat_libtcpip_mbedtls_ssl_ctx_t* ctx = (luat_libtcpip_mbedtls_ssl_ctx_t*)s;
     // 设置好超时
     mbedtls_ssl_conf_read_timeout(&ctx->conf, 1);
@@ -97,7 +97,7 @@ static int luat_libtcpip_recv_mbedtls_ssl(int s, void *mem, size_t len, int flag
     return ret;
 }
 
-static int luat_libtcpip_recv_timeout_mbedtls_ssl(int s, void *mem, size_t len, int flags, int timeout) {
+static int luat_libtcpip_recv_timeout_mbedtls_ssl(void* s, void *mem, size_t len, int flags, int timeout) {
     luat_libtcpip_mbedtls_ssl_ctx_t* ctx = (luat_libtcpip_mbedtls_ssl_ctx_t*)s;
     // 设置好超时
     mbedtls_ssl_conf_read_timeout(&ctx->conf, timeout);
@@ -119,7 +119,7 @@ static int luat_libtcpip_recv_timeout_mbedtls_ssl(int s, void *mem, size_t len,
 //     return select(maxfdp1, readset, writeset, exceptset, timeout);
 // }
 
-static int luat_libtcpip_close_mbedtls_ssl(int s) {
+static int luat_libtcpip_close_mbedtls_ssl(void* s) {
     luat_libtcpip_mbedtls_ssl_ctx_t* ctx = (luat_libtcpip_mbedtls_ssl_ctx_t*)s;
     if (ctx == NULL)
         return 0;
@@ -134,7 +134,7 @@ static int luat_libtcpip_close_mbedtls_ssl(int s) {
     return 0;
 }
 
-static int luat_libtcpip_connect_mbedtls_ssl(int s, const char *hostname, uint16_t _port) {
+static int luat_libtcpip_connect_mbedtls_ssl(void* s, const char *hostname, uint16_t _port) {
     // 1. Start the connection
     int ret = 0;
     uint32_t flags;
@@ -201,7 +201,7 @@ static struct hostent* luat_libtcpip_gethostbyname_mbedtls_ssl(const char* name)
     return gethostbyname(name);
 }
 
-static int luat_libtcpip_setsockopt_mbedtls_ssl(int s, int level, int optname, const void *optval, uint32_t optlen) {
+static int luat_libtcpip_setsockopt_mbedtls_ssl(void* s, int level, int optname, const void *optval, uint32_t optlen) {
     // return setsockopt(s, level, optname, optval, optlen);
     return 0; // nop
 }

+ 15 - 15
components/network/libtcpip/port/luat_libtcpip_posix.c

@@ -17,24 +17,24 @@
 #define LUAT_LOG_TAG "posix"
 #include "luat_log.h"
 
-static int luat_libtcpip_socket_posix(int domain, int type, int protocol) {
+static void* luat_libtcpip_socket_posix(int domain, int type, int protocol) {
     LLOGD("create socket %d %d %d", domain, type, protocol);
-    return socket(domain, type, protocol);
+    return (void*)socket(domain, type, protocol);
 }
 
-static int luat_libtcpip_send_posix(int s, const void *data, size_t size, int flags) {
-    return send(s, data, size, flags);
+static int luat_libtcpip_send_posix(void* s, const void *data, size_t size, int flags) {
+    return send((int)s, data, size, flags);
 }
 
-static int luat_libtcpip_recv_posix(int s, void *mem, size_t len, int flags) {
-    return recv(s, mem, len, flags);
+static int luat_libtcpip_recv_posix(void* s, void *mem, size_t len, int flags) {
+    return recv((int)s, mem, len, flags);
 }
 
-static int luat_libtcpip_recv_timeout_posix(int s, void *mem, size_t len, int flags, int timeout) {
+static int luat_libtcpip_recv_timeout_posix(void* s, void *mem, size_t len, int flags, int timeout) {
     int ret;
     struct timeval tv;
     fd_set read_fds;
-    int fd = s;
+    int fd = (int)s;
 
     FD_ZERO( &read_fds );
     FD_SET( fd, &read_fds );
@@ -62,12 +62,12 @@ static int luat_libtcpip_select_posix(int maxfdp1, fd_set *readset, fd_set *writ
     return select(maxfdp1, readset, writeset, exceptset, timeout);
 }
 
-static int luat_libtcpip_close_posix(int s) {
+static int luat_libtcpip_close_posix(void* s) {
     // return close(s);
-    return close(s);
+    return close((int)s);
 }
 
-static int luat_libtcpip_connect_posix(int s, const char *hostname, uint16_t port) {
+static int luat_libtcpip_connect_posix(void* s, const char *hostname, uint16_t port) {
     struct sockaddr_in socket_address;
     struct hostent *hp;
     char tmp[32];
@@ -88,10 +88,10 @@ static int luat_libtcpip_connect_posix(int s, const char *hostname, uint16_t por
     socket_address.sin_port = htons(port);
     memcpy(&(socket_address.sin_addr), hp->h_addr, hp->h_length);
 
-    LLOGD("socket fd %d", s);
+    LLOGD("socket fd %d", (int)s);
 
     LLOGD("connect %s %s %d", hostname, tmp, port);
-    int ret = connect(s, &socket_address, sizeof(socket_address));
+    int ret = connect((int)s, &socket_address, sizeof(socket_address));
     if (ret == -1) {
         LLOGD("connect errno %d", errno);
     }
@@ -103,8 +103,8 @@ static struct hostent* luat_libtcpip_gethostbyname_posix(const char* name) {
     return gethostbyname(name);
 }
 
-static int luat_libtcpip_setsockopt_posix(int s, int level, int optname, const void *optval, uint32_t optlen) {
-    return setsockopt(s, level, optname, optval, (socklen_t)optlen);
+static int luat_libtcpip_setsockopt_posix(void* s, int level, int optname, const void *optval, uint32_t optlen) {
+    return setsockopt((int)s, level, optname, optval, (socklen_t)optlen);
 }
 
 luat_libtcpip_opts_t luat_libtcpip_posix = {

+ 120 - 0
components/network/libtcpip/port/luat_libtcpip_win32.c

@@ -0,0 +1,120 @@
+#include "luat_base.h"
+
+#include "string.h"
+
+// #define LUAT_USE_LIBTCPIP_WIN32
+#ifdef LUAT_USE_LIBTCPIP_WIN32
+
+#include "time.h"
+
+#include "luat_libtcpip.h"
+#include <unistd.h>
+#include "windows.h"
+#include <WinSock2.h>
+
+#define LUAT_LOG_TAG "win32"
+#include "luat_log.h"
+
+static int luat_libtcpip_socket_posix(int domain, int type, int protocol) {
+    LLOGD("create socket %d %d %d", domain, type, protocol);
+    return socket(domain, type, protocol);
+}
+
+static int luat_libtcpip_send_posix(int s, const void *data, size_t size, int flags) {
+    return send(s, data, size, flags);
+}
+
+static int luat_libtcpip_recv_posix(int s, void *mem, size_t len, int flags) {
+    return recv(s, mem, len, flags);
+}
+
+static int luat_libtcpip_recv_timeout_posix(int s, void *mem, size_t len, int flags, int timeout) {
+    int ret;
+    struct timeval tv;
+    fd_set read_fds;
+    int fd = s;
+
+    FD_ZERO( &read_fds );
+    FD_SET( fd, &read_fds );
+
+    tv.tv_sec  = timeout / 1000;
+    tv.tv_usec = ( timeout % 1000 ) * 1000;
+
+    ret = select( fd + 1, &read_fds, NULL, NULL, timeout == 0 ? NULL : &tv );
+
+    /* Zero fds ready means we timed out */
+    if( ret == 0 )
+        return -1;
+
+    if( ret < 0 )
+    {
+        return ret;
+    }
+
+    /* This call will not block */
+    return recv( fd, mem, len, flags);
+}
+
+static int luat_libtcpip_select_posix(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
+            struct timeval *timeout) {
+    return select(maxfdp1, readset, writeset, exceptset, timeout);
+}
+
+static int luat_libtcpip_close_posix(int s) {
+    // return close(s);
+    return closesocket(s);
+}
+
+static int luat_libtcpip_connect_posix(int s, const char *hostname, uint16_t port) {
+    struct sockaddr_in socket_address;
+    struct hostent *hp;
+    char tmp[32];
+
+    hp = gethostbyname(hostname);
+    if (hp == NULL )
+    {
+        LLOGW("DNS Query Fail %s", hostname);
+        return -2;
+    }
+    else {
+        inet_ntop(hp->h_addrtype, hp->h_addr_list[0], tmp, sizeof(tmp));
+        LLOGW("DNS Query OK %s %s", hostname, tmp);
+    }
+
+    memset(&socket_address, 0, sizeof(struct sockaddr_in));
+    socket_address.sin_family = AF_INET;
+    socket_address.sin_port = htons(port);
+    memcpy(&(socket_address.sin_addr), hp->h_addr, hp->h_length);
+
+    LLOGD("socket fd %d", s);
+
+    LLOGD("connect %s %s %d", hostname, tmp, port);
+    int ret = connect(s, &socket_address, sizeof(socket_address));
+    if (ret == -1) {
+        LLOGD("connect errno %d", errno);
+    }
+    //LLOGD("connect %s %s %d ret %d", hostname, tmp, port, ret);
+    return ret;
+}
+
+static struct hostent* luat_libtcpip_gethostbyname_posix(const char* name) {
+    return gethostbyname(name);
+}
+
+static int luat_libtcpip_setsockopt_posix(int s, int level, int optname, const void *optval, uint32_t optlen) {
+    return setsockopt(s, level, optname, optval, (socklen_t)optlen);
+}
+
+luat_libtcpip_opts_t luat_libtcpip_posix = {
+    ._socket = luat_libtcpip_socket_posix,
+    ._close = luat_libtcpip_close_posix,
+    ._connect = luat_libtcpip_connect_posix,
+    ._gethostbyname = luat_libtcpip_gethostbyname_posix,
+    ._recv = luat_libtcpip_recv_posix,
+    ._recv_timeout = luat_libtcpip_recv_timeout_posix,
+    // ._select = luat_libtcpip_select_posix,
+    ._send = luat_libtcpip_send_posix,
+    ._setsockopt = luat_libtcpip_setsockopt_posix
+};
+
+#endif

+ 66 - 93
components/network/libtcpip/protocol/mqtt/luat_libtcpip_mqtt.c

@@ -7,6 +7,8 @@
 #include "luat_libtcpip.h"
 #include "luat_libtcpip_mqtt.h"
 
+#include "luat_mcu.h"
+
 #include "libemqtt.h"
 #include <sys/socket.h>
 #include <sys/socket.h>
@@ -24,7 +26,6 @@
 
 #define MQTT_KEEPALIVE            240
 
-// static const mqtt_queue_msg_t MQTT_QUEUE_MSG_START = {MQTT_CMD_START};
 static const mqtt_queue_msg_t MQTT_QUEUE_MSG_LOOP =  {MQTT_CMD_LOOP};
 // static const mqtt_queue_msg_t MQTT_QUEUE_MSG_HEART = {MQTT_CMD_HEART};
 
@@ -33,40 +34,34 @@ LUAT_RET app_mqtt_ready(app_mqtt_ctx_t* ctx) {
     return ctx->conack_ready;
 }
 
-// static void app_mqtt_heart_timer(void *ptmr, void *parg)
-// {
-//     //tls_os_queue_send(app_mqtt_task_queue, (void *)&MQTT_QUEUE_MSG_HEART, 0);
-// }
-
 static int app_mqtt_close_socket(app_mqtt_ctx_t* ctx)
 {
-    int fd = ctx->socket_fd;
     ctx->conack_ready = LUAT_FALSE;
-    // XXX 替换原有posix的API调用
-    // return closesocket(fd);
-    return ctx->tcp_opts->_close(fd);
+    ctx->tcp_opts->_close(ctx->socket_ctx);
+    ctx->socket_ctx = NULL;
+    return 0;
 }
 
 static int app_mqtt_send_packet(void* userdata, const void *buf, unsigned int count)
 {
     app_mqtt_ctx_t* ctx = (app_mqtt_ctx_t*)userdata;
     LLOGD("ctx %p", ctx);
-    LLOGD("send fd %d len %d", ctx->socket_fd, count);
-    return ctx->tcp_opts->_send(ctx->socket_fd, buf, count, 0);
+    LLOGD("send fd %d len %d", ctx->socket_ctx, count);
+    return ctx->tcp_opts->_send(ctx->socket_ctx, buf, count, 0);
 }
 
 static int app_mqtt_read_packet(app_mqtt_ctx_t* ctx)
 {
     // int ret = 0;
     int total_bytes = 0, bytes_rcvd, packet_length;
-    int socket_fd = ctx->socket_fd;
+    // int socket_fd = ctx->socket_fd;
     uint8_t* packet_buff = ctx->packet_buffer;
     luat_libtcpip_opts_t* tcp_opts = ctx->tcp_opts;
 
     memset(packet_buff, 0, sizeof(MQTT_RECV_BUF_LEN_MAX));
     // XXX 替换原有posix的API调用
     // if((bytes_rcvd = recv(app_mqtt_socket_id, (app_mqtt_packet_buffer + total_bytes), MQTT_RECV_BUF_LEN_MAX, 0)) <= 0)
-    if((bytes_rcvd = ctx->tcp_opts->_recv_timeout(socket_fd, (packet_buff + total_bytes), 2, 0, 5)) <= 0)
+    if((bytes_rcvd = ctx->tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), 2, 0, 5)) <= 0)
     {
         // printf("%d, %d", bytes_rcvd, app_mqtt_socket_id);
         return MQTT_READ_TIMEOUT;
@@ -75,7 +70,7 @@ static int app_mqtt_read_packet(app_mqtt_ctx_t* ctx)
     total_bytes += bytes_rcvd; // Keep tally of total bytes
     if (total_bytes < 2) {
         // 少于2字节,那就肯定1个字节, 那我们再等15000ms
-        if((bytes_rcvd = tcp_opts->_recv_timeout(socket_fd, (packet_buff + total_bytes), 1, 0, 15000)) <= 0) {
+        if((bytes_rcvd = tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), 1, 0, 15000)) <= 0) {
             LLOGD("read package header timeout, close socket");
             app_mqtt_close_socket(ctx);
             return -1;
@@ -86,7 +81,7 @@ static int app_mqtt_read_packet(app_mqtt_ctx_t* ctx)
         for (size_t i = 1; i < 5; i++)
         {
             if (packet_buff[i] & 0x80) {
-                if((bytes_rcvd = tcp_opts->_recv_timeout(socket_fd, (packet_buff + total_bytes), 1, 0, 15000)) <= 0) {
+                if((bytes_rcvd = tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), 1, 0, 15000)) <= 0) {
                     LLOGD("read package header timeout, close socket");
                     app_mqtt_close_socket(ctx);
                     return -1;
@@ -110,11 +105,9 @@ static int app_mqtt_read_packet(app_mqtt_ctx_t* ctx)
     // LLOGD("packet_length %d total_bytes %d", packet_length, total_bytes);
     while(total_bytes < packet_length) // Reading the packet
     {
-        // XXX 替换原有posix的API调用
         // LLOGD("packet_length %d total_bytes %d", packet_length, total_bytes);
-        // if((bytes_rcvd = recv(app_mqtt_socket_id, (app_mqtt_packet_buffer + total_bytes), MQTT_RECV_BUF_LEN_MAX, 0)) <= 0)
         // LLOGD("more data %d", packet_length - total_bytes);
-        if((bytes_rcvd = tcp_opts->_recv_timeout(socket_fd, (packet_buff + total_bytes), packet_length - total_bytes, 0, 2000)) <= 0)
+        if((bytes_rcvd = tcp_opts->_recv_timeout(ctx->socket_ctx, (packet_buff + total_bytes), packet_length - total_bytes, 0, 2000)) <= 0)
             return -1;
         total_bytes += bytes_rcvd; // Keep tally of total bytes
     }
@@ -128,16 +121,12 @@ static int app_mqtt_init_socket(app_mqtt_ctx_t* ctx)
     // struct hostent *hp;
 
     // Create the socket
-    // XXX 替换原有posix的API调用
-    // if((app_mqtt_socket_id = socket(PF_INET, SOCK_STREAM, 0)) < 0)
-    if((ctx->socket_fd = ctx->tcp_opts->_socket(PF_INET, SOCK_STREAM, 0)) < 0) {
-        LLOGE("socket create error %d", ctx->socket_fd);
+    if((ctx->socket_ctx = ctx->tcp_opts->_socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+        LLOGE("socket create error %p", ctx->socket_ctx);
         return -1;
     }
     // Disable Nagle Algorithm
-    // XXX 替换原有posix的API调用
-    // if (setsockopt(app_mqtt_socket_id, IPPROTO_TCP, 0x01, (char *)&flag, sizeof(flag)) < 0)
-    if (ctx->tcp_opts->_setsockopt(ctx->socket_fd, IPPROTO_TCP, 0x01, (char *)&flag, sizeof(flag)) < 0){
+    if (ctx->tcp_opts->_setsockopt(ctx->socket_ctx, IPPROTO_TCP, 0x01, (char *)&flag, sizeof(flag)) < 0){
         LLOGE("socket setsockopt error");
         app_mqtt_close_socket(ctx);
         return -2;
@@ -146,7 +135,7 @@ static int app_mqtt_init_socket(app_mqtt_ctx_t* ctx)
     // XXX 替换原有posix的API调用
     // if((connect(app_mqtt_socket_id, (struct sockaddr *)&socket_address, sizeof(socket_address))) < 0)
     // if((tcp_opts->_connect(app_mqtt_socket_id, (struct sockaddr *)&socket_address, sizeof(socket_address))) < 0)
-    if(ctx->tcp_opts->_connect(ctx->socket_fd, ctx->host, ctx->port) < 0){
+    if(ctx->tcp_opts->_connect(ctx->socket_ctx, ctx->host, ctx->port) < 0){
         LLOGE("socket connect error");
         app_mqtt_close_socket(ctx);
         return -3;
@@ -167,8 +156,6 @@ static int app_mqtt_init_inner(app_mqtt_ctx_t* ctx)
     ctx->connect_ready = LUAT_FALSE;
     ctx->conack_ready = LUAT_FALSE;
 
-    // uint16_t msg_id, msg_id_rcv;
-    // LLOGD("step1: init mqtt lib.");
 #if 1
     LLOGD("step1: init mqtt lib.");
     LLOGD("mqtt client_id:%s", ctx->client_id);
@@ -192,12 +179,6 @@ static int app_mqtt_init_inner(app_mqtt_ctx_t* ctx)
         return -5;
     }
     ctx->connect_ready = LUAT_TRUE;
-    // LLOGD("step5: start the Heart-beat preservation timer");
-    // ret = tls_os_timer_create(&app_mqtt_heartbeat_timer,
-    //                           app_mqtt_heart_timer,
-    //                           NULL, (APP_MQTT_KEEPALIVE / 3 * HZ), TRUE, NULL);
-    // if (TLS_OS_SUCCESS == ret)
-    //     tls_os_timer_start(app_mqtt_heartbeat_timer);
 
     return 0;
 }
@@ -212,7 +193,7 @@ static int app_mqtt_msg_cb(app_mqtt_ctx_t* ctx) {
     LLOGD("mqtt msg %02X", msg_tp);
     switch (msg_tp) {
         case MQTT_MSG_PUBLISH : {
-            ctx->keepalive_mark = 0;
+            //ctx->keepalive_mark = 0;
             // uint8_t topic[128], *msg;
             topic_len = mqtt_parse_pub_topic_ptr(ctx->packet_buffer, &topic);
             LLOGD("recvd: topic len %d", topic_len);
@@ -231,10 +212,6 @@ static int app_mqtt_msg_cb(app_mqtt_ctx_t* ctx) {
             #endif
 
             ctx->publish_cb(ctx, (char*)topic, topic_len, (char*)payload, payload_len);
-            // LLOGD("recvd: %s >>> %d %d", topic,);
-            // TODO 禁用下面的回显
-            // mqtt_publish(&app_mqtt_mqtt_broker, (const char *)mqtt_iot_pub_topic, (const char *)msg, len, 0);
-            // LLOGD("pushed: %s <<< %s", MQTT_PUB_TOPIC, msg);
             break;
         }
         case MQTT_MSG_CONNACK: {
@@ -282,9 +259,7 @@ static int app_mqtt_msg_cb(app_mqtt_ctx_t* ctx) {
 }
 
 int app_mqtt_disconnect(app_mqtt_ctx_t *ctx) {
-    // tls_os_timer_stop(app_mqtt_heartbeat_timer);
-    app_mqtt_close_socket(ctx);
-    return 0;
+    return app_mqtt_close_socket(ctx);
 }
 
 static int app_mqtt_loop(app_mqtt_ctx_t *ctx)
@@ -300,74 +275,88 @@ static int app_mqtt_loop(app_mqtt_ctx_t *ctx)
         //LLOGD("recvd Packet Header: 0x%x...", app_mqtt_packet_buffer[0]);
         ret = app_mqtt_msg_cb(ctx);
         if (ret != 0) {
-            // tls_os_timer_stop(app_mqtt_heartbeat_timer);
             app_mqtt_close_socket(ctx);
-        }
-        else {
-            // tls_os_queue_send(app_mqtt_task_queue, (void *)&MQTT_QUEUE_MSG_LOOP, 0);
+            return -1;
         }
     }
     else if (packet_length == MQTT_READ_TIMEOUT)
     {
-        // tls_os_queue_send(app_mqtt_task_queue, (void *)&MQTT_QUEUE_MSG_LOOP, 0);
+        // nop
     }
     else if(packet_length == -1)
     {
-        LLOGD("mqtt error:(%d), stop mqtt iotda!", packet_length);
-        // tls_os_timer_stop(app_mqtt_heartbeat_timer);
+        LLOGD("mqtt error:(%d), stop mqtt!", packet_length);
         app_mqtt_close_socket(ctx);
+        return -1;
     }
-
     return 0;
 }
 
 extern int app_mqtt_authentication_get(app_mqtt_ctx_t* ctx);
 
+app_mqtt_ctx_t* app_mqtt_configure_create(void) {
+    app_mqtt_ctx_t* ctx = luat_heap_malloc(sizeof(app_mqtt_ctx_t));
+    if (ctx == NULL) {
+        LLOGE("out of memory when mallo app_mqtt_ctx_t");
+        return NULL;
+    }
+    memset(ctx, 0, sizeof(app_mqtt_ctx_t));
+
+    ctx->keepalive = 240;
+    ctx->port = 1883;
+    ctx->keep_run = 1;
+
+    luat_queue_create(&ctx->msg_queue, 128, 4);
+
+    return ctx;
+}
+
 void app_mqtt_task(void *p)
 {
     int ret = 0;
     mqtt_queue_msg_t *msg;
 	app_mqtt_pub_data_t* pmsg;
-    uint32_t retry_time = 2;
+    // uint32_t retry_time = 2;
     app_mqtt_ctx_t* ctx = (app_mqtt_ctx_t*)p;
 
     msg = &MQTT_QUEUE_MSG_LOOP;
 
-    while (1)
+    // 计算ping的时机
+    // uint64_t last_pkg_ticks = 0;
+    size_t hz = luat_mcu_hz();
+    uint64_t tick_used;
+
+    while (ctx->keep_run)
     {
         if (ctx->connect_ready == LUAT_FALSE)
         {
-            // 固定延迟2秒再启动
-            //luat_timer_mdelay(2 * 1000);
-
             ret = app_mqtt_init_inner(ctx);
             if (ret) {
                 LLOGD("mqtt init fail %d", ret);
-                continue; // 开始下一轮重连循环
+                if (ctx->keep_run) {
+                    luat_timer_mdelay(ctx->reconnet_delay);
+                    continue; // 开始下一轮重连循环
+                }
+                else {
+                    LLOGD("mqtt exit");
+                    break;
+                }
             }
-            //app_mqtt_state = LUAT_TRUE;
-            // tls_os_queue_send(app_mqtt_task_queue, (void *)&MQTT_QUEUE_MSG_LOOP, 0);
+            ctx->last_pkg_tick = luat_mcu_ticks();
         }
-        
-        // ret = tls_os_queue_receive(app_mqtt_task_queue, (void **)&msg, 0, 1);
+
+        ret = luat_queue_recv(&ctx->msg_queue, &msg, sizeof(mqtt_queue_msg_t), 1);
         if (!ret)
         {
             switch((uint32_t)msg->type)
             {
-            // case MQTT_CMD_START:
-            //     break;
-            case MQTT_CMD_HEART:
-                // LLOGD("MQTT_CMD_HEART");
-                if (ctx->keepalive_mark == 1)
-                    mqtt_ping(&ctx->broker);
-                ctx->keepalive_mark = 1;
-                break;
             case MQTT_CMD_LOOP:
                 // LLOGD("MQTT_CMD_LOOP");
                 app_mqtt_loop(ctx);
                 break;
             case PUB_MSG_MAGIC:
-                ctx->keepalive_mark = 0;
+                // ctx->keepalive_mark = 0;
+                ctx->last_pkg_tick = luat_mcu_ticks();
                 pmsg = msg;
                 ret = mqtt_publish_with_qos(&ctx->broker, pmsg->topic, pmsg->data, pmsg->data_len, pmsg->retain, pmsg->qos, NULL);
                 LLOGD("app_mqtt_pub_data_t free %p", msg);
@@ -381,6 +370,12 @@ void app_mqtt_task(void *p)
         else {
             app_mqtt_loop(ctx);
         }
+        
+        tick_used = luat_mcu_ticks() - ctx->last_pkg_tick;
+        if (tick_used > (ctx->keepalive * hz  / 3)) {
+            mqtt_ping(&ctx->broker);
+            ctx->last_pkg_tick = luat_mcu_ticks();
+        }
     }
 }
 
@@ -407,28 +402,6 @@ int app_mqtt_publish(app_mqtt_ctx_t* ctx, const char* topic, char* data, size_t
         memcpy(msg->topic, topic, strlen(topic) + 1);
     else
         memcpy(msg->topic, ctx->pub_topic, strlen(ctx->pub_topic) + 1);
-    // int ret = tls_os_queue_send(app_mqtt_task_queue, (void *)msg, 0);
-    // if (ret != 0) {
-    //     LLOGD("app_mqtt_publish fail, free msg %p", msg);
-    //     free(msg);
-    //     return -2;
-    // }
+    luat_queue_send(&ctx->msg_queue, msg, sizeof(app_mqtt_pub_data_t), 1);
     return 0;
 }
-
-//----------------------------------------------------
-//                   MQTT主线程
-//----------------------------------------------------
-
-// static OS_STK app_mqtt_task_stk[MQTT_TASK_SIZE];
-
-// int app_mqtt_init(void) {
-//     ctx = malloc(sizeof(app_ctx_t));
-//     memset(ctx, 0, sizeof(app_ctx_t));
-//     tls_os_queue_create(&app_mqtt_task_queue, 32);
-//     tls_os_task_create(NULL, "mqtt", app_mqtt_task,
-//                         NULL, (void *)app_mqtt_task_stk,  /* task's stack start address */
-//                         MQTT_TASK_SIZE * sizeof(u32), /* task's stack size, unit:byte */
-//                         MQTT_TASK_PRIO, 0);
-//     return WM_SUCCESS;
-// }

+ 7 - 2
components/network/libtcpip/protocol/mqtt/luat_libtcpip_mqtt.h

@@ -3,6 +3,7 @@
 
 #include "luat_libtcpip.h"
 #include "libemqtt.h"
+#include "luat_rtos.h"
 
 #define MQTT_READ_TIMEOUT        (-1000)
 
@@ -40,9 +41,13 @@ typedef struct app_mqtt_ctx
     uint8_t packet_buffer[MQTT_RECV_BUF_LEN_MAX];
     int conack_ready; // 判断是否收到CONACK
     int connect_ready; // 判断连接是否已经就绪
-    int socket_fd;
-    int keepalive_mark;
+
+    void* socket_ctx;
+    uint64_t last_pkg_tick;
+    uint16_t reconnet_delay;
+    uint8_t keep_run;
     mqtt_publish_cb publish_cb;
+    luat_rtos_queue_t msg_queue;
 }app_mqtt_ctx_t;
 
 

+ 50 - 0
components/rtos/win32/luat_rtos_win32_queue.c

@@ -0,0 +1,50 @@
+#include "luat_base.h"
+#include "luat_rtos.h"
+
+#include "FreeRTOS.h"
+#include "task.h"
+#include "queue.h"
+#include "semaphore.h"
+
+LUAT_RET luat_queue_create(luat_rtos_queue_t* queue, size_t msgcount, size_t msgsize) {
+    QueueHandle_t q = xQueueCreate(msgcount, msgsize);
+    if (q == NULL) {
+        return -1;
+    }
+    queue->userdata = q;
+    return LUAT_ERR_OK;
+}
+
+LUAT_RET luat_queue_send(luat_rtos_queue_t*   queue, void* msg,  size_t msg_size, size_t timeout) {
+    if (queue->userdata == NULL)
+        return LUAT_ERR_FAIL;
+    if (xQueueSend(queue->userdata, msg, timeout) == pdTRUE) {
+        return LUAT_ERR_OK;
+    }
+    return LUAT_ERR_FAIL;
+}
+LUAT_RET luat_queue_recv(luat_rtos_queue_t*   queue, void* msg, size_t msg_size, size_t timeout) {
+    if (queue->userdata == NULL)
+        return LUAT_ERR_FAIL;
+    if (xQueueReceive(queue->userdata, msg, timeout) == pdTRUE) {
+        return LUAT_ERR_OK;
+    }
+    return LUAT_ERR_FAIL;
+}
+
+LUAT_RET luat_queue_reset(luat_rtos_queue_t*   queue) {
+    if (queue->userdata == NULL)
+        return LUAT_ERR_FAIL;
+    if (pdTRUE == xQueueReset((QueueHandle_t)queue->userdata)) {
+        return LUAT_ERR_OK;
+    }
+    return LUAT_ERR_FAIL;
+}
+
+LUAT_RET luat_queue_delete(luat_rtos_queue_t*   queue) {
+    if (queue->userdata) {
+        vQueueDelete((QueueHandle_t)queue->userdata);
+    }
+    queue->userdata = NULL;
+    return LUAT_ERR_OK;
+}

+ 36 - 0
components/rtos/win32/luat_rtos_win32_task.c

@@ -0,0 +1,36 @@
+#include "luat_base.h"
+#include "luat_rtos.h"
+
+#include "windows.h"
+	#define thread_type HANDLE
+	#define thread_id_type DWORD
+	#define thread_return_type DWORD
+	#define thread_fn LPTHREAD_START_ROUTINE
+	#define cond_type HANDLE
+	#define sem_type HANDLE
+	#undef ETIMEDOUT
+	#define ETIMEDOUT WSAETIMEDOUT
+
+#define LUAT_LOG_TAG "thread"
+#include "luat_log.h"
+
+static void task_proxy(void* params) {
+    luat_thread_t* thread = (luat_thread_t*)params;
+    thread->entry(thread->userdata);
+}
+
+LUAT_RET luat_thread_start(luat_thread_t* thread) {
+    thread_type thread = NULL;
+    thread = CreateThread(NULL, 0, task_proxy, thread, 0, NULL);
+    CloseHandle(thread);
+    LLOGD("thread start fail %d", ret);
+    return LUAT_ERR_FAIL;
+}
+
+LUAT_RET luat_thread_stop(luat_thread_t* thread) {
+    return LUAT_ERR_FAIL;
+}
+
+LUAT_RET luat_thread_delete(luat_thread_t* thread) {
+    return LUAT_ERR_FAIL;
+}