import subprocess from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import StreamingResponse import time import asyncio from contextlib import asynccontextmanager from fastapi.middleware.cors import CORSMiddleware # CORS 中间件配置 origins = ["*"] ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" # ffmpeg 可执行文件路径 # 管理推拉流的 ffmpeg 子进程 stream_manager: dict[str, dict] = {} base_url = "rtsp:127.0.0.1:8554/" # 定时清理空闲流 async def cleanup_streams(): while True: await asyncio.sleep(60) try: print("定时任务执行-清空空闲流...") now = int(time.time() * 1000) for id, stream in list(stream_manager.items()): if now - stream["create_time"] > 60 * 1000 * 5: print(f"清空空闲流: {id}") stop_stream(id) except Exception as e: print(f"清空空闲流异常: {e}") @asynccontextmanager async def lifespan(app: FastAPI): print("正在启动服务...") # 启动定时清理任务 asyncio.create_task(cleanup_streams()) yield # FastAPI 会在这里执行应用关闭时的清理任务 print("停止服务") # 使用 lifespan 事件初始化 FastAPI 应用 app = FastAPI(lifespan=lifespan) # 添加 CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=origins, # 允许的来源 allow_credentials=True, # 是否允许携带凭证(如 Cookies) allow_methods=["*"], # 允许所有 HTTP 方法 allow_headers=["*"], # 允许所有请求头 ) # 记录并处理 ffmpeg 输出日志 async def log_output(process, output_url): while True: output = await asyncio.to_thread(process.stderr.readline) # 读取 ffmpeg 的错误日志 if output == "" and process.poll() is not None: # 检查是否完成 break if output: print(f"ffmpeg output for {output_url}: {output.strip()}") # 实时打印日志 # 启动流 async def start_stream(stream_id: str): output_url = base_url + stream_id input_url = r"rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" print(f"开始推流:{stream_id}") try: command = [ ffmpeg_path, "-re", # 实时读取 "-i", input_url, # 输入文件或流 "-c:v", "libx264", # 视频编码器 "-c:a", "aac", # 音频编码器 "-f", "rtsp", # 推流 RTSP 格式 "-rtsp_transport", "tcp", # 指定 RTSP 传输协议 output_url ] # 创建子进程 process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 保存子进程和创建时间 stream_manager[stream_id] = { "process": process, "create_time": int(time.time() * 1000)} print("ffmpeg command: %s", " ".join(command)) # 使用 asyncio 运行日志捕获 asyncio.create_task(log_output(process, output_url)) return process except Exception as e: print(f"Error starting stream: {e}") # 停止流 def stop_stream(id: str): stream = stream_manager.pop(id, None) if stream: process = stream["process"] process.terminate() print(f"Stream {id} stopped.") else: print(f"没有找到该流: {id}") # WebSocket 心跳检测 @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: json_msg = await websocket.receive_json() # eg: looking: 正在看的摄像头ID if json_msg: looking = json_msg["looking"] if looking and looking in stream_manager: stream = stream_manager[looking] # 重置时间 new_time = int(time.time() * 1000) stream["create_time"] = new_time print(f"重置流时间: {looking}-{new_time}") await websocket.send_text("ok") except WebSocketDisconnect: print("WebSocket disconnected") # @app.get("/start_stream") # async def start(id: str): # # 判断是否已经存在 # if id in stream_manager: # return {"message": "Stream already running", "url": base_url + id} # await start_stream(id) # 使用 await 调用 # return {"message": "Stream started", "url": base_url + id} # ffmpeg 命令将 RTSP 转换为 FLV 格式并直接输出 async def stream_rtsp_to_flv(stream_id: str): # 检查流是否正在运行 if stream_id in stream_manager: print(f"Stream {stream_id} is already running.") process = stream_manager[stream_id]["process"] return process.stdout input_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" command = [ ffmpeg_path, "-i", input_url, # 输入 RTSP 流地址 "-c:v", "libx264", # 使用 libx264 编码 "-c:a", "aac", # 使用 aac 编码 "-f", "flv", # 输出 FLV 格式 "-" # 直接输出到 stdout ] try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) asyncio.create_task(log_output(process, "rtsp")) # 保存子进程和创建时间 stream_manager[stream_id] = { "process": process, "create_time": int(time.time() * 1000)} print(f"Started streaming RTSP {stream_id} to FLV format.") return process.stdout except Exception as e: print(f"Error converting RTSP stream {stream_id} to FLV: {e}") return None # 提供 RTSP 流并将其转换为 FLV 格式 @app.get("/get_stream") async def get_stream(id: str): # 启动流并获取 stdout 流 stream_output = await stream_rtsp_to_flv(id) if stream_output: # 通过 StreamingResponse 流式传输数据 return StreamingResponse(stream_output, media_type="video/x-flv") return {"message": "Unable to start RTSP to FLV conversion."} # 停止流的 API 路由 @app.get("/stop_stream") def stop(id: str): stop_stream(id) return {"message": "Stream stopped"}