import asyncio import time from app.config import ffmpeg_path, rtmp_url, flv_url, rtc_url from app.services.stream_manager import stream_manager from app.utils.logger import get_logger from app.utils.md5 import generate_md5 import subprocess from linxyun.utils.result import Result, SysCodes from app.model import Stream logger = get_logger(__name__) test_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" async def log_output(process, output_url): try: while True: # 异步读取 stderr 的输出 output = await asyncio.to_thread(process.stderr.readline) if not output and process.poll() is not None: # 如果输出为空且进程已结束 break if output: logger.debug(f"ffmpeg output for {output_url}: {output.decode().strip()}") except Exception as e: logger.error(f"Error while logging output for {output_url}: {e}") async def start_stream(stream_id: str, output_url: str, stream: Stream): logger.info(f"Starting stream {stream_id}") input_url = test_url # RTSP 流地址 # 构建 ffmpeg 命令 command = [ ffmpeg_path, '-i', input_url, # 输入 RTSP 流 '-c:v', stream.cv, # 视频编码为 libx264 '-preset', stream.preset, '-tune', stream.tune, '-g', stream.g, '-b:v', stream.bv, # 码率 6Mbps '-r', stream.r, '-b:a', stream.ba, '-c:a', stream.ca, # 音频编码为 aac '-f', 'flv', output_url # 推送到 MediaMTX 流服务 ] logger.info(f"ffmpeg command for {stream_id}: {' '.join(command)}") try: # 使用 asyncio.to_thread 异步启动 ffmpeg 进程 process = await asyncio.to_thread( subprocess.Popen, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) except Exception as e: logger.error(f"Failed to start ffmpeg process for {stream_id}: {e}") return None # 等待短时间以确认进程是否启动成功 # await asyncio.sleep(10) # 等待 1 秒,确保进程已稳定运行 # 检查进程是否已退出 # if process.poll() is not None: # poll() 检查子进程状态 # logger.error(f"Stream {stream_id} failed to start. Exit code: {process.poll()}") # stderr_output = await asyncio.to_thread(process.stderr.read) # logger.error(f"Error: {stderr_output.decode().strip()}") # return None # 异步记录输出 asyncio.create_task(log_output(process, output_url)) # 保存进程信息 stream_manager[stream_id] = { "process": process, "create_time": int(time.time() * 1000) } logger.info(f"Stream {stream_id} started successfully") return process async def stream_rtsp_to_url(stream_id: str, type: str, stream: Stream): md5_route = generate_md5(stream_id) output_url = f"{rtmp_url}/{md5_route}" # 如果流已经在运行,检查进程是否仍在运行 if stream_id in stream_manager: logger.info(f"Stream {stream_id} is already running.") process = stream_manager[stream_id]["process"] if process.poll() is None: # 子进程仍在运行 logger.info(f"Stream {stream_id} is still running.") if type == "rtc": result_url = f"{rtc_url}{md5_route}.flv" else: result_url = f"{flv_url}/{md5_route}.flv" return Result.ok(result_url) # 如果流未在运行,启动新流 logger.info(f"Stream {stream_id} is not running. Starting new stream.") result = await start_stream(stream_id, output_url, stream) if result is None: return Result.error(SysCodes.OPERATE_FAIL) # 根据类型返回不同的 URL if type == "rtc": result_url = f"{rtc_url}{md5_route}.flv" else: result_url = f"{flv_url}/{md5_route}.flv" return Result.ok(result_url)