diff --git a/app/config.py b/app/config.py index 0c317a8..e3f47e3 100644 --- a/app/config.py +++ b/app/config.py @@ -2,4 +2,7 @@ ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" base_url = "rtsp:127.0.0.1:8554/" cleanup_interval = 60 # 定时清理间隔时间(秒) -stream_timeout = 60 * 1000 * 5 # 流空闲超时时间(毫秒) +expired_timeout = 60 * 1000 * 1 # 流空闲超时时间(毫秒) +rtmp_url = "rtmp://127.0.0.1/live" +flv_url = "http://127.0.0.1:8080/live" +md5_salt = "linxyun2024" \ No newline at end of file diff --git a/app/main.py b/app/main.py index 5c3c7ea..29e4c7c 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,5 @@ from app.middleware.cors import add_cors_middleware +from app.middleware.security import add_security_middleware from app.routes import websocket, stream from app.tasks.cleanup_task import cleanup_streams from fastapi import FastAPI @@ -7,7 +8,6 @@ import asyncio @asynccontextmanager async def lifespan(app: FastAPI): - print("正在启动服务...") # 启动定时清理任务 asyncio.create_task(cleanup_streams()) yield # FastAPI 会在这里执行应用关闭时的清理任务 @@ -16,5 +16,10 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) # 添加跨域中间件 add_cors_middleware(app) +# 添加鉴权中间件 +add_security_middleware(app) app.include_router(websocket.router) app.include_router(stream.router) + + + diff --git a/app/middleware/security.py b/app/middleware/security.py new file mode 100644 index 0000000..b800761 --- /dev/null +++ b/app/middleware/security.py @@ -0,0 +1,25 @@ +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.responses import Response + +# 鉴权中间件 +class SecurityMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + # 获取请求路径 + path = request.url.path + # 获取请求头 Token + # token = request.headers.get("token") + # if not token: + # return Response("Token is required", status_code=401) + # 获取请求参数 + query_params = request.query_params + # 获取请求体 + body = await request.body() + # 获取请求方法 + method = request.method + # 获取请求IP + ip = request.client.host + return await call_next(request) + + +def add_security_middleware(app): + app.add_middleware(SecurityMiddleware) \ No newline at end of file diff --git a/app/routes/stream.py b/app/routes/stream.py index 40c1666..581cc9f 100644 --- a/app/routes/stream.py +++ b/app/routes/stream.py @@ -1,17 +1,15 @@ -# app/routes/stream.py from fastapi import APIRouter -from fastapi.responses import StreamingResponse from app.services.stream_manager import stop_stream from app.services.ffmpeg_service import stream_rtsp_to_flv - +from app.utils.result import Result router = APIRouter() @router.get("/get_stream") -async def get_stream(id: str): - stream_output = await stream_rtsp_to_flv(id) - if stream_output: - return StreamingResponse(stream_output, media_type="video/x-flv") - return {"message": "Unable to start RTSP to FLV conversion."} +async def get_stream(camera_id: str): + flv_url = await stream_rtsp_to_flv(camera_id) + if flv_url is None: + return Result.error("0001", "Stream not started") + return Result.ok(flv_url) @router.get("/stop_stream") def stop(id: str): diff --git a/app/services/ffmpeg_service.py b/app/services/ffmpeg_service.py index f9d6ccc..d93583d 100644 --- a/app/services/ffmpeg_service.py +++ b/app/services/ffmpeg_service.py @@ -1,7 +1,15 @@ -# app/services/ffmpeg_service.py -import subprocess + import asyncio -from app.config import ffmpeg_path +import time +from app.config import ffmpeg_path, rtmp_url, flv_url +from app.services.stream_manager import stream_manager +from app.utils.logger import get_logger +from app.utils.md5 import generate_md5 +import subprocess +from urllib.parse import urljoin + +logger = get_logger(__name__) +test_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" async def log_output(process, output_url): while True: @@ -9,13 +17,59 @@ async def log_output(process, output_url): if output == "" and process.poll() is not None: break if output: - print(f"ffmpeg output for {output_url}: {output.strip()}") + logger.debug(f"ffmpeg output for {output_url}: {output.strip()}") + if process.returncode != 0: + logger.error(f"ffmpeg process for {output_url} failed with return code {process.returncode}") + + +async def start_stream(stream_id: str, output_url: str): + logger.info(f"Starting stream {stream_id}") + input_url = test_url # RTSP 流地址 + # 构建 ffmpeg 命令 + command = [ + ffmpeg_path, + '-i', input_url, # 输入 RTSP 流 + '-c:v', 'libx264', # 视频编码为 libx264 + '-preset', 'ultrafast', + '-tune', 'zerolatency', + '-g', '5', + '-b:v', '6000k', # 码率 6Mbps + '-r', '30', + '-b:a', '128k', + '-c:a', 'aac', # 音频编码为 aac + '-f', 'flv', + output_url # 推送到 MediaMTX 流服务 + ] + + # 使用 asyncio.to_thread 异步启动 ffmpeg 进程 + process = await asyncio.to_thread(subprocess.Popen, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + logger.info(f"ffmpeg command for {stream_id}: {' '.join(command)}") + + # 异步记录输出 + asyncio.create_task(log_output(process, stream_id)) + + # 保存进程信息 + stream_manager[stream_id] = { + "process": process, + "create_time": int(time.time() * 1000) + } + return process + async def stream_rtsp_to_flv(stream_id: str): - input_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" - command = [ - ffmpeg_path, "-i", input_url, "-c:v", "libx264", "-c:a", "aac", "-f", "flv", "-" - ] - process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - asyncio.create_task(log_output(process, stream_id)) - return process.stdout + md5_route = generate_md5(stream_id) + output_url = f"{rtmp_url}/{md5_route}" + # 如果流已经在运行,检查进程是否仍在运行 + if stream_id in stream_manager: + logger.info(f"Stream {stream_id} is already running.") + process = stream_manager[stream_id]["process"] + if process.poll() is None: + logger.info(f"Stream {stream_id} is still running.") + return output_url + # 如果流未在运行,启动新流 + logger.info(f"Stream {stream_id} is not running. Starting new stream.") + process = await start_stream(stream_id, output_url) + if process.poll() is None: + logger.info(f"Stream {stream_id} started successfully.") + return f"{flv_url}/{md5_route}.flv" + return None diff --git a/app/services/stream_manager.py b/app/services/stream_manager.py index 8aaa95c..9d4135e 100644 --- a/app/services/stream_manager.py +++ b/app/services/stream_manager.py @@ -1,14 +1,18 @@ -# app/services/stream_manager.py +from app.utils.logger import get_logger import time stream_manager = {} +logger = get_logger(__name__) + def reset_stream_time(stream_id: str): if stream_id in stream_manager: stream_manager[stream_id]["create_time"] = int(time.time() * 1000) def stop_stream(stream_id: str): - stream = stream_manager.pop(stream_id, None) - if stream: - process = stream["process"] + if stream_id in stream_manager: + process = stream_manager[stream_id]["process"] process.terminate() - print(f"Stream {stream_id} stopped.") + process.wait() # 确保进程已经完全终止 + logger.info(f"Stream {stream_id} stopped.") + del stream_manager[stream_id] + diff --git a/app/tasks/cleanup_task.py b/app/tasks/cleanup_task.py index 5b0150d..3298f57 100644 --- a/app/tasks/cleanup_task.py +++ b/app/tasks/cleanup_task.py @@ -3,15 +3,16 @@ import asyncio import time from app.services.stream_manager import stream_manager, stop_stream from app.utils.logger import get_logger +from app.config import expired_timeout, cleanup_interval logger = get_logger(__name__) async def cleanup_streams(): while True: - await asyncio.sleep(60) + await asyncio.sleep(cleanup_interval) now = int(time.time() * 1000) logger.info("定时任务执行:清理空闲流") for stream_id, stream in list(stream_manager.items()): - if now - stream["create_time"] > 60 * 1000 * 1: + if now - stream["create_time"] > expired_timeout: stop_stream(stream_id) logger.info(f"清理空闲流:{stream_id}") diff --git a/app/utils/md5.py b/app/utils/md5.py new file mode 100644 index 0000000..1adc9af --- /dev/null +++ b/app/utils/md5.py @@ -0,0 +1,10 @@ +import hashlib +from app.config import md5_salt + +# 生成md5 +def generate_md5(text: str): + content = text + md5_salt + return hashlib.md5(content.encode('utf-8')).hexdigest() + + + diff --git a/app/utils/result.py b/app/utils/result.py new file mode 100644 index 0000000..1c0bfb1 --- /dev/null +++ b/app/utils/result.py @@ -0,0 +1,19 @@ + +class Result: + code:str = "" + msg:str = "" + data:object = None + success:bool = False + def __init__(self, code:str, msg:str, data:object, success:bool) -> None: + self.code = code + self.msg = msg + self.data = data + self.success = success + + @staticmethod + def ok(data): + return Result("0000", "操作成功", data, True) + + @staticmethod + def error(self, code:str, message:str): + return Result(code, message, None, False)