from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional, Dict, Any, List import subprocess import os import uuid from pathlib import Path from datetime import datetime, timedelta from enum import Enum import asyncio import aiofiles import boto3 from botocore.client import Config app = FastAPI() FILES_DIR = Path("/files") # ─── S3 / MinIO ─────────────────────────────────────────────────────────────── S3_ENDPOINT = os.getenv("S3_ENDPOINT", "http://minio:9000") S3_ACCESS = os.getenv("S3_ACCESS_KEY", "UN0-admin") S3_SECRET = os.getenv("S3_SECRET_KEY", "RAygtZHqGN49qKn") S3_REGION = os.getenv("S3_REGION", "us-east-1") def get_s3(): return boto3.client( "s3", endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS, aws_secret_access_key=S3_SECRET, region_name=S3_REGION, config=Config(signature_version="s3v4"), ) def parse_s3_uri(uri: str) -> tuple[str, str]: if not uri.startswith("s3://"): raise ValueError(f"Not an s3:// URI: {uri}") parts = uri[5:].split("/", 1) if len(parts) != 2: raise ValueError(f"Bad s3 URI: {uri}") return parts[0], parts[1] # ─── Модели ─────────────────────────────────────────────────────────────────── class JobStatus(str, Enum): WAITING = "pending" # Ждёт начала обработки (pending для обратной совместимости) PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" class JobInfo(BaseModel): job_id: str status: JobStatus created_at: str updated_at: str input_s3: Optional[str] = None output_s3: Optional[str] = None error: Optional[str] = None progress: Optional[int] = None class FfmpegS3Request(BaseModel): input_s3: str ffmpeg_args: list[str] output_s3: str | None = None job_id: str | None = None # ─── Хранилище задач и очередь ──────────────────────────────────────────────── jobs: Dict[str, Dict[str, Any]] = {} job_queue: asyncio.Queue = asyncio.Queue() JOB_TTL = timedelta(hours=1) jobs_lock = asyncio.Lock() NUM_WORKERS = 4 # ─── Очистка старых задач ───────────────────────────────────────────────────── async def cleanup_old_jobs(): now = datetime.utcnow() to_delete = [] async with jobs_lock: for job_id, job in jobs.items(): if job["status"] in (JobStatus.COMPLETED, JobStatus.FAILED): updated = datetime.fromisoformat(job["updated_at"]) if now - updated > JOB_TTL: to_delete.append(job_id) for job_id in to_delete: del jobs[job_id] # ─── Воркер очереди ─────────────────────────────────────────────────────────── async def job_worker(worker_id: int): """Обработчик задач из очереди""" while True: job_data = await job_queue.get() job_id = job_data["job_id"] try: await process_ffmpeg_job(job_id, job_data) finally: job_queue.task_done() async def process_ffmpeg_job(job_id: str, job_data: Dict[str, Any]): """Асинхронная обработка задачи""" s3 = get_s3() in_bucket = job_data["in_bucket"] in_key = job_data["in_key"] out_bucket = job_data["out_bucket"] out_key = job_data["out_key"] output_s3 = job_data["output_s3"] ffmpeg_args = job_data["ffmpeg_args"] tmp_in = job_data["tmp_in"] tmp_out = job_data["tmp_out"] try: async with jobs_lock: jobs[job_id]["status"] = JobStatus.PROCESSING jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() # 1. Скачать из MinIO await asyncio.get_event_loop().run_in_executor( None, lambda: s3.download_file(in_bucket, in_key, str(tmp_in)) ) # 2. Запустить FFmpeg cmd = ["ffmpeg", "-i", str(tmp_in), *ffmpeg_args, "-y", str(tmp_out)] proc = await asyncio.create_subprocess_exec( *cmd, cwd=str(FILES_DIR), stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode("utf-8", errors="ignore")[-8000:] if stderr else "Unknown error" async with jobs_lock: jobs[job_id]["status"] = JobStatus.FAILED jobs[job_id]["error"] = error_msg jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() return # 3. Загрузить результат в MinIO async with aiofiles.open(str(tmp_out), "rb") as f: content = await f.read() await asyncio.get_event_loop().run_in_executor( None, lambda: s3.put_object(Bucket=out_bucket, Key=out_key, Body=content, ContentType="video/mp4") ) async with jobs_lock: jobs[job_id]["status"] = JobStatus.COMPLETED jobs[job_id]["output_s3"] = output_s3 jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() except Exception as e: async with jobs_lock: jobs[job_id]["status"] = JobStatus.FAILED jobs[job_id]["error"] = str(e) jobs[job_id]["updated_at"] = datetime.utcnow().isoformat() finally: try: tmp_in.unlink(missing_ok=True) tmp_out.unlink(missing_ok=True) except: pass # ─── Запуск воркеров при старте ─────────────────────────────────────────────── @app.on_event("startup") async def startup_event(): """Запуск воркеров очереди""" for i in range(NUM_WORKERS): asyncio.create_task(job_worker(i)) # ─── POST /run-s3 ───────────────────────────────────────────────────────────── @app.post("/run-s3") async def run_ffmpeg_s3(req: FfmpegS3Request): """Создаёт задачу и добавляет в очередь, сразу возвращает job_id""" tmp_id = req.job_id or str(uuid.uuid4()) try: in_bucket, in_key = parse_s3_uri(req.input_s3) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if req.output_s3: try: out_bucket, out_key = parse_s3_uri(req.output_s3) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) else: folder = "/".join(in_key.split("/")[:-1]) ext = in_key.rsplit(".", 1)[-1] if "." in in_key else "mp4" out_key = f"{folder}/{tmp_id}_processed.{ext}" if folder else f"{tmp_id}_processed.{ext}" out_bucket = in_bucket tmp_in = FILES_DIR / f"_in_{tmp_id}.{in_key.rsplit('.',1)[-1] if '.' in in_key else 'mp4'}" tmp_out = FILES_DIR / f"_out_{tmp_id}.mp4" output_s3 = f"s3://{out_bucket}/{out_key}" now = datetime.utcnow().isoformat() async with jobs_lock: jobs[tmp_id] = { "job_id": tmp_id, "status": JobStatus.WAITING, "created_at": now, "updated_at": now, "input_s3": req.input_s3, "output_s3": output_s3, "error": None, "progress": None, } # Добавляем в очередь await job_queue.put({ "job_id": tmp_id, "in_bucket": in_bucket, "in_key": in_key, "out_bucket": out_bucket, "out_key": out_key, "output_s3": output_s3, "ffmpeg_args": req.ffmpeg_args, "tmp_in": tmp_in, "tmp_out": tmp_out, }) await cleanup_old_jobs() return { "job_id": tmp_id, "status": JobStatus.WAITING, "status_url": f"/status/{tmp_id}", "queue_position": job_queue.qsize(), } # ─── GET /status/{job_id} ───────────────────────────────────────────────────── @app.get("/status/{job_id}", response_model=JobInfo) async def get_job_status(job_id: str): """Получить статус задачи""" async with jobs_lock: if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id].copy() return JobInfo(**job) # ─── GET /status ────────────────────────────────────────────────────────────── @app.get("/status") async def list_statuses(): """Список всех задач""" async with jobs_lock: return [ { "job_id": j["job_id"], "status": j["status"], "created_at": j["created_at"], "updated_at": j["updated_at"], } for j in jobs.values() ] # ─── DELETE /jobs/{job_id} ──────────────────────────────────────────────────── @app.delete("/jobs/{job_id}") async def delete_job(job_id: str): """Удалить задачу""" async with jobs_lock: if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") del jobs[job_id] return {"ok": True, "message": "Job deleted"} # ─── Health check ───────────────────────────────────────────────────────────── @app.get("/health") async def health_check(): return { "status": "ok", "jobs_count": len(jobs), "queue_size": job_queue.qsize(), "workers": NUM_WORKERS, }