diff --git a/app/routes/stream.py b/app/routes/stream.py index 83d0284..62a40ab 100644 --- a/app/routes/stream.py +++ b/app/routes/stream.py @@ -1,6 +1,5 @@ from fastapi import APIRouter -from app.services.stream_manager import reset_stream_time -from app.services.ffmpeg_service import stream_rtsp_to_url +from app.services.ffmpeg_service import stream_rtsp_to_url, reset_stream_time from linxyun.utils.result import Result, SysCodes from app.model import Stream router = APIRouter() @@ -12,7 +11,7 @@ async def get_stream(camera_id: str, stream: Stream = Stream(), type: str="flv", @router.get("/looking/{camera_id}") async def looking(camera_id: str): - status = reset_stream_time(camera_id) + status = await reset_stream_time(camera_id) if status is True: return Result.ok() else: diff --git a/app/services/ffmpeg_service.py b/app/services/ffmpeg_service.py index 814440d..dbc42ca 100644 --- a/app/services/ffmpeg_service.py +++ b/app/services/ffmpeg_service.py @@ -12,6 +12,21 @@ from app.model import Stream logger = get_logger(__name__) test_url = "rtsp://admin:jitu0818@192.168.4.102:554/Streaming/Channels/101" +async def reset_stream_time(stream_id: str) -> bool: + if stream_id in stream_manager: + new_time = int(time.time() * 1000) + logger.info(f"Stream {stream_id} reset time to {new_time}") + stream = stream_manager[stream_id] + process = stream["process"] + if process.poll() is not None: + # 流已经停止,重启流 + result = await start_stream(stream_id, stream["output_url"], stream["config"]) + if result is None: + return False + stream["create_time"] = new_time + return True + return False + async def log_output(process, output_url): try: while True: @@ -55,30 +70,22 @@ async def start_stream(stream_id: str, output_url: str, stream: Stream): logger.error(f"Failed to start ffmpeg process for {stream_id}: {e}") return None - # 等待短时间以确认进程是否启动成功 - # await asyncio.sleep(10) # 等待 1 秒,确保进程已稳定运行 - - # 检查进程是否已退出 - # if process.poll() is not None: # poll() 检查子进程状态 - # logger.error(f"Stream {stream_id} failed to start. Exit code: {process.poll()}") - # stderr_output = await asyncio.to_thread(process.stderr.read) - # logger.error(f"Error: {stderr_output.decode().strip()}") - # return None - # 异步记录输出 asyncio.create_task(log_output(process, output_url)) # 保存进程信息 stream_manager[stream_id] = { "process": process, - "create_time": int(time.time() * 1000) + "create_time": int(time.time() * 1000), + "output_url": output_url, + "config": stream } logger.info(f"Stream {stream_id} started successfully") return process -async def stream_rtsp_to_url(stream_id: str, type: str, stream: Stream): +async def stream_rtsp_to_url(stream_id: str, type: str, config: Stream): md5_route = generate_md5(stream_id) output_url = f"{rtmp_url}/{md5_route}" # 如果流已经在运行,检查进程是否仍在运行 @@ -96,7 +103,7 @@ async def stream_rtsp_to_url(stream_id: str, type: str, stream: Stream): # 如果流未在运行,启动新流 logger.info(f"Stream {stream_id} is not running. Starting new stream.") - result = await start_stream(stream_id, output_url, stream) + result = await start_stream(stream_id, output_url, config) if result is None: return Result.error(SysCodes.OPERATE_FAIL) diff --git a/app/services/stream_manager.py b/app/services/stream_manager.py index 86c8088..4ca0fc5 100644 --- a/app/services/stream_manager.py +++ b/app/services/stream_manager.py @@ -4,13 +4,6 @@ stream_manager = {} logger = get_logger(__name__) -def reset_stream_time(stream_id: str) -> bool: - if stream_id in stream_manager: - new_time = int(time.time() * 1000) - logger.info(f"Stream {stream_id} reset time to {new_time}") - stream_manager[stream_id]["create_time"] = new_time - return True - return False def stop_stream(stream_id: str): if stream_id in stream_manager: