From 1d47fd791384100e2c3050154eac66bab0077ff2 Mon Sep 17 00:00:00 2001 From: wenxin <1731551615@qq.com> Date: Thu, 12 Dec 2024 20:04:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=8C=96=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/__init__.py | 0 app/config.py | 5 +++ app/main.py | 20 +++++++++ app/middleware/__init__.py | 0 app/middleware/cors.py | 12 +++++ app/routes/__init__.py | 0 app/routes/stream.py | 19 ++++++++ app/routes/websocket.py | 17 ++++++++ app/services/__init__.py | 0 app/services/ffmpeg_service.py | 21 +++++++++ app/services/stream_manager.py | 14 ++++++ app/tasks/__init__.py | 0 app/tasks/cleanup_task.py | 17 ++++++++ app/utils/__init__.py | 0 app/utils/logger.py | 30 +++++++++++++ tests/__init__.py | 0 main.py => tests/main.py | 80 +++++++++++++++++++++++++++++----- 17 files changed, 223 insertions(+), 12 deletions(-) create mode 100644 app/__init__.py create mode 100644 app/config.py create mode 100644 app/main.py create mode 100644 app/middleware/__init__.py create mode 100644 app/middleware/cors.py create mode 100644 app/routes/__init__.py create mode 100644 app/routes/stream.py create mode 100644 app/routes/websocket.py create mode 100644 app/services/__init__.py create mode 100644 app/services/ffmpeg_service.py create mode 100644 app/services/stream_manager.py create mode 100644 app/tasks/__init__.py create mode 100644 app/tasks/cleanup_task.py create mode 100644 app/utils/__init__.py create mode 100644 app/utils/logger.py create mode 100644 tests/__init__.py rename main.py => tests/main.py (58%) diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..0c317a8 --- /dev/null +++ b/app/config.py @@ -0,0 +1,5 @@ +# app/config.py +ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" +base_url = "rtsp:127.0.0.1:8554/" +cleanup_interval = 60 # 定时清理间隔时间(秒) +stream_timeout = 60 * 1000 * 5 # 流空闲超时时间(毫秒) diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..5c3c7ea --- /dev/null +++ b/app/main.py @@ -0,0 +1,20 @@ +from app.middleware.cors import add_cors_middleware +from app.routes import websocket, stream +from app.tasks.cleanup_task import cleanup_streams +from fastapi import FastAPI +from contextlib import asynccontextmanager +import asyncio + +@asynccontextmanager +async def lifespan(app: FastAPI): + print("正在启动服务...") + # 启动定时清理任务 + asyncio.create_task(cleanup_streams()) + yield # FastAPI 会在这里执行应用关闭时的清理任务 + print("停止服务") + +app = FastAPI(lifespan=lifespan) +# 添加跨域中间件 +add_cors_middleware(app) +app.include_router(websocket.router) +app.include_router(stream.router) diff --git a/app/middleware/__init__.py b/app/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/middleware/cors.py b/app/middleware/cors.py new file mode 100644 index 0000000..341e817 --- /dev/null +++ b/app/middleware/cors.py @@ -0,0 +1,12 @@ +# app/middleware/cors.py +from fastapi.middleware.cors import CORSMiddleware + +def add_cors_middleware(app): + origins = ["*"] # 可以替换为更具体的来源 + app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) diff --git a/app/routes/__init__.py b/app/routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routes/stream.py b/app/routes/stream.py new file mode 100644 index 0000000..40c1666 --- /dev/null +++ b/app/routes/stream.py @@ -0,0 +1,19 @@ +# app/routes/stream.py +from fastapi import APIRouter +from fastapi.responses import StreamingResponse +from app.services.stream_manager import stop_stream +from app.services.ffmpeg_service import stream_rtsp_to_flv + +router = APIRouter() + +@router.get("/get_stream") +async def get_stream(id: str): + stream_output = await stream_rtsp_to_flv(id) + if stream_output: + return StreamingResponse(stream_output, media_type="video/x-flv") + return {"message": "Unable to start RTSP to FLV conversion."} + +@router.get("/stop_stream") +def stop(id: str): + stop_stream(id) + return {"message": "Stream stopped"} diff --git a/app/routes/websocket.py b/app/routes/websocket.py new file mode 100644 index 0000000..6b230c9 --- /dev/null +++ b/app/routes/websocket.py @@ -0,0 +1,17 @@ +# app/routes/websocket.py +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from app.services.stream_manager import reset_stream_time + +router = APIRouter() + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + try: + while True: + json_msg = await websocket.receive_json() + if json_msg: + reset_stream_time(json_msg.get("looking")) + await websocket.send_text("ok") + except WebSocketDisconnect: + print("WebSocket disconnected") diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/ffmpeg_service.py b/app/services/ffmpeg_service.py new file mode 100644 index 0000000..f9d6ccc --- /dev/null +++ b/app/services/ffmpeg_service.py @@ -0,0 +1,21 @@ +# app/services/ffmpeg_service.py +import subprocess +import asyncio +from app.config import ffmpeg_path + +async def log_output(process, output_url): + while True: + output = await asyncio.to_thread(process.stderr.readline) + if output == "" and process.poll() is not None: + break + if output: + print(f"ffmpeg output for {output_url}: {output.strip()}") + +async def stream_rtsp_to_flv(stream_id: str): + input_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" + command = [ + ffmpeg_path, "-i", input_url, "-c:v", "libx264", "-c:a", "aac", "-f", "flv", "-" + ] + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + asyncio.create_task(log_output(process, stream_id)) + return process.stdout diff --git a/app/services/stream_manager.py b/app/services/stream_manager.py new file mode 100644 index 0000000..8aaa95c --- /dev/null +++ b/app/services/stream_manager.py @@ -0,0 +1,14 @@ +# app/services/stream_manager.py +import time +stream_manager = {} + +def reset_stream_time(stream_id: str): + if stream_id in stream_manager: + stream_manager[stream_id]["create_time"] = int(time.time() * 1000) + +def stop_stream(stream_id: str): + stream = stream_manager.pop(stream_id, None) + if stream: + process = stream["process"] + process.terminate() + print(f"Stream {stream_id} stopped.") diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tasks/cleanup_task.py b/app/tasks/cleanup_task.py new file mode 100644 index 0000000..5b0150d --- /dev/null +++ b/app/tasks/cleanup_task.py @@ -0,0 +1,17 @@ +# app/tasks/cleanup_task.py +import asyncio +import time +from app.services.stream_manager import stream_manager, stop_stream +from app.utils.logger import get_logger + +logger = get_logger(__name__) + +async def cleanup_streams(): + while True: + await asyncio.sleep(60) + now = int(time.time() * 1000) + logger.info("定时任务执行:清理空闲流") + for stream_id, stream in list(stream_manager.items()): + if now - stream["create_time"] > 60 * 1000 * 1: + stop_stream(stream_id) + logger.info(f"清理空闲流:{stream_id}") diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/utils/logger.py b/app/utils/logger.py new file mode 100644 index 0000000..a364712 --- /dev/null +++ b/app/utils/logger.py @@ -0,0 +1,30 @@ +import logging +import os +from logging.handlers import RotatingFileHandler + +# 创建日志目录(如果不存在) +LOG_DIR = "logs" +os.makedirs(LOG_DIR, exist_ok=True) + +# 日志文件路径 +LOG_FILE = os.path.join(LOG_DIR, "app.log") + +# 日志格式 +LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + +# 配置日志 +logging.basicConfig( + level=logging.INFO, # 设置日志级别 + format=LOG_FORMAT, + handlers=[ + # 控制台输出 + logging.StreamHandler(), + # 文件输出,支持日志文件轮转 + RotatingFileHandler(LOG_FILE, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"), + ], +) + +# 获取日志实例 +def get_logger(name: str): + """获取带有模块名称的日志记录器""" + return logging.getLogger(name) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/tests/main.py similarity index 58% rename from main.py rename to tests/main.py index 37fdd70..3d405db 100644 --- a/main.py +++ b/tests/main.py @@ -1,9 +1,15 @@ import subprocess from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.responses import StreamingResponse import time import asyncio from contextlib import asynccontextmanager +from fastapi.middleware.cors import CORSMiddleware + +# CORS 中间件配置 +origins = ["*"] +ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" # ffmpeg 可执行文件路径 # 管理推拉流的 ffmpeg 子进程 stream_manager: dict[str, dict] = {} base_url = "rtsp:127.0.0.1:8554/" @@ -17,13 +23,12 @@ async def cleanup_streams(): print("定时任务执行-清空空闲流...") now = int(time.time() * 1000) for id, stream in list(stream_manager.items()): - if now - stream["create_time"] > 60 * 1000 * 1: + if now - stream["create_time"] > 60 * 1000 * 5: print(f"清空空闲流: {id}") stop_stream(id) except Exception as e: print(f"清空空闲流异常: {e}") - @asynccontextmanager async def lifespan(app: FastAPI): print("正在启动服务...") @@ -35,6 +40,15 @@ async def lifespan(app: FastAPI): # 使用 lifespan 事件初始化 FastAPI 应用 app = FastAPI(lifespan=lifespan) +# 添加 CORS 中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=origins, # 允许的来源 + allow_credentials=True, # 是否允许携带凭证(如 Cookies) + allow_methods=["*"], # 允许所有 HTTP 方法 + allow_headers=["*"], # 允许所有请求头 +) + # 记录并处理 ffmpeg 输出日志 async def log_output(process, output_url): while True: @@ -47,7 +61,7 @@ async def log_output(process, output_url): # 启动流 async def start_stream(stream_id: str): output_url = base_url + stream_id - input_url = r"C:\Users\simon\Desktop\input.mp4" + input_url = r"rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" print(f"开始推流:{stream_id}") try: @@ -104,17 +118,59 @@ async def websocket_endpoint(websocket: WebSocket): except WebSocketDisconnect: print("WebSocket disconnected") -# 启动流的 API 路由 -@app.post("/start_stream") -async def start(id: str): - # 判断是否已经存在 - if id in stream_manager: - return {"message": "Stream already running", "url": base_url + id} - await start_stream(id) # 使用 await 调用 - return {"message": "Stream started", "url": base_url + id} +# @app.get("/start_stream") +# async def start(id: str): +# # 判断是否已经存在 +# if id in stream_manager: +# return {"message": "Stream already running", "url": base_url + id} +# await start_stream(id) # 使用 await 调用 +# return {"message": "Stream started", "url": base_url + id} + + +# ffmpeg 命令将 RTSP 转换为 FLV 格式并直接输出 +async def stream_rtsp_to_flv(stream_id: str): + # 检查流是否正在运行 + if stream_id in stream_manager: + print(f"Stream {stream_id} is already running.") + process = stream_manager[stream_id]["process"] + return process.stdout + input_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" + command = [ + ffmpeg_path, + "-i", input_url, # 输入 RTSP 流地址 + "-c:v", "libx264", # 使用 libx264 编码 + "-c:a", "aac", # 使用 aac 编码 + "-f", "flv", # 输出 FLV 格式 + "-" # 直接输出到 stdout + ] + + try: + process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + asyncio.create_task(log_output(process, "rtsp")) + # 保存子进程和创建时间 + stream_manager[stream_id] = { "process": process, "create_time": int(time.time() * 1000)} + print(f"Started streaming RTSP {stream_id} to FLV format.") + return process.stdout + except Exception as e: + print(f"Error converting RTSP stream {stream_id} to FLV: {e}") + return None + + +# 提供 RTSP 流并将其转换为 FLV 格式 +@app.get("/get_stream") +async def get_stream(id: str): + # 启动流并获取 stdout 流 + stream_output = await stream_rtsp_to_flv(id) + if stream_output: + # 通过 StreamingResponse 流式传输数据 + return StreamingResponse(stream_output, media_type="video/x-flv") + return {"message": "Unable to start RTSP to FLV conversion."} + # 停止流的 API 路由 -@app.post("/stop_stream") +@app.get("/stop_stream") def stop(id: str): stop_stream(id) return {"message": "Stream stopped"}