from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional, Dict, Any, List, Literal import subprocess import os import uuid from pathlib import Path from datetime import datetime, timedelta from enum import Enum import asyncio import time import aiofiles import boto3 from botocore.client import Config app = FastAPI() FILES_DIR = Path("/files") # ─── S3 / MinIO ─────────────────────────────────────────────────────────────── S3_ENDPOINT = os.getenv("S3_ENDPOINT", "http://minio:9000") S3_ACCESS = os.getenv("S3_ACCESS_KEY", "UN0-admin") S3_SECRET = os.getenv("S3_SECRET_KEY", "RAygtZHqGN49qKn") S3_REGION = os.getenv("S3_REGION", "us-east-1") def get_s3(): return boto3.client( "s3", endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS, aws_secret_access_key=S3_SECRET, region_name=S3_REGION, config=Config(signature_version="s3v4"), ) def parse_s3_uri(uri: str) -> tuple[str, str]: if not uri.startswith("s3://"): raise ValueError(f"Not an s3:// URI: {uri}") parts = uri[5:].split("/", 1) if len(parts) != 2: raise ValueError(f"Bad s3 URI: {uri}") return parts[0], parts[1] # ─── Модели ─────────────────────────────────────────────────────────────────── class JobStatus(str, Enum): WAITING = "pending" # Ждёт начала обработки (pending для обратной совместимости) PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" class JobInfo(BaseModel): job_id: str status: JobStatus created_at: str updated_at: str input_s3: Optional[str] = None output_s3: Optional[str] = None error: Optional[str] = None progress: Optional[int] = None class FfmpegS3Request(BaseModel): input_s3: str ffmpeg_args: list[str] output_s3: str | None = None job_id: str | None = None class ConcatS3Request(BaseModel): inputs_s3: List[str] output_s3: str audio: Literal["keep", "mute", "first-only"] = "keep" reencode: bool = False job_id: Optional[str] = None # ─── Хранилище задач и очередь ──────────────────────────────────────────────── jobs: Dict[str, Dict[str, Any]] = {} job_queue: asyncio.Queue = asyncio.Queue() JOB_TTL = timedelta(hours=1) jobs_lock = asyncio.Lock() NUM_WORKERS = 4 # ─── Очистка старых задач ───────────────────────────────────────────────────── async def cleanup_old_jobs(): now = datetime.utcnow() to_delete = [] async with jobs_lock: for job_id, job in jobs.items(): if job["status"] in (JobStatus.COMPLETED, JobStatus.FAILED): updated = datetime.fromisoformat(job["updated_at"]) if now - updated > JOB_TTL: to_delete.append(job_id) for job_id in to_delete: del jobs[job_id] # ─── Воркер очереди ─────────────────────────────────────────────────────────── async def job_worker(worker_id: int): """Обработчик задач из очереди""" while True: job_data = await job_queue.get() job_id = job_data["job_id"] kind = job_data.get("kind", "ffmpeg") try: if kind == "concat": await process_concat_job(job_id, job_data) else: await process_ffmpeg_job(job_id, job_data) finally: job_queue.task_done() async def process_ffmpeg_job(job_id: str, job_data: Dict[str, Any]): """Асинхронная обработка задачи""" s3 = get_s3() in_bucket = job_data["in_bucket"] in_key = job_data["in_key"] out_bucket = job_data["out_bucket"] out_key = job_data["out_key"] output_s3 = job_data["output_s3"] ffmpeg_args = job_data["ffmpeg_args"] tmp_in = job_data["tmp_in"] tmp_out = job_data["tmp_out"] try: async with jobs_lock: jobs[job_id]["status"] = JobStatus.PROCESSING jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() # 1. Скачать из MinIO await asyncio.get_event_loop().run_in_executor( None, lambda: s3.download_file(in_bucket, in_key, str(tmp_in)) ) # 2. Запустить FFmpeg cmd = ["ffmpeg", "-i", str(tmp_in), *ffmpeg_args, "-y", str(tmp_out)] proc = await asyncio.create_subprocess_exec( *cmd, cwd=str(FILES_DIR), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode("utf-8", errors="ignore")[-8000:] if stderr else "Unknown error" async with jobs_lock: jobs[job_id]["status"] = JobStatus.FAILED jobs[job_id]["error"] = error_msg jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() return # 3. Загрузить результат в MinIO async with aiofiles.open(str(tmp_out), "rb") as f: content = await f.read() await asyncio.get_event_loop().run_in_executor( None, lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4") ) async with jobs_lock: jobs[job_id]["status"] = JobStatus.COMPLETED jobs[job_id]["output_s3"] = output_s3 jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() except Exception as e: async with jobs_lock: jobs[job_id]["status"] = JobStatus.FAILED jobs[job_id]["error"] = str(e) jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() finally: try: tmp_in.unlink(missing_ok=True) tmp_out.unlink(missing_ok=True) except: pass async def _probe_video_size(path: Path) -> tuple[int, int]: """Return (width, height) using ffprobe; (0,0) on failure.""" proc = await asyncio.create_subprocess_exec( "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0:s=x", str(path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) out, _ = await proc.communicate() try: w, h = out.decode().strip().split("x") return int(w), int(h) except Exception: return 0, 0 async def _probe_main_video_index(path: Path) -> int: """Index of the first video stream that is NOT an attached_pic/cover art.""" proc = await asyncio.create_subprocess_exec( "ffprobe", "-v", "error", "-select_streams", "v", "-show_entries", "stream=index,disposition", "-of", "default=nw=1:nk=0", str(path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) out, _ = await proc.communicate() text = out.decode("utf-8", errors="ignore").splitlines() streams: List[dict] = [] cur: dict = {} for line in text: if "=" not in line: continue k, v = line.split("=", 1) if k == "index": if cur: streams.append(cur) cur = {"index": int(v)} else: cur[k] = v if cur: streams.append(cur) main = [s for s in streams if s.get("disposition:attached_pic", "0") != "1"] if not main: return 0 # ffprobe video-only stream order is the per-type index; first non-attached_pic is 0. return next((i for i, s in enumerate(streams) if s.get("disposition:attached_pic", "0") != "1"), 0) async def _probe_has_audio(path: Path) -> bool: proc = await asyncio.create_subprocess_exec( "ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "csv=p=0", str(path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) out, _ = await proc.communicate() return b"audio" in out async def process_concat_job(job_id: str, job_data: Dict[str, Any]): """Склейка нескольких видео из MinIO в одно через ffmpeg concat.""" s3 = get_s3() inputs: List[Dict[str, str]] = job_data["inputs"] # [{"bucket":..,"key":..,"tmp":Path}] out_bucket: str = job_data["out_bucket"] out_key: str = job_data["out_key"] output_s3: str = job_data["output_s3"] audio_mode: str = job_data["audio"] reencode: bool = job_data["reencode"] list_file: Path = job_data["list_file"] tmp_out: Path = job_data["tmp_out"] try: async with jobs_lock: jobs[job_id]["status"] = JobStatus.PROCESSING jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() # 1. Скачать все входные файлы из MinIO параллельно loop = asyncio.get_event_loop() await asyncio.gather(*[ loop.run_in_executor( None, lambda b=item["bucket"], k=item["key"], t=item["tmp"]: s3.download_file(b, k, str(t)) ) for item in inputs ]) # 2. Сформировать list.txt для concat demuxer # Имена пишем относительно cwd=FILES_DIR (где лежат файлы), без полных путей # для безопасности concat (-safe 0 разрешает абсолютные, но проще относительные). async with aiofiles.open(str(list_file), "w") as f: for item in inputs: # экранируем одинарные кавычки в имени файла safe = str(item["tmp"]).replace("'", "'\\''") await f.write(f"file '{safe}'\n") # 3. Сформировать команду async def run_ffmpeg(args: List[str]) -> tuple[int, str]: proc = await asyncio.create_subprocess_exec( "ffmpeg", *args, cwd=str(FILES_DIR), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() return proc.returncode, stderr.decode("utf-8", errors="ignore") if stderr else "" async def attempt(cmd_args: List[str]) -> tuple[int, str]: return await run_ffmpeg(cmd_args) rc = -1 stderr_text = "" if audio_mode == "first-only": # video concat по всем (с нормализацией), audio только из первого. Всегда reencode. n = len(inputs) sizes = await asyncio.gather(*[_probe_video_size(item["tmp"]) for item in inputs]) vi = await asyncio.gather(*[_probe_main_video_index(item["tmp"]) for item in inputs]) tgt_w = max((w for w, _ in sizes if w > 0), default=1280) tgt_h = max((h for _, h in sizes if h > 0), default=720) if tgt_w % 2: tgt_w += 1 if tgt_h % 2: tgt_h += 1 cmd: List[str] = [] for item in inputs: cmd += ["-i", str(item["tmp"])] norm_filters = [ f"[{i}:v:{vi[i]}]scale={tgt_w}:{tgt_h}:force_original_aspect_ratio=decrease," f"pad={tgt_w}:{tgt_h}:(ow-iw)/2:(oh-ih)/2:color=black," f"setsar=1,fps=30,format=yuv420p[v{i}]" for i in range(n) ] v_chain = "".join(f"[v{i}]" for i in range(n)) + f"concat=n={n}:v=1:a=0[outv]" filter_complex = ";".join(norm_filters + [v_chain]) cmd += [ "-filter_complex", filter_complex, "-map", "[outv]", "-map", "0:a:0?", "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", "-y", str(tmp_out), ] rc, stderr_text = await attempt(cmd) else: # keep / mute → если разрешено copy, пробуем concat demuxer без перекодировки. base_cmd = ["-f", "concat", "-safe", "0", "-i", str(list_file)] if audio_mode == "mute": copy_cmd = base_cmd + ["-c:v", "copy", "-an", "-movflags", "+faststart", "-y", str(tmp_out)] else: # keep copy_cmd = base_cmd + ["-c", "copy", "-movflags", "+faststart", "-y", str(tmp_out)] if not reencode: rc, stderr_text = await attempt(copy_cmd) # Перекодировка с нормализацией размера/SAR/fps для разнородных входов. if reencode or rc != 0: n = len(inputs) # Подбираем целевое разрешение = максимум по всем входам (чётное). sizes = await asyncio.gather(*[_probe_video_size(item["tmp"]) for item in inputs]) vi = await asyncio.gather(*[_probe_main_video_index(item["tmp"]) for item in inputs]) tgt_w = max((w for w, _ in sizes if w > 0), default=1280) tgt_h = max((h for _, h in sizes if h > 0), default=720) if tgt_w % 2: tgt_w += 1 if tgt_h % 2: tgt_h += 1 cmd: List[str] = [] for item in inputs: cmd += ["-i", str(item["tmp"])] # Для keep-режима проверяем наличие аудио в каждом входе; если где-то нет — # подмешиваем тишину через anullsrc, иначе concat=...:a=1 упадёт. want_audio = audio_mode == "keep" has_audio: List[bool] = [] if want_audio: has_audio = list(await asyncio.gather(*[_probe_has_audio(item["tmp"]) for item in inputs])) # Добавляем anullsrc-вход для каждого клипа без аудио. for missing in (i for i, ok in enumerate(has_audio) if not ok): cmd += ["-f", "lavfi", "-t", "0.1", "-i", "anullsrc=channel_layout=stereo:sample_rate=44100"] norm_filters: List[str] = [] concat_inputs: List[str] = [] anull_idx = n # индекс первого anullsrc-входа for i in range(n): norm_filters.append( f"[{i}:v:{vi[i]}]scale={tgt_w}:{tgt_h}:force_original_aspect_ratio=decrease," f"pad={tgt_w}:{tgt_h}:(ow-iw)/2:(oh-ih)/2:color=black," f"setsar=1,fps=30,format=yuv420p[v{i}]" ) if want_audio: if has_audio[i]: norm_filters.append(f"[{i}:a:0]aresample=async=1:first_pts=0[a{i}]") else: norm_filters.append(f"[{anull_idx}:a:0]aresample=async=1[a{i}]") anull_idx += 1 concat_inputs.append(f"[v{i}][a{i}]") else: concat_inputs.append(f"[v{i}]") if want_audio: concat = "".join(concat_inputs) + f"concat=n={n}:v=1:a=1[outv][outa]" filter_complex = ";".join(norm_filters + [concat]) cmd += [ "-filter_complex", filter_complex, "-map", "[outv]", "-map", "[outa]", "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", "-y", str(tmp_out), ] else: concat = "".join(concat_inputs) + f"concat=n={n}:v=1:a=0[outv]" filter_complex = ";".join(norm_filters + [concat]) cmd += [ "-filter_complex", filter_complex, "-map", "[outv]", "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-an", "-movflags", "+faststart", "-y", str(tmp_out), ] rc, stderr_text = await attempt(cmd) if rc != 0: async with jobs_lock: jobs[job_id]["status"] = JobStatus.FAILED jobs[job_id]["error"] = stderr_text[-8000:] if stderr_text else "Unknown error" jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() return # 4. Залить результат в MinIO async with aiofiles.open(str(tmp_out), "rb") as f: content = await f.read() await loop.run_in_executor( None, lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4") ) async with jobs_lock: jobs[job_id]["status"] = JobStatus.COMPLETED jobs[job_id]["output_s3"] = output_s3 jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() except Exception as e: async with jobs_lock: jobs[job_id]["status"] = JobStatus.FAILED jobs[job_id]["error"] = str(e) jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() finally: try: for item in inputs: item["tmp"].unlink(missing_ok=True) list_file.unlink(missing_ok=True) tmp_out.unlink(missing_ok=True) except: pass # ─── Запуск воркеров при старте ─────────────────────────────────────────────── @app.on_event("startup") async def startup_event(): """Запуск воркеров очереди + чистка осиротевших временных файлов.""" try: now = time.time() for pat in ("_in_*", "_out_*", "_concat_*", "_list_*"): for f in FILES_DIR.glob(pat): try: if now - f.stat().st_mtime > 3600: # 1 час f.unlink(missing_ok=True) except Exception: pass except Exception: pass for i in range(NUM_WORKERS): asyncio.create_task(job_worker(i)) # ─── POST /run-s3 ───────────────────────────────────────────────────────────── @app.post("/run-s3") async def run_ffmpeg_s3(req: FfmpegS3Request): """Создаёт задачу и добавляет в очередь, сразу возвращает job_id""" tmp_id = req.job_id or str(uuid.uuid4()) try: in_bucket, in_key = parse_s3_uri(req.input_s3) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if req.output_s3: try: out_bucket, out_key = parse_s3_uri(req.output_s3) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) else: folder = "/".join(in_key.split("/")[:-1]) ext = in_key.rsplit(".", 1)[-1] if "." in in_key else "mp4" out_key = f"{folder}/{tmp_id}_processed.{ext}" if folder else f"{tmp_id}_processed.{ext}" out_bucket = in_bucket tmp_in = FILES_DIR / f"_in_{tmp_id}.{in_key.rsplit('.',1)[-1] if '.' in in_key else 'mp4'}" tmp_out = FILES_DIR / f"_out_{tmp_id}.mp4" output_s3 = f"s3://{out_bucket}/{out_key}" now = datetime.utcnow().isoformat() async with jobs_lock: jobs[tmp_id] = { "job_id": tmp_id, "status": JobStatus.WAITING, "created_at": now, "updated_at": now, "input_s3": req.input_s3, "output_s3": output_s3, "error": None, "progress": None, } # Добавляем в очередь await job_queue.put({ "kind": "ffmpeg", "job_id": tmp_id, "in_bucket": in_bucket, "in_key": in_key, "out_bucket": out_bucket, "out_key": out_key, "output_s3": output_s3, "ffmpeg_args": req.ffmpeg_args, "tmp_in": tmp_in, "tmp_out": tmp_out, }) await cleanup_old_jobs() return { "job_id": tmp_id, "status": JobStatus.WAITING, "status_url": f"/status/{tmp_id}", "queue_position": job_queue.qsize(), } # ─── POST /concat-s3 ────────────────────────────────────────────────────────── @app.post("/concat-s3") async def concat_s3(req: ConcatS3Request): """Склеивает несколько видео из MinIO в одно. Возвращает job_id и status_url.""" if len(req.inputs_s3) < 2: raise HTTPException(status_code=400, detail="inputs_s3 must contain at least 2 items") if len(req.inputs_s3) > 20: raise HTTPException(status_code=400, detail="inputs_s3 too long (max 20)") tmp_id = req.job_id or str(uuid.uuid4()) inputs: List[Dict[str, Any]] = [] for idx, uri in enumerate(req.inputs_s3): try: b, k = parse_s3_uri(uri) except ValueError as e: raise HTTPException(status_code=400, detail=f"inputs_s3[{idx}]: {e}") ext = k.rsplit(".", 1)[-1] if "." in k else "mp4" inputs.append({ "bucket": b, "key": k, "tmp": FILES_DIR / f"_concat_{tmp_id}_{idx}.{ext}", }) try: out_bucket, out_key = parse_s3_uri(req.output_s3) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) list_file = FILES_DIR / f"_concat_{tmp_id}.txt" tmp_out = FILES_DIR / f"_concat_out_{tmp_id}.mp4" output_s3 = f"s3://{out_bucket}/{out_key}" now = datetime.utcnow().isoformat() async with jobs_lock: jobs[tmp_id] = { "job_id": tmp_id, "status": JobStatus.WAITING, "created_at": now, "updated_at": now, "input_s3": ",".join(req.inputs_s3), "output_s3": output_s3, "error": None, "progress": None, } await job_queue.put({ "kind": "concat", "job_id": tmp_id, "inputs": inputs, "out_bucket": out_bucket, "out_key": out_key, "output_s3": output_s3, "audio": req.audio, "reencode": req.reencode, "list_file": list_file, "tmp_out": tmp_out, }) await cleanup_old_jobs() return { "job_id": tmp_id, "status": JobStatus.WAITING, "status_url": f"/status/{tmp_id}", "queue_position": job_queue.qsize(), } # ─── GET /status/{job_id} ───────────────────────────────────────────────────── @app.get("/status/{job_id}", response_model=JobInfo) async def get_job_status(job_id: str): """Получить статус задачи""" async with jobs_lock: if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id].copy() return JobInfo(**job) # ─── GET /status ────────────────────────────────────────────────────────────── @app.get("/status") async def list_statuses(): """Список всех задач""" async with jobs_lock: return [ { "job_id": j["job_id"], "status": j["status"], "created_at": j["created_at"], "updated_at": j["updated_at"], } for j in jobs.values() ] # ─── DELETE /jobs/{job_id} ──────────────────────────────────────────────────── @app.delete("/jobs/{job_id}") async def delete_job(job_id: str): """Удалить задачу""" async with jobs_lock: if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") del jobs[job_id] return {"ok": True, "message": "Job deleted"} # ─── Health check ───────────────────────────────────────────────────────────── @app.get("/health") async def health_check(): return { "status": "ok", "jobs_count": len(jobs), "queue_size": job_queue.qsize(), "workers": NUM_WORKERS, }