2024-12-13 18:21:25 +08:00
|
|
|
|
2024-12-12 20:04:36 +08:00
|
|
|
import asyncio
|
2024-12-13 18:21:25 +08:00
|
|
|
import time
|
2024-12-17 20:07:29 +08:00
|
|
|
from app.config import ffmpeg_path, rtmp_url, flv_url, rtc_url
|
2024-12-13 18:21:25 +08:00
|
|
|
from app.services.stream_manager import stream_manager
|
|
|
|
from app.utils.logger import get_logger
|
|
|
|
from app.utils.md5 import generate_md5
|
|
|
|
import subprocess
|
2024-12-18 17:49:49 +08:00
|
|
|
from linxyun.utils.result import Result, SysCodes
|
|
|
|
from app.model import Stream
|
2024-12-13 18:21:25 +08:00
|
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
test_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101"
|
2024-12-12 20:04:36 +08:00
|
|
|
|
2024-12-24 19:47:44 +08:00
|
|
|
async def reset_stream_time(stream_id: str) -> bool:
|
|
|
|
if stream_id in stream_manager:
|
|
|
|
new_time = int(time.time() * 1000)
|
|
|
|
logger.info(f"Stream {stream_id} reset time to {new_time}")
|
|
|
|
stream = stream_manager[stream_id]
|
|
|
|
process = stream["process"]
|
|
|
|
if process.poll() is not None:
|
|
|
|
# 流已经停止,重启流
|
|
|
|
result = await start_stream(stream_id, stream["output_url"], stream["config"])
|
|
|
|
if result is None:
|
|
|
|
return False
|
|
|
|
stream["create_time"] = new_time
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
2024-12-12 20:04:36 +08:00
|
|
|
async def log_output(process, output_url):
|
2024-12-18 17:49:49 +08:00
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
# 异步读取 stderr 的输出
|
|
|
|
output = await asyncio.to_thread(process.stderr.readline)
|
|
|
|
if not output and process.poll() is not None: # 如果输出为空且进程已结束
|
|
|
|
break
|
|
|
|
if output:
|
|
|
|
logger.debug(f"ffmpeg output for {output_url}: {output.decode().strip()}")
|
|
|
|
except Exception as e:
|
|
|
|
logger.error(f"Error while logging output for {output_url}: {e}")
|
2024-12-12 20:04:36 +08:00
|
|
|
|
2024-12-13 18:21:25 +08:00
|
|
|
|
2024-12-18 17:49:49 +08:00
|
|
|
async def start_stream(stream_id: str, output_url: str, stream: Stream):
|
2024-12-13 18:21:25 +08:00
|
|
|
logger.info(f"Starting stream {stream_id}")
|
|
|
|
input_url = test_url # RTSP 流地址
|
2024-12-18 17:49:49 +08:00
|
|
|
|
2024-12-13 18:21:25 +08:00
|
|
|
# 构建 ffmpeg 命令
|
2024-12-12 20:04:36 +08:00
|
|
|
command = [
|
2024-12-13 18:21:25 +08:00
|
|
|
ffmpeg_path,
|
|
|
|
'-i', input_url, # 输入 RTSP 流
|
2024-12-18 17:49:49 +08:00
|
|
|
'-c:v', stream.cv, # 视频编码为 libx264
|
|
|
|
'-preset', stream.preset,
|
|
|
|
'-tune', stream.tune,
|
|
|
|
'-g', stream.g,
|
|
|
|
'-b:v', stream.bv, # 码率 6Mbps
|
|
|
|
'-r', stream.r,
|
|
|
|
'-b:a', stream.ba,
|
|
|
|
'-c:a', stream.ca, # 音频编码为 aac
|
2024-12-13 18:21:25 +08:00
|
|
|
'-f', 'flv',
|
|
|
|
output_url # 推送到 MediaMTX 流服务
|
2024-12-12 20:04:36 +08:00
|
|
|
]
|
2024-12-13 18:21:25 +08:00
|
|
|
logger.info(f"ffmpeg command for {stream_id}: {' '.join(command)}")
|
|
|
|
|
2024-12-18 17:49:49 +08:00
|
|
|
try:
|
|
|
|
# 使用 asyncio.to_thread 异步启动 ffmpeg 进程
|
|
|
|
process = await asyncio.to_thread(
|
|
|
|
subprocess.Popen, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
|
|
)
|
|
|
|
except Exception as e:
|
|
|
|
logger.error(f"Failed to start ffmpeg process for {stream_id}: {e}")
|
|
|
|
return None
|
|
|
|
|
2024-12-13 18:21:25 +08:00
|
|
|
# 异步记录输出
|
2024-12-18 17:49:49 +08:00
|
|
|
asyncio.create_task(log_output(process, output_url))
|
2024-12-13 18:21:25 +08:00
|
|
|
|
|
|
|
# 保存进程信息
|
|
|
|
stream_manager[stream_id] = {
|
|
|
|
"process": process,
|
2024-12-24 19:47:44 +08:00
|
|
|
"create_time": int(time.time() * 1000),
|
|
|
|
"output_url": output_url,
|
|
|
|
"config": stream
|
2024-12-13 18:21:25 +08:00
|
|
|
}
|
2024-12-18 17:49:49 +08:00
|
|
|
logger.info(f"Stream {stream_id} started successfully")
|
2024-12-13 18:21:25 +08:00
|
|
|
return process
|
|
|
|
|
|
|
|
|
2024-12-18 17:49:49 +08:00
|
|
|
|
2024-12-24 19:47:44 +08:00
|
|
|
async def stream_rtsp_to_url(stream_id: str, type: str, config: Stream):
|
2024-12-13 18:21:25 +08:00
|
|
|
md5_route = generate_md5(stream_id)
|
|
|
|
output_url = f"{rtmp_url}/{md5_route}"
|
|
|
|
# 如果流已经在运行,检查进程是否仍在运行
|
|
|
|
if stream_id in stream_manager:
|
|
|
|
logger.info(f"Stream {stream_id} is already running.")
|
|
|
|
process = stream_manager[stream_id]["process"]
|
|
|
|
if process.poll() is None:
|
2024-12-18 17:49:49 +08:00
|
|
|
# 子进程仍在运行
|
2024-12-13 18:21:25 +08:00
|
|
|
logger.info(f"Stream {stream_id} is still running.")
|
2024-12-17 20:07:29 +08:00
|
|
|
if type == "rtc":
|
2024-12-18 17:49:49 +08:00
|
|
|
result_url = f"{rtc_url}{md5_route}.flv"
|
|
|
|
else:
|
|
|
|
result_url = f"{flv_url}/{md5_route}.flv"
|
|
|
|
return Result.ok(result_url)
|
|
|
|
|
2024-12-13 18:21:25 +08:00
|
|
|
# 如果流未在运行,启动新流
|
|
|
|
logger.info(f"Stream {stream_id} is not running. Starting new stream.")
|
2024-12-24 19:47:44 +08:00
|
|
|
result = await start_stream(stream_id, output_url, config)
|
2024-12-18 17:49:49 +08:00
|
|
|
if result is None:
|
|
|
|
return Result.error(SysCodes.OPERATE_FAIL)
|
|
|
|
|
|
|
|
# 根据类型返回不同的 URL
|
|
|
|
if type == "rtc":
|
|
|
|
result_url = f"{rtc_url}{md5_route}.flv"
|
|
|
|
else:
|
|
|
|
result_url = f"{flv_url}/{md5_route}.flv"
|
|
|
|
|
|
|
|
return Result.ok(result_url)
|