Эх сурвалжийг харах

update: rtmp,优化拥堵控制算法

1. 效果: 连续跑8小时正常传输中, 30w像素, 1.3M码率上传
2. 策略: 入队前若超限,优先丢弃未发送的非关键帧,再丢弃未发送帧;若仍超限则拒绝当前帧
3. VLC连续播放4.5小时后,不再显示图像,原因待查,但数据包一直有下行,重新连接就正常播放
Wendal Chen 1 сар өмнө
parent
commit
ff9974ee27

+ 8 - 0
components/rtmp/include/luat_rtmp_push.h

@@ -35,6 +35,9 @@ extern "C" {
 /** RTMP缓冲区大小(字节) - 需要足够大以容纳I帧 */
 #define RTMP_BUFFER_SIZE (512 * 1024)
 
+/** 发送帧队列最大字节数上限,超出将丢弃未发送帧(优先丢弃非关键帧) */
+#define RTMP_MAX_QUEUE_BYTES (1024 * 1024)
+
 /** RTMP握手数据大小(字节) */
 #define RTMP_HANDSHAKE_SIZE 1536
 
@@ -174,6 +177,11 @@ typedef struct {
     uint8_t *send_buf;              /**< 发送缓冲区 */
     uint32_t send_buf_size;         /**< 发送缓冲区大小 */
     uint32_t send_pos;              /**< 发送缓冲区写位置 */
+
+    /** ============ 帧发送队列 ============ */
+    struct rtmp_frame_node *frame_head; /**< 待发送帧队列头 */
+    struct rtmp_frame_node *frame_tail; /**< 待发送帧队列尾 */
+    uint32_t frame_queue_bytes;          /**< 队列占用的总字节数 */
     
     /** ============ 时间戳管理 ============ */
     uint32_t video_timestamp;       /**< 当前视频时间戳(ms) */

+ 285 - 104
components/rtmp/src/luat_rtmp_push.c

@@ -122,6 +122,17 @@ static int rtmp_pack_video_tag(uint8_t *buffer, uint32_t buffer_len,
                               const uint8_t *video_data, uint32_t video_len,
                               bool is_key_frame);
 
+/**
+ * 帧发送队列节点
+ */
+typedef struct rtmp_frame_node {
+    uint8_t *data;              /* 完整RTMP消息(包含chunk头) */
+    uint32_t len;               /* 消息总长度 */
+    uint32_t sent;              /* 已发送字节数 */
+    bool is_key;                /* 是否关键帧 */
+    struct rtmp_frame_node *next;
+} rtmp_frame_node_t;
+
 /**
  * 发送发出的缓冲数据
  */
@@ -144,6 +155,13 @@ static int rtmp_send_single_nalu(rtmp_ctx_t *ctx, const uint8_t *nalu_data,
  */
 static int rtmp_send_avc_sequence_header(rtmp_ctx_t *ctx, const uint8_t *seq_header,
                                         uint32_t seq_len, uint32_t timestamp);
+static int rtmp_build_rtmp_message(rtmp_ctx_t *ctx, uint8_t msg_type,
+                                  const uint8_t *payload, uint32_t payload_len,
+                                  uint32_t timestamp, uint32_t stream_id,
+                                  uint8_t **out_buf, uint32_t *out_len);
+static void rtmp_free_frame_node(rtmp_frame_node_t *node);
+static int rtmp_queue_frame(rtmp_ctx_t *ctx, rtmp_frame_node_t *node);
+static void rtmp_try_send_queue(rtmp_ctx_t *ctx);
 
 /**
  * 生成随机时间戳
@@ -544,6 +562,14 @@ int rtmp_destroy(rtmp_ctx_t *ctx) {
     if (ctx->auth) luat_heap_free(ctx->auth);
     if (ctx->recv_buf) luat_heap_free(ctx->recv_buf);
     if (ctx->send_buf) luat_heap_free(ctx->send_buf);
+
+    /* 释放未发送的帧队列 */
+    rtmp_frame_node_t *cur = ctx->frame_head;
+    while (cur) {
+        rtmp_frame_node_t *next = cur->next;
+        rtmp_free_frame_node(cur);
+        cur = next;
+    }
     
     luat_heap_free(ctx);
     
@@ -740,7 +766,7 @@ int rtmp_send_nalu(rtmp_ctx_t *ctx, const uint8_t *nalu_data,
     
     /* 如果有SPS和PPS,发送AVC Sequence Header */
     if (sps_data && pps_data) {
-        LLOGI("RTMP: Sending AVC Sequence Header (SPS+PPS)");
+        // LLOGI("RTMP: Sending AVC Sequence Header (SPS+PPS)");
         
         /* 构建AVC Sequence Header
          * 格式: [configurationVersion(1)] [AVCProfileIndication(1)] [profile_compatibility(1)] 
@@ -858,22 +884,34 @@ static int rtmp_send_avc_sequence_header(rtmp_ctx_t *ctx, const uint8_t *seq_hea
     memcpy(&msg_buf[offset], seq_header, seq_len);
     offset += seq_len;
     
-    /* 发送消息 (消息类型9=视频, 流ID=1) */
-    int ret = rtmp_pack_message(ctx, 9, msg_buf, msg_len, timestamp, 1);
+    /* 发送消息 (消息类型9=视频, 流ID=1),放入帧队列 */
+    uint8_t *rtmp_buf = NULL;
+    uint32_t rtmp_len = 0;
+    int ret = rtmp_build_rtmp_message(ctx, 9, msg_buf, msg_len, timestamp, 1, &rtmp_buf, &rtmp_len);
     luat_heap_free(msg_buf);
-    
     if (ret != RTMP_OK) {
-        LLOGE("RTMP: Failed to send AVC sequence header");
+        LLOGE("RTMP: Failed to build RTMP message for AVC sequence header");
         return ret;
     }
     
-    /* 立即刷新发送缓冲区 */
-    ret = rtmp_flush_send_buffer(ctx);
+    rtmp_frame_node_t *node = (rtmp_frame_node_t *)luat_heap_malloc(sizeof(rtmp_frame_node_t));
+    if (!node) {
+        luat_heap_free(rtmp_buf);
+        return RTMP_ERR_NO_MEMORY;
+    }
+    node->data = rtmp_buf;
+    node->len = rtmp_len;
+    node->sent = 0;
+    node->is_key = true; /* 配置按关键帧优先处理 */
+    node->next = NULL;
+    
+    ret = rtmp_queue_frame(ctx, node);
     if (ret != RTMP_OK) {
+        rtmp_free_frame_node(node);
         return ret;
     }
     
-    LLOGI("RTMP: AVC Sequence Header sent successfully, size=%u", seq_len);
+    // LLOGI("RTMP: AVC Sequence Header queued, size=%u", rtmp_len);
     
     return RTMP_OK;
 }
@@ -921,83 +959,47 @@ static int rtmp_send_single_nalu(rtmp_ctx_t *ctx, const uint8_t *nalu_data,
     
     /* 完整视频消息 = 头(11字节) + NALU数据 */
     uint32_t total_msg_len = header_len + nalu_len;
-    
-    /* 对于大数据帧(>32KB),分块传输以避免单个RTMP消息过大 */
-    #define RTMP_CHUNK_SIZE (32 * 1024)  /* 32KB块 */
-    
-    if (total_msg_len <= RTMP_CHUNK_SIZE) {
-        /* 小消息: 一次发送 */
-        uint8_t *msg_buf = (uint8_t *)luat_heap_malloc(total_msg_len);
-        if (!msg_buf) {
-            LLOGE("RTMP: Failed to allocate buffer for video message");
-            return RTMP_ERR_NO_MEMORY;
-        }
-        
-        /* 复制头和数据 */
-        memcpy(msg_buf, header_buf, header_len);
-        memcpy(&msg_buf[header_len], nalu_data, nalu_len);
-        
-        /* 发送 */
-        int ret = rtmp_pack_message(ctx, 9, msg_buf, total_msg_len, timestamp, 1);
-        luat_heap_free(msg_buf);
-        
-        if (ret != RTMP_OK) {
-            return ret;
-        }
-    } else {
-        /* 大消息: 分块发送 */
-        uint32_t pos = 0;
-        uint32_t chunk_count = 0;
-        
-        while (pos < nalu_len) {
-            uint32_t chunk_len = nalu_len - pos;
-            if (chunk_len > RTMP_CHUNK_SIZE - header_len) {
-                chunk_len = RTMP_CHUNK_SIZE - header_len;
-            }
-            
-            /* 只有第一块包含头 */
-            uint32_t current_msg_len = (pos == 0) ? (header_len + chunk_len) : chunk_len;
-            uint8_t *chunk_buf = (uint8_t *)luat_heap_malloc(current_msg_len);
-            
-            if (!chunk_buf) {
-                LLOGE("RTMP: Failed to allocate buffer for chunk");
-                return RTMP_ERR_NO_MEMORY;
-            }
-            
-            uint32_t buf_pos = 0;
-            
-            /* 第一块包含头 */
-            if (pos == 0) {
-                memcpy(chunk_buf, header_buf, header_len);
-                buf_pos = header_len;
-            }
-            
-            /* 复制数据 */
-            memcpy(&chunk_buf[buf_pos], &nalu_data[pos], chunk_len);
-            
-            /* 发送块 */
-            int ret = rtmp_pack_message(ctx, 9, chunk_buf, current_msg_len, timestamp, 1);
-            luat_heap_free(chunk_buf);
-            
-            if (ret != RTMP_OK) {
-                return ret;
-            }
-            
-            pos += chunk_len;
-            chunk_count++;
-            
-            LLOGD("RTMP: Video chunk %d sent, chunk_size=%d, total_progress=%d/%d",
-                  chunk_count, chunk_len, pos, nalu_len);
-        }
+
+    uint8_t *msg_buf = (uint8_t *)luat_heap_malloc(total_msg_len);
+    if (!msg_buf) {
+        LLOGE("RTMP: Failed to allocate buffer for video message");
+        return RTMP_ERR_NO_MEMORY;
     }
-    
+
+    memcpy(msg_buf, header_buf, header_len);
+    memcpy(&msg_buf[header_len], nalu_data, nalu_len);
+
+    /* 构建完整RTMP消息并入队 */
+    uint8_t *rtmp_buf = NULL;
+    uint32_t rtmp_len = 0;
+    int ret = rtmp_build_rtmp_message(ctx, 9, msg_buf, total_msg_len, timestamp, 1, &rtmp_buf, &rtmp_len);
+    luat_heap_free(msg_buf);
+    if (ret != RTMP_OK) {
+        return ret;
+    }
+
+    rtmp_frame_node_t *node = (rtmp_frame_node_t *)luat_heap_malloc(sizeof(rtmp_frame_node_t));
+    if (!node) {
+        luat_heap_free(rtmp_buf);
+        return RTMP_ERR_NO_MEMORY;
+    }
+    node->data = rtmp_buf;
+    node->len = rtmp_len;
+    node->sent = 0;
+    node->is_key = is_key_frame;
+    node->next = NULL;
+
+    ret = rtmp_queue_frame(ctx, node);
+    if (ret != RTMP_OK) {
+        rtmp_free_frame_node(node);
+        return ret;
+    }
+
     /* 更新统计 */
     ctx->video_timestamp = timestamp;
     ctx->packets_sent++;
     ctx->bytes_sent += nalu_len;
-    
-    // LLOGD("RTMP: NALU sent, type=%d, is_key=%d, size=%d", nal_type, is_key_frame, nalu_len);
-    
+
     return RTMP_OK;
 }
 
@@ -1038,6 +1040,9 @@ int rtmp_poll(rtmp_ctx_t *ctx) {
     if (!ctx) {
         return RTMP_ERR_INVALID_PARAM;
     }
+
+    /* 优先尝试发送队列中的数据 */
+    rtmp_try_send_queue(ctx);
     
     /* 检查超时 */
     uint32_t now = rtmp_gen_timestamp();
@@ -1409,6 +1414,8 @@ static err_t rtmp_tcp_sent_callback(void *arg, struct tcp_pcb *pcb, u16_t len) {
     //LLOGD("RTMP: TCP sent callback, len=%d, total_sent=%llu", len, total_sent);
     if (ctx) {
         ctx->bytes_sent += len;
+        /* 继续发送队列中的数据 */
+        rtmp_try_send_queue(ctx);
     }
     
     return ERR_OK;
@@ -1657,6 +1664,201 @@ static int rtmp_send_metadata(rtmp_ctx_t *ctx) {
     return ret;
 }
 
+/* ========== 帧队列与发送 ========== */
+
+static void rtmp_free_frame_node(rtmp_frame_node_t *node) {
+    if (!node) return;
+    if (node->data) {
+        luat_heap_free(node->data);
+    }
+    luat_heap_free(node);
+}
+
+/* 构建完整的RTMP消息(包含chunk分片),返回新分配的缓冲区 */
+static int rtmp_build_rtmp_message(rtmp_ctx_t *ctx, uint8_t msg_type,
+                                  const uint8_t *payload, uint32_t payload_len,
+                                  uint32_t timestamp, uint32_t stream_id,
+                                  uint8_t **out_buf, uint32_t *out_len) {
+    if (!ctx || !payload || payload_len == 0 || !out_buf || !out_len) {
+        return RTMP_ERR_INVALID_PARAM;
+    }
+
+    uint32_t chunk_size = ctx->chunk_size ? ctx->chunk_size : RTMP_DEFAULT_CHUNK_SIZE;
+    uint8_t chunk_stream_id = (msg_type == 20 || msg_type == 17) ? 3 : 4;
+
+    uint32_t num_chunks = (payload_len + chunk_size - 1) / chunk_size;
+    uint32_t total_len = 12 + payload_len;           /* 首块含完整头 */
+    if (num_chunks > 1) {
+        total_len += (num_chunks - 1);               /* 每个后续块1字节继续头 */
+    }
+
+    uint8_t *buf = (uint8_t *)luat_heap_malloc(total_len);
+    if (!buf) {
+        return RTMP_ERR_NO_MEMORY;
+    }
+
+    uint32_t offset = 0;
+
+    /* 首块头:fmt0 */
+    buf[offset++] = (0 << 6) | (chunk_stream_id & 0x3F);
+    buf[offset++] = (timestamp >> 16) & 0xFF;
+    buf[offset++] = (timestamp >> 8) & 0xFF;
+    buf[offset++] = timestamp & 0xFF;
+    buf[offset++] = (payload_len >> 16) & 0xFF;
+    buf[offset++] = (payload_len >> 8) & 0xFF;
+    buf[offset++] = payload_len & 0xFF;
+    buf[offset++] = msg_type;
+    buf[offset++] = stream_id & 0xFF;
+    buf[offset++] = (stream_id >> 8) & 0xFF;
+    buf[offset++] = (stream_id >> 16) & 0xFF;
+    buf[offset++] = (stream_id >> 24) & 0xFF;
+
+    /* 数据拷贝,带继续块头 */
+    uint32_t sent = 0;
+    uint32_t first_copy = (payload_len < chunk_size) ? payload_len : chunk_size;
+    memcpy(&buf[offset], payload, first_copy);
+    offset += first_copy;
+    sent += first_copy;
+
+    while (sent < payload_len) {
+        uint32_t remain = payload_len - sent;
+        uint32_t copy_len = (remain < chunk_size) ? remain : chunk_size;
+        buf[offset++] = (3 << 6) | (chunk_stream_id & 0x3F); /* continuation header */
+        memcpy(&buf[offset], &payload[sent], copy_len);
+        offset += copy_len;
+        sent += copy_len;
+    }
+
+    *out_buf = buf;
+    *out_len = offset;
+    return RTMP_OK;
+}
+
+/* 入队帧,必要时丢弃未开始发送的旧帧 */
+static int rtmp_queue_frame(rtmp_ctx_t *ctx, rtmp_frame_node_t *node) {
+    if (!ctx || !node) return RTMP_ERR_INVALID_PARAM;
+
+    /* 拥堵且来了关键帧,丢弃所有未开始发送的帧(sent==0) */
+    if (node->is_key && ctx->frame_head) {
+        rtmp_frame_node_t *cur = ctx->frame_head;
+        rtmp_frame_node_t *prev = NULL;
+        while (cur) {
+            if (cur->sent == 0) {
+                rtmp_frame_node_t *to_free = cur;
+                cur = cur->next;
+                if (prev) prev->next = cur; else ctx->frame_head = cur;
+                if (to_free == ctx->frame_tail) ctx->frame_tail = prev;
+                ctx->frame_queue_bytes -= to_free->len;
+                rtmp_free_frame_node(to_free);
+                continue;
+            }
+            prev = cur;
+            cur = cur->next;
+        }
+    }
+
+    /* 水位控制:超限则优先丢弃未发送的非关键帧,再丢未发送的旧帧 */
+    uint32_t need_bytes = node->len;
+    rtmp_frame_node_t *cur = ctx->frame_head;
+    rtmp_frame_node_t *prev = NULL;
+    while (ctx->frame_queue_bytes + need_bytes > RTMP_MAX_QUEUE_BYTES && cur) {
+        if (cur->sent == 0 && !cur->is_key) {
+            rtmp_frame_node_t *to_free = cur;
+            cur = cur->next;
+            if (prev) prev->next = cur; else ctx->frame_head = cur;
+            if (to_free == ctx->frame_tail) ctx->frame_tail = prev;
+            ctx->frame_queue_bytes -= to_free->len;
+            rtmp_free_frame_node(to_free);
+            continue;
+        }
+        prev = cur;
+        cur = cur->next;
+    }
+
+    cur = ctx->frame_head;
+    prev = NULL;
+    while (ctx->frame_queue_bytes + need_bytes > RTMP_MAX_QUEUE_BYTES && cur) {
+        if (cur->sent == 0) {
+            rtmp_frame_node_t *to_free = cur;
+            cur = cur->next;
+            if (prev) prev->next = cur; else ctx->frame_head = cur;
+            if (to_free == ctx->frame_tail) ctx->frame_tail = prev;
+            ctx->frame_queue_bytes -= to_free->len;
+            rtmp_free_frame_node(to_free);
+            continue;
+        }
+        prev = cur;
+        cur = cur->next;
+    }
+
+    /* 仍然超限,则放弃当前帧 */
+    if (ctx->frame_queue_bytes + need_bytes > RTMP_MAX_QUEUE_BYTES) {
+        LLOGE("RTMP: Drop frame, queue bytes %u exceed max %u", ctx->frame_queue_bytes + need_bytes, RTMP_MAX_QUEUE_BYTES);
+        return RTMP_ERR_BUFFER_OVERFLOW;
+    }
+
+    /* 追加到队尾 */
+    node->next = NULL;
+    if (ctx->frame_tail) {
+        ctx->frame_tail->next = node;
+    } else {
+        ctx->frame_head = node;
+    }
+    ctx->frame_tail = node;
+    ctx->frame_queue_bytes += node->len;
+
+    /* 尝试立即发送 */
+    rtmp_try_send_queue(ctx);
+
+    return RTMP_OK;
+}
+
+/* 发送队列中的数据,逐chunk写入lwip */
+static void rtmp_try_send_queue(rtmp_ctx_t *ctx) {
+    if (!ctx || !ctx->pcb) return;
+
+    while (ctx->frame_head) {
+        rtmp_frame_node_t *node = ctx->frame_head;
+        uint32_t remaining = node->len - node->sent;
+        if (remaining == 0) {
+            ctx->frame_head = node->next;
+            if (ctx->frame_head == NULL) ctx->frame_tail = NULL;
+            if (ctx->frame_queue_bytes >= node->len) ctx->frame_queue_bytes -= node->len; else ctx->frame_queue_bytes = 0;
+            rtmp_free_frame_node(node);
+            continue;
+        }
+
+        u16_t snd_avail = tcp_sndbuf(ctx->pcb);
+        if (snd_avail == 0) {
+            tcp_output(ctx->pcb);
+            break;
+        }
+
+        uint32_t to_send = remaining < snd_avail ? remaining : snd_avail;
+        /* 在空闲时多发,拥堵时受snd_avail限制;上限设为8KB */
+        if (to_send > 8192) to_send = 8192;
+
+        err_t err = tcp_write(ctx->pcb, node->data + node->sent, to_send, TCP_WRITE_FLAG_COPY);
+        if (err != ERR_OK) {
+            LLOGE("RTMP: tcp_write queue failed %d", err);
+            break;
+        }
+        node->sent += to_send;
+
+        tcp_output(ctx->pcb);
+
+        if (node->sent >= node->len) {
+            ctx->frame_head = node->next;
+            if (ctx->frame_head == NULL) ctx->frame_tail = NULL;
+            if (ctx->frame_queue_bytes >= node->len) ctx->frame_queue_bytes -= node->len; else ctx->frame_queue_bytes = 0;
+            rtmp_free_frame_node(node);
+        } else {
+            /* 发送缓冲区不足,等待sent回调继续 */
+            break;
+        }
+    }
+}
+
 /**
  * 发送RTMP命令
  */
@@ -2181,18 +2383,9 @@ static int rtmp_flush_send_buffer(rtmp_ctx_t *ctx) {
         /* 检查TCP发送缓冲区可用空间 */
         u16_t available = tcp_sndbuf(ctx->pcb);
         if (available == 0) {
-            /* 缓冲区已满,先输出已有数据 */
+            /* 缓冲区已满,先输出已有数据,等待sent回调继续 */
             tcp_output(ctx->pcb);
-            
-            /* 等待一小段时间让数据发送出去 */
-            LLOGE("RTMP: TCP send buffer full, waiting...");
-            luat_rtos_task_sleep(10);
-            available = tcp_sndbuf(ctx->pcb);
-            
-            if (available == 0) {
-                LLOGE("RTMP: TCP send buffer still full after wait");
-                return RTMP_ERR_NETWORK;
-            }
+            return RTMP_ERR_NETWORK;
         }
         
         /* 计算本次可以发送的字节数 */
@@ -2208,19 +2401,7 @@ static int rtmp_flush_send_buffer(rtmp_ctx_t *ctx) {
         err_t err = tcp_write(ctx->pcb, &ctx->send_buf[bytes_sent], to_send, TCP_WRITE_FLAG_COPY);
         if (err != ERR_OK) {
             LLOGE("RTMP: tcp_write failed: %d, sent %u/%u bytes", err, bytes_sent, total_bytes);
-            
-            /* 如果是内存不足,尝试输出后重试一次 */
-            if (err == ERR_MEM && bytes_sent > 0) {
-                tcp_output(ctx->pcb);
-                LLOGE("RTMP: TCP send buffer full, waiting...");
-                luat_rtos_task_sleep(10);
-                err = tcp_write(ctx->pcb, &ctx->send_buf[bytes_sent], to_send, TCP_WRITE_FLAG_COPY);
-                if (err != ERR_OK) {
-                    return RTMP_ERR_NETWORK;
-                }
-            } else {
-                return RTMP_ERR_NETWORK;
-            }
+            return RTMP_ERR_NETWORK;
         }
         
         bytes_sent += to_send;