commit dc77aaed9e0902ef4a5865f3c17de67c2c226651 Author: bolshakovsky Date: Tue May 19 10:12:57 2026 +0000 build: first commit diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..db113bf --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +# HuggingFace token (обязателен для pyannote) +# Получить: https://huggingface.co/settings/tokens +# Принять условия: +# https://huggingface.co/pyannote/segmentation-3.0 +# https://huggingface.co/pyannote/speaker-diarization-3.1 +HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +# Whisper settings +WHISPER_MODEL=large-v3 +WHISPER_BATCH_SIZE=16 + +# Pyannote device: cpu (рекомендуется) или cuda (нужен PyTorch с CUDA) +PYANNOTE_DEVICE=cuda diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..43a2a99 --- /dev/null +++ b/README.md @@ -0,0 +1,262 @@ +# 3_diarization — микросервисная транскрибация с диаризацией + +Три Docker-контейнера: Whisper (GPU) + pyannote (CPU) + API Gateway. +Работают параллельно. Итоговое время ≈ max(whisper, pyannote), не сумма. + +## Архитектура + +``` + POST /process (audio.aac) + │ + ┌──────┴──────┐ + │ Gateway │ :8000 + └──────┬──────┘ + ┌───────────┴───────────┐ + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ Whisper │ │ Pyannote │ + │ (GPU) │ │ (CPU) │ + │ :8001 │ │ :8002 │ + └──────┬───────┘ └──────┬───────┘ + │ │ + ▼ ▼ + текст + слова кто когда говорил + с таймкодами SPEAKER_00: 0–4с + │ │ + └───────────┬───────────┘ + ▼ + reconciliation + (сшивка по словам) + │ + ▼ + SPEAKER_00: Привет, как дела с проектом? + SPEAKER_01: Нормально, закончил doctype. +``` + +## Предварительные требования + +1. **Docker** + **Docker Compose** v2+ +2. **NVIDIA Container Toolkit** (`nvidia-ctk`) + + ```bash + # проверить, что GPU виден Docker'у + docker run --rm --gpus all nvidia/smi + ``` + +3. **HuggingFace-токен** для pyannote + + - Создать токен: https://huggingface.co/settings/tokens + - Принять условия (заполнить форму на каждой странице): + - https://huggingface.co/pyannote/segmentation-3.0 + - https://huggingface.co/pyannote/speaker-diarization-3.1 + +## Быстрый старт + +```bash +# 1. Клонировать / скопировать папку 3_diarization +cd 3_diarization + +# 2. Создать .env из примера и вставить свой HF-токен +cp .env.example .env +nano .env # вставить HF_TOKEN=hf_... + +# 3. Собрать и запустить +docker compose up --build -d + +# 4. Подождать ~1–2 минуты, пока модели загрузятся +docker compose logs -f +# Ждём: +# whisper-service: "Model loaded in X.Xs" +# pyannote-service: "Pipeline loaded in X.Xs" + +# 5. Проверить здоровье +curl http://localhost:8000/health +``` + +## API + +### POST /transcribe — только транскрибация + +```bash +curl -X POST http://localhost:8000/transcribe \ + -F "file=@audio.aac" \ + -F "language=ru" \ + -F "initial_prompt=Event Forge, Frappe, doctype" \ + -F "batch_size=16" +``` + +Ответ (JSON): +```json +{ + "language": "ru", + "language_probability": 0.99, + "duration": 197.6, + "processing_time": 8.2, + "segments": [ + { + "start": 0.0, + "end": 3.2, + "text": "Привет, это запись.", + "words": [ + {"start": 0.0, "end": 0.4, "word": " Привет"}, + {"start": 0.4, "end": 0.6, "word": ","}, + {"start": 0.7, "end": 1.1, "word": " это"}, + {"start": 1.1, "end": 1.5, "word": " запись."} + ] + } + ] +} +``` + +### POST /diarize — только диаризация + +```bash +curl -X POST http://localhost:8000/diarize \ + -F "file=@audio.aac" \ + -F "num_speakers=2" +``` + +Ответ: +```json +{ + "speakers": ["SPEAKER_00", "SPEAKER_01"], + "num_speakers": 2, + "processing_time": 45.3, + "turns": [ + {"start": 0.0, "end": 4.2, "speaker": "SPEAKER_00"}, + {"start": 4.2, "end": 9.1, "speaker": "SPEAKER_01"} + ] +} +``` + +### POST /process — транскрибация + диаризация (параллельно) + +```bash +curl -X POST http://localhost:8000/process \ + -F "file=@audio.aac" \ + -F "language=ru" \ + -F "initial_prompt=Event Forge, Frappe, doctype, Vladimir" \ + -F "num_speakers=2" \ + -F "batch_size=16" +``` + +Ответ: +```json +{ + "language": "ru", + "duration": 197.6, + "processing_time": 47.1, + "whisper_time": 8.2, + "pyannote_time": 45.3, + "speakers": ["SPEAKER_00", "SPEAKER_01"], + "num_speakers": 2, + "utterances": [ + { + "speaker": "SPEAKER_00", + "start": 0.0, + "end": 4.1, + "text": "Привет, как дела с проектом?", + "words": [...] + }, + { + "speaker": "SPEAKER_01", + "start": 4.2, + "end": 9.0, + "text": "Нормально, закончил doctype для метрик.", + "words": [...] + } + ] +} +``` + +### Форматы вывода + +Добавьте `response_format` в `/transcribe` или `/process`: + +```bash +# SRT-субтитры с именами спикеров +curl -X POST http://localhost:8000/process \ + -F "file=@audio.aac" \ + -F "language=ru" \ + -F "response_format=srt" + +# VTT +-F "response_format=vtt" + +# Текст (спикер: реплика) +-F "response_format=txt" +``` + +## Параметры + +| Параметр | Где | По умолчанию | Описание | +|------------------|---------------|--------------|---------------------------------------| +| `language` | whisper | auto | Код языка (ru, en, de, ...) | +| `initial_prompt` | whisper | — | Термины/имена через запятую | +| `beam_size` | whisper | 5 | Размер beam search | +| `batch_size` | whisper | 16 | Размер батча (1–32) | +| `num_speakers` | pyannote | auto | Точное число спикеров | +| `min_speakers` | pyannote | — | Минимум спикеров | +| `max_speakers` | pyannote | — | Максимум спикеров | +| `response_format`| gateway | json | json / srt / vtt / txt | + +## Конфигурация через .env + +```bash +# Обязательные +HF_TOKEN=hf_xxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +# Опциональные +WHISPER_MODEL=large-v3 # tiny/base/small/medium/large-v3/turbo +WHISPER_BATCH_SIZE=16 # размер батча +PYANNOTE_DEVICE=cpu # cpu или cuda +``` + +## Полезные команды + +```bash +# Логи конкретного сервиса +docker compose logs -f whisper-service +docker compose logs -f pyannote-service + +# Перезапустить один сервис (например, после смены модели) +docker compose restart whisper-service + +# Остановить всё +docker compose down + +# Пересобрать всё с нуля +docker compose down && docker compose up --build -d +``` + +## Производительность (RTX 5060 16 ГБ) + +Ориентировочные цифры для 1 часа аудио: + +| Операция | Время | +|-----------------------|------------| +| Whisper large-v3 b=16 | ~2–4 мин | +| Pyannote CPU | ~5–10 мин | +| /process (параллельно)| ~5–10 мин | +| Reconciliation | ~1 сек | + +Bottleneck — pyannote на CPU. Если нужно ещё быстрее — поменяйте +`PYANNOTE_DEVICE=cuda` в `.env` и пересоберите pyannote-service с GPU-версией +PyTorch (потребует изменения Dockerfile). + +## Траблшутинг + +**Контейнер whisper-service не стартует / `no CUDA-capable device`** — проверьте +`nvidia-container-toolkit`. Тест: `docker run --rm --gpus all nvidia/smi`. + +**pyannote-service падает с `401 Unauthorized`** — невалидный HF_TOKEN или не +приняты условия моделей на HuggingFace. + +**`CUBLAS_STATUS_NOT_SUPPORTED`** — int8 на Blackwell. В конфиге уже стоит +`WHISPER_COMPUTE_TYPE=float16`. + +**OOM при batch_size=16** — уменьшите `WHISPER_BATCH_SIZE=8` в `.env`. + +**Первый запрос медленный** — модели загружаются при старте контейнера +(large-v3 ~3 ГБ, pyannote ~500 МБ). Дождитесь `Model loaded` в логах. +Дальнейшие запросы используют загруженные модели. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..dd884c9 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,74 @@ +services: + + whisper-service: + build: ./whisper-service + ports: + - "8001:8001" + volumes: + - shared-data:/data + - whisper-cache:/root/.cache/huggingface + environment: + - WHISPER_MODEL=${WHISPER_MODEL:-large-v3} + - WHISPER_DEVICE=cuda + - WHISPER_COMPUTE_TYPE=float16 + - WHISPER_BATCH_SIZE=${WHISPER_BATCH_SIZE:-16} + - WHISPER_UNLOAD_AFTER=10 + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8001/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + restart: unless-stopped + + pyannote-service: + build: ./pyannote-service + ports: + - "8002:8002" + volumes: + - shared-data:/data + - pyannote-cache:/root/.cache/huggingface + environment: + - HF_TOKEN=${HF_TOKEN} + - PYANNOTE_DEVICE=${PYANNOTE_DEVICE:-cuda} + - PYANNOTE_UNLOAD_AFTER=10 + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8002/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 120s + restart: unless-stopped + + gateway: + build: ./gateway + ports: + - "8000:8000" + environment: + - WHISPER_URL=http://whisper-service:8001 + - PYANNOTE_URL=http://pyannote-service:8002 + depends_on: + whisper-service: + condition: service_healthy + pyannote-service: + condition: service_healthy + restart: unless-stopped + +volumes: + shared-data: + whisper-cache: + pyannote-cache: \ No newline at end of file diff --git a/gateway/Dockerfile b/gateway/Dockerfile new file mode 100644 index 0000000..e039be2 --- /dev/null +++ b/gateway/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +RUN apt-get update && \ + apt-get install -y --no-install-recommends curl && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app.py . + +EXPOSE 8000 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/gateway/app.py b/gateway/app.py new file mode 100644 index 0000000..4eb526d --- /dev/null +++ b/gateway/app.py @@ -0,0 +1,353 @@ +""" +API Gateway: маршрутизация запросов и reconciliation. + +POST /transcribe → whisper-service (только транскрибация) +POST /diarize → pyannote-service (только диаризация) +POST /process → оба параллельно + сшивка по словам + +Управление VRAM: + POST /whisper/load → загрузить whisper + POST /whisper/unload → выгрузить whisper + POST /pyannote/load → загрузить pyannote + POST /pyannote/unload → выгрузить pyannote + POST /ollama/unload → выгрузить текущую модель ollama (keep_alive=0) +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import time + +import httpx +from fastapi import FastAPI, File, Form, UploadFile +from fastapi.responses import JSONResponse + +logger = logging.getLogger("gateway") +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s: %(message)s") + +WHISPER_URL = os.getenv("WHISPER_URL", "http://whisper-service:8001") +PYANNOTE_URL = os.getenv("PYANNOTE_URL", "http://pyannote-service:8002") +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://ollama:11434") + +TIMEOUT = httpx.Timeout(timeout=1800.0) + +app = FastAPI( + title="Speech Processing API", + description="Транскрибация + диаризация аудио. Whisper (GPU) + pyannote (CPU), параллельно.", + version="4.0.0", +) + + +# ────────────────────── VRAM management proxies ────────────────────── + +@app.post("/whisper/load") +async def whisper_load(): + async with httpx.AsyncClient() as client: + r = await client.post(f"{WHISPER_URL}/load", timeout=120.0) + return r.json() + + +@app.post("/whisper/unload") +async def whisper_unload(): + async with httpx.AsyncClient() as client: + r = await client.post(f"{WHISPER_URL}/unload", timeout=30.0) + return r.json() + + +@app.post("/pyannote/load") +async def pyannote_load(): + async with httpx.AsyncClient() as client: + r = await client.post(f"{PYANNOTE_URL}/load", timeout=120.0) + return r.json() + + +@app.post("/pyannote/unload") +async def pyannote_unload(): + async with httpx.AsyncClient() as client: + r = await client.post(f"{PYANNOTE_URL}/unload", timeout=30.0) + return r.json() + + +@app.post("/ollama/unload") +async def ollama_unload(model: str = "llama3.2"): + """Выгрузить модель ollama (keep_alive=0).""" + async with httpx.AsyncClient() as client: + r = await client.post( + f"{OLLAMA_URL}/api/generate", + json={"model": model, "keep_alive": 0}, + timeout=30.0, + ) + return {"status": "unloaded", "model": model} + + +# ────────────────────── helpers ────────────────────── + +def _format_timestamp_srt(seconds: float) -> str: + if seconds < 0: + seconds = 0 + total_ms = round(seconds * 1000) + hours, total_ms = divmod(total_ms, 3_600_000) + minutes, total_ms = divmod(total_ms, 60_000) + secs, ms = divmod(total_ms, 1000) + return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" + + +def _format_timestamp_vtt(seconds: float) -> str: + if seconds < 0: + seconds = 0 + total_ms = round(seconds * 1000) + hours, total_ms = divmod(total_ms, 3_600_000) + minutes, total_ms = divmod(total_ms, 60_000) + secs, ms = divmod(total_ms, 1000) + return f"{hours:02d}:{minutes:02d}:{secs:02d}.{ms:03d}" + + +async def _call_whisper( + client: httpx.AsyncClient, + audio_bytes: bytes, + filename: str, + language: str | None, + initial_prompt: str | None, + beam_size: int, + batch_size: int, +) -> dict: + language = language if language and language not in ("string", "") else None + initial_prompt = initial_prompt if initial_prompt and initial_prompt not in ("string", "") else None + + data = {"beam_size": str(beam_size), "batch_size": str(batch_size), "word_timestamps": "true"} + if language: + data["language"] = language + if initial_prompt: + data["initial_prompt"] = initial_prompt + + logger.info(f"[whisper] Sending {filename} ({len(audio_bytes)} bytes)") + t0 = time.perf_counter() + resp = await client.post( + f"{WHISPER_URL}/transcribe", + files={"file": (filename, audio_bytes)}, + data=data, + timeout=TIMEOUT, + ) + logger.info(f"[whisper] {resp.status_code} in {time.perf_counter() - t0:.1f}s") + resp.raise_for_status() + return resp.json() + + +async def _call_pyannote( + client: httpx.AsyncClient, + audio_bytes: bytes, + filename: str, + num_speakers: int | None, + min_speakers: int | None, + max_speakers: int | None, + min_duration: float = 0.5, + merge_gap: float = 0.3, +) -> dict: + data = {"min_duration": str(min_duration), "merge_gap": str(merge_gap)} + if num_speakers is not None: + data["num_speakers"] = str(num_speakers) + if min_speakers is not None: + data["min_speakers"] = str(min_speakers) + if max_speakers is not None: + data["max_speakers"] = str(max_speakers) + + logger.info(f"[pyannote] Sending {filename} ({len(audio_bytes)} bytes)") + t0 = time.perf_counter() + resp = await client.post( + f"{PYANNOTE_URL}/diarize", + files={"file": (filename, audio_bytes)}, + data=data, + timeout=TIMEOUT, + ) + logger.info(f"[pyannote] {resp.status_code} in {time.perf_counter() - t0:.1f}s") + resp.raise_for_status() + return resp.json() + + +def _reconcile(whisper_result: dict, pyannote_result: dict) -> list[dict]: + turns = pyannote_result.get("turns", []) + if not turns: + return [ + {"speaker": "SPEAKER_00", "start": seg["start"], "end": seg["end"], "text": seg["text"]} + for seg in whisper_result.get("segments", []) + ] + + all_words = [] + for seg in whisper_result.get("segments", []): + words = seg.get("words") + if words: + all_words.extend(words) + else: + all_words.append({"start": seg["start"], "end": seg["end"], "word": seg["text"]}) + + if not all_words: + return [] + + def find_speaker(midpoint: float) -> str: + for turn in turns: + if turn["start"] <= midpoint <= turn["end"]: + return turn["speaker"] + min_dist = float("inf") + closest = turns[0]["speaker"] + for turn in turns: + dist = min(abs(midpoint - turn["start"]), abs(midpoint - turn["end"])) + if dist < min_dist: + min_dist = dist + closest = turn["speaker"] + return closest + + for w in all_words: + w["speaker"] = find_speaker((w["start"] + w["end"]) / 2) + + utterances: list[dict] = [] + current_speaker = all_words[0]["speaker"] + current_words = [all_words[0]] + + for w in all_words[1:]: + if w["speaker"] == current_speaker: + current_words.append(w) + else: + utterances.append({ + "speaker": current_speaker, + "start": round(current_words[0]["start"], 3), + "end": round(current_words[-1]["end"], 3), + "text": "".join(w["word"] for w in current_words).strip(), + }) + current_speaker = w["speaker"] + current_words = [w] + + if current_words: + utterances.append({ + "speaker": current_speaker, + "start": round(current_words[0]["start"], 3), + "end": round(current_words[-1]["end"], 3), + "text": "".join(w["word"] for w in current_words).strip(), + }) + + return utterances + + +def _to_srt(utterances: list[dict]) -> str: + lines = [] + for i, u in enumerate(utterances, 1): + lines += [str(i), f"{_format_timestamp_srt(u['start'])} --> {_format_timestamp_srt(u['end'])}", f"[{u['speaker']}] {u['text']}", ""] + return "\n".join(lines) + + +def _to_vtt(utterances: list[dict]) -> str: + lines = ["WEBVTT", ""] + for u in utterances: + lines += [f"{_format_timestamp_vtt(u['start'])} --> {_format_timestamp_vtt(u['end'])}", f"[{u['speaker']}] {u['text']}", ""] + return "\n".join(lines) + + +def _to_txt(utterances: list[dict]) -> str: + return "\n".join(f"{u['speaker']}: {u['text']}" for u in utterances) + + +# ────────────────────── endpoints ────────────────────── + +@app.get("/health") +async def health(): + async with httpx.AsyncClient() as client: + results = {} + for name, url in [("whisper", WHISPER_URL), ("pyannote", PYANNOTE_URL), ("ollama", OLLAMA_URL)]: + try: + r = await client.get(f"{url}/health" if name != "ollama" else f"{url}/", timeout=3.0) + results[name] = "ok" if r.status_code < 400 else "error" + except Exception: + results[name] = "unreachable" + return {"status": "ok", "services": results} + + +@app.post("/transcribe") +async def transcribe( + file: UploadFile = File(...), + language: str | None = Form(None), + initial_prompt: str | None = Form(None), + beam_size: int = Form(5), + batch_size: int = Form(16), + response_format: str = Form("json"), +): + audio_bytes = await file.read() + async with httpx.AsyncClient() as client: + result = await _call_whisper(client, audio_bytes, file.filename or "audio.wav", + language, initial_prompt, beam_size, batch_size) + + if response_format == "srt": + return JSONResponse({"format": "srt", "content": _to_srt([ + {"speaker": "", "start": s["start"], "end": s["end"], "text": s["text"]} + for s in result["segments"] + ])}) + if response_format == "vtt": + return JSONResponse({"format": "vtt", "content": _to_vtt([ + {"speaker": "", "start": s["start"], "end": s["end"], "text": s["text"]} + for s in result["segments"] + ])}) + if response_format == "txt": + return JSONResponse({"format": "txt", "content": " ".join(s["text"] for s in result["segments"])}) + return result + + +@app.post("/diarize") +async def diarize( + file: UploadFile = File(...), + num_speakers: int | None = Form(None), + min_speakers: int | None = Form(None), + max_speakers: int | None = Form(None), + min_duration: float = Form(0.5), + merge_gap: float = Form(0.3), +): + audio_bytes = await file.read() + async with httpx.AsyncClient() as client: + return await _call_pyannote(client, audio_bytes, file.filename or "audio.wav", + num_speakers, min_speakers, max_speakers, min_duration, merge_gap) + + +@app.post("/process") +async def process( + file: UploadFile = File(...), + language: str | None = Form(None), + initial_prompt: str | None = Form(None), + beam_size: int = Form(5), + batch_size: int = Form(16), + num_speakers: int | None = Form(None), + min_speakers: int | None = Form(None), + max_speakers: int | None = Form(None), + min_duration: float = Form(0.5), + merge_gap: float = Form(0.3), + response_format: str = Form("json"), +): + t0 = time.perf_counter() + audio_bytes = await file.read() + filename = file.filename or "audio.wav" + + async with httpx.AsyncClient() as client: + whisper_result, pyannote_result = await asyncio.gather( + _call_whisper(client, audio_bytes, filename, language, initial_prompt, beam_size, batch_size), + _call_pyannote(client, audio_bytes, filename, num_speakers, min_speakers, max_speakers, min_duration, merge_gap), + ) + + utterances = _reconcile(whisper_result, pyannote_result) + elapsed = time.perf_counter() - t0 + + if response_format == "srt": + return JSONResponse({"format": "srt", "content": _to_srt(utterances), "processing_time": round(elapsed, 3)}) + if response_format == "vtt": + return JSONResponse({"format": "vtt", "content": _to_vtt(utterances), "processing_time": round(elapsed, 3)}) + if response_format == "txt": + return JSONResponse({"format": "txt", "content": _to_txt(utterances), "processing_time": round(elapsed, 3)}) + + return { + "language": whisper_result.get("language"), + "language_probability": whisper_result.get("language_probability"), + "duration": whisper_result.get("duration"), + "processing_time": round(elapsed, 3), + "whisper_time": whisper_result.get("processing_time"), + "pyannote_time": pyannote_result.get("processing_time"), + "speakers": pyannote_result.get("speakers", []), + "num_speakers": pyannote_result.get("num_speakers", 0), + "utterances": utterances, + } \ No newline at end of file diff --git a/gateway/requirements.txt b/gateway/requirements.txt new file mode 100644 index 0000000..c3d7c77 --- /dev/null +++ b/gateway/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.* +uvicorn[standard]==0.34.* +python-multipart==0.0.* +httpx==0.28.* diff --git a/pyannote-service/Dockerfile b/pyannote-service/Dockerfile new file mode 100644 index 0000000..6b1be7a --- /dev/null +++ b/pyannote-service/Dockerfile @@ -0,0 +1,25 @@ +FROM nvidia/cuda:12.9.0-runtime-ubuntu24.04 + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + python3 python3-pip python3-venv libpython3.12-dev \ + ffmpeg libsndfile1 curl \ + libavutil-dev libavcodec-dev libavformat-dev libavdevice-dev libavfilter-dev && \ + ln -sf /usr/bin/python3 /usr/bin/python && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Точные версии из рабочего окружения: torch 2.11.0+cu130, sm_120 поддерживается +RUN pip install --no-cache-dir --break-system-packages \ + torch==2.11.0 torchaudio==2.11.0 \ + --index-url https://download.pytorch.org/whl/cu130 + +COPY requirements.txt . +RUN pip install --no-cache-dir --break-system-packages -r requirements.txt + +COPY app.py . + +EXPOSE 8002 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8002"] diff --git a/pyannote-service/app.py b/pyannote-service/app.py new file mode 100644 index 0000000..8044ac7 --- /dev/null +++ b/pyannote-service/app.py @@ -0,0 +1,269 @@ +""" +Pyannote microservice: диаризация (кто когда говорил). + +Модель загружается по требованию (lazy load) и выгружается +через UNLOAD_AFTER секунд простоя. Явные endpoints /load и /unload +позволяют управлять памятью вручную (например, из n8n). +""" + +from __future__ import annotations + +import asyncio +import gc +import logging +import os +import tempfile +import time +from contextlib import asynccontextmanager +from pathlib import Path + +import torch +from fastapi import FastAPI, File, Form, HTTPException, UploadFile +from pyannote.audio import Pipeline + +logger = logging.getLogger("pyannote-service") +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s: %(message)s") + +# ────────────────────────── config ────────────────────────── + +HF_TOKEN = os.getenv("HF_TOKEN") +DEVICE = os.getenv("PYANNOTE_DEVICE", "cuda") +PIPELINE_NAME = "pyannote/speaker-diarization-3.1" +UNLOAD_AFTER = int(os.getenv("PYANNOTE_UNLOAD_AFTER", "300")) # секунд простоя + +# ────────────────────────── state ────────────────────────── + +pipeline: Pipeline | None = None +_model_lock = asyncio.Lock() +_last_used_at: float = 0.0 +_unload_task: asyncio.Task | None = None + + +# ────────────────────────── постобработка (без изменений) ────────────────────────── + +def extract_segments(diarization) -> list[tuple[float, float, str]]: + annotation = getattr(diarization, "speaker_diarization", diarization) + segments: list[tuple[float, float, str]] = [] + if hasattr(annotation, "itertracks"): + for turn, _, speaker in annotation.itertracks(yield_label=True): + segments.append((turn.start, turn.end, speaker)) + else: + for turn, speaker in annotation: + segments.append((turn.start, turn.end, speaker)) + return segments + + +def filter_short(segments: list[tuple[float, float, str]], min_duration: float) -> list[tuple[float, float, str]]: + if min_duration <= 0: + return segments + return [s for s in segments if (s[1] - s[0]) >= min_duration] + + +def merge_adjacent(segments: list[tuple[float, float, str]], max_gap: float) -> list[tuple[float, float, str]]: + if max_gap < 0 or not segments: + return segments + segments = sorted(segments, key=lambda s: s[0]) + merged: list[tuple[float, float, str]] = [] + for start, end, speaker in segments: + if merged and merged[-1][2] == speaker and (start - merged[-1][1]) <= max_gap: + prev_start, prev_end, prev_speaker = merged[-1] + merged[-1] = (prev_start, max(prev_end, end), prev_speaker) + else: + merged.append((start, end, speaker)) + return merged + + +# ────────────────────────── model helpers ────────────────────────── + +def _load_pipeline_sync() -> Pipeline: + if not HF_TOKEN: + raise RuntimeError( + "HF_TOKEN environment variable is required. " + "Get it at https://huggingface.co/settings/tokens" + ) + logger.info(f"Loading pipeline '{PIPELINE_NAME}' on {DEVICE}…") + t0 = time.perf_counter() + p = Pipeline.from_pretrained(PIPELINE_NAME, token=HF_TOKEN) + if DEVICE == "cuda" and torch.cuda.is_available(): + p.to(torch.device("cuda")) + logger.info(f"Using GPU: {torch.cuda.get_device_name(0)}") + else: + logger.info("Using CPU") + logger.info(f"Pipeline loaded in {time.perf_counter() - t0:.1f}s") + return p + + +def _unload_pipeline_sync() -> None: + global pipeline + if pipeline is None: + return + del pipeline + pipeline = None + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + torch.cuda.synchronize() + logger.info("Pipeline unloaded, VRAM released") + + +async def _ensure_loaded() -> Pipeline: + global pipeline, _last_used_at, _unload_task + async with _model_lock: + if pipeline is None: + loop = asyncio.get_running_loop() + pipeline = await loop.run_in_executor(None, _load_pipeline_sync) + + _last_used_at = time.monotonic() + + if UNLOAD_AFTER > 0: + if _unload_task and not _unload_task.done(): + _unload_task.cancel() + _unload_task = asyncio.create_task(_auto_unload_after(UNLOAD_AFTER)) + + return pipeline + + +async def _auto_unload_after(seconds: int) -> None: + await asyncio.sleep(seconds) + async with _model_lock: + if pipeline is not None and (time.monotonic() - _last_used_at) >= seconds: + logger.info(f"Auto-unloading after {seconds}s of inactivity…") + _unload_pipeline_sync() + + +# ────────────────────────── lifespan ────────────────────────── + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info(f"Pyannote service ready (lazy load, unload_after={UNLOAD_AFTER}s)") + yield + async with _model_lock: + _unload_pipeline_sync() + + +app = FastAPI(title="Pyannote Service", lifespan=lifespan) + + +# ────────────────────────── control endpoints ────────────────────────── + +@app.get("/health") +async def health(): + return { + "status": "ok", + "pipeline": PIPELINE_NAME, + "device": DEVICE, + "loaded": pipeline is not None, + "idle_seconds": round(time.monotonic() - _last_used_at, 1) if _last_used_at else None, + } + + +@app.post("/load") +async def load_model(): + """Явная загрузка pipeline в VRAM.""" + await _ensure_loaded() + return {"status": "loaded", "pipeline": PIPELINE_NAME} + + +@app.post("/unload") +async def unload_model(): + """Явная выгрузка pipeline из VRAM.""" + global _unload_task + async with _model_lock: + if _unload_task and not _unload_task.done(): + _unload_task.cancel() + _unload_pipeline_sync() + return {"status": "unloaded"} + + +# ────────────────────────── diarize ────────────────────────── + +@app.post("/diarize") +async def diarize( + file: UploadFile = File(...), + num_speakers: int | None = Form(None), + min_speakers: int | None = Form(None), + max_speakers: int | None = Form(None), + min_duration: float = Form(0.5), + merge_gap: float = Form(0.3), +): + num_speakers = num_speakers if num_speakers and num_speakers > 0 else None + min_speakers = min_speakers if min_speakers and min_speakers > 0 else None + max_speakers = max_speakers if max_speakers and max_speakers > 0 else None + + suffix = Path(file.filename).suffix if file.filename else ".wav" + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: + content = await file.read() + tmp.write(content) + tmp_path = tmp.name + + try: + p = await _ensure_loaded() + + t0 = time.perf_counter() + logger.info(f"Starting diarization: {file.filename} ({len(content)} bytes)") + + kwargs: dict = {} + if num_speakers is not None: + kwargs["num_speakers"] = num_speakers + else: + if min_speakers is not None: + kwargs["min_speakers"] = min_speakers + if max_speakers is not None: + kwargs["max_speakers"] = max_speakers + + logger.info(f"Running pipeline (params: {kwargs})…") + try: + from pyannote.audio.pipelines.utils.hook import ProgressHook + with ProgressHook() as hook: + diarization = p(tmp_path, hook=hook, **kwargs) + except ImportError: + diarization = p(tmp_path, **kwargs) + + t1 = time.perf_counter() + logger.info(f"Pipeline finished in {t1 - t0:.1f}s") + + raw_segments = extract_segments(diarization) + raw_count = len(raw_segments) + segments = filter_short(raw_segments, min_duration) + after_filter = len(segments) + segments = merge_adjacent(segments, merge_gap) + after_merge = len(segments) + + turns = [ + { + "start": round(start, 3), + "end": round(end, 3), + "duration": round(end - start, 3), + "speaker": speaker, + } + for start, end, speaker in segments + ] + + speakers_stats: dict[str, float] = {} + for start, end, speaker in segments: + speakers_stats[speaker] = speakers_stats.get(speaker, 0.0) + (end - start) + + speakers_info = { + speaker: round(duration, 3) + for speaker, duration in sorted(speakers_stats.items(), key=lambda x: -x[1]) + } + + elapsed = time.perf_counter() - t0 + logger.info( + f"Diarized {file.filename}: {raw_count} raw → {after_filter} filtered → " + f"{after_merge} merged, {len(speakers_info)} speakers in {elapsed:.1f}s" + ) + + return { + "speakers": speakers_info, + "num_speakers": len(speakers_info), + "total_speech_duration": round(sum(speakers_stats.values()), 3), + "processing_time": round(elapsed, 3), + "segments_raw": raw_count, + "segments_filtered": after_filter, + "segments_final": after_merge, + "turns": turns, + } + + finally: + Path(tmp_path).unlink(missing_ok=True) \ No newline at end of file diff --git a/pyannote-service/requirements.txt b/pyannote-service/requirements.txt new file mode 100644 index 0000000..33ea71e --- /dev/null +++ b/pyannote-service/requirements.txt @@ -0,0 +1,6 @@ +pyannote.audio==4.0.4 +torchcodec==0.11.1 +soundfile>=0.12.0 +fastapi==0.115.* +uvicorn[standard]==0.34.* +python-multipart==0.0.* diff --git a/whisper-service/Dockerfile b/whisper-service/Dockerfile new file mode 100644 index 0000000..50d7bf6 --- /dev/null +++ b/whisper-service/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.12-slim + +# системные зависимости для аудио-декодинга +RUN apt-get update && \ + apt-get install -y --no-install-recommends ffmpeg curl && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# NVIDIA runtime libraries (cuBLAS + cuDNN) — ставим через pip, +# чтобы не тянуть полный CUDA Toolkit в образ +RUN pip install --no-cache-dir nvidia-cublas-cu12 "nvidia-cudnn-cu12==9.*" + +# LD_LIBRARY_PATH для CTranslate2 +ENV LD_LIBRARY_PATH="/usr/local/lib/python3.12/site-packages/nvidia/cublas/lib:/usr/local/lib/python3.12/site-packages/nvidia/cudnn/lib:${LD_LIBRARY_PATH}" + +COPY app.py . + +EXPOSE 8001 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8001"] diff --git a/whisper-service/app.py b/whisper-service/app.py new file mode 100644 index 0000000..7ca6da9 --- /dev/null +++ b/whisper-service/app.py @@ -0,0 +1,207 @@ +""" +Whisper microservice: транскрибация аудио через faster-whisper. + +Модель загружается по требованию (lazy load) и выгружается +через UNLOAD_AFTER секунд простоя. Явные endpoints /load и /unload +позволяют управлять памятью вручную (например, из n8n). +""" + +from __future__ import annotations + +import asyncio +import gc +import logging +import os +import tempfile +import time +from contextlib import asynccontextmanager +from pathlib import Path + +from fastapi import FastAPI, File, Form, UploadFile +from faster_whisper import BatchedInferencePipeline, WhisperModel + +logger = logging.getLogger("whisper-service") +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s: %(message)s") + +# ────────────────────────── config ────────────────────────── + +MODEL_SIZE = os.getenv("WHISPER_MODEL", "large-v3") +DEVICE = os.getenv("WHISPER_DEVICE", "cuda") +COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "float16") +BATCH_SIZE = int(os.getenv("WHISPER_BATCH_SIZE", "16")) +UNLOAD_AFTER = int(os.getenv("WHISPER_UNLOAD_AFTER", "300")) # секунд простоя, 0 = никогда + +# ────────────────────────── state ────────────────────────── + +model: BatchedInferencePipeline | None = None +_model_lock = asyncio.Lock() +_last_used_at: float = 0.0 +_unload_task: asyncio.Task | None = None + +# ────────────────────────── model helpers ────────────────────────── + +def _load_model_sync() -> BatchedInferencePipeline: + logger.info(f"Loading '{MODEL_SIZE}' on {DEVICE} ({COMPUTE_TYPE}), batch={BATCH_SIZE}…") + t0 = time.perf_counter() + base = WhisperModel(MODEL_SIZE, device=DEVICE, compute_type=COMPUTE_TYPE) + pipeline = BatchedInferencePipeline(model=base) + logger.info(f"Model loaded in {time.perf_counter() - t0:.1f}s") + return pipeline + + +def _unload_model_sync() -> None: + global model + if model is None: + return + del model + model = None + gc.collect() + # CTranslate2 освобождает VRAM при удалении объекта, + # дополнительный вызов не нужен + logger.info("Model unloaded, VRAM released") + + +async def _ensure_loaded() -> BatchedInferencePipeline: + global model, _last_used_at, _unload_task + async with _model_lock: + if model is None: + loop = asyncio.get_running_loop() + model = await loop.run_in_executor(None, _load_model_sync) + + _last_used_at = time.monotonic() + + if UNLOAD_AFTER > 0: + if _unload_task and not _unload_task.done(): + _unload_task.cancel() + _unload_task = asyncio.create_task(_auto_unload_after(UNLOAD_AFTER)) + + return model + + +async def _auto_unload_after(seconds: int) -> None: + await asyncio.sleep(seconds) + async with _model_lock: + if model is not None and (time.monotonic() - _last_used_at) >= seconds: + logger.info(f"Auto-unloading after {seconds}s of inactivity…") + _unload_model_sync() + +# ────────────────────────── lifespan ────────────────────────── + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info(f"Whisper service ready (lazy load, unload_after={UNLOAD_AFTER}s)") + yield + async with _model_lock: + _unload_model_sync() + + +app = FastAPI(title="Whisper Service", lifespan=lifespan) + +# ────────────────────────── control endpoints ────────────────────────── + +@app.get("/health") +async def health(): + return { + "status": "ok", + "model": MODEL_SIZE, + "device": DEVICE, + "loaded": model is not None, + "idle_seconds": round(time.monotonic() - _last_used_at, 1) if _last_used_at else None, + } + + +@app.post("/load") +async def load_model(): + """Явная загрузка модели в VRAM (перед серией запросов).""" + await _ensure_loaded() + return {"status": "loaded", "model": MODEL_SIZE} + + +@app.post("/unload") +async def unload_model(): + """Явная выгрузка модели из VRAM (освобождаем память для Ollama).""" + global _unload_task + async with _model_lock: + if _unload_task and not _unload_task.done(): + _unload_task.cancel() + _unload_model_sync() + return {"status": "unloaded"} + +# ────────────────────────── transcribe ────────────────────────── + +@app.post("/transcribe") +async def transcribe( + file: UploadFile = File(...), + language: str | None = Form(None), + initial_prompt: str | None = Form(None), + beam_size: int = Form(5), + batch_size: int = Form(BATCH_SIZE), + word_timestamps: bool = Form(True), + vad_filter: bool = Form(True), +): + """Транскрибация загруженного аудиофайла.""" + language = language if language and language not in ("string", "") else None + initial_prompt = initial_prompt if initial_prompt and initial_prompt not in ("string", "") else None + + suffix = Path(file.filename).suffix if file.filename else ".wav" + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: + content = await file.read() + tmp.write(content) + tmp_path = tmp.name + + try: + pipeline = await _ensure_loaded() + + t0 = time.perf_counter() + logger.info( + f"Received: {file.filename} ({len(content)} bytes), " + f"language={language}, model={MODEL_SIZE}, batch_size={batch_size}" + ) + + segments_iter, info = pipeline.transcribe( + tmp_path, + language=language, + initial_prompt=initial_prompt, + beam_size=beam_size, + batch_size=batch_size, + word_timestamps=word_timestamps, + vad_filter=vad_filter, + ) + t1 = time.perf_counter() + logger.info( + f"Transcribe call returned in {t1 - t0:.1f}s. " + f"Language: {info.language} ({info.language_probability:.2f}), " + f"duration: {info.duration:.1f}s" + ) + + segments = [] + for seg in segments_iter: + seg_data = { + "start": round(seg.start, 3), + "end": round(seg.end, 3), + "text": seg.text.strip(), + } + if word_timestamps and seg.words: + seg_data["words"] = [ + {"start": round(w.start, 3), "end": round(w.end, 3), "word": w.word} + for w in seg.words + ] + segments.append(seg_data) + + elapsed = time.perf_counter() - t0 + logger.info( + f"Done: {file.filename} — {info.duration:.1f}s audio, " + f"{len(segments)} segments, {elapsed:.1f}s total " + f"(RTF={elapsed / info.duration:.3f})" + ) + + return { + "language": info.language, + "language_probability": round(info.language_probability, 3), + "duration": round(info.duration, 3), + "processing_time": round(elapsed, 3), + "segments": segments, + } + + finally: + Path(tmp_path).unlink(missing_ok=True) \ No newline at end of file diff --git a/whisper-service/requirements.txt b/whisper-service/requirements.txt new file mode 100644 index 0000000..7a7eeaa --- /dev/null +++ b/whisper-service/requirements.txt @@ -0,0 +1,4 @@ +faster-whisper==1.2.1 +fastapi==0.115.* +uvicorn[standard]==0.34.* +python-multipart==0.0.*