Files
uno-click/ffmpeg-api/app.py
T
2026-05-13 14:20:41 +00:00

685 lines
27 KiB
Python

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any, List, Literal
import subprocess
import os
import uuid
from pathlib import Path
from datetime import datetime, timedelta
from enum import Enum
import asyncio
import time
import aiofiles
import boto3
from botocore.client import Config
app = FastAPI()
FILES_DIR = Path("/files")
# ─── S3 / MinIO ───────────────────────────────────────────────────────────────
S3_ENDPOINT = os.getenv("S3_ENDPOINT", "http://minio:9000")
S3_ACCESS = os.getenv("S3_ACCESS_KEY", "UN0-admin")
S3_SECRET = os.getenv("S3_SECRET_KEY", "RAygtZHqGN49qKn")
S3_REGION = os.getenv("S3_REGION", "us-east-1")
def get_s3():
return boto3.client(
"s3",
endpoint_url=S3_ENDPOINT,
aws_access_key_id=S3_ACCESS,
aws_secret_access_key=S3_SECRET,
region_name=S3_REGION,
config=Config(signature_version="s3v4"),
)
def parse_s3_uri(uri: str) -> tuple[str, str]:
if not uri.startswith("s3://"):
raise ValueError(f"Not an s3:// URI: {uri}")
parts = uri[5:].split("/", 1)
if len(parts) != 2:
raise ValueError(f"Bad s3 URI: {uri}")
return parts[0], parts[1]
# ─── Модели ───────────────────────────────────────────────────────────────────
class JobStatus(str, Enum):
WAITING = "pending" # Ждёт начала обработки (pending для обратной совместимости)
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class JobInfo(BaseModel):
job_id: str
status: JobStatus
created_at: str
updated_at: str
input_s3: Optional[str] = None
output_s3: Optional[str] = None
error: Optional[str] = None
progress: Optional[int] = None
class FfmpegS3Request(BaseModel):
input_s3: str
ffmpeg_args: list[str]
output_s3: str | None = None
job_id: str | None = None
class ConcatS3Request(BaseModel):
inputs_s3: List[str]
output_s3: str
audio: Literal["keep", "mute", "first-only"] = "keep"
reencode: bool = False
job_id: Optional[str] = None
# ─── Хранилище задач и очередь ────────────────────────────────────────────────
jobs: Dict[str, Dict[str, Any]] = {}
job_queue: asyncio.Queue = asyncio.Queue()
JOB_TTL = timedelta(hours=1)
jobs_lock = asyncio.Lock()
NUM_WORKERS = 4
# ─── Очистка старых задач ─────────────────────────────────────────────────────
async def cleanup_old_jobs():
now = datetime.utcnow()
to_delete = []
async with jobs_lock:
for job_id, job in jobs.items():
if job["status"] in (JobStatus.COMPLETED, JobStatus.FAILED):
updated = datetime.fromisoformat(job["updated_at"])
if now - updated > JOB_TTL:
to_delete.append(job_id)
for job_id in to_delete:
del jobs[job_id]
# ─── Воркер очереди ───────────────────────────────────────────────────────────
async def job_worker(worker_id: int):
"""Обработчик задач из очереди"""
while True:
job_data = await job_queue.get()
job_id = job_data["job_id"]
kind = job_data.get("kind", "ffmpeg")
try:
if kind == "concat":
await process_concat_job(job_id, job_data)
else:
await process_ffmpeg_job(job_id, job_data)
finally:
job_queue.task_done()
async def process_ffmpeg_job(job_id: str, job_data: Dict[str, Any]):
"""Асинхронная обработка задачи"""
s3 = get_s3()
in_bucket = job_data["in_bucket"]
in_key = job_data["in_key"]
out_bucket = job_data["out_bucket"]
out_key = job_data["out_key"]
output_s3 = job_data["output_s3"]
ffmpeg_args = job_data["ffmpeg_args"]
tmp_in = job_data["tmp_in"]
tmp_out = job_data["tmp_out"]
try:
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.PROCESSING
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
# 1. Скачать из MinIO
await asyncio.get_event_loop().run_in_executor(
None, lambda: s3.download_file(in_bucket, in_key, str(tmp_in))
)
# 2. Запустить FFmpeg
cmd = ["ffmpeg", "-i", str(tmp_in), *ffmpeg_args, "-y", str(tmp_out)]
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=str(FILES_DIR),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode("utf-8", errors="ignore")[-8000:] if stderr else "Unknown error"
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.FAILED
jobs[job_id]["error"] = error_msg
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
return
# 3. Загрузить результат в MinIO
async with aiofiles.open(str(tmp_out), "rb") as f:
content = await f.read()
await asyncio.get_event_loop().run_in_executor(
None, lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4")
)
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.COMPLETED
jobs[job_id]["output_s3"] = output_s3
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
except Exception as e:
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.FAILED
jobs[job_id]["error"] = str(e)
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
finally:
try:
tmp_in.unlink(missing_ok=True)
tmp_out.unlink(missing_ok=True)
except:
pass
async def _probe_video_size(path: Path) -> tuple[int, int]:
"""Return (width, height) using ffprobe; (0,0) on failure."""
proc = await asyncio.create_subprocess_exec(
"ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "csv=p=0:s=x", str(path),
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
)
out, _ = await proc.communicate()
try:
w, h = out.decode().strip().split("x")
return int(w), int(h)
except Exception:
return 0, 0
async def _probe_main_video_index(path: Path) -> int:
"""Index of the first video stream that is NOT an attached_pic/cover art."""
proc = await asyncio.create_subprocess_exec(
"ffprobe", "-v", "error", "-select_streams", "v",
"-show_entries", "stream=index,disposition",
"-of", "default=nw=1:nk=0", str(path),
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
)
out, _ = await proc.communicate()
text = out.decode("utf-8", errors="ignore").splitlines()
streams: List[dict] = []
cur: dict = {}
for line in text:
if "=" not in line:
continue
k, v = line.split("=", 1)
if k == "index":
if cur:
streams.append(cur)
cur = {"index": int(v)}
else:
cur[k] = v
if cur:
streams.append(cur)
main = [s for s in streams if s.get("disposition:attached_pic", "0") != "1"]
if not main:
return 0
# ffprobe video-only stream order is the per-type index; first non-attached_pic is 0.
return next((i for i, s in enumerate(streams) if s.get("disposition:attached_pic", "0") != "1"), 0)
async def _probe_has_audio(path: Path) -> bool:
proc = await asyncio.create_subprocess_exec(
"ffprobe", "-v", "error", "-select_streams", "a:0",
"-show_entries", "stream=codec_type", "-of", "csv=p=0", str(path),
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
)
out, _ = await proc.communicate()
return b"audio" in out
async def process_concat_job(job_id: str, job_data: Dict[str, Any]):
"""Склейка нескольких видео из MinIO в одно через ffmpeg concat."""
s3 = get_s3()
inputs: List[Dict[str, str]] = job_data["inputs"] # [{"bucket":..,"key":..,"tmp":Path}]
out_bucket: str = job_data["out_bucket"]
out_key: str = job_data["out_key"]
output_s3: str = job_data["output_s3"]
audio_mode: str = job_data["audio"]
reencode: bool = job_data["reencode"]
list_file: Path = job_data["list_file"]
tmp_out: Path = job_data["tmp_out"]
try:
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.PROCESSING
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
# 1. Скачать все входные файлы из MinIO параллельно
loop = asyncio.get_event_loop()
await asyncio.gather(*[
loop.run_in_executor(
None,
lambda b=item["bucket"], k=item["key"], t=item["tmp"]: s3.download_file(b, k, str(t))
)
for item in inputs
])
# 2. Сформировать list.txt для concat demuxer
# Имена пишем относительно cwd=FILES_DIR (где лежат файлы), без полных путей
# для безопасности concat (-safe 0 разрешает абсолютные, но проще относительные).
async with aiofiles.open(str(list_file), "w") as f:
for item in inputs:
# экранируем одинарные кавычки в имени файла
safe = str(item["tmp"]).replace("'", "'\\''")
await f.write(f"file '{safe}'\n")
# 3. Сформировать команду
async def run_ffmpeg(args: List[str]) -> tuple[int, str]:
proc = await asyncio.create_subprocess_exec(
"ffmpeg", *args,
cwd=str(FILES_DIR),
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
return proc.returncode, stderr.decode("utf-8", errors="ignore") if stderr else ""
async def attempt(cmd_args: List[str]) -> tuple[int, str]:
return await run_ffmpeg(cmd_args)
rc = -1
stderr_text = ""
if audio_mode == "first-only":
# video concat по всем (с нормализацией), audio только из первого. Всегда reencode.
n = len(inputs)
sizes = await asyncio.gather(*[_probe_video_size(item["tmp"]) for item in inputs])
vi = await asyncio.gather(*[_probe_main_video_index(item["tmp"]) for item in inputs])
tgt_w = max((w for w, _ in sizes if w > 0), default=1280)
tgt_h = max((h for _, h in sizes if h > 0), default=720)
if tgt_w % 2: tgt_w += 1
if tgt_h % 2: tgt_h += 1
cmd: List[str] = []
for item in inputs:
cmd += ["-i", str(item["tmp"])]
norm_filters = [
f"[{i}:v:{vi[i]}]scale={tgt_w}:{tgt_h}:force_original_aspect_ratio=decrease,"
f"pad={tgt_w}:{tgt_h}:(ow-iw)/2:(oh-ih)/2:color=black,"
f"setsar=1,fps=30,format=yuv420p[v{i}]"
for i in range(n)
]
v_chain = "".join(f"[v{i}]" for i in range(n)) + f"concat=n={n}:v=1:a=0[outv]"
filter_complex = ";".join(norm_filters + [v_chain])
cmd += [
"-filter_complex", filter_complex,
"-map", "[outv]",
"-map", "0:a:0?",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
"-y", str(tmp_out),
]
rc, stderr_text = await attempt(cmd)
else:
# keep / mute → если разрешено copy, пробуем concat demuxer без перекодировки.
base_cmd = ["-f", "concat", "-safe", "0", "-i", str(list_file)]
if audio_mode == "mute":
copy_cmd = base_cmd + ["-c:v", "copy", "-an", "-movflags", "+faststart", "-y", str(tmp_out)]
else: # keep
copy_cmd = base_cmd + ["-c", "copy", "-movflags", "+faststart", "-y", str(tmp_out)]
if not reencode:
rc, stderr_text = await attempt(copy_cmd)
# Перекодировка с нормализацией размера/SAR/fps для разнородных входов.
if reencode or rc != 0:
n = len(inputs)
# Подбираем целевое разрешение = максимум по всем входам (чётное).
sizes = await asyncio.gather(*[_probe_video_size(item["tmp"]) for item in inputs])
vi = await asyncio.gather(*[_probe_main_video_index(item["tmp"]) for item in inputs])
tgt_w = max((w for w, _ in sizes if w > 0), default=1280)
tgt_h = max((h for _, h in sizes if h > 0), default=720)
if tgt_w % 2: tgt_w += 1
if tgt_h % 2: tgt_h += 1
cmd: List[str] = []
for item in inputs:
cmd += ["-i", str(item["tmp"])]
# Для keep-режима проверяем наличие аудио в каждом входе; если где-то нет —
# подмешиваем тишину через anullsrc, иначе concat=...:a=1 упадёт.
want_audio = audio_mode == "keep"
has_audio: List[bool] = []
if want_audio:
has_audio = list(await asyncio.gather(*[_probe_has_audio(item["tmp"]) for item in inputs]))
# Добавляем anullsrc-вход для каждого клипа без аудио.
for missing in (i for i, ok in enumerate(has_audio) if not ok):
cmd += ["-f", "lavfi", "-t", "0.1", "-i", "anullsrc=channel_layout=stereo:sample_rate=44100"]
norm_filters: List[str] = []
concat_inputs: List[str] = []
anull_idx = n # индекс первого anullsrc-входа
for i in range(n):
norm_filters.append(
f"[{i}:v:{vi[i]}]scale={tgt_w}:{tgt_h}:force_original_aspect_ratio=decrease,"
f"pad={tgt_w}:{tgt_h}:(ow-iw)/2:(oh-ih)/2:color=black,"
f"setsar=1,fps=30,format=yuv420p[v{i}]"
)
if want_audio:
if has_audio[i]:
norm_filters.append(f"[{i}:a:0]aresample=async=1:first_pts=0[a{i}]")
else:
norm_filters.append(f"[{anull_idx}:a:0]aresample=async=1[a{i}]")
anull_idx += 1
concat_inputs.append(f"[v{i}][a{i}]")
else:
concat_inputs.append(f"[v{i}]")
if want_audio:
concat = "".join(concat_inputs) + f"concat=n={n}:v=1:a=1[outv][outa]"
filter_complex = ";".join(norm_filters + [concat])
cmd += [
"-filter_complex", filter_complex,
"-map", "[outv]", "-map", "[outa]",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
"-y", str(tmp_out),
]
else:
concat = "".join(concat_inputs) + f"concat=n={n}:v=1:a=0[outv]"
filter_complex = ";".join(norm_filters + [concat])
cmd += [
"-filter_complex", filter_complex,
"-map", "[outv]",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-an",
"-movflags", "+faststart",
"-y", str(tmp_out),
]
rc, stderr_text = await attempt(cmd)
if rc != 0:
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.FAILED
jobs[job_id]["error"] = stderr_text[-8000:] if stderr_text else "Unknown error"
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
return
# 4. Залить результат в MinIO
async with aiofiles.open(str(tmp_out), "rb") as f:
content = await f.read()
await loop.run_in_executor(
None,
lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4")
)
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.COMPLETED
jobs[job_id]["output_s3"] = output_s3
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
except Exception as e:
async with jobs_lock:
jobs[job_id]["status"] = JobStatus.FAILED
jobs[job_id]["error"] = str(e)
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
finally:
try:
for item in inputs:
item["tmp"].unlink(missing_ok=True)
list_file.unlink(missing_ok=True)
tmp_out.unlink(missing_ok=True)
except:
pass
# ─── Запуск воркеров при старте ───────────────────────────────────────────────
@app.on_event("startup")
async def startup_event():
"""Запуск воркеров очереди + чистка осиротевших временных файлов."""
try:
now = time.time()
for pat in ("_in_*", "_out_*", "_concat_*", "_list_*"):
for f in FILES_DIR.glob(pat):
try:
if now - f.stat().st_mtime > 3600: # 1 час
f.unlink(missing_ok=True)
except Exception:
pass
except Exception:
pass
for i in range(NUM_WORKERS):
asyncio.create_task(job_worker(i))
# ─── POST /run-s3 ─────────────────────────────────────────────────────────────
@app.post("/run-s3")
async def run_ffmpeg_s3(req: FfmpegS3Request):
"""Создаёт задачу и добавляет в очередь, сразу возвращает job_id"""
tmp_id = req.job_id or str(uuid.uuid4())
try:
in_bucket, in_key = parse_s3_uri(req.input_s3)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if req.output_s3:
try:
out_bucket, out_key = parse_s3_uri(req.output_s3)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
else:
folder = "/".join(in_key.split("/")[:-1])
ext = in_key.rsplit(".", 1)[-1] if "." in in_key else "mp4"
out_key = f"{folder}/{tmp_id}_processed.{ext}" if folder else f"{tmp_id}_processed.{ext}"
out_bucket = in_bucket
tmp_in = FILES_DIR / f"_in_{tmp_id}.{in_key.rsplit('.',1)[-1] if '.' in in_key else 'mp4'}"
tmp_out = FILES_DIR / f"_out_{tmp_id}.mp4"
output_s3 = f"s3://{out_bucket}/{out_key}"
now = datetime.utcnow().isoformat()
async with jobs_lock:
jobs[tmp_id] = {
"job_id": tmp_id,
"status": JobStatus.WAITING,
"created_at": now,
"updated_at": now,
"input_s3": req.input_s3,
"output_s3": output_s3,
"error": None,
"progress": None,
}
# Добавляем в очередь
await job_queue.put({
"kind": "ffmpeg",
"job_id": tmp_id,
"in_bucket": in_bucket,
"in_key": in_key,
"out_bucket": out_bucket,
"out_key": out_key,
"output_s3": output_s3,
"ffmpeg_args": req.ffmpeg_args,
"tmp_in": tmp_in,
"tmp_out": tmp_out,
})
await cleanup_old_jobs()
return {
"job_id": tmp_id,
"status": JobStatus.WAITING,
"status_url": f"/status/{tmp_id}",
"queue_position": job_queue.qsize(),
}
# ─── POST /concat-s3 ──────────────────────────────────────────────────────────
@app.post("/concat-s3")
async def concat_s3(req: ConcatS3Request):
"""Склеивает несколько видео из MinIO в одно. Возвращает job_id и status_url."""
if len(req.inputs_s3) < 2:
raise HTTPException(status_code=400, detail="inputs_s3 must contain at least 2 items")
if len(req.inputs_s3) > 20:
raise HTTPException(status_code=400, detail="inputs_s3 too long (max 20)")
tmp_id = req.job_id or str(uuid.uuid4())
inputs: List[Dict[str, Any]] = []
for idx, uri in enumerate(req.inputs_s3):
try:
b, k = parse_s3_uri(uri)
except ValueError as e:
raise HTTPException(status_code=400, detail=f"inputs_s3[{idx}]: {e}")
ext = k.rsplit(".", 1)[-1] if "." in k else "mp4"
inputs.append({
"bucket": b,
"key": k,
"tmp": FILES_DIR / f"_concat_{tmp_id}_{idx}.{ext}",
})
try:
out_bucket, out_key = parse_s3_uri(req.output_s3)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
list_file = FILES_DIR / f"_concat_{tmp_id}.txt"
tmp_out = FILES_DIR / f"_concat_out_{tmp_id}.mp4"
output_s3 = f"s3://{out_bucket}/{out_key}"
now = datetime.utcnow().isoformat()
async with jobs_lock:
jobs[tmp_id] = {
"job_id": tmp_id,
"status": JobStatus.WAITING,
"created_at": now,
"updated_at": now,
"input_s3": ",".join(req.inputs_s3),
"output_s3": output_s3,
"error": None,
"progress": None,
}
await job_queue.put({
"kind": "concat",
"job_id": tmp_id,
"inputs": inputs,
"out_bucket": out_bucket,
"out_key": out_key,
"output_s3": output_s3,
"audio": req.audio,
"reencode": req.reencode,
"list_file": list_file,
"tmp_out": tmp_out,
})
await cleanup_old_jobs()
return {
"job_id": tmp_id,
"status": JobStatus.WAITING,
"status_url": f"/status/{tmp_id}",
"queue_position": job_queue.qsize(),
}
# ─── GET /status/{job_id} ─────────────────────────────────────────────────────
@app.get("/status/{job_id}", response_model=JobInfo)
async def get_job_status(job_id: str):
"""Получить статус задачи"""
async with jobs_lock:
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id].copy()
return JobInfo(**job)
# ─── GET /status ──────────────────────────────────────────────────────────────
@app.get("/status")
async def list_statuses():
"""Список всех задач"""
async with jobs_lock:
return [
{
"job_id": j["job_id"],
"status": j["status"],
"created_at": j["created_at"],
"updated_at": j["updated_at"],
}
for j in jobs.values()
]
# ─── DELETE /jobs/{job_id} ────────────────────────────────────────────────────
@app.delete("/jobs/{job_id}")
async def delete_job(job_id: str):
"""Удалить задачу"""
async with jobs_lock:
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
del jobs[job_id]
return {"ok": True, "message": "Job deleted"}
# ─── Health check ─────────────────────────────────────────────────────────────
@app.get("/health")
async def health_check():
return {
"status": "ok",
"jobs_count": len(jobs),
"queue_size": job_queue.qsize(),
"workers": NUM_WORKERS,
}
# ─── POST /ingest-url ─────────────────────────────────────────────────────────
import urllib.request
class IngestUrlRequest(BaseModel):
url: str
output_s3: str
content_type: Optional[str] = None
@app.post("/ingest-url")
async def ingest_url(req: IngestUrlRequest):
"""Скачивает файл по URL и кладёт его в S3/MinIO. Возвращает {output_s3}."""
try:
out_bucket, out_key = parse_s3_uri(req.output_s3)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
tmp = FILES_DIR / f"_ingest_{uuid.uuid4().hex}"
def _download_and_upload():
request = urllib.request.Request(req.url, headers={"User-Agent": "uno-ffmpeg-api/1.0"})
with urllib.request.urlopen(request, timeout=300) as resp:
with open(tmp, "wb") as f:
while True:
chunk = resp.read(1024 * 1024)
if not chunk:
break
f.write(chunk)
s3 = get_s3()
extra = {"ContentType": req.content_type} if req.content_type else None
if extra:
s3.upload_file(str(tmp), out_bucket, out_key, ExtraArgs=extra)
else:
s3.upload_file(str(tmp), out_bucket, out_key)
try:
await asyncio.to_thread(_download_and_upload)
except urllib.error.HTTPError as e:
raise HTTPException(status_code=502, detail=f"Source URL HTTP {e.code}: {e.reason}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ingest failed: {e}")
finally:
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
return {"output_s3": req.output_s3, "content_type": req.content_type}