重写处理流逻辑

This commit is contained in:
wenxin 2024-12-13 18:21:25 +08:00
parent 1d47fd7913
commit df5e03d7a0
9 changed files with 147 additions and 28 deletions

View File

@ -2,4 +2,7 @@
ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe"
base_url = "rtsp:127.0.0.1:8554/"
cleanup_interval = 60 # 定时清理间隔时间(秒)
stream_timeout = 60 * 1000 * 5 # 流空闲超时时间(毫秒)
expired_timeout = 60 * 1000 * 1 # 流空闲超时时间(毫秒)
rtmp_url = "rtmp://127.0.0.1/live"
flv_url = "http://127.0.0.1:8080/live"
md5_salt = "linxyun2024"

View File

@ -1,4 +1,5 @@
from app.middleware.cors import add_cors_middleware
from app.middleware.security import add_security_middleware
from app.routes import websocket, stream
from app.tasks.cleanup_task import cleanup_streams
from fastapi import FastAPI
@ -7,7 +8,6 @@ import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
print("正在启动服务...")
# 启动定时清理任务
asyncio.create_task(cleanup_streams())
yield # FastAPI 会在这里执行应用关闭时的清理任务
@ -16,5 +16,10 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
# 添加跨域中间件
add_cors_middleware(app)
# 添加鉴权中间件
add_security_middleware(app)
app.include_router(websocket.router)
app.include_router(stream.router)

View File

@ -0,0 +1,25 @@
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
# 鉴权中间件
class SecurityMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# 获取请求路径
path = request.url.path
# 获取请求头 Token
# token = request.headers.get("token")
# if not token:
# return Response("Token is required", status_code=401)
# 获取请求参数
query_params = request.query_params
# 获取请求体
body = await request.body()
# 获取请求方法
method = request.method
# 获取请求IP
ip = request.client.host
return await call_next(request)
def add_security_middleware(app):
app.add_middleware(SecurityMiddleware)

View File

@ -1,17 +1,15 @@
# app/routes/stream.py
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from app.services.stream_manager import stop_stream
from app.services.ffmpeg_service import stream_rtsp_to_flv
from app.utils.result import Result
router = APIRouter()
@router.get("/get_stream")
async def get_stream(id: str):
stream_output = await stream_rtsp_to_flv(id)
if stream_output:
return StreamingResponse(stream_output, media_type="video/x-flv")
return {"message": "Unable to start RTSP to FLV conversion."}
async def get_stream(camera_id: str):
flv_url = await stream_rtsp_to_flv(camera_id)
if flv_url is None:
return Result.error("0001", "Stream not started")
return Result.ok(flv_url)
@router.get("/stop_stream")
def stop(id: str):

View File

@ -1,7 +1,15 @@
# app/services/ffmpeg_service.py
import subprocess
import asyncio
from app.config import ffmpeg_path
import time
from app.config import ffmpeg_path, rtmp_url, flv_url
from app.services.stream_manager import stream_manager
from app.utils.logger import get_logger
from app.utils.md5 import generate_md5
import subprocess
from urllib.parse import urljoin
logger = get_logger(__name__)
test_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101"
async def log_output(process, output_url):
while True:
@ -9,13 +17,59 @@ async def log_output(process, output_url):
if output == "" and process.poll() is not None:
break
if output:
print(f"ffmpeg output for {output_url}: {output.strip()}")
logger.debug(f"ffmpeg output for {output_url}: {output.strip()}")
if process.returncode != 0:
logger.error(f"ffmpeg process for {output_url} failed with return code {process.returncode}")
async def start_stream(stream_id: str, output_url: str):
logger.info(f"Starting stream {stream_id}")
input_url = test_url # RTSP 流地址
# 构建 ffmpeg 命令
command = [
ffmpeg_path,
'-i', input_url, # 输入 RTSP 流
'-c:v', 'libx264', # 视频编码为 libx264
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-g', '5',
'-b:v', '6000k', # 码率 6Mbps
'-r', '30',
'-b:a', '128k',
'-c:a', 'aac', # 音频编码为 aac
'-f', 'flv',
output_url # 推送到 MediaMTX 流服务
]
# 使用 asyncio.to_thread 异步启动 ffmpeg 进程
process = await asyncio.to_thread(subprocess.Popen, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info(f"ffmpeg command for {stream_id}: {' '.join(command)}")
# 异步记录输出
asyncio.create_task(log_output(process, stream_id))
# 保存进程信息
stream_manager[stream_id] = {
"process": process,
"create_time": int(time.time() * 1000)
}
return process
async def stream_rtsp_to_flv(stream_id: str):
input_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101"
command = [
ffmpeg_path, "-i", input_url, "-c:v", "libx264", "-c:a", "aac", "-f", "flv", "-"
]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
asyncio.create_task(log_output(process, stream_id))
return process.stdout
md5_route = generate_md5(stream_id)
output_url = f"{rtmp_url}/{md5_route}"
# 如果流已经在运行,检查进程是否仍在运行
if stream_id in stream_manager:
logger.info(f"Stream {stream_id} is already running.")
process = stream_manager[stream_id]["process"]
if process.poll() is None:
logger.info(f"Stream {stream_id} is still running.")
return output_url
# 如果流未在运行,启动新流
logger.info(f"Stream {stream_id} is not running. Starting new stream.")
process = await start_stream(stream_id, output_url)
if process.poll() is None:
logger.info(f"Stream {stream_id} started successfully.")
return f"{flv_url}/{md5_route}.flv"
return None

View File

@ -1,14 +1,18 @@
# app/services/stream_manager.py
from app.utils.logger import get_logger
import time
stream_manager = {}
logger = get_logger(__name__)
def reset_stream_time(stream_id: str):
if stream_id in stream_manager:
stream_manager[stream_id]["create_time"] = int(time.time() * 1000)
def stop_stream(stream_id: str):
stream = stream_manager.pop(stream_id, None)
if stream:
process = stream["process"]
if stream_id in stream_manager:
process = stream_manager[stream_id]["process"]
process.terminate()
print(f"Stream {stream_id} stopped.")
process.wait() # 确保进程已经完全终止
logger.info(f"Stream {stream_id} stopped.")
del stream_manager[stream_id]

View File

@ -3,15 +3,16 @@ import asyncio
import time
from app.services.stream_manager import stream_manager, stop_stream
from app.utils.logger import get_logger
from app.config import expired_timeout, cleanup_interval
logger = get_logger(__name__)
async def cleanup_streams():
while True:
await asyncio.sleep(60)
await asyncio.sleep(cleanup_interval)
now = int(time.time() * 1000)
logger.info("定时任务执行:清理空闲流")
for stream_id, stream in list(stream_manager.items()):
if now - stream["create_time"] > 60 * 1000 * 1:
if now - stream["create_time"] > expired_timeout:
stop_stream(stream_id)
logger.info(f"清理空闲流:{stream_id}")

10
app/utils/md5.py Normal file
View File

@ -0,0 +1,10 @@
import hashlib
from app.config import md5_salt
# 生成md5
def generate_md5(text: str):
content = text + md5_salt
return hashlib.md5(content.encode('utf-8')).hexdigest()

19
app/utils/result.py Normal file
View File

@ -0,0 +1,19 @@
class Result:
code:str = ""
msg:str = ""
data:object = None
success:bool = False
def __init__(self, code:str, msg:str, data:object, success:bool) -> None:
self.code = code
self.msg = msg
self.data = data
self.success = success
@staticmethod
def ok(data):
return Result("0000", "操作成功", data, True)
@staticmethod
def error(self, code:str, message:str):
return Result(code, message, None, False)