فهرست منبع

update: rtmp,正确处理服务器下发的SetChunkSize指令,并在执行@setDataFrame之后立即通知摄像头输出帧数据,简化用户侧代码

Wendal Chen 1 ماه پیش
والد
کامیت
9574d33e30
2فایلهای تغییر یافته به همراه116 افزوده شده و 107 حذف شده
  1. 101 74
      components/rtmp/src/luat_rtmp_push.c
  2. 15 33
      olddemo/camera/rtmp_usb/main.lua

+ 101 - 74
components/rtmp/src/luat_rtmp_push.c

@@ -676,7 +676,7 @@ int rtmp_send_nalu(rtmp_ctx_t *ctx, const uint8_t *nalu_data,
         return RTMP_ERR_INVALID_PARAM;
     }
     
-    if (ctx->state != RTMP_STATE_CONNECTED && ctx->state != RTMP_STATE_PUBLISHING) {
+    if (ctx->state != RTMP_STATE_PUBLISHING) {
         return RTMP_ERR_FAILED;
     }
     
@@ -2128,90 +2128,113 @@ static int rtmp_send_command(rtmp_ctx_t *ctx, const char *command,
  * 处理收到的RTMP数据
  */
 static int rtmp_process_data(rtmp_ctx_t *ctx) {
-    /* RTMP消息解析和处理
-     * 
-     * RTMP消息格式:
-     * - 块头 (1-3字节)
-     * - 消息头 (0/3/7/11字节)
-     * - 块数据 (可变长)
-     * 
-     * 消息头格式(最多11字节):
-     * - 消息类型 (1字节): 8=音频, 9=视频, 18=数据, 20=命令(AMF0)
-     * - 消息长度 (3字节大端)
-     * - 时间戳 (3字节大端)
-     * - 扩展时间戳 (1字节)
-     * - 流ID (3字节大端)
-     */
-    
     if (!ctx || ctx->recv_pos == 0) {
         return RTMP_OK;
     }
-    
-    /* 简化实现: 寻找响应中的onStatus/NetConnection.Connect.Success */
-    /* 完整实现应该解析块头和消息头 */
-    
-    uint32_t pos = 0;
-    
-    /* 搜索响应字符串 */
+
     const char *success_str = "NetConnection.Connect.Success";
     const char *on_status_str = "onStatus";
     const char *failed_str = "NetConnection.Connect.Failed";
     const char *result_str = "_result";
     const char *publish_start_str = "NetStream.Publish.Start";
-    
+
     bool found_success = false;
     bool found_failed = false;
     bool found_on_status = false;
     bool found_result = false;
     bool found_publish_start = false;
-    
-    /* 在接收缓冲区中搜索这些字符串 */
-    while (pos < ctx->recv_pos) {
-        /* 检查 connect success */
-        if (!found_success && pos + strlen(success_str) <= ctx->recv_pos) {
-            if (memcmp(&ctx->recv_buf[pos], success_str, strlen(success_str)) == 0) {
-                found_success = true;
-                LLOGD("RTMP: Found NetConnection.Connect.Success");
-                break;
-            }
+
+    uint32_t pos = 0;
+    while (pos + 12 <= ctx->recv_pos) {
+        uint8_t chunk_header = ctx->recv_buf[pos];
+        uint8_t fmt = (chunk_header >> 6) & 0x03;
+        /* 仅处理格式0头,其他格式简单跳过到下一个字节 */
+        if (fmt != 0) {
+            pos++;
+            continue;
         }
-        
-        /* 检查 connect failed */
-        if (!found_failed && pos + strlen(failed_str) <= ctx->recv_pos) {
-            if (memcmp(&ctx->recv_buf[pos], failed_str, strlen(failed_str)) == 0) {
-                found_failed = true;
-                LLOGD("RTMP: Found NetConnection.Connect.Failed");
-                break;
-            }
+
+        /* 检查剩余长度是否包含完整头 */
+        if (pos + 12 > ctx->recv_pos) {
+            break; /* 不完整,等待更多数据 */
         }
-        
-        /* 检查 _result (createStream响应) */
-        if (!found_result && pos + strlen(result_str) <= ctx->recv_pos) {
-            if (memcmp(&ctx->recv_buf[pos], result_str, strlen(result_str)) == 0) {
-                found_result = true;
-                LLOGD("RTMP: Found _result response");
-            }
+
+        uint32_t msg_len = ((uint32_t)ctx->recv_buf[pos + 4] << 16) |
+                           ((uint32_t)ctx->recv_buf[pos + 5] << 8) |
+                           (uint32_t)ctx->recv_buf[pos + 6];
+        uint8_t msg_type = ctx->recv_buf[pos + 7];
+
+        /* 检查消息是否完整 */
+        if (pos + 12 + msg_len > ctx->recv_pos) {
+            break; /* 不完整,等待更多数据 */
         }
-        
-        /* 检查 publish start */
-        if (!found_publish_start && pos + strlen(publish_start_str) <= ctx->recv_pos) {
-            if (memcmp(&ctx->recv_buf[pos], publish_start_str, strlen(publish_start_str)) == 0) {
-                found_publish_start = true;
-                LLOGD("RTMP: Found NetStream.Publish.Start");
+
+        const uint8_t *payload = &ctx->recv_buf[pos + 12];
+
+        if (msg_type == RTMP_MSG_SET_CHUNK_SIZE && msg_len >= 4) {
+            uint32_t new_chunk_size = read_be32(payload) & 0x7FFFFFFF;
+            if (new_chunk_size > 0 && new_chunk_size <= 0x7FFFFFFF) {
+                ctx->in_chunk_size = new_chunk_size;
+                ctx->chunk_size = new_chunk_size;
+                LLOGI("RTMP: Received Set Chunk Size from server: %u, updated local chunk_size", new_chunk_size);
             }
-        }
-        
-        /* 检查 onStatus */
-        if (!found_on_status && pos + strlen(on_status_str) <= ctx->recv_pos) {
-            if (memcmp(&ctx->recv_buf[pos], on_status_str, strlen(on_status_str)) == 0) {
-                found_on_status = true;
-                LLOGD("RTMP: Found onStatus command");
+        } else if (msg_type == RTMP_MSG_COMMAND || msg_type == RTMP_MSG_EXTENDED_COMMAND || msg_type == RTMP_MSG_AMFDATAFILE) {
+            /* 在命令/数据消息体内查找关键字符串 */
+            if (!found_success && msg_len >= strlen(success_str)) {
+                for (uint32_t i = 0; i + strlen(success_str) <= msg_len; i++) {
+                    if (memcmp(&payload[i], success_str, strlen(success_str)) == 0) {
+                        found_success = true;
+                        break;
+                    }
+                }
+            }
+            if (!found_failed && msg_len >= strlen(failed_str)) {
+                for (uint32_t i = 0; i + strlen(failed_str) <= msg_len; i++) {
+                    if (memcmp(&payload[i], failed_str, strlen(failed_str)) == 0) {
+                        found_failed = true;
+                        break;
+                    }
+                }
+            }
+            if (!found_result && msg_len >= strlen(result_str)) {
+                for (uint32_t i = 0; i + strlen(result_str) <= msg_len; i++) {
+                    if (memcmp(&payload[i], result_str, strlen(result_str)) == 0) {
+                        found_result = true;
+                        break;
+                    }
+                }
+            }
+            if (!found_publish_start && msg_len >= strlen(publish_start_str)) {
+                for (uint32_t i = 0; i + strlen(publish_start_str) <= msg_len; i++) {
+                    if (memcmp(&payload[i], publish_start_str, strlen(publish_start_str)) == 0) {
+                        found_publish_start = true;
+                        break;
+                    }
+                }
+            }
+            if (!found_on_status && msg_len >= strlen(on_status_str)) {
+                for (uint32_t i = 0; i + strlen(on_status_str) <= msg_len; i++) {
+                    if (memcmp(&payload[i], on_status_str, strlen(on_status_str)) == 0) {
+                        found_on_status = true;
+                        break;
+                    }
+                }
             }
         }
-        
-        pos++;
+
+        /* 前进到下一条消息 */
+        pos += 12 + msg_len;
     }
-    
+
+    /* 如果有未处理完的部分,将剩余数据前移 */
+    if (pos < ctx->recv_pos) {
+        uint32_t remain = ctx->recv_pos - pos;
+        memmove(ctx->recv_buf, &ctx->recv_buf[pos], remain);
+        ctx->recv_pos = remain;
+    } else {
+        ctx->recv_pos = 0;
+    }
+
     /* 根据查找结果更新状态 */
     if (found_success) {
         /* 连接成功,开始发送发布流的控制命令 */
@@ -2219,14 +2242,18 @@ static int rtmp_process_data(rtmp_ctx_t *ctx) {
         
         /* 1. 发送 setChunkSize */
         uint8_t chunk_size_msg[4];
-        uint32_t new_chunk_size = 4096;  /* 设置为4KB */
-        write_be32(chunk_size_msg, new_chunk_size);
+        uint32_t negotiated_chunk = ctx->in_chunk_size ? ctx->in_chunk_size : (ctx->chunk_size ? ctx->chunk_size : RTMP_DEFAULT_CHUNK_SIZE);
+        negotiated_chunk &= 0x7FFFFFFF; /* 规范要求最高位为0 */
+        if (negotiated_chunk == 0) {
+            negotiated_chunk = RTMP_DEFAULT_CHUNK_SIZE;
+        }
+        write_be32(chunk_size_msg, negotiated_chunk);
         
-        int ret = rtmp_pack_message(ctx, 1, chunk_size_msg, sizeof(chunk_size_msg), 0, 0);
+        int ret = rtmp_pack_message(ctx, RTMP_MSG_SET_CHUNK_SIZE, chunk_size_msg, sizeof(chunk_size_msg), 0, 0);
         if (ret == RTMP_OK) {
-            ctx->out_chunk_size = new_chunk_size;
-            ctx->chunk_size = new_chunk_size;  /* 更新实际使用的chunk大小 */
-            LLOGI("RTMP: Sent setChunkSize: %u", new_chunk_size);
+            ctx->out_chunk_size = negotiated_chunk;
+            ctx->chunk_size = negotiated_chunk;  /* 更新实际使用的chunk大小 */
+            LLOGI("RTMP: Sent setChunkSize: %u", negotiated_chunk);
         }
         
         /* 2. 发送 releaseStream */
@@ -2275,6 +2302,9 @@ static int rtmp_process_data(rtmp_ctx_t *ctx) {
         if (rtmp_send_metadata(ctx) == RTMP_OK) {
             LLOGI("RTMP: Metadata sent, ready to send video data");
             rtmp_set_state(ctx, RTMP_STATE_PUBLISHING, 0);
+            // 通知摄像头开始采集
+            extern int luat_camera_capture(int id, uint8_t quality, const char *path);
+            luat_camera_capture(0, 80, "rtmp");
         } else {
             LLOGE("RTMP: Failed to send metadata");
             rtmp_set_state(ctx, RTMP_STATE_ERROR, RTMP_ERR_FAILED);
@@ -2290,9 +2320,6 @@ static int rtmp_process_data(rtmp_ctx_t *ctx) {
         LLOGD("RTMP: Received onStatus response");
     }
     
-    /* 清空接收缓冲区 */
-    ctx->recv_pos = 0;
-    
     return RTMP_OK;
 }
 

+ 15 - 33
olddemo/camera/rtmp_usb/main.lua

@@ -6,8 +6,8 @@ local camera_id = camera.USB
 
 local usb_camera_table = {
     id = camera_id,
-    sensor_width = 640,
-    sensor_height = 480,
+    sensor_width = 1280,
+    sensor_height = 720,
     usb_port = 1
 }
 
@@ -17,9 +17,16 @@ sys.taskInit(function()
 
     sys.waitUntil("IP_READY")
 
+    camera.config(0, camera.CONF_UVC_FPS, 15)
+    socket.sntp()
+    sys.wait(200)
+    result = camera.init(usb_camera_table)
+    log.info("摄像头初始化", result)
+    camera.start(camera_id)
+
     -- local rtmpc = rtmp.create("rtmp://192.168.1.10:1935/live/abc")
     -- local rtmpc = rtmp.create("rtmp://180.152.6.34:1935/stream1live/1ca786f5_23e5_4d89_8b1d_2eec6932775a_0001")
-    local rtmpc = rtmp.create("rtmp://47.94.236.172/live/1ca786f5")
+    local rtmpc = rtmp.create("rtmp://47.94.236.172/live/1ca786f5") -- 替换为你的推流地址
     rtmpc:setCallback(function(state, ...)
         if state == rtmp.STATE_CONNECTED then
             log.info("rtmp", "已连接到推流服务器")
@@ -30,46 +37,21 @@ sys.taskInit(function()
         end
     end)
     log.info("开始连接到推流服务器...")
-    -- sys.wait(100)
+    sys.wait(100)
     rtmpc:connect()
-
-    sys.wait(500)
+    sys.wait(300)
 
     -- 开始处理
     log.info("rtmp", "开始推流...")
-    rtmpc:start()
-
-    camera.config(0, camera.CONF_UVC_FPS, 15)
-
-    socket.sntp()
-    sys.wait(200)
-
-    -- 初始化摄像头
+    rtmpc:start() -- 已自动调用 camera.capture(camera_id, "rtmp", 1)
+    
     while 1 do
-        -- if true then rtos.reboot() end
-        result = camera.init(usb_camera_table)
-        log.info("摄像头初始化", result)
-        -- log.info("lua", rtos.meminfo())
-        -- log.info("sys", rtos.meminfo("sys"))
-        -- log.info("psram", rtos.meminfo("psram"))
-        if (result == 0) then
-            camera.start(camera_id)
-            -- 开始mp4录制
-            camera.capture(camera_id, "rtmp", 1)
-            sys.wait(3000000)
-
-            -- 结束MP4录制
-            camera.stop(camera_id)
-
-            log.info("保存成功")
-        end
-        camera.close(camera_id)
         --- 打印一下内存状态
+        sys.wait(30*1000)
         log.info("lua", rtos.meminfo())
         log.info("sys", rtos.meminfo("sys"))
         log.info("psram", rtos.meminfo("psram"))
         sys.wait(2000)
-        -- rtos.reboot()
     end
     -- #################################################