mirror of
https://github.com/mudler/LocalAI.git
synced 2026-01-30 01:02:37 -05:00
* feat(vibevoice): add ASR support Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Add tests Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * chore(tests): download voice files Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Small fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Small fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Try to run on bigger runner Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * debug Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * CI can't hold vibevoice Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
757 lines
34 KiB
Python
757 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
This is an extra gRPC server of LocalAI for VibeVoice
|
|
"""
|
|
from concurrent import futures
|
|
import time
|
|
import argparse
|
|
import signal
|
|
import sys
|
|
import os
|
|
import copy
|
|
import traceback
|
|
from pathlib import Path
|
|
import backend_pb2
|
|
import backend_pb2_grpc
|
|
import torch
|
|
from vibevoice.modular.modeling_vibevoice_streaming_inference import VibeVoiceStreamingForConditionalGenerationInference
|
|
from vibevoice.processor.vibevoice_streaming_processor import VibeVoiceStreamingProcessor
|
|
from vibevoice.modular.modeling_vibevoice_asr import VibeVoiceASRForConditionalGeneration
|
|
from vibevoice.processor.vibevoice_asr_processor import VibeVoiceASRProcessor
|
|
|
|
import grpc
|
|
|
|
def is_float(s):
|
|
"""Check if a string can be converted to float."""
|
|
try:
|
|
float(s)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
def is_int(s):
|
|
"""Check if a string can be converted to int."""
|
|
try:
|
|
int(s)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
|
|
|
|
# If MAX_WORKERS are specified in the environment use it, otherwise default to 1
|
|
MAX_WORKERS = int(os.environ.get('PYTHON_GRPC_MAX_WORKERS', '1'))
|
|
|
|
# Implement the BackendServicer class with the service methods
|
|
class BackendServicer(backend_pb2_grpc.BackendServicer):
|
|
"""
|
|
BackendServicer is the class that implements the gRPC service
|
|
"""
|
|
def Health(self, request, context):
|
|
return backend_pb2.Reply(message=bytes("OK", 'utf-8'))
|
|
|
|
def LoadModel(self, request, context):
|
|
# Get device
|
|
if torch.cuda.is_available():
|
|
print("CUDA is available", file=sys.stderr)
|
|
device = "cuda"
|
|
else:
|
|
print("CUDA is not available", file=sys.stderr)
|
|
device = "cpu"
|
|
mps_available = hasattr(torch.backends, "mps") and torch.backends.mps.is_available()
|
|
if mps_available:
|
|
device = "mps"
|
|
if not torch.cuda.is_available() and request.CUDA:
|
|
return backend_pb2.Result(success=False, message="CUDA is not available")
|
|
|
|
# Normalize potential 'mpx' typo to 'mps'
|
|
if device == "mpx":
|
|
print("Note: device 'mpx' detected, treating it as 'mps'.", file=sys.stderr)
|
|
device = "mps"
|
|
|
|
# Validate mps availability if requested
|
|
if device == "mps" and not torch.backends.mps.is_available():
|
|
print("Warning: MPS not available. Falling back to CPU.", file=sys.stderr)
|
|
device = "cpu"
|
|
|
|
self.device = device
|
|
self._torch_device = torch.device(device)
|
|
|
|
options = request.Options
|
|
|
|
# empty dict
|
|
self.options = {}
|
|
|
|
# The options are a list of strings in this form optname:optvalue
|
|
# We are storing all the options in a dict so we can use it later when
|
|
# generating the audio
|
|
for opt in options:
|
|
if ":" not in opt:
|
|
continue
|
|
key, value = opt.split(":", 1) # Split only on first colon
|
|
# if value is a number, convert it to the appropriate type
|
|
if is_float(value):
|
|
value = float(value)
|
|
elif is_int(value):
|
|
value = int(value)
|
|
elif value.lower() in ["true", "false"]:
|
|
value = value.lower() == "true"
|
|
self.options[key] = value
|
|
|
|
# Check if ASR mode is enabled
|
|
self.asr_mode = self.options.get("asr_mode", False)
|
|
if not isinstance(self.asr_mode, bool):
|
|
# Handle string "true"/"false" case
|
|
self.asr_mode = str(self.asr_mode).lower() == "true"
|
|
|
|
# Get model path from request
|
|
model_path = request.Model
|
|
if not model_path:
|
|
if self.asr_mode:
|
|
model_path = "microsoft/VibeVoice-ASR" # Default ASR model
|
|
else:
|
|
model_path = "microsoft/VibeVoice-Realtime-0.5B" # Default TTS model
|
|
|
|
default_dtype = torch.bfloat16 if self.device == "cuda" else torch.float32
|
|
|
|
load_dtype = default_dtype
|
|
if "torch_dtype" in self.options:
|
|
torch_dtype_str = str(self.options["torch_dtype"]).lower()
|
|
if torch_dtype_str == "fp16":
|
|
load_dtype = torch.float16
|
|
elif torch_dtype_str == "bf16":
|
|
load_dtype = torch.bfloat16
|
|
elif torch_dtype_str == "fp32":
|
|
load_dtype = torch.float32
|
|
# remove it from options after reading
|
|
del self.options["torch_dtype"]
|
|
|
|
# Get inference steps from options, default to 5 (TTS only)
|
|
self.inference_steps = self.options.get("inference_steps", 5)
|
|
if not isinstance(self.inference_steps, int) or self.inference_steps <= 0:
|
|
self.inference_steps = 5
|
|
|
|
# Get cfg_scale from options, default to 1.5 (TTS only)
|
|
self.cfg_scale = self.options.get("cfg_scale", 1.5)
|
|
if not isinstance(self.cfg_scale, (int, float)) or self.cfg_scale <= 0:
|
|
self.cfg_scale = 1.5
|
|
|
|
# Get ASR generation parameters from options
|
|
self.max_new_tokens = self.options.get("max_new_tokens", 512)
|
|
if not isinstance(self.max_new_tokens, int) or self.max_new_tokens <= 0:
|
|
self.max_new_tokens = 512
|
|
|
|
self.temperature = self.options.get("temperature", 0.0)
|
|
if not isinstance(self.temperature, (int, float)) or self.temperature < 0:
|
|
self.temperature = 0.0
|
|
|
|
self.top_p = self.options.get("top_p", 1.0)
|
|
if not isinstance(self.top_p, (int, float)) or self.top_p <= 0:
|
|
self.top_p = 1.0
|
|
|
|
self.do_sample = self.options.get("do_sample", None)
|
|
if self.do_sample is None:
|
|
# Default: use sampling if temperature > 0
|
|
self.do_sample = self.temperature > 0
|
|
elif not isinstance(self.do_sample, bool):
|
|
self.do_sample = str(self.do_sample).lower() == "true"
|
|
|
|
self.num_beams = self.options.get("num_beams", 1)
|
|
if not isinstance(self.num_beams, int) or self.num_beams < 1:
|
|
self.num_beams = 1
|
|
|
|
self.repetition_penalty = self.options.get("repetition_penalty", 1.0)
|
|
if not isinstance(self.repetition_penalty, (int, float)) or self.repetition_penalty <= 0:
|
|
self.repetition_penalty = 1.0
|
|
|
|
# Determine voices directory
|
|
# Priority order:
|
|
# 1. voices_dir option (explicitly set by user - highest priority)
|
|
# 2. Relative to ModelFile if provided
|
|
# 3. Relative to ModelPath (models directory) if provided
|
|
# 4. Backend directory
|
|
# 5. Absolute path from AudioPath if provided
|
|
voices_dir = None
|
|
|
|
# First check if voices_dir is explicitly set in options
|
|
if "voices_dir" in self.options:
|
|
voices_dir_option = self.options["voices_dir"]
|
|
if isinstance(voices_dir_option, str) and voices_dir_option.strip():
|
|
voices_dir = voices_dir_option.strip()
|
|
# If relative path, try to resolve it relative to ModelPath or ModelFile
|
|
if not os.path.isabs(voices_dir):
|
|
if hasattr(request, 'ModelPath') and request.ModelPath:
|
|
voices_dir = os.path.join(request.ModelPath, voices_dir)
|
|
elif request.ModelFile:
|
|
model_file_base = os.path.dirname(request.ModelFile)
|
|
voices_dir = os.path.join(model_file_base, voices_dir)
|
|
# If still relative, make it absolute from current working directory
|
|
if not os.path.isabs(voices_dir):
|
|
voices_dir = os.path.abspath(voices_dir)
|
|
# Check if the directory exists
|
|
if not os.path.exists(voices_dir):
|
|
print(f"Warning: voices_dir option specified but directory does not exist: {voices_dir}", file=sys.stderr)
|
|
voices_dir = None
|
|
|
|
# If not set via option, try relative to ModelFile if provided
|
|
if not voices_dir and request.ModelFile:
|
|
model_file_base = os.path.dirname(request.ModelFile)
|
|
voices_dir = os.path.join(model_file_base, "voices", "streaming_model")
|
|
if not os.path.exists(voices_dir):
|
|
voices_dir = None
|
|
|
|
# If not found, try relative to ModelPath (models directory)
|
|
if not voices_dir and hasattr(request, 'ModelPath') and request.ModelPath:
|
|
voices_dir = os.path.join(request.ModelPath, "voices", "streaming_model")
|
|
if not os.path.exists(voices_dir):
|
|
voices_dir = None
|
|
|
|
# If not found, try relative to backend directory
|
|
if not voices_dir:
|
|
backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
voices_dir = os.path.join(backend_dir, "vibevoice", "voices", "streaming_model")
|
|
if not os.path.exists(voices_dir):
|
|
# Try absolute path from AudioPath if provided
|
|
if request.AudioPath and os.path.isabs(request.AudioPath):
|
|
voices_dir = os.path.dirname(request.AudioPath)
|
|
else:
|
|
voices_dir = None
|
|
|
|
# Initialize voice-related attributes (TTS only)
|
|
self.voices_dir = voices_dir
|
|
self.voice_presets = {}
|
|
self._voice_cache = {}
|
|
self.default_voice_key = None
|
|
|
|
# Store AudioPath, ModelFile, and ModelPath from LoadModel request for use in TTS
|
|
self.audio_path = request.AudioPath if hasattr(request, 'AudioPath') and request.AudioPath else None
|
|
self.model_file = request.ModelFile if hasattr(request, 'ModelFile') and request.ModelFile else None
|
|
self.model_path = request.ModelPath if hasattr(request, 'ModelPath') and request.ModelPath else None
|
|
|
|
# Decide attention implementation and device_map (matching upstream example)
|
|
if self.device == "mps":
|
|
device_map = None
|
|
attn_impl_primary = "sdpa" # flash_attention_2 not supported on MPS
|
|
elif self.device == "cuda":
|
|
device_map = "cuda"
|
|
attn_impl_primary = "flash_attention_2"
|
|
else: # cpu
|
|
device_map = "cpu" # Match upstream example: use "cpu" for CPU device_map
|
|
attn_impl_primary = "sdpa"
|
|
|
|
try:
|
|
if self.asr_mode:
|
|
# Load ASR model and processor
|
|
print(f"Loading ASR processor & model from {model_path}", file=sys.stderr)
|
|
|
|
# Load ASR processor
|
|
self.processor = VibeVoiceASRProcessor.from_pretrained(
|
|
model_path,
|
|
language_model_pretrained_name="Qwen/Qwen2.5-7B"
|
|
)
|
|
|
|
print(f"Using device: {self.device}, torch_dtype: {load_dtype}, attn_implementation: {attn_impl_primary}", file=sys.stderr)
|
|
|
|
# Load ASR model - use device_map=None and move manually to avoid JSON serialization issues
|
|
# Load with dtype to ensure all components are in correct dtype from the start
|
|
try:
|
|
print(f"Using attention implementation: {attn_impl_primary}", file=sys.stderr)
|
|
# Load model with dtype to ensure all components are in correct dtype
|
|
self.model = VibeVoiceASRForConditionalGeneration.from_pretrained(
|
|
model_path,
|
|
dtype=load_dtype,
|
|
device_map=None, # Always use None, move manually to avoid JSON serialization issues
|
|
attn_implementation=attn_impl_primary,
|
|
trust_remote_code=True
|
|
)
|
|
# Move to device manually
|
|
self.model = self.model.to(self.device)
|
|
except Exception as e:
|
|
if attn_impl_primary == 'flash_attention_2':
|
|
print(f"[ERROR] : {type(e).__name__}: {e}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
print("Error loading the ASR model. Trying to use SDPA.", file=sys.stderr)
|
|
self.model = VibeVoiceASRForConditionalGeneration.from_pretrained(
|
|
model_path,
|
|
dtype=load_dtype,
|
|
device_map=None,
|
|
attn_implementation='sdpa',
|
|
trust_remote_code=True
|
|
)
|
|
# Move to device manually
|
|
self.model = self.model.to(self.device)
|
|
else:
|
|
raise e
|
|
|
|
self.model.eval()
|
|
print(f"ASR model loaded successfully", file=sys.stderr)
|
|
else:
|
|
# Load TTS model and processor (existing logic)
|
|
# Load voice presets if directory exists
|
|
if self.voices_dir and os.path.exists(self.voices_dir):
|
|
self._load_voice_presets()
|
|
else:
|
|
print(f"Warning: Voices directory not found. Voice presets will not be available.", file=sys.stderr)
|
|
|
|
print(f"Loading TTS processor & model from {model_path}", file=sys.stderr)
|
|
self.processor = VibeVoiceStreamingProcessor.from_pretrained(model_path)
|
|
|
|
|
|
print(f"Using device: {self.device}, torch_dtype: {load_dtype}, attn_implementation: {attn_impl_primary}", file=sys.stderr)
|
|
|
|
# Load model with device-specific logic (matching upstream example exactly)
|
|
try:
|
|
if self.device == "mps":
|
|
self.model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
|
|
model_path,
|
|
torch_dtype=load_dtype,
|
|
attn_implementation=attn_impl_primary,
|
|
device_map=None, # load then move
|
|
)
|
|
self.model.to("mps")
|
|
elif self.device == "cuda":
|
|
self.model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
|
|
model_path,
|
|
torch_dtype=load_dtype,
|
|
device_map=device_map,
|
|
attn_implementation=attn_impl_primary,
|
|
)
|
|
else: # cpu
|
|
# Match upstream example: use device_map="cpu" for CPU
|
|
self.model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
|
|
model_path,
|
|
torch_dtype=load_dtype,
|
|
device_map="cpu",
|
|
attn_implementation=attn_impl_primary,
|
|
)
|
|
except Exception as e:
|
|
if attn_impl_primary == 'flash_attention_2':
|
|
print(f"[ERROR] : {type(e).__name__}: {e}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
print("Error loading the model. Trying to use SDPA. However, note that only flash_attention_2 has been fully tested, and using SDPA may result in lower audio quality.", file=sys.stderr)
|
|
# Match upstream example fallback pattern
|
|
self.model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
|
|
model_path,
|
|
torch_dtype=load_dtype,
|
|
device_map=(self.device if self.device in ("cuda", "cpu") else None),
|
|
attn_implementation='sdpa'
|
|
)
|
|
if self.device == "mps":
|
|
self.model.to("mps")
|
|
else:
|
|
raise e
|
|
|
|
self.model.eval()
|
|
self.model.set_ddpm_inference_steps(num_steps=self.inference_steps)
|
|
|
|
# Set default voice key
|
|
if self.voice_presets:
|
|
# Try to get default from environment or use first available
|
|
preset_name = os.environ.get("VOICE_PRESET")
|
|
self.default_voice_key = self._determine_voice_key(preset_name)
|
|
print(f"Default voice preset: {self.default_voice_key}", file=sys.stderr)
|
|
else:
|
|
print("Warning: No voice presets available. Voice selection will not work.", file=sys.stderr)
|
|
|
|
except Exception as err:
|
|
# Format error message safely, avoiding JSON serialization issues
|
|
error_msg = str(err)
|
|
error_type = type(err).__name__
|
|
# Include traceback for debugging
|
|
tb_str = traceback.format_exc()
|
|
print(f"[ERROR] LoadModel failed: {error_type}: {error_msg}", file=sys.stderr)
|
|
print(tb_str, file=sys.stderr)
|
|
return backend_pb2.Result(success=False, message=f"{error_type}: {error_msg}")
|
|
|
|
return backend_pb2.Result(message="Model loaded successfully", success=True)
|
|
|
|
def _load_voice_presets(self):
|
|
"""Load voice presets from the voices directory."""
|
|
if not self.voices_dir or not os.path.exists(self.voices_dir):
|
|
self.voice_presets = {}
|
|
return
|
|
|
|
self.voice_presets = {}
|
|
|
|
# Get all .pt files in the voices directory
|
|
pt_files = [f for f in os.listdir(self.voices_dir)
|
|
if f.lower().endswith('.pt') and os.path.isfile(os.path.join(self.voices_dir, f))]
|
|
|
|
# Create dictionary with filename (without extension) as key
|
|
for pt_file in pt_files:
|
|
# Remove .pt extension to get the name
|
|
name = os.path.splitext(pt_file)[0]
|
|
# Create full path
|
|
full_path = os.path.join(self.voices_dir, pt_file)
|
|
self.voice_presets[name] = full_path
|
|
|
|
# Sort the voice presets alphabetically by name
|
|
self.voice_presets = dict(sorted(self.voice_presets.items()))
|
|
|
|
print(f"Found {len(self.voice_presets)} voice files in {self.voices_dir}", file=sys.stderr)
|
|
if self.voice_presets:
|
|
print(f"Available voices: {', '.join(self.voice_presets.keys())}", file=sys.stderr)
|
|
|
|
def _determine_voice_key(self, name):
|
|
"""Determine voice key from name or use default."""
|
|
if name and name in self.voice_presets:
|
|
return name
|
|
|
|
# Try default key
|
|
default_key = "en-WHTest_man"
|
|
if default_key in self.voice_presets:
|
|
return default_key
|
|
|
|
# Use first available
|
|
if self.voice_presets:
|
|
first_key = next(iter(self.voice_presets))
|
|
print(f"Using fallback voice preset: {first_key}", file=sys.stderr)
|
|
return first_key
|
|
|
|
return None
|
|
|
|
def _get_voice_path(self, speaker_name):
|
|
"""Get voice file path for a given speaker name."""
|
|
if not self.voice_presets:
|
|
return None
|
|
|
|
# First try exact match
|
|
if speaker_name and speaker_name in self.voice_presets:
|
|
return self.voice_presets[speaker_name]
|
|
|
|
# Try partial matching (case insensitive)
|
|
if speaker_name:
|
|
speaker_lower = speaker_name.lower()
|
|
for preset_name, path in self.voice_presets.items():
|
|
if preset_name.lower() in speaker_lower or speaker_lower in preset_name.lower():
|
|
return path
|
|
|
|
# Default to first voice if no match found
|
|
if self.default_voice_key and self.default_voice_key in self.voice_presets:
|
|
return self.voice_presets[self.default_voice_key]
|
|
elif self.voice_presets:
|
|
default_voice = list(self.voice_presets.values())[0]
|
|
print(f"Warning: No voice preset found for '{speaker_name}', using default voice: {default_voice}", file=sys.stderr)
|
|
return default_voice
|
|
|
|
return None
|
|
|
|
def _ensure_voice_cached(self, voice_path):
|
|
"""Load and cache voice preset."""
|
|
if not voice_path or not os.path.exists(voice_path):
|
|
return None
|
|
|
|
# Ensure cache exists (should be initialized in LoadModel)
|
|
if not hasattr(self, '_voice_cache'):
|
|
self._voice_cache = {}
|
|
|
|
# Use path as cache key
|
|
if voice_path not in self._voice_cache:
|
|
print(f"Loading prefilled prompt from {voice_path}", file=sys.stderr)
|
|
# Match self-test.py: use string device name for map_location
|
|
# Ensure self.device exists (should be set in LoadModel)
|
|
try:
|
|
if not hasattr(self, 'device'):
|
|
# Fallback to CPU if device not set
|
|
device_str = "cpu"
|
|
else:
|
|
device_str = str(self.device)
|
|
except AttributeError as e:
|
|
print(f"Error accessing self.device: {e}, falling back to CPU", file=sys.stderr)
|
|
device_str = "cpu"
|
|
if device_str != "cpu":
|
|
map_loc = device_str
|
|
else:
|
|
map_loc = "cpu"
|
|
# Call torch.load with explicit arguments
|
|
prefilled_outputs = torch.load(voice_path, map_location=map_loc, weights_only=False)
|
|
self._voice_cache[voice_path] = prefilled_outputs
|
|
|
|
return self._voice_cache[voice_path]
|
|
|
|
def TTS(self, request, context):
|
|
try:
|
|
# Get voice selection
|
|
# Priority: request.voice > AudioPath > default
|
|
voice_path = None
|
|
voice_key = None
|
|
|
|
if request.voice:
|
|
# Try to get voice by name
|
|
voice_path = self._get_voice_path(request.voice)
|
|
if voice_path:
|
|
voice_key = request.voice
|
|
elif self.audio_path:
|
|
# Use AudioPath from LoadModel as voice file
|
|
if os.path.isabs(self.audio_path):
|
|
voice_path = self.audio_path
|
|
elif self.model_file:
|
|
model_file_base = os.path.dirname(self.model_file)
|
|
voice_path = os.path.join(model_file_base, self.audio_path)
|
|
elif self.model_path:
|
|
voice_path = os.path.join(self.model_path, self.audio_path)
|
|
else:
|
|
voice_path = self.audio_path
|
|
elif self.default_voice_key:
|
|
voice_path = self._get_voice_path(self.default_voice_key)
|
|
voice_key = self.default_voice_key
|
|
|
|
if not voice_path or not os.path.exists(voice_path):
|
|
return backend_pb2.Result(
|
|
success=False,
|
|
message=f"Voice file not found: {voice_path}. Please provide a valid voice preset or AudioPath."
|
|
)
|
|
|
|
# Load voice preset
|
|
prefilled_outputs = self._ensure_voice_cached(voice_path)
|
|
if prefilled_outputs is None:
|
|
return backend_pb2.Result(
|
|
success=False,
|
|
message=f"Failed to load voice preset from {voice_path}"
|
|
)
|
|
|
|
# Get generation parameters from options
|
|
cfg_scale = self.options.get("cfg_scale", self.cfg_scale)
|
|
inference_steps = self.options.get("inference_steps", self.inference_steps)
|
|
do_sample = self.options.get("do_sample", False)
|
|
temperature = self.options.get("temperature", 0.9)
|
|
top_p = self.options.get("top_p", 0.9)
|
|
|
|
# Update inference steps if needed
|
|
if inference_steps != self.inference_steps:
|
|
self.model.set_ddpm_inference_steps(num_steps=inference_steps)
|
|
self.inference_steps = inference_steps
|
|
|
|
# Prepare text
|
|
text = request.text.strip().replace("'", "'").replace('"', '"').replace('"', '"')
|
|
|
|
# Prepare inputs
|
|
inputs = self.processor.process_input_with_cached_prompt(
|
|
text=text,
|
|
cached_prompt=prefilled_outputs,
|
|
padding=True,
|
|
return_tensors="pt",
|
|
return_attention_mask=True,
|
|
)
|
|
|
|
# Move tensors to target device (matching self-test.py exactly)
|
|
# Explicitly ensure it's a string to avoid any variable name collisions
|
|
target_device = str(self.device) if str(self.device) != "cpu" else "cpu"
|
|
for k, v in inputs.items():
|
|
if torch.is_tensor(v):
|
|
inputs[k] = v.to(target_device)
|
|
|
|
print(f"Generating audio with cfg_scale: {cfg_scale}, inference_steps: {inference_steps}", file=sys.stderr)
|
|
|
|
# Generate audio
|
|
outputs = self.model.generate(
|
|
**inputs,
|
|
max_new_tokens=None,
|
|
cfg_scale=cfg_scale,
|
|
tokenizer=self.processor.tokenizer,
|
|
generation_config={
|
|
'do_sample': do_sample,
|
|
'temperature': temperature if do_sample else 1.0,
|
|
'top_p': top_p if do_sample else 1.0,
|
|
},
|
|
verbose=False,
|
|
all_prefilled_outputs=copy.deepcopy(prefilled_outputs) if prefilled_outputs is not None else None,
|
|
)
|
|
|
|
# Save output
|
|
if outputs.speech_outputs and outputs.speech_outputs[0] is not None:
|
|
self.processor.save_audio(
|
|
outputs.speech_outputs[0], # First (and only) batch item
|
|
output_path=request.dst,
|
|
)
|
|
print(f"Saved output to {request.dst}", file=sys.stderr)
|
|
else:
|
|
return backend_pb2.Result(
|
|
success=False,
|
|
message="No audio output generated"
|
|
)
|
|
|
|
except Exception as err:
|
|
print(f"Error in TTS: {err}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
return backend_pb2.Result(success=False, message=f"Unexpected {err=}, {type(err)=}")
|
|
|
|
return backend_pb2.Result(success=True)
|
|
|
|
def AudioTranscription(self, request, context):
|
|
"""Transcribe audio file to text using ASR model."""
|
|
try:
|
|
# Validate ASR mode is active
|
|
if not self.asr_mode:
|
|
return backend_pb2.TranscriptResult(
|
|
segments=[],
|
|
text="",
|
|
)
|
|
# Note: We return empty result instead of error to match faster-whisper behavior
|
|
|
|
# Get audio file path
|
|
audio_path = request.dst
|
|
if not audio_path or not os.path.exists(audio_path):
|
|
print(f"Error: Audio file not found: {audio_path}", file=sys.stderr)
|
|
return backend_pb2.TranscriptResult(
|
|
segments=[],
|
|
text="",
|
|
)
|
|
|
|
print(f"Transcribing audio file: {audio_path}", file=sys.stderr)
|
|
|
|
# Get context_info from options if available
|
|
context_info = self.options.get("context_info", None)
|
|
if context_info and isinstance(context_info, str) and context_info.strip():
|
|
context_info = context_info.strip()
|
|
else:
|
|
context_info = None
|
|
|
|
# Process audio with ASR processor (matching gradio example)
|
|
inputs = self.processor(
|
|
audio=audio_path,
|
|
sampling_rate=None,
|
|
return_tensors="pt",
|
|
add_generation_prompt=True,
|
|
context_info=context_info
|
|
)
|
|
|
|
# Move to device (matching gradio example)
|
|
inputs = {k: v.to(self.device) if isinstance(v, torch.Tensor) else v
|
|
for k, v in inputs.items()}
|
|
|
|
# Prepare generation config (matching gradio example)
|
|
generation_config = {
|
|
"max_new_tokens": self.max_new_tokens,
|
|
"temperature": self.temperature if self.temperature > 0 else None,
|
|
"top_p": self.top_p if self.do_sample else None,
|
|
"do_sample": self.do_sample,
|
|
"num_beams": self.num_beams,
|
|
"repetition_penalty": self.repetition_penalty,
|
|
"pad_token_id": self.processor.pad_id,
|
|
"eos_token_id": self.processor.tokenizer.eos_token_id,
|
|
}
|
|
|
|
# Remove None values (matching gradio example)
|
|
generation_config = {k: v for k, v in generation_config.items() if v is not None}
|
|
|
|
print(f"Generating transcription with max_new_tokens: {self.max_new_tokens}, temperature: {self.temperature}, do_sample: {self.do_sample}, num_beams: {self.num_beams}, repetition_penalty: {self.repetition_penalty}", file=sys.stderr)
|
|
|
|
# Generate transcription (matching gradio example)
|
|
with torch.no_grad():
|
|
output_ids = self.model.generate(
|
|
**inputs,
|
|
**generation_config
|
|
)
|
|
|
|
# Decode output (matching gradio example)
|
|
generated_ids = output_ids[0, inputs['input_ids'].shape[1]:]
|
|
generated_text = self.processor.decode(generated_ids, skip_special_tokens=True)
|
|
|
|
# Parse structured output to get segments
|
|
result_segments = []
|
|
try:
|
|
transcription_segments = self.processor.post_process_transcription(generated_text)
|
|
|
|
if transcription_segments:
|
|
# Map segments to TranscriptSegment format
|
|
for idx, seg in enumerate(transcription_segments):
|
|
# Extract timing information (if available)
|
|
# Handle both dict and object with attributes
|
|
if isinstance(seg, dict):
|
|
start_time = seg.get('start_time', 0)
|
|
end_time = seg.get('end_time', 0)
|
|
text = seg.get('text', '')
|
|
speaker_id = seg.get('speaker_id', None)
|
|
else:
|
|
# Handle object with attributes
|
|
start_time = getattr(seg, 'start_time', 0)
|
|
end_time = getattr(seg, 'end_time', 0)
|
|
text = getattr(seg, 'text', '')
|
|
speaker_id = getattr(seg, 'speaker_id', None)
|
|
|
|
# Convert time to milliseconds (assuming seconds)
|
|
start_ms = int(start_time * 1000) if isinstance(start_time, (int, float)) else 0
|
|
end_ms = int(end_time * 1000) if isinstance(end_time, (int, float)) else 0
|
|
|
|
# Add speaker info to text if available
|
|
if speaker_id is not None:
|
|
text = f"[Speaker {speaker_id}] {text}"
|
|
|
|
result_segments.append(backend_pb2.TranscriptSegment(
|
|
id=idx,
|
|
start=start_ms,
|
|
end=end_ms,
|
|
text=text,
|
|
tokens=[] # Token IDs not extracted for now
|
|
))
|
|
except Exception as e:
|
|
print(f"Warning: Failed to parse structured output: {e}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
# Fallback: create a single segment with the full text
|
|
if generated_text:
|
|
result_segments.append(backend_pb2.TranscriptSegment(
|
|
id=0,
|
|
start=0,
|
|
end=0,
|
|
text=generated_text,
|
|
tokens=[]
|
|
))
|
|
|
|
# Combine all segment texts into full transcription
|
|
if result_segments:
|
|
full_text = " ".join([seg.text for seg in result_segments])
|
|
else:
|
|
full_text = generated_text if generated_text else ""
|
|
|
|
print(f"Transcription completed: {len(result_segments)} segments", file=sys.stderr)
|
|
|
|
return backend_pb2.TranscriptResult(
|
|
segments=result_segments,
|
|
text=full_text
|
|
)
|
|
|
|
except Exception as err:
|
|
print(f"Error in AudioTranscription: {err}", file=sys.stderr)
|
|
print(traceback.format_exc(), file=sys.stderr)
|
|
return backend_pb2.TranscriptResult(
|
|
segments=[],
|
|
text="",
|
|
)
|
|
|
|
def serve(address):
|
|
server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_WORKERS),
|
|
options=[
|
|
('grpc.max_message_length', 50 * 1024 * 1024), # 50MB
|
|
('grpc.max_send_message_length', 50 * 1024 * 1024), # 50MB
|
|
('grpc.max_receive_message_length', 50 * 1024 * 1024), # 50MB
|
|
])
|
|
backend_pb2_grpc.add_BackendServicer_to_server(BackendServicer(), server)
|
|
server.add_insecure_port(address)
|
|
server.start()
|
|
print("Server started. Listening on: " + address, file=sys.stderr)
|
|
|
|
# Define the signal handler function
|
|
def signal_handler(sig, frame):
|
|
print("Received termination signal. Shutting down...")
|
|
server.stop(0)
|
|
sys.exit(0)
|
|
|
|
# Set the signal handlers for SIGINT and SIGTERM
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(_ONE_DAY_IN_SECONDS)
|
|
except KeyboardInterrupt:
|
|
server.stop(0)
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run the gRPC server.")
|
|
parser.add_argument(
|
|
"--addr", default="localhost:50051", help="The address to bind the server to."
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
serve(args.addr)
|