287 lines
10 KiB
Plaintext
287 lines
10 KiB
Plaintext
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
from typing import Optional, Dict, Any, List
|
|
import subprocess
|
|
import os
|
|
import uuid
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
import asyncio
|
|
import aiofiles
|
|
|
|
import boto3
|
|
from botocore.client import Config
|
|
|
|
app = FastAPI()
|
|
|
|
FILES_DIR = Path("/files")
|
|
|
|
# ─── S3 / MinIO ───────────────────────────────────────────────────────────────
|
|
S3_ENDPOINT = os.getenv("S3_ENDPOINT", "http://minio:9000")
|
|
S3_ACCESS = os.getenv("S3_ACCESS_KEY", "UN0-admin")
|
|
S3_SECRET = os.getenv("S3_SECRET_KEY", "RAygtZHqGN49qKn")
|
|
S3_REGION = os.getenv("S3_REGION", "us-east-1")
|
|
|
|
def get_s3():
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=S3_ENDPOINT,
|
|
aws_access_key_id=S3_ACCESS,
|
|
aws_secret_access_key=S3_SECRET,
|
|
region_name=S3_REGION,
|
|
config=Config(signature_version="s3v4"),
|
|
)
|
|
|
|
def parse_s3_uri(uri: str) -> tuple[str, str]:
|
|
if not uri.startswith("s3://"):
|
|
raise ValueError(f"Not an s3:// URI: {uri}")
|
|
parts = uri[5:].split("/", 1)
|
|
if len(parts) != 2:
|
|
raise ValueError(f"Bad s3 URI: {uri}")
|
|
return parts[0], parts[1]
|
|
|
|
|
|
# ─── Модели ───────────────────────────────────────────────────────────────────
|
|
class JobStatus(str, Enum):
|
|
WAITING = "pending" # Ждёт начала обработки (pending для обратной совместимости)
|
|
PROCESSING = "processing"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
class JobInfo(BaseModel):
|
|
job_id: str
|
|
status: JobStatus
|
|
created_at: str
|
|
updated_at: str
|
|
input_s3: Optional[str] = None
|
|
output_s3: Optional[str] = None
|
|
error: Optional[str] = None
|
|
progress: Optional[int] = None
|
|
|
|
class FfmpegS3Request(BaseModel):
|
|
input_s3: str
|
|
ffmpeg_args: list[str]
|
|
output_s3: str | None = None
|
|
job_id: str | None = None
|
|
|
|
|
|
# ─── Хранилище задач и очередь ────────────────────────────────────────────────
|
|
jobs: Dict[str, Dict[str, Any]] = {}
|
|
job_queue: asyncio.Queue = asyncio.Queue()
|
|
JOB_TTL = timedelta(hours=1)
|
|
jobs_lock = asyncio.Lock()
|
|
NUM_WORKERS = 4
|
|
|
|
|
|
# ─── Очистка старых задач ─────────────────────────────────────────────────────
|
|
async def cleanup_old_jobs():
|
|
now = datetime.utcnow()
|
|
to_delete = []
|
|
async with jobs_lock:
|
|
for job_id, job in jobs.items():
|
|
if job["status"] in (JobStatus.COMPLETED, JobStatus.FAILED):
|
|
updated = datetime.fromisoformat(job["updated_at"])
|
|
if now - updated > JOB_TTL:
|
|
to_delete.append(job_id)
|
|
for job_id in to_delete:
|
|
del jobs[job_id]
|
|
|
|
|
|
# ─── Воркер очереди ───────────────────────────────────────────────────────────
|
|
async def job_worker(worker_id: int):
|
|
"""Обработчик задач из очереди"""
|
|
while True:
|
|
job_data = await job_queue.get()
|
|
job_id = job_data["job_id"]
|
|
try:
|
|
await process_ffmpeg_job(job_id, job_data)
|
|
finally:
|
|
job_queue.task_done()
|
|
|
|
|
|
async def process_ffmpeg_job(job_id: str, job_data: Dict[str, Any]):
|
|
"""Асинхронная обработка задачи"""
|
|
s3 = get_s3()
|
|
|
|
in_bucket = job_data["in_bucket"]
|
|
in_key = job_data["in_key"]
|
|
out_bucket = job_data["out_bucket"]
|
|
out_key = job_data["out_key"]
|
|
output_s3 = job_data["output_s3"]
|
|
ffmpeg_args = job_data["ffmpeg_args"]
|
|
tmp_in = job_data["tmp_in"]
|
|
tmp_out = job_data["tmp_out"]
|
|
|
|
try:
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.PROCESSING
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
# 1. Скачать из MinIO
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None, lambda: s3.download_file(in_bucket, in_key, str(tmp_in))
|
|
)
|
|
|
|
# 2. Запустить FFmpeg
|
|
cmd = ["ffmpeg", "-i", str(tmp_in), *ffmpeg_args, "-y", str(tmp_out)]
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
cwd=str(FILES_DIR),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
error_msg = stderr.decode("utf-8", errors="ignore")[-8000:] if stderr else "Unknown error"
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.FAILED
|
|
jobs[job_id]["error"] = error_msg
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
return
|
|
|
|
# 3. Загрузить результат в MinIO
|
|
async with aiofiles.open(str(tmp_out), "rb") as f:
|
|
content = await f.read()
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None, lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4")
|
|
)
|
|
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.COMPLETED
|
|
jobs[job_id]["output_s3"] = output_s3
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
except Exception as e:
|
|
async with jobs_lock:
|
|
jobs[job_id]["status"] = JobStatus.FAILED
|
|
jobs[job_id]["error"] = str(e)
|
|
jobs[job_id]["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
finally:
|
|
try:
|
|
tmp_in.unlink(missing_ok=True)
|
|
tmp_out.unlink(missing_ok=True)
|
|
except:
|
|
pass
|
|
|
|
|
|
# ─── Запуск воркеров при старте ───────────────────────────────────────────────
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Запуск воркеров очереди"""
|
|
for i in range(NUM_WORKERS):
|
|
asyncio.create_task(job_worker(i))
|
|
|
|
|
|
# ─── POST /run-s3 ─────────────────────────────────────────────────────────────
|
|
@app.post("/run-s3")
|
|
async def run_ffmpeg_s3(req: FfmpegS3Request):
|
|
"""Создаёт задачу и добавляет в очередь, сразу возвращает job_id"""
|
|
tmp_id = req.job_id or str(uuid.uuid4())
|
|
|
|
try:
|
|
in_bucket, in_key = parse_s3_uri(req.input_s3)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
if req.output_s3:
|
|
try:
|
|
out_bucket, out_key = parse_s3_uri(req.output_s3)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
else:
|
|
folder = "/".join(in_key.split("/")[:-1])
|
|
ext = in_key.rsplit(".", 1)[-1] if "." in in_key else "mp4"
|
|
out_key = f"{folder}/{tmp_id}_processed.{ext}" if folder else f"{tmp_id}_processed.{ext}"
|
|
out_bucket = in_bucket
|
|
|
|
tmp_in = FILES_DIR / f"_in_{tmp_id}.{in_key.rsplit('.',1)[-1] if '.' in in_key else 'mp4'}"
|
|
tmp_out = FILES_DIR / f"_out_{tmp_id}.mp4"
|
|
output_s3 = f"s3://{out_bucket}/{out_key}"
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
async with jobs_lock:
|
|
jobs[tmp_id] = {
|
|
"job_id": tmp_id,
|
|
"status": JobStatus.WAITING,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
"input_s3": req.input_s3,
|
|
"output_s3": output_s3,
|
|
"error": None,
|
|
"progress": None,
|
|
}
|
|
|
|
# Добавляем в очередь
|
|
await job_queue.put({
|
|
"job_id": tmp_id,
|
|
"in_bucket": in_bucket,
|
|
"in_key": in_key,
|
|
"out_bucket": out_bucket,
|
|
"out_key": out_key,
|
|
"output_s3": output_s3,
|
|
"ffmpeg_args": req.ffmpeg_args,
|
|
"tmp_in": tmp_in,
|
|
"tmp_out": tmp_out,
|
|
})
|
|
|
|
await cleanup_old_jobs()
|
|
|
|
return {
|
|
"job_id": tmp_id,
|
|
"status": JobStatus.WAITING,
|
|
"status_url": f"/status/{tmp_id}",
|
|
"queue_position": job_queue.qsize(),
|
|
}
|
|
|
|
|
|
# ─── GET /status/{job_id} ─────────────────────────────────────────────────────
|
|
@app.get("/status/{job_id}", response_model=JobInfo)
|
|
async def get_job_status(job_id: str):
|
|
"""Получить статус задачи"""
|
|
async with jobs_lock:
|
|
if job_id not in jobs:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
job = jobs[job_id].copy()
|
|
return JobInfo(**job)
|
|
|
|
|
|
# ─── GET /status ──────────────────────────────────────────────────────────────
|
|
@app.get("/status")
|
|
async def list_statuses():
|
|
"""Список всех задач"""
|
|
async with jobs_lock:
|
|
return [
|
|
{
|
|
"job_id": j["job_id"],
|
|
"status": j["status"],
|
|
"created_at": j["created_at"],
|
|
"updated_at": j["updated_at"],
|
|
}
|
|
for j in jobs.values()
|
|
]
|
|
|
|
|
|
# ─── DELETE /jobs/{job_id} ────────────────────────────────────────────────────
|
|
@app.delete("/jobs/{job_id}")
|
|
async def delete_job(job_id: str):
|
|
"""Удалить задачу"""
|
|
async with jobs_lock:
|
|
if job_id not in jobs:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
del jobs[job_id]
|
|
return {"ok": True, "message": "Job deleted"}
|
|
|
|
|
|
# ─── Health check ─────────────────────────────────────────────────────────────
|
|
@app.get("/health")
|
|
async def health_check():
|
|
return {
|
|
"status": "ok",
|
|
"jobs_count": len(jobs),
|
|
"queue_size": job_queue.qsize(),
|
|
"workers": NUM_WORKERS,
|
|
}
|