steam-server/tests/main.py
2024-12-12 20:04:36 +08:00

179 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
import time
import asyncio
from contextlib import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
# CORS 中间件配置
origins = ["*"]
ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe" # ffmpeg 可执行文件路径
# 管理推拉流的 ffmpeg 子进程
stream_manager: dict[str, dict] = {}
base_url = "rtsp:127.0.0.1:8554/"
# 定时清理空闲流
async def cleanup_streams():
while True:
await asyncio.sleep(60)
try:
print("定时任务执行-清空空闲流...")
now = int(time.time() * 1000)
for id, stream in list(stream_manager.items()):
if now - stream["create_time"] > 60 * 1000 * 5:
print(f"清空空闲流: {id}")
stop_stream(id)
except Exception as e:
print(f"清空空闲流异常: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
print("正在启动服务...")
# 启动定时清理任务
asyncio.create_task(cleanup_streams())
yield # FastAPI 会在这里执行应用关闭时的清理任务
print("停止服务")
# 使用 lifespan 事件初始化 FastAPI 应用
app = FastAPI(lifespan=lifespan)
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=origins, # 允许的来源
allow_credentials=True, # 是否允许携带凭证(如 Cookies
allow_methods=["*"], # 允许所有 HTTP 方法
allow_headers=["*"], # 允许所有请求头
)
# 记录并处理 ffmpeg 输出日志
async def log_output(process, output_url):
while True:
output = await asyncio.to_thread(process.stderr.readline) # 读取 ffmpeg 的错误日志
if output == "" and process.poll() is not None: # 检查是否完成
break
if output:
print(f"ffmpeg output for {output_url}: {output.strip()}") # 实时打印日志
# 启动流
async def start_stream(stream_id: str):
output_url = base_url + stream_id
input_url = r"rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101"
ffmpeg_path = r"D:\software\ffmpeg\ffmpeg-4.4.1\bin\ffmpeg.exe"
print(f"开始推流:{stream_id}")
try:
command = [
ffmpeg_path,
"-re", # 实时读取
"-i", input_url, # 输入文件或流
"-c:v", "libx264", # 视频编码器
"-c:a", "aac", # 音频编码器
"-f", "rtsp", # 推流 RTSP 格式
"-rtsp_transport", "tcp", # 指定 RTSP 传输协议
output_url
]
# 创建子进程
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# 保存子进程和创建时间
stream_manager[stream_id] = { "process": process, "create_time": int(time.time() * 1000)}
print("ffmpeg command: %s", " ".join(command))
# 使用 asyncio 运行日志捕获
asyncio.create_task(log_output(process, output_url))
return process
except Exception as e:
print(f"Error starting stream: {e}")
# 停止流
def stop_stream(id: str):
stream = stream_manager.pop(id, None)
if stream:
process = stream["process"]
process.terminate()
print(f"Stream {id} stopped.")
else:
print(f"没有找到该流: {id}")
# WebSocket 心跳检测
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
json_msg = await websocket.receive_json()
# eg: looking: 正在看的摄像头ID
if json_msg:
looking = json_msg["looking"]
if looking and looking in stream_manager:
stream = stream_manager[looking]
# 重置时间
new_time = int(time.time() * 1000)
stream["create_time"] = new_time
print(f"重置流时间: {looking}-{new_time}")
await websocket.send_text("ok")
except WebSocketDisconnect:
print("WebSocket disconnected")
# @app.get("/start_stream")
# async def start(id: str):
# # 判断是否已经存在
# if id in stream_manager:
# return {"message": "Stream already running", "url": base_url + id}
# await start_stream(id) # 使用 await 调用
# return {"message": "Stream started", "url": base_url + id}
# ffmpeg 命令将 RTSP 转换为 FLV 格式并直接输出
async def stream_rtsp_to_flv(stream_id: str):
# 检查流是否正在运行
if stream_id in stream_manager:
print(f"Stream {stream_id} is already running.")
process = stream_manager[stream_id]["process"]
return process.stdout
input_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101"
command = [
ffmpeg_path,
"-i", input_url, # 输入 RTSP 流地址
"-c:v", "libx264", # 使用 libx264 编码
"-c:a", "aac", # 使用 aac 编码
"-f", "flv", # 输出 FLV 格式
"-" # 直接输出到 stdout
]
try:
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
asyncio.create_task(log_output(process, "rtsp"))
# 保存子进程和创建时间
stream_manager[stream_id] = { "process": process, "create_time": int(time.time() * 1000)}
print(f"Started streaming RTSP {stream_id} to FLV format.")
return process.stdout
except Exception as e:
print(f"Error converting RTSP stream {stream_id} to FLV: {e}")
return None
# 提供 RTSP 流并将其转换为 FLV 格式
@app.get("/get_stream")
async def get_stream(id: str):
# 启动流并获取 stdout 流
stream_output = await stream_rtsp_to_flv(id)
if stream_output:
# 通过 StreamingResponse 流式传输数据
return StreamingResponse(stream_output, media_type="video/x-flv")
return {"message": "Unable to start RTSP to FLV conversion."}
# 停止流的 API 路由
@app.get("/stop_stream")
def stop(id: str):
stop_stream(id)
return {"message": "Stream stopped"}