685 lines
27 KiB
Python
685 lines
27 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
from typing import Optional, Dict, Any, List, Literal
|
|
import subprocess
|
|
import os
|
|
import uuid
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
import asyncio
|
|
import time
|
|
import aiofiles
|
|
|
|
import boto3
|
|
from botocore.client import Config
|
|
|
|
app = FastAPI()
|
|
|
|
FILES_DIR = Path("/files")
|
|
|
|
# ─── S3 / MinIO ───────────────────────────────────────────────────────────────
|
|
S3_ENDPOINT = os.getenv("S3_ENDPOINT", "http://minio:9000")
|
|
S3_ACCESS = os.getenv("S3_ACCESS_KEY", "UN0-admin")
|
|
S3_SECRET = os.getenv("S3_SECRET_KEY", "RAygtZHqGN49qKn")
|
|
S3_REGION = os.getenv("S3_REGION", "us-east-1")
|
|
|
|
def get_s3():
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=S3_ENDPOINT,
|
|
aws_access_key_id=S3_ACCESS,
|
|
aws_secret_access_key=S3_SECRET,
|
|
region_name=S3_REGION,
|
|
config=Config(signature_version="s3v4"),
|
|
)
|
|
|
|
def parse_s3_uri(uri: str) -> tuple[str, str]:
|
|
if not uri.startswith("s3://"):
|
|
raise ValueError(f"Not an s3:// URI: {uri}")
|
|
parts = uri[5:].split("/", 1)
|
|
if len(parts) != 2:
|
|
raise ValueError(f"Bad s3 URI: {uri}")
|
|
return parts[0], parts[1]
|
|
|
|
|
|
# ─── Модели ───────────────────────────────────────────────────────────────────
|
|
class JobStatus(str, Enum):
|
|
WAITING = "pending" # Ждёт начала обработки (pending для обратной совместимости)
|
|
PROCESSING = "processing"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
class JobInfo(BaseModel):
|
|
job_id: str
|
|
status: JobStatus
|
|
created_at: str
|
|
updated_at: str
|
|
input_s3: Optional[str] = None
|
|
output_s3: Optional[str] = None
|
|
error: Optional[str] = None
|
|
progress: Optional[int] = None
|
|
|
|
class FfmpegS3Request(BaseModel):
|
|
input_s3: str
|
|
ffmpeg_args: list[str]
|
|
output_s3: str | None = None
|
|
job_id: str | None = None
|
|
|
|
class ConcatS3Request(BaseModel):
|
|
inputs_s3: List[str]
|
|
output_s3: str
|
|
audio: Literal["keep", "mute", "first-only"] = "keep"
|
|
reencode: bool = False
|
|
job_id: Optional[str] = None
|
|
|
|
|
|
# ─── Хранилище задач и очередь ────────────────────────────────────────────────
|
|
jobs: Dict[str, Dict[str, Any]] = {}
|
|
job_queue: asyncio.Queue = asyncio.Queue()
|
|
JOB_TTL = timedelta(hours=1)
|
|
jobs_lock = asyncio.Lock()
|
|
NUM_WORKERS = 4
|
|
|
|
|
|
# ─── Очистка старых задач ─────────────────────────────────────────────────────
|
|
async def cleanup_old_jobs():
|
|
now = datetime.utcnow()
|
|
to_delete = []
|
|
async with jobs_lock:
|
|
for job_id, job in jobs.items():
|
|
if job["status"] in (JobStatus.COMPLETED, JobStatus.FAILED):
|
|
updated = datetime.fromisoformat(job["updated_at"])
|
|
if now - updated > JOB_TTL:
|
|
to_delete.append(job_id)
|
|
for job_id in to_delete:
|
|
del jobs[job_id]
|
|
|
|
|
|
# ─── Воркер очереди ───────────────────────────────────────────────────────────
|
|
async def job_worker(worker_id: int):
|
|
"""Обработчик задач из очереди"""
|
|
while True:
|
|
job_data = await job_queue.get()
|
|
job_id = job_data["job_id"]
|
|
kind = job_data.get("kind", "ffmpeg")
|
|
try:
|
|
if kind == "concat":
|
|
await process_concat_job(job_id, job_data)
|
|
else:
|
|
await process_ffmpeg_job(job_id, job_data)
|
|
finally:
|
|
job_queue.task_done()
|
|
|
|
|
|
async def process_ffmpeg_job(job_id: str, job_data: Dict[str, Any]):
|
|
"""Асинхронная обработка задачи"""
|
|
s3 = get_s3()
|
|
|
|
in_bucket = job_data["in_bucket"]
|
|
in_key = job_data["in_key"]
|
|
out_bucket = job_data["out_bucket"]
|
|
out_key = job_data["out_key"]
|
|
output_s3 = job_data["output_s3"]
|
|
ffmpeg_args = job_data["ffmpeg_args"]
|
|
tmp_in = job_data["tmp_in"]
|
|
tmp_out = job_data["tmp_out"]
|
|
|
|
try:
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.PROCESSING
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
# 1. Скачать из MinIO
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None, lambda: s3.download_file(in_bucket, in_key, str(tmp_in))
|
|
)
|
|
|
|
# 2. Запустить FFmpeg
|
|
cmd = ["ffmpeg", "-i", str(tmp_in), *ffmpeg_args, "-y", str(tmp_out)]
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
cwd=str(FILES_DIR),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
error_msg = stderr.decode("utf-8", errors="ignore")[-8000:] if stderr else "Unknown error"
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.FAILED
|
|
jobs[job_id]["error"] = error_msg
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
return
|
|
|
|
# 3. Загрузить результат в MinIO
|
|
async with aiofiles.open(str(tmp_out), "rb") as f:
|
|
content = await f.read()
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None, lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4")
|
|
)
|
|
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.COMPLETED
|
|
jobs[job_id]["output_s3"] = output_s3
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
except Exception as e:
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.FAILED
|
|
jobs[job_id]["error"] = str(e)
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
finally:
|
|
try:
|
|
tmp_in.unlink(missing_ok=True)
|
|
tmp_out.unlink(missing_ok=True)
|
|
except:
|
|
pass
|
|
|
|
|
|
|
|
async def _probe_video_size(path: Path) -> tuple[int, int]:
|
|
"""Return (width, height) using ffprobe; (0,0) on failure."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ffprobe", "-v", "error", "-select_streams", "v:0",
|
|
"-show_entries", "stream=width,height", "-of", "csv=p=0:s=x", str(path),
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
out, _ = await proc.communicate()
|
|
try:
|
|
w, h = out.decode().strip().split("x")
|
|
return int(w), int(h)
|
|
except Exception:
|
|
return 0, 0
|
|
|
|
async def _probe_main_video_index(path: Path) -> int:
|
|
"""Index of the first video stream that is NOT an attached_pic/cover art."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ffprobe", "-v", "error", "-select_streams", "v",
|
|
"-show_entries", "stream=index,disposition",
|
|
"-of", "default=nw=1:nk=0", str(path),
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
out, _ = await proc.communicate()
|
|
text = out.decode("utf-8", errors="ignore").splitlines()
|
|
streams: List[dict] = []
|
|
cur: dict = {}
|
|
for line in text:
|
|
if "=" not in line:
|
|
continue
|
|
k, v = line.split("=", 1)
|
|
if k == "index":
|
|
if cur:
|
|
streams.append(cur)
|
|
cur = {"index": int(v)}
|
|
else:
|
|
cur[k] = v
|
|
if cur:
|
|
streams.append(cur)
|
|
main = [s for s in streams if s.get("disposition:attached_pic", "0") != "1"]
|
|
if not main:
|
|
return 0
|
|
# ffprobe video-only stream order is the per-type index; first non-attached_pic is 0.
|
|
return next((i for i, s in enumerate(streams) if s.get("disposition:attached_pic", "0") != "1"), 0)
|
|
|
|
|
|
async def _probe_has_audio(path: Path) -> bool:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ffprobe", "-v", "error", "-select_streams", "a:0",
|
|
"-show_entries", "stream=codec_type", "-of", "csv=p=0", str(path),
|
|
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
out, _ = await proc.communicate()
|
|
return b"audio" in out
|
|
|
|
|
|
async def process_concat_job(job_id: str, job_data: Dict[str, Any]):
|
|
"""Склейка нескольких видео из MinIO в одно через ffmpeg concat."""
|
|
s3 = get_s3()
|
|
|
|
inputs: List[Dict[str, str]] = job_data["inputs"] # [{"bucket":..,"key":..,"tmp":Path}]
|
|
out_bucket: str = job_data["out_bucket"]
|
|
out_key: str = job_data["out_key"]
|
|
output_s3: str = job_data["output_s3"]
|
|
audio_mode: str = job_data["audio"]
|
|
reencode: bool = job_data["reencode"]
|
|
list_file: Path = job_data["list_file"]
|
|
tmp_out: Path = job_data["tmp_out"]
|
|
|
|
try:
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.PROCESSING
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
# 1. Скачать все входные файлы из MinIO параллельно
|
|
loop = asyncio.get_event_loop()
|
|
await asyncio.gather(*[
|
|
loop.run_in_executor(
|
|
None,
|
|
lambda b=item["bucket"], k=item["key"], t=item["tmp"]: s3.download_file(b, k, str(t))
|
|
)
|
|
for item in inputs
|
|
])
|
|
|
|
# 2. Сформировать list.txt для concat demuxer
|
|
# Имена пишем относительно cwd=FILES_DIR (где лежат файлы), без полных путей
|
|
# для безопасности concat (-safe 0 разрешает абсолютные, но проще относительные).
|
|
async with aiofiles.open(str(list_file), "w") as f:
|
|
for item in inputs:
|
|
# экранируем одинарные кавычки в имени файла
|
|
safe = str(item["tmp"]).replace("'", "'\\''")
|
|
await f.write(f"file '{safe}'\n")
|
|
|
|
# 3. Сформировать команду
|
|
async def run_ffmpeg(args: List[str]) -> tuple[int, str]:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ffmpeg", *args,
|
|
cwd=str(FILES_DIR),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
return proc.returncode, stderr.decode("utf-8", errors="ignore") if stderr else ""
|
|
|
|
async def attempt(cmd_args: List[str]) -> tuple[int, str]:
|
|
return await run_ffmpeg(cmd_args)
|
|
|
|
rc = -1
|
|
stderr_text = ""
|
|
|
|
if audio_mode == "first-only":
|
|
# video concat по всем (с нормализацией), audio только из первого. Всегда reencode.
|
|
n = len(inputs)
|
|
sizes = await asyncio.gather(*[_probe_video_size(item["tmp"]) for item in inputs])
|
|
vi = await asyncio.gather(*[_probe_main_video_index(item["tmp"]) for item in inputs])
|
|
tgt_w = max((w for w, _ in sizes if w > 0), default=1280)
|
|
tgt_h = max((h for _, h in sizes if h > 0), default=720)
|
|
if tgt_w % 2: tgt_w += 1
|
|
if tgt_h % 2: tgt_h += 1
|
|
cmd: List[str] = []
|
|
for item in inputs:
|
|
cmd += ["-i", str(item["tmp"])]
|
|
norm_filters = [
|
|
f"[{i}:v:{vi[i]}]scale={tgt_w}:{tgt_h}:force_original_aspect_ratio=decrease,"
|
|
f"pad={tgt_w}:{tgt_h}:(ow-iw)/2:(oh-ih)/2:color=black,"
|
|
f"setsar=1,fps=30,format=yuv420p[v{i}]"
|
|
for i in range(n)
|
|
]
|
|
v_chain = "".join(f"[v{i}]" for i in range(n)) + f"concat=n={n}:v=1:a=0[outv]"
|
|
filter_complex = ";".join(norm_filters + [v_chain])
|
|
cmd += [
|
|
"-filter_complex", filter_complex,
|
|
"-map", "[outv]",
|
|
"-map", "0:a:0?",
|
|
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
|
|
"-c:a", "aac", "-b:a", "128k",
|
|
"-movflags", "+faststart",
|
|
"-y", str(tmp_out),
|
|
]
|
|
rc, stderr_text = await attempt(cmd)
|
|
|
|
else:
|
|
# keep / mute → если разрешено copy, пробуем concat demuxer без перекодировки.
|
|
base_cmd = ["-f", "concat", "-safe", "0", "-i", str(list_file)]
|
|
if audio_mode == "mute":
|
|
copy_cmd = base_cmd + ["-c:v", "copy", "-an", "-movflags", "+faststart", "-y", str(tmp_out)]
|
|
else: # keep
|
|
copy_cmd = base_cmd + ["-c", "copy", "-movflags", "+faststart", "-y", str(tmp_out)]
|
|
|
|
if not reencode:
|
|
rc, stderr_text = await attempt(copy_cmd)
|
|
|
|
# Перекодировка с нормализацией размера/SAR/fps для разнородных входов.
|
|
if reencode or rc != 0:
|
|
n = len(inputs)
|
|
# Подбираем целевое разрешение = максимум по всем входам (чётное).
|
|
sizes = await asyncio.gather(*[_probe_video_size(item["tmp"]) for item in inputs])
|
|
vi = await asyncio.gather(*[_probe_main_video_index(item["tmp"]) for item in inputs])
|
|
tgt_w = max((w for w, _ in sizes if w > 0), default=1280)
|
|
tgt_h = max((h for _, h in sizes if h > 0), default=720)
|
|
if tgt_w % 2: tgt_w += 1
|
|
if tgt_h % 2: tgt_h += 1
|
|
|
|
cmd: List[str] = []
|
|
for item in inputs:
|
|
cmd += ["-i", str(item["tmp"])]
|
|
|
|
# Для keep-режима проверяем наличие аудио в каждом входе; если где-то нет —
|
|
# подмешиваем тишину через anullsrc, иначе concat=...:a=1 упадёт.
|
|
want_audio = audio_mode == "keep"
|
|
has_audio: List[bool] = []
|
|
if want_audio:
|
|
has_audio = list(await asyncio.gather(*[_probe_has_audio(item["tmp"]) for item in inputs]))
|
|
# Добавляем anullsrc-вход для каждого клипа без аудио.
|
|
for missing in (i for i, ok in enumerate(has_audio) if not ok):
|
|
cmd += ["-f", "lavfi", "-t", "0.1", "-i", "anullsrc=channel_layout=stereo:sample_rate=44100"]
|
|
|
|
norm_filters: List[str] = []
|
|
concat_inputs: List[str] = []
|
|
anull_idx = n # индекс первого anullsrc-входа
|
|
for i in range(n):
|
|
norm_filters.append(
|
|
f"[{i}:v:{vi[i]}]scale={tgt_w}:{tgt_h}:force_original_aspect_ratio=decrease,"
|
|
f"pad={tgt_w}:{tgt_h}:(ow-iw)/2:(oh-ih)/2:color=black,"
|
|
f"setsar=1,fps=30,format=yuv420p[v{i}]"
|
|
)
|
|
if want_audio:
|
|
if has_audio[i]:
|
|
norm_filters.append(f"[{i}:a:0]aresample=async=1:first_pts=0[a{i}]")
|
|
else:
|
|
norm_filters.append(f"[{anull_idx}:a:0]aresample=async=1[a{i}]")
|
|
anull_idx += 1
|
|
concat_inputs.append(f"[v{i}][a{i}]")
|
|
else:
|
|
concat_inputs.append(f"[v{i}]")
|
|
|
|
if want_audio:
|
|
concat = "".join(concat_inputs) + f"concat=n={n}:v=1:a=1[outv][outa]"
|
|
filter_complex = ";".join(norm_filters + [concat])
|
|
cmd += [
|
|
"-filter_complex", filter_complex,
|
|
"-map", "[outv]", "-map", "[outa]",
|
|
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
|
|
"-c:a", "aac", "-b:a", "128k",
|
|
"-movflags", "+faststart",
|
|
"-y", str(tmp_out),
|
|
]
|
|
else:
|
|
concat = "".join(concat_inputs) + f"concat=n={n}:v=1:a=0[outv]"
|
|
filter_complex = ";".join(norm_filters + [concat])
|
|
cmd += [
|
|
"-filter_complex", filter_complex,
|
|
"-map", "[outv]",
|
|
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
|
|
"-an",
|
|
"-movflags", "+faststart",
|
|
"-y", str(tmp_out),
|
|
]
|
|
rc, stderr_text = await attempt(cmd)
|
|
|
|
if rc != 0:
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.FAILED
|
|
jobs[job_id]["error"] = stderr_text[-8000:] if stderr_text else "Unknown error"
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
return
|
|
|
|
# 4. Залить результат в MinIO
|
|
async with aiofiles.open(str(tmp_out), "rb") as f:
|
|
content = await f.read()
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4")
|
|
)
|
|
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.COMPLETED
|
|
jobs[job_id]["output_s3"] = output_s3
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
except Exception as e:
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.FAILED
|
|
jobs[job_id]["error"] = str(e)
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
finally:
|
|
try:
|
|
for item in inputs:
|
|
item["tmp"].unlink(missing_ok=True)
|
|
list_file.unlink(missing_ok=True)
|
|
tmp_out.unlink(missing_ok=True)
|
|
except:
|
|
pass
|
|
|
|
|
|
# ─── Запуск воркеров при старте ───────────────────────────────────────────────
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Запуск воркеров очереди + чистка осиротевших временных файлов."""
|
|
try:
|
|
now = time.time()
|
|
for pat in ("_in_*", "_out_*", "_concat_*", "_list_*"):
|
|
for f in FILES_DIR.glob(pat):
|
|
try:
|
|
if now - f.stat().st_mtime > 3600: # 1 час
|
|
f.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
for i in range(NUM_WORKERS):
|
|
asyncio.create_task(job_worker(i))
|
|
|
|
|
|
# ─── POST /run-s3 ─────────────────────────────────────────────────────────────
|
|
@app.post("/run-s3")
|
|
async def run_ffmpeg_s3(req: FfmpegS3Request):
|
|
"""Создаёт задачу и добавляет в очередь, сразу возвращает job_id"""
|
|
tmp_id = req.job_id or str(uuid.uuid4())
|
|
|
|
try:
|
|
in_bucket, in_key = parse_s3_uri(req.input_s3)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
if req.output_s3:
|
|
try:
|
|
out_bucket, out_key = parse_s3_uri(req.output_s3)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
else:
|
|
folder = "/".join(in_key.split("/")[:-1])
|
|
ext = in_key.rsplit(".", 1)[-1] if "." in in_key else "mp4"
|
|
out_key = f"{folder}/{tmp_id}_processed.{ext}" if folder else f"{tmp_id}_processed.{ext}"
|
|
out_bucket = in_bucket
|
|
|
|
tmp_in = FILES_DIR / f"_in_{tmp_id}.{in_key.rsplit('.',1)[-1] if '.' in in_key else 'mp4'}"
|
|
tmp_out = FILES_DIR / f"_out_{tmp_id}.mp4"
|
|
output_s3 = f"s3://{out_bucket}/{out_key}"
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
async with jobs_lock:
|
|
jobs[tmp_id] = {
|
|
"job_id": tmp_id,
|
|
"status": JobStatus.WAITING,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
"input_s3": req.input_s3,
|
|
"output_s3": output_s3,
|
|
"error": None,
|
|
"progress": None,
|
|
}
|
|
|
|
# Добавляем в очередь
|
|
await job_queue.put({
|
|
"kind": "ffmpeg",
|
|
"job_id": tmp_id,
|
|
"in_bucket": in_bucket,
|
|
"in_key": in_key,
|
|
"out_bucket": out_bucket,
|
|
"out_key": out_key,
|
|
"output_s3": output_s3,
|
|
"ffmpeg_args": req.ffmpeg_args,
|
|
"tmp_in": tmp_in,
|
|
"tmp_out": tmp_out,
|
|
})
|
|
|
|
await cleanup_old_jobs()
|
|
|
|
return {
|
|
"job_id": tmp_id,
|
|
"status": JobStatus.WAITING,
|
|
"status_url": f"/status/{tmp_id}",
|
|
"queue_position": job_queue.qsize(),
|
|
}
|
|
|
|
|
|
# ─── POST /concat-s3 ──────────────────────────────────────────────────────────
|
|
@app.post("/concat-s3")
|
|
async def concat_s3(req: ConcatS3Request):
|
|
"""Склеивает несколько видео из MinIO в одно. Возвращает job_id и status_url."""
|
|
if len(req.inputs_s3) < 2:
|
|
raise HTTPException(status_code=400, detail="inputs_s3 must contain at least 2 items")
|
|
if len(req.inputs_s3) > 20:
|
|
raise HTTPException(status_code=400, detail="inputs_s3 too long (max 20)")
|
|
|
|
tmp_id = req.job_id or str(uuid.uuid4())
|
|
|
|
inputs: List[Dict[str, Any]] = []
|
|
for idx, uri in enumerate(req.inputs_s3):
|
|
try:
|
|
b, k = parse_s3_uri(uri)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=f"inputs_s3[{idx}]: {e}")
|
|
ext = k.rsplit(".", 1)[-1] if "." in k else "mp4"
|
|
inputs.append({
|
|
"bucket": b,
|
|
"key": k,
|
|
"tmp": FILES_DIR / f"_concat_{tmp_id}_{idx}.{ext}",
|
|
})
|
|
|
|
try:
|
|
out_bucket, out_key = parse_s3_uri(req.output_s3)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
list_file = FILES_DIR / f"_concat_{tmp_id}.txt"
|
|
tmp_out = FILES_DIR / f"_concat_out_{tmp_id}.mp4"
|
|
output_s3 = f"s3://{out_bucket}/{out_key}"
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
async with jobs_lock:
|
|
jobs[tmp_id] = {
|
|
"job_id": tmp_id,
|
|
"status": JobStatus.WAITING,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
"input_s3": ",".join(req.inputs_s3),
|
|
"output_s3": output_s3,
|
|
"error": None,
|
|
"progress": None,
|
|
}
|
|
|
|
await job_queue.put({
|
|
"kind": "concat",
|
|
"job_id": tmp_id,
|
|
"inputs": inputs,
|
|
"out_bucket": out_bucket,
|
|
"out_key": out_key,
|
|
"output_s3": output_s3,
|
|
"audio": req.audio,
|
|
"reencode": req.reencode,
|
|
"list_file": list_file,
|
|
"tmp_out": tmp_out,
|
|
})
|
|
|
|
await cleanup_old_jobs()
|
|
|
|
return {
|
|
"job_id": tmp_id,
|
|
"status": JobStatus.WAITING,
|
|
"status_url": f"/status/{tmp_id}",
|
|
"queue_position": job_queue.qsize(),
|
|
}
|
|
|
|
|
|
# ─── GET /status/{job_id} ─────────────────────────────────────────────────────
|
|
@app.get("/status/{job_id}", response_model=JobInfo)
|
|
async def get_job_status(job_id: str):
|
|
"""Получить статус задачи"""
|
|
async with jobs_lock:
|
|
if job_id not in jobs:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
job = jobs[job_id].copy()
|
|
return JobInfo(**job)
|
|
|
|
|
|
# ─── GET /status ──────────────────────────────────────────────────────────────
|
|
@app.get("/status")
|
|
async def list_statuses():
|
|
"""Список всех задач"""
|
|
async with jobs_lock:
|
|
return [
|
|
{
|
|
"job_id": j["job_id"],
|
|
"status": j["status"],
|
|
"created_at": j["created_at"],
|
|
"updated_at": j["updated_at"],
|
|
}
|
|
for j in jobs.values()
|
|
]
|
|
|
|
|
|
# ─── DELETE /jobs/{job_id} ────────────────────────────────────────────────────
|
|
@app.delete("/jobs/{job_id}")
|
|
async def delete_job(job_id: str):
|
|
"""Удалить задачу"""
|
|
async with jobs_lock:
|
|
if job_id not in jobs:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
del jobs[job_id]
|
|
return {"ok": True, "message": "Job deleted"}
|
|
|
|
|
|
# ─── Health check ─────────────────────────────────────────────────────────────
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {
|
|
"status": "ok",
|
|
"jobs_count": len(jobs),
|
|
"queue_size": job_queue.qsize(),
|
|
"workers": NUM_WORKERS,
|
|
}
|
|
|
|
|
|
# ─── POST /ingest-url ─────────────────────────────────────────────────────────
|
|
import urllib.request
|
|
|
|
class IngestUrlRequest(BaseModel):
|
|
url: str
|
|
output_s3: str
|
|
content_type: Optional[str] = None
|
|
|
|
@app.post("/ingest-url")
|
|
async def ingest_url(req: IngestUrlRequest):
|
|
"""Скачивает файл по URL и кладёт его в S3/MinIO. Возвращает {output_s3}."""
|
|
try:
|
|
out_bucket, out_key = parse_s3_uri(req.output_s3)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
tmp = FILES_DIR / f"_ingest_{uuid.uuid4().hex}"
|
|
|
|
def _download_and_upload():
|
|
request = urllib.request.Request(req.url, headers={"User-Agent": "uno-ffmpeg-api/1.0"})
|
|
with urllib.request.urlopen(request, timeout=300) as resp:
|
|
with open(tmp, "wb") as f:
|
|
while True:
|
|
chunk = resp.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
f.write(chunk)
|
|
s3 = get_s3()
|
|
extra = {"ContentType": req.content_type} if req.content_type else None
|
|
if extra:
|
|
s3.upload_file(str(tmp), out_bucket, out_key, ExtraArgs=extra)
|
|
else:
|
|
s3.upload_file(str(tmp), out_bucket, out_key)
|
|
|
|
try:
|
|
await asyncio.to_thread(_download_and_upload)
|
|
except urllib.error.HTTPError as e:
|
|
raise HTTPException(status_code=502, detail=f"Source URL HTTP {e.code}: {e.reason}")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Ingest failed: {e}")
|
|
finally:
|
|
try:
|
|
tmp.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"output_s3": req.output_s3, "content_type": req.content_type}
|