import subprocess from fastapi import FastAPI, WebSocket, WebSocketDisconnect import time import asyncio from contextlib import asynccontextmanager # 管理推拉流的 ffmpeg 子进程 stream_manager: dict[str, dict] = {} base_url = "rtsp:127.0.0.1:8554/" # 定时清理空闲流 async def cleanup_streams(): while True: await asyncio.sleep(60) try: print("定时任务执行-清空空闲流...") now = int(time.time() * 1000) for id, stream in list(stream_manager.items()): if now - stream["create_time"] > 60 * 1000 * 1: print(f"清空空闲流: {id}") stop_stream(id) except Exception as e: print(f"清空空闲流异常: {e}") @asynccontextmanager async def lifespan(app: FastAPI): print("正在启动服务...") # 启动定时清理任务 asyncio.create_task(cleanup_streams()) yield # FastAPI 会在这里执行应用关闭时的清理任务 print("停止服务") # 使用 lifespan 事件初始化 FastAPI 应用 app = FastAPI(lifespan=lifespan) # 记录并处理 ffmpeg 输出日志 async def log_output(process, output_url): while True: output = await asyncio.to_thread(process.stderr.readline) # 读取 ffmpeg 的错误日志 if output == "" and process.poll() is not None: # 检查是否完成 break if output: print(f"ffmpeg output for {output_url}: {output.strip()}") # 实时打印日志 # 启动流 async def start_stream(stream_id: str): output_url = base_url + stream_id input_url = r"C:\Users\simon\Desktop\input.mp4" ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" print(f"开始推流:{stream_id}") try: command = [ ffmpeg_path, "-re", # 实时读取 "-i", input_url, # 输入文件或流 "-c:v", "libx264", # 视频编码器 "-c:a", "aac", # 音频编码器 "-f", "rtsp", # 推流 RTSP 格式 "-rtsp_transport", "tcp", # 指定 RTSP 传输协议 output_url ] # 创建子进程 process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 保存子进程和创建时间 stream_manager[stream_id] = { "process": process, "create_time": int(time.time() * 1000)} print("ffmpeg command: %s", " ".join(command)) # 使用 asyncio 运行日志捕获 asyncio.create_task(log_output(process, output_url)) return process except Exception as e: print(f"Error starting stream: {e}") # 停止流 def stop_stream(id: str): stream = stream_manager.pop(id, None) if stream: process = stream["process"] process.terminate() print(f"Stream {id} stopped.") else: print(f"没有找到该流: {id}") # WebSocket 心跳检测 @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: json_msg = await websocket.receive_json() # eg: looking: 正在看的摄像头ID if json_msg: looking = json_msg["looking"] if looking and looking in stream_manager: stream = stream_manager[looking] # 重置时间 new_time = int(time.time() * 1000) stream["create_time"] = new_time print(f"重置流时间: {looking}-{new_time}") await websocket.send_text("ok") except WebSocketDisconnect: print("WebSocket disconnected") # 启动流的 API 路由 @app.post("/start_stream") async def start(id: str): # 判断是否已经存在 if id in stream_manager: return {"message": "Stream already running", "url": base_url + id} await start_stream(id) # 使用 await 调用 return {"message": "Stream started", "url": base_url + id} # 停止流的 API 路由 @app.post("/stop_stream") def stop(id: str): stop_stream(id) return {"message": "Stream stopped"}