feat(kokoro): complete kokoro integration (#5978)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2025-08-06 15:23:29 +02:00
committed by GitHub
parent 20a70e1244
commit 9c2840ac38
17 changed files with 258 additions and 1179 deletions

View File

@@ -1,9 +1,18 @@
.DEFAULT_GOAL := install
.PHONY: install
install:
.PHONY: kokoro
kokoro: protogen
bash install.sh
$(MAKE) protogen
.PHONY: run
run: protogen
@echo "Running kokoro..."
bash run.sh
@echo "kokoro run."
.PHONY: test
test: protogen
@echo "Testing kokoro..."
bash test.sh
@echo "kokoro tested."
.PHONY: protogen
protogen: backend_pb2_grpc.py backend_pb2.py
@@ -13,7 +22,7 @@ protogen-clean:
$(RM) backend_pb2_grpc.py backend_pb2.py
backend_pb2_grpc.py backend_pb2.py:
bash protogen.sh
python3 -m grpc_tools.protoc -I../.. -I./ --python_out=. --grpc_python_out=. backend.proto
.PHONY: clean
clean: protogen-clean

View File

@@ -0,0 +1,23 @@
# Kokoro TTS Backend for LocalAI
This is a gRPC server backend for LocalAI that uses the Kokoro TTS pipeline.
## Creating a separate environment for kokoro project
```bash
make kokoro
```
## Testing the gRPC server
```bash
make test
```
## Features
- Lightweight TTS model with 82 million parameters
- Apache-licensed weights
- Fast and cost-efficient
- Multi-language support
- Multiple voice options

115
backend/python/kokoro/backend.py Executable file → Normal file
View File

@@ -1,101 +1,92 @@
#!/usr/bin/env python3
"""
Extra gRPC server for Kokoro models.
This is an extra gRPC server of LocalAI for Kokoro TTS
"""
from concurrent import futures
import time
import argparse
import signal
import sys
import os
import time
import backend_pb2
import backend_pb2_grpc
import torch
from kokoro import KPipeline
import soundfile as sf
import grpc
from models import build_model
from kokoro import generate
import torch
SAMPLE_RATE = 22050
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
# If MAX_WORKERS are specified in the environment use it, otherwise default to 1
MAX_WORKERS = int(os.environ.get('PYTHON_GRPC_MAX_WORKERS', '1'))
KOKORO_LANG_CODE = os.environ.get('KOKORO_LANG_CODE', 'a')
# Implement the BackendServicer class with the service methods
class BackendServicer(backend_pb2_grpc.BackendServicer):
"""
A gRPC servicer for the backend service.
This class implements the gRPC methods for the backend service, including Health, LoadModel, and Embedding.
BackendServicer is the class that implements the gRPC service
"""
def Health(self, request, context):
"""
A gRPC method that returns the health status of the backend service.
Args:
request: A HealthRequest object that contains the request parameters.
context: A grpc.ServicerContext object that provides information about the RPC.
Returns:
A Reply object that contains the health status of the backend service.
"""
return backend_pb2.Reply(message=bytes("OK", 'utf-8'))
def LoadModel(self, request, context):
"""
A gRPC method that loads a model into memory.
# Get device
if torch.cuda.is_available():
print("CUDA is available", file=sys.stderr)
device = "cuda"
else:
print("CUDA is not available", file=sys.stderr)
device = "cpu"
Args:
request: A LoadModelRequest object that contains the request parameters.
context: A grpc.ServicerContext object that provides information about the RPC.
if not torch.cuda.is_available() and request.CUDA:
return backend_pb2.Result(success=False, message="CUDA is not available")
Returns:
A Result object that contains the result of the LoadModel operation.
"""
model_name = request.Model
try:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
self.MODEL = build_model(request.ModelFile, device)
print("Preparing Kokoro TTS pipeline, please wait", file=sys.stderr)
# empty dict
self.options = {}
options = request.Options
# Find the voice from the options, options are a list of strings in this form optname:optvalue:
VOICE_NAME = None
# The options are a list of strings in this form optname:optvalue
# We are storing all the options in a dict so we can use it later when
# generating the images
for opt in options:
if opt.startswith("voice:"):
VOICE_NAME = opt.split(":")[1]
break
if VOICE_NAME is None:
return backend_pb2.Result(success=False, message=f"No voice specified in options")
MODELPATH = request.ModelPath
# If voice name contains a plus, split it and load the two models and combine them
if "+" in VOICE_NAME:
voice1, voice2 = VOICE_NAME.split("+")
voice1 = torch.load(f'{MODELPATH}/{voice1}.pt', weights_only=True).to(device)
voice2 = torch.load(f'{MODELPATH}/{voice2}.pt', weights_only=True).to(device)
self.VOICEPACK = torch.mean(torch.stack([voice1, voice2]), dim=0)
else:
self.VOICEPACK = torch.load(f'{MODELPATH}/{VOICE_NAME}.pt', weights_only=True).to(device)
if ":" not in opt:
continue
key, value = opt.split(":")
self.options[key] = value
self.VOICE_NAME = VOICE_NAME
print(f'Loaded voice: {VOICE_NAME}')
# Initialize Kokoro pipeline with language code
lang_code = self.options.get("lang_code", KOKORO_LANG_CODE)
self.pipeline = KPipeline(lang_code=lang_code)
print(f"Kokoro TTS pipeline loaded with language code: {lang_code}", file=sys.stderr)
except Exception as err:
return backend_pb2.Result(success=False, message=f"Unexpected {err=}, {type(err)=}")
return backend_pb2.Result(message="Model loaded successfully", success=True)
return backend_pb2.Result(message="Kokoro TTS pipeline loaded successfully", success=True)
def TTS(self, request, context):
model_name = request.model
if model_name == "":
return backend_pb2.Result(success=False, message="request.model is required")
try:
audio, out_ps = generate(self.MODEL, request.text, self.VOICEPACK, lang=self.VOICE_NAME)
print(out_ps)
sf.write(request.dst, audio, SAMPLE_RATE)
# Get voice from request, default to 'af_heart' if not specified
voice = request.voice if request.voice else 'af_heart'
# Generate audio using Kokoro pipeline
generator = self.pipeline(request.text, voice=voice)
# Get the first (and typically only) audio segment
for i, (gs, ps, audio) in enumerate(generator):
# Save audio to the destination file
sf.write(request.dst, audio, 24000)
print(f"Generated audio segment {i}: gs={gs}, ps={ps}", file=sys.stderr)
# For now, we only process the first segment
# If you need to handle multiple segments, you might want to modify this
break
except Exception as err:
return backend_pb2.Result(success=False, message=f"Unexpected {err=}, {type(err)=}")
return backend_pb2.Result(success=True)
def serve(address):
@@ -108,11 +99,11 @@ def serve(address):
backend_pb2_grpc.add_BackendServicer_to_server(BackendServicer(), server)
server.add_insecure_port(address)
server.start()
print("[Kokoro] Server started. Listening on: " + address, file=sys.stderr)
print("Server started. Listening on: " + address, file=sys.stderr)
# Define the signal handler function
def signal_handler(sig, frame):
print("[Kokoro] Received termination signal. Shutting down...")
print("Received termination signal. Shutting down...")
server.stop(0)
sys.exit(0)
@@ -132,5 +123,5 @@ if __name__ == "__main__":
"--addr", default="localhost:50051", help="The address to bind the server to."
)
args = parser.parse_args()
print(f"[Kokoro] startup: {args}", file=sys.stderr)
serve(args.addr)

View File

@@ -1,524 +0,0 @@
# https://huggingface.co/hexgrad/Kokoro-82M/blob/main/istftnet.py
# https://github.com/yl4579/StyleTTS2/blob/main/Modules/istftnet.py
from scipy.signal import get_window
from torch.nn import Conv1d, ConvTranspose1d
from torch.nn.utils import weight_norm, remove_weight_norm
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# https://github.com/yl4579/StyleTTS2/blob/main/Modules/utils.py
def init_weights(m, mean=0.0, std=0.01):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(mean, std)
def get_padding(kernel_size, dilation=1):
return int((kernel_size*dilation - dilation)/2)
LRELU_SLOPE = 0.1
class AdaIN1d(nn.Module):
def __init__(self, style_dim, num_features):
super().__init__()
self.norm = nn.InstanceNorm1d(num_features, affine=False)
self.fc = nn.Linear(style_dim, num_features*2)
def forward(self, x, s):
h = self.fc(s)
h = h.view(h.size(0), h.size(1), 1)
gamma, beta = torch.chunk(h, chunks=2, dim=1)
return (1 + gamma) * self.norm(x) + beta
class AdaINResBlock1(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5), style_dim=64):
super(AdaINResBlock1, self).__init__()
self.convs1 = nn.ModuleList([
weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]))),
weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]))),
weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=dilation[2],
padding=get_padding(kernel_size, dilation[2])))
])
self.convs1.apply(init_weights)
self.convs2 = nn.ModuleList([
weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=1,
padding=get_padding(kernel_size, 1))),
weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=1,
padding=get_padding(kernel_size, 1))),
weight_norm(Conv1d(channels, channels, kernel_size, 1, dilation=1,
padding=get_padding(kernel_size, 1)))
])
self.convs2.apply(init_weights)
self.adain1 = nn.ModuleList([
AdaIN1d(style_dim, channels),
AdaIN1d(style_dim, channels),
AdaIN1d(style_dim, channels),
])
self.adain2 = nn.ModuleList([
AdaIN1d(style_dim, channels),
AdaIN1d(style_dim, channels),
AdaIN1d(style_dim, channels),
])
self.alpha1 = nn.ParameterList([nn.Parameter(torch.ones(1, channels, 1)) for i in range(len(self.convs1))])
self.alpha2 = nn.ParameterList([nn.Parameter(torch.ones(1, channels, 1)) for i in range(len(self.convs2))])
def forward(self, x, s):
for c1, c2, n1, n2, a1, a2 in zip(self.convs1, self.convs2, self.adain1, self.adain2, self.alpha1, self.alpha2):
xt = n1(x, s)
xt = xt + (1 / a1) * (torch.sin(a1 * xt) ** 2) # Snake1D
xt = c1(xt)
xt = n2(xt, s)
xt = xt + (1 / a2) * (torch.sin(a2 * xt) ** 2) # Snake1D
xt = c2(xt)
x = xt + x
return x
def remove_weight_norm(self):
for l in self.convs1:
remove_weight_norm(l)
for l in self.convs2:
remove_weight_norm(l)
class TorchSTFT(torch.nn.Module):
def __init__(self, filter_length=800, hop_length=200, win_length=800, window='hann'):
super().__init__()
self.filter_length = filter_length
self.hop_length = hop_length
self.win_length = win_length
self.window = torch.from_numpy(get_window(window, win_length, fftbins=True).astype(np.float32))
def transform(self, input_data):
forward_transform = torch.stft(
input_data,
self.filter_length, self.hop_length, self.win_length, window=self.window.to(input_data.device),
return_complex=True)
return torch.abs(forward_transform), torch.angle(forward_transform)
def inverse(self, magnitude, phase):
inverse_transform = torch.istft(
magnitude * torch.exp(phase * 1j),
self.filter_length, self.hop_length, self.win_length, window=self.window.to(magnitude.device))
return inverse_transform.unsqueeze(-2) # unsqueeze to stay consistent with conv_transpose1d implementation
def forward(self, input_data):
self.magnitude, self.phase = self.transform(input_data)
reconstruction = self.inverse(self.magnitude, self.phase)
return reconstruction
class SineGen(torch.nn.Module):
""" Definition of sine generator
SineGen(samp_rate, harmonic_num = 0,
sine_amp = 0.1, noise_std = 0.003,
voiced_threshold = 0,
flag_for_pulse=False)
samp_rate: sampling rate in Hz
harmonic_num: number of harmonic overtones (default 0)
sine_amp: amplitude of sine-wavefrom (default 0.1)
noise_std: std of Gaussian noise (default 0.003)
voiced_thoreshold: F0 threshold for U/V classification (default 0)
flag_for_pulse: this SinGen is used inside PulseGen (default False)
Note: when flag_for_pulse is True, the first time step of a voiced
segment is always sin(np.pi) or cos(0)
"""
def __init__(self, samp_rate, upsample_scale, harmonic_num=0,
sine_amp=0.1, noise_std=0.003,
voiced_threshold=0,
flag_for_pulse=False):
super(SineGen, self).__init__()
self.sine_amp = sine_amp
self.noise_std = noise_std
self.harmonic_num = harmonic_num
self.dim = self.harmonic_num + 1
self.sampling_rate = samp_rate
self.voiced_threshold = voiced_threshold
self.flag_for_pulse = flag_for_pulse
self.upsample_scale = upsample_scale
def _f02uv(self, f0):
# generate uv signal
uv = (f0 > self.voiced_threshold).type(torch.float32)
return uv
def _f02sine(self, f0_values):
""" f0_values: (batchsize, length, dim)
where dim indicates fundamental tone and overtones
"""
# convert to F0 in rad. The integer part n can be ignored
# because 2 * np.pi * n doesn't affect phase
rad_values = (f0_values / self.sampling_rate) % 1
# initial phase noise (no noise for fundamental component)
rand_ini = torch.rand(f0_values.shape[0], f0_values.shape[2], \
device=f0_values.device)
rand_ini[:, 0] = 0
rad_values[:, 0, :] = rad_values[:, 0, :] + rand_ini
# instantanouse phase sine[t] = sin(2*pi \sum_i=1 ^{t} rad)
if not self.flag_for_pulse:
# # for normal case
# # To prevent torch.cumsum numerical overflow,
# # it is necessary to add -1 whenever \sum_k=1^n rad_value_k > 1.
# # Buffer tmp_over_one_idx indicates the time step to add -1.
# # This will not change F0 of sine because (x-1) * 2*pi = x * 2*pi
# tmp_over_one = torch.cumsum(rad_values, 1) % 1
# tmp_over_one_idx = (padDiff(tmp_over_one)) < 0
# cumsum_shift = torch.zeros_like(rad_values)
# cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0
# phase = torch.cumsum(rad_values, dim=1) * 2 * np.pi
rad_values = torch.nn.functional.interpolate(rad_values.transpose(1, 2),
scale_factor=1/self.upsample_scale,
mode="linear").transpose(1, 2)
# tmp_over_one = torch.cumsum(rad_values, 1) % 1
# tmp_over_one_idx = (padDiff(tmp_over_one)) < 0
# cumsum_shift = torch.zeros_like(rad_values)
# cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0
phase = torch.cumsum(rad_values, dim=1) * 2 * np.pi
phase = torch.nn.functional.interpolate(phase.transpose(1, 2) * self.upsample_scale,
scale_factor=self.upsample_scale, mode="linear").transpose(1, 2)
sines = torch.sin(phase)
else:
# If necessary, make sure that the first time step of every
# voiced segments is sin(pi) or cos(0)
# This is used for pulse-train generation
# identify the last time step in unvoiced segments
uv = self._f02uv(f0_values)
uv_1 = torch.roll(uv, shifts=-1, dims=1)
uv_1[:, -1, :] = 1
u_loc = (uv < 1) * (uv_1 > 0)
# get the instantanouse phase
tmp_cumsum = torch.cumsum(rad_values, dim=1)
# different batch needs to be processed differently
for idx in range(f0_values.shape[0]):
temp_sum = tmp_cumsum[idx, u_loc[idx, :, 0], :]
temp_sum[1:, :] = temp_sum[1:, :] - temp_sum[0:-1, :]
# stores the accumulation of i.phase within
# each voiced segments
tmp_cumsum[idx, :, :] = 0
tmp_cumsum[idx, u_loc[idx, :, 0], :] = temp_sum
# rad_values - tmp_cumsum: remove the accumulation of i.phase
# within the previous voiced segment.
i_phase = torch.cumsum(rad_values - tmp_cumsum, dim=1)
# get the sines
sines = torch.cos(i_phase * 2 * np.pi)
return sines
def forward(self, f0):
""" sine_tensor, uv = forward(f0)
input F0: tensor(batchsize=1, length, dim=1)
f0 for unvoiced steps should be 0
output sine_tensor: tensor(batchsize=1, length, dim)
output uv: tensor(batchsize=1, length, 1)
"""
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim,
device=f0.device)
# fundamental component
fn = torch.multiply(f0, torch.FloatTensor([[range(1, self.harmonic_num + 2)]]).to(f0.device))
# generate sine waveforms
sine_waves = self._f02sine(fn) * self.sine_amp
# generate uv signal
# uv = torch.ones(f0.shape)
# uv = uv * (f0 > self.voiced_threshold)
uv = self._f02uv(f0)
# noise: for unvoiced should be similar to sine_amp
# std = self.sine_amp/3 -> max value ~ self.sine_amp
# . for voiced regions is self.noise_std
noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3
noise = noise_amp * torch.randn_like(sine_waves)
# first: set the unvoiced part to 0 by uv
# then: additive noise
sine_waves = sine_waves * uv + noise
return sine_waves, uv, noise
class SourceModuleHnNSF(torch.nn.Module):
""" SourceModule for hn-nsf
SourceModule(sampling_rate, harmonic_num=0, sine_amp=0.1,
add_noise_std=0.003, voiced_threshod=0)
sampling_rate: sampling_rate in Hz
harmonic_num: number of harmonic above F0 (default: 0)
sine_amp: amplitude of sine source signal (default: 0.1)
add_noise_std: std of additive Gaussian noise (default: 0.003)
note that amplitude of noise in unvoiced is decided
by sine_amp
voiced_threshold: threhold to set U/V given F0 (default: 0)
Sine_source, noise_source = SourceModuleHnNSF(F0_sampled)
F0_sampled (batchsize, length, 1)
Sine_source (batchsize, length, 1)
noise_source (batchsize, length 1)
uv (batchsize, length, 1)
"""
def __init__(self, sampling_rate, upsample_scale, harmonic_num=0, sine_amp=0.1,
add_noise_std=0.003, voiced_threshod=0):
super(SourceModuleHnNSF, self).__init__()
self.sine_amp = sine_amp
self.noise_std = add_noise_std
# to produce sine waveforms
self.l_sin_gen = SineGen(sampling_rate, upsample_scale, harmonic_num,
sine_amp, add_noise_std, voiced_threshod)
# to merge source harmonics into a single excitation
self.l_linear = torch.nn.Linear(harmonic_num + 1, 1)
self.l_tanh = torch.nn.Tanh()
def forward(self, x):
"""
Sine_source, noise_source = SourceModuleHnNSF(F0_sampled)
F0_sampled (batchsize, length, 1)
Sine_source (batchsize, length, 1)
noise_source (batchsize, length 1)
"""
# source for harmonic branch
with torch.no_grad():
sine_wavs, uv, _ = self.l_sin_gen(x)
sine_merge = self.l_tanh(self.l_linear(sine_wavs))
# source for noise branch, in the same shape as uv
noise = torch.randn_like(uv) * self.sine_amp / 3
return sine_merge, noise, uv
def padDiff(x):
return F.pad(F.pad(x, (0,0,-1,1), 'constant', 0) - x, (0,0,0,-1), 'constant', 0)
class Generator(torch.nn.Module):
def __init__(self, style_dim, resblock_kernel_sizes, upsample_rates, upsample_initial_channel, resblock_dilation_sizes, upsample_kernel_sizes, gen_istft_n_fft, gen_istft_hop_size):
super(Generator, self).__init__()
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
resblock = AdaINResBlock1
self.m_source = SourceModuleHnNSF(
sampling_rate=24000,
upsample_scale=np.prod(upsample_rates) * gen_istft_hop_size,
harmonic_num=8, voiced_threshod=10)
self.f0_upsamp = torch.nn.Upsample(scale_factor=np.prod(upsample_rates) * gen_istft_hop_size)
self.noise_convs = nn.ModuleList()
self.noise_res = nn.ModuleList()
self.ups = nn.ModuleList()
for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)):
self.ups.append(weight_norm(
ConvTranspose1d(upsample_initial_channel//(2**i), upsample_initial_channel//(2**(i+1)),
k, u, padding=(k-u)//2)))
self.resblocks = nn.ModuleList()
for i in range(len(self.ups)):
ch = upsample_initial_channel//(2**(i+1))
for j, (k, d) in enumerate(zip(resblock_kernel_sizes,resblock_dilation_sizes)):
self.resblocks.append(resblock(ch, k, d, style_dim))
c_cur = upsample_initial_channel // (2 ** (i + 1))
if i + 1 < len(upsample_rates): #
stride_f0 = np.prod(upsample_rates[i + 1:])
self.noise_convs.append(Conv1d(
gen_istft_n_fft + 2, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=(stride_f0+1) // 2))
self.noise_res.append(resblock(c_cur, 7, [1,3,5], style_dim))
else:
self.noise_convs.append(Conv1d(gen_istft_n_fft + 2, c_cur, kernel_size=1))
self.noise_res.append(resblock(c_cur, 11, [1,3,5], style_dim))
self.post_n_fft = gen_istft_n_fft
self.conv_post = weight_norm(Conv1d(ch, self.post_n_fft + 2, 7, 1, padding=3))
self.ups.apply(init_weights)
self.conv_post.apply(init_weights)
self.reflection_pad = torch.nn.ReflectionPad1d((1, 0))
self.stft = TorchSTFT(filter_length=gen_istft_n_fft, hop_length=gen_istft_hop_size, win_length=gen_istft_n_fft)
def forward(self, x, s, f0):
with torch.no_grad():
f0 = self.f0_upsamp(f0[:, None]).transpose(1, 2) # bs,n,t
har_source, noi_source, uv = self.m_source(f0)
har_source = har_source.transpose(1, 2).squeeze(1)
har_spec, har_phase = self.stft.transform(har_source)
har = torch.cat([har_spec, har_phase], dim=1)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x_source = self.noise_convs[i](har)
x_source = self.noise_res[i](x_source, s)
x = self.ups[i](x)
if i == self.num_upsamples - 1:
x = self.reflection_pad(x)
x = x + x_source
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i*self.num_kernels+j](x, s)
else:
xs += self.resblocks[i*self.num_kernels+j](x, s)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
spec = torch.exp(x[:,:self.post_n_fft // 2 + 1, :])
phase = torch.sin(x[:, self.post_n_fft // 2 + 1:, :])
return self.stft.inverse(spec, phase)
def fw_phase(self, x, s):
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i*self.num_kernels+j](x, s)
else:
xs += self.resblocks[i*self.num_kernels+j](x, s)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.reflection_pad(x)
x = self.conv_post(x)
spec = torch.exp(x[:,:self.post_n_fft // 2 + 1, :])
phase = torch.sin(x[:, self.post_n_fft // 2 + 1:, :])
return spec, phase
def remove_weight_norm(self):
print('Removing weight norm...')
for l in self.ups:
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
remove_weight_norm(self.conv_pre)
remove_weight_norm(self.conv_post)
class AdainResBlk1d(nn.Module):
def __init__(self, dim_in, dim_out, style_dim=64, actv=nn.LeakyReLU(0.2),
upsample='none', dropout_p=0.0):
super().__init__()
self.actv = actv
self.upsample_type = upsample
self.upsample = UpSample1d(upsample)
self.learned_sc = dim_in != dim_out
self._build_weights(dim_in, dim_out, style_dim)
self.dropout = nn.Dropout(dropout_p)
if upsample == 'none':
self.pool = nn.Identity()
else:
self.pool = weight_norm(nn.ConvTranspose1d(dim_in, dim_in, kernel_size=3, stride=2, groups=dim_in, padding=1, output_padding=1))
def _build_weights(self, dim_in, dim_out, style_dim):
self.conv1 = weight_norm(nn.Conv1d(dim_in, dim_out, 3, 1, 1))
self.conv2 = weight_norm(nn.Conv1d(dim_out, dim_out, 3, 1, 1))
self.norm1 = AdaIN1d(style_dim, dim_in)
self.norm2 = AdaIN1d(style_dim, dim_out)
if self.learned_sc:
self.conv1x1 = weight_norm(nn.Conv1d(dim_in, dim_out, 1, 1, 0, bias=False))
def _shortcut(self, x):
x = self.upsample(x)
if self.learned_sc:
x = self.conv1x1(x)
return x
def _residual(self, x, s):
x = self.norm1(x, s)
x = self.actv(x)
x = self.pool(x)
x = self.conv1(self.dropout(x))
x = self.norm2(x, s)
x = self.actv(x)
x = self.conv2(self.dropout(x))
return x
def forward(self, x, s):
out = self._residual(x, s)
out = (out + self._shortcut(x)) / np.sqrt(2)
return out
class UpSample1d(nn.Module):
def __init__(self, layer_type):
super().__init__()
self.layer_type = layer_type
def forward(self, x):
if self.layer_type == 'none':
return x
else:
return F.interpolate(x, scale_factor=2, mode='nearest')
class Decoder(nn.Module):
def __init__(self, dim_in=512, F0_channel=512, style_dim=64, dim_out=80,
resblock_kernel_sizes = [3,7,11],
upsample_rates = [10, 6],
upsample_initial_channel=512,
resblock_dilation_sizes=[[1,3,5], [1,3,5], [1,3,5]],
upsample_kernel_sizes=[20, 12],
gen_istft_n_fft=20, gen_istft_hop_size=5):
super().__init__()
self.decode = nn.ModuleList()
self.encode = AdainResBlk1d(dim_in + 2, 1024, style_dim)
self.decode.append(AdainResBlk1d(1024 + 2 + 64, 1024, style_dim))
self.decode.append(AdainResBlk1d(1024 + 2 + 64, 1024, style_dim))
self.decode.append(AdainResBlk1d(1024 + 2 + 64, 1024, style_dim))
self.decode.append(AdainResBlk1d(1024 + 2 + 64, 512, style_dim, upsample=True))
self.F0_conv = weight_norm(nn.Conv1d(1, 1, kernel_size=3, stride=2, groups=1, padding=1))
self.N_conv = weight_norm(nn.Conv1d(1, 1, kernel_size=3, stride=2, groups=1, padding=1))
self.asr_res = nn.Sequential(
weight_norm(nn.Conv1d(512, 64, kernel_size=1)),
)
self.generator = Generator(style_dim, resblock_kernel_sizes, upsample_rates,
upsample_initial_channel, resblock_dilation_sizes,
upsample_kernel_sizes, gen_istft_n_fft, gen_istft_hop_size)
def forward(self, asr, F0_curve, N, s):
F0 = self.F0_conv(F0_curve.unsqueeze(1))
N = self.N_conv(N.unsqueeze(1))
x = torch.cat([asr, F0, N], axis=1)
x = self.encode(x, s)
asr_res = self.asr_res(asr)
res = True
for block in self.decode:
if res:
x = torch.cat([x, asr_res, F0, N], axis=1)
x = block(x, s)
if block.upsample_type != "none":
res = False
x = self.generator(x, s, F0_curve)
return x

View File

@@ -1,166 +0,0 @@
# https://huggingface.co/hexgrad/Kokoro-82M/blob/main/kokoro.py
import phonemizer
import re
import torch
import numpy as np
def split_num(num):
num = num.group()
if '.' in num:
return num
elif ':' in num:
h, m = [int(n) for n in num.split(':')]
if m == 0:
return f"{h} o'clock"
elif m < 10:
return f'{h} oh {m}'
return f'{h} {m}'
year = int(num[:4])
if year < 1100 or year % 1000 < 10:
return num
left, right = num[:2], int(num[2:4])
s = 's' if num.endswith('s') else ''
if 100 <= year % 1000 <= 999:
if right == 0:
return f'{left} hundred{s}'
elif right < 10:
return f'{left} oh {right}{s}'
return f'{left} {right}{s}'
def flip_money(m):
m = m.group()
bill = 'dollar' if m[0] == '$' else 'pound'
if m[-1].isalpha():
return f'{m[1:]} {bill}s'
elif '.' not in m:
s = '' if m[1:] == '1' else 's'
return f'{m[1:]} {bill}{s}'
b, c = m[1:].split('.')
s = '' if b == '1' else 's'
c = int(c.ljust(2, '0'))
coins = f"cent{'' if c == 1 else 's'}" if m[0] == '$' else ('penny' if c == 1 else 'pence')
return f'{b} {bill}{s} and {c} {coins}'
def point_num(num):
a, b = num.group().split('.')
return ' point '.join([a, ' '.join(b)])
def normalize_text(text):
text = text.replace(chr(8216), "'").replace(chr(8217), "'")
text = text.replace('«', chr(8220)).replace('»', chr(8221))
text = text.replace(chr(8220), '"').replace(chr(8221), '"')
text = text.replace('(', '«').replace(')', '»')
for a, b in zip('、。!,:;?', ',.!,:;?'):
text = text.replace(a, b+' ')
text = re.sub(r'[^\S \n]', ' ', text)
text = re.sub(r' +', ' ', text)
text = re.sub(r'(?<=\n) +(?=\n)', '', text)
text = re.sub(r'\bD[Rr]\.(?= [A-Z])', 'Doctor', text)
text = re.sub(r'\b(?:Mr\.|MR\.(?= [A-Z]))', 'Mister', text)
text = re.sub(r'\b(?:Ms\.|MS\.(?= [A-Z]))', 'Miss', text)
text = re.sub(r'\b(?:Mrs\.|MRS\.(?= [A-Z]))', 'Mrs', text)
text = re.sub(r'\betc\.(?! [A-Z])', 'etc', text)
text = re.sub(r'(?i)\b(y)eah?\b', r"\1e'a", text)
text = re.sub(r'\d*\.\d+|\b\d{4}s?\b|(?<!:)\b(?:[1-9]|1[0-2]):[0-5]\d\b(?!:)', split_num, text)
text = re.sub(r'(?<=\d),(?=\d)', '', text)
text = re.sub(r'(?i)[$£]\d+(?:\.\d+)?(?: hundred| thousand| (?:[bm]|tr)illion)*\b|[$£]\d+\.\d\d?\b', flip_money, text)
text = re.sub(r'\d*\.\d+', point_num, text)
text = re.sub(r'(?<=\d)-(?=\d)', ' to ', text)
text = re.sub(r'(?<=\d)S', ' S', text)
text = re.sub(r"(?<=[BCDFGHJ-NP-TV-Z])'?s\b", "'S", text)
text = re.sub(r"(?<=X')S\b", 's', text)
text = re.sub(r'(?:[A-Za-z]\.){2,} [a-z]', lambda m: m.group().replace('.', '-'), text)
text = re.sub(r'(?i)(?<=[A-Z])\.(?=[A-Z])', '-', text)
return text.strip()
def get_vocab():
_pad = "$"
_punctuation = ';:,.!?¡¿—…"«»“” '
_letters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
_letters_ipa = "ɑɐɒæɓʙβɔɕçɗɖðʤəɘɚɛɜɝɞɟʄɡɠɢʛɦɧħɥʜɨɪʝɭɬɫɮʟɱɯɰŋɳɲɴøɵɸθœɶʘɹɺɾɻʀʁɽʂʃʈʧʉʊʋⱱʌɣɤʍχʎʏʑʐʒʔʡʕʢǀǁǂǃˈˌːˑʼʴʰʱʲʷˠˤ˞↓↑→↗↘'̩'"
symbols = [_pad] + list(_punctuation) + list(_letters) + list(_letters_ipa)
dicts = {}
for i in range(len((symbols))):
dicts[symbols[i]] = i
return dicts
VOCAB = get_vocab()
def tokenize(ps):
return [i for i in map(VOCAB.get, ps) if i is not None]
phonemizers = dict(
a=phonemizer.backend.EspeakBackend(language='en-us', preserve_punctuation=True, with_stress=True),
b=phonemizer.backend.EspeakBackend(language='en-gb', preserve_punctuation=True, with_stress=True),
)
def phonemize(text, lang, norm=True):
if norm:
text = normalize_text(text)
ps = phonemizers[lang].phonemize([text])
ps = ps[0] if ps else ''
# https://en.wiktionary.org/wiki/kokoro#English
ps = ps.replace('kəkˈoːɹoʊ', 'kˈoʊkəɹoʊ').replace('kəkˈɔːɹəʊ', 'kˈəʊkəɹəʊ')
ps = ps.replace('ʲ', 'j').replace('r', 'ɹ').replace('x', 'k').replace('ɬ', 'l')
ps = re.sub(r'(?<=[a-zɹː])(?=hˈʌndɹɪd)', ' ', ps)
ps = re.sub(r' z(?=[;:,.!?¡¿—…"«»“” ]|$)', 'z', ps)
if lang == 'a':
ps = re.sub(r'(?<=nˈaɪn)ti(?!ː)', 'di', ps)
ps = ''.join(filter(lambda p: p in VOCAB, ps))
return ps.strip()
def length_to_mask(lengths):
mask = torch.arange(lengths.max()).unsqueeze(0).expand(lengths.shape[0], -1).type_as(lengths)
mask = torch.gt(mask+1, lengths.unsqueeze(1))
return mask
@torch.no_grad()
def forward(model, tokens, ref_s, speed):
device = ref_s.device
tokens = torch.LongTensor([[0, *tokens, 0]]).to(device)
input_lengths = torch.LongTensor([tokens.shape[-1]]).to(device)
text_mask = length_to_mask(input_lengths).to(device)
bert_dur = model.bert(tokens, attention_mask=(~text_mask).int())
d_en = model.bert_encoder(bert_dur).transpose(-1, -2)
s = ref_s[:, 128:]
d = model.predictor.text_encoder(d_en, s, input_lengths, text_mask)
x, _ = model.predictor.lstm(d)
duration = model.predictor.duration_proj(x)
duration = torch.sigmoid(duration).sum(axis=-1) / speed
pred_dur = torch.round(duration).clamp(min=1).long()
pred_aln_trg = torch.zeros(input_lengths, pred_dur.sum().item())
c_frame = 0
for i in range(pred_aln_trg.size(0)):
pred_aln_trg[i, c_frame:c_frame + pred_dur[0,i].item()] = 1
c_frame += pred_dur[0,i].item()
en = d.transpose(-1, -2) @ pred_aln_trg.unsqueeze(0).to(device)
F0_pred, N_pred = model.predictor.F0Ntrain(en, s)
t_en = model.text_encoder(tokens, input_lengths, text_mask)
asr = t_en @ pred_aln_trg.unsqueeze(0).to(device)
return model.decoder(asr, F0_pred, N_pred, ref_s[:, :128]).squeeze().cpu().numpy()
def generate(model, text, voicepack, lang='a', speed=1, ps=None):
ps = ps or phonemize(text, lang)
tokens = tokenize(ps)
if not tokens:
return None
elif len(tokens) > 510:
tokens = tokens[:510]
print('Truncated to 510 tokens')
ref_s = voicepack[len(tokens)]
out = forward(model, tokens, ref_s, speed)
ps = ''.join(next(k for k, v in VOCAB.items() if i == v) for i in tokens)
return out, ps
def generate_full(model, text, voicepack, lang='a', speed=1, ps=None):
ps = ps or phonemize(text, lang)
tokens = tokenize(ps)
if not tokens:
return None
outs = []
loop_count = len(tokens)//510 + (1 if len(tokens) % 510 != 0 else 0)
for i in range(loop_count):
ref_s = voicepack[len(tokens[i*510:(i+1)*510])]
out = forward(model, tokens[i*510:(i+1)*510], ref_s, speed)
outs.append(out)
outs = np.concatenate(outs)
ps = ''.join(next(k for k, v in VOCAB.items() if i == v) for i in tokens)
return outs, ps

View File

@@ -1,373 +0,0 @@
# https://github.com/yl4579/StyleTTS2/blob/main/models.py
# https://huggingface.co/hexgrad/Kokoro-82M/blob/main/models.py
from istftnet import AdaIN1d, Decoder
from munch import Munch
from pathlib import Path
from plbert import load_plbert
from torch.nn.utils import weight_norm, spectral_norm
import json
import numpy as np
import os
import os.path as osp
import torch
import torch.nn as nn
import torch.nn.functional as F
class LinearNorm(torch.nn.Module):
def __init__(self, in_dim, out_dim, bias=True, w_init_gain='linear'):
super(LinearNorm, self).__init__()
self.linear_layer = torch.nn.Linear(in_dim, out_dim, bias=bias)
torch.nn.init.xavier_uniform_(
self.linear_layer.weight,
gain=torch.nn.init.calculate_gain(w_init_gain))
def forward(self, x):
return self.linear_layer(x)
class LayerNorm(nn.Module):
def __init__(self, channels, eps=1e-5):
super().__init__()
self.channels = channels
self.eps = eps
self.gamma = nn.Parameter(torch.ones(channels))
self.beta = nn.Parameter(torch.zeros(channels))
def forward(self, x):
x = x.transpose(1, -1)
x = F.layer_norm(x, (self.channels,), self.gamma, self.beta, self.eps)
return x.transpose(1, -1)
class TextEncoder(nn.Module):
def __init__(self, channels, kernel_size, depth, n_symbols, actv=nn.LeakyReLU(0.2)):
super().__init__()
self.embedding = nn.Embedding(n_symbols, channels)
padding = (kernel_size - 1) // 2
self.cnn = nn.ModuleList()
for _ in range(depth):
self.cnn.append(nn.Sequential(
weight_norm(nn.Conv1d(channels, channels, kernel_size=kernel_size, padding=padding)),
LayerNorm(channels),
actv,
nn.Dropout(0.2),
))
# self.cnn = nn.Sequential(*self.cnn)
self.lstm = nn.LSTM(channels, channels//2, 1, batch_first=True, bidirectional=True)
def forward(self, x, input_lengths, m):
x = self.embedding(x) # [B, T, emb]
x = x.transpose(1, 2) # [B, emb, T]
m = m.to(input_lengths.device).unsqueeze(1)
x.masked_fill_(m, 0.0)
for c in self.cnn:
x = c(x)
x.masked_fill_(m, 0.0)
x = x.transpose(1, 2) # [B, T, chn]
input_lengths = input_lengths.cpu().numpy()
x = nn.utils.rnn.pack_padded_sequence(
x, input_lengths, batch_first=True, enforce_sorted=False)
self.lstm.flatten_parameters()
x, _ = self.lstm(x)
x, _ = nn.utils.rnn.pad_packed_sequence(
x, batch_first=True)
x = x.transpose(-1, -2)
x_pad = torch.zeros([x.shape[0], x.shape[1], m.shape[-1]])
x_pad[:, :, :x.shape[-1]] = x
x = x_pad.to(x.device)
x.masked_fill_(m, 0.0)
return x
def inference(self, x):
x = self.embedding(x)
x = x.transpose(1, 2)
x = self.cnn(x)
x = x.transpose(1, 2)
self.lstm.flatten_parameters()
x, _ = self.lstm(x)
return x
def length_to_mask(self, lengths):
mask = torch.arange(lengths.max()).unsqueeze(0).expand(lengths.shape[0], -1).type_as(lengths)
mask = torch.gt(mask+1, lengths.unsqueeze(1))
return mask
class UpSample1d(nn.Module):
def __init__(self, layer_type):
super().__init__()
self.layer_type = layer_type
def forward(self, x):
if self.layer_type == 'none':
return x
else:
return F.interpolate(x, scale_factor=2, mode='nearest')
class AdainResBlk1d(nn.Module):
def __init__(self, dim_in, dim_out, style_dim=64, actv=nn.LeakyReLU(0.2),
upsample='none', dropout_p=0.0):
super().__init__()
self.actv = actv
self.upsample_type = upsample
self.upsample = UpSample1d(upsample)
self.learned_sc = dim_in != dim_out
self._build_weights(dim_in, dim_out, style_dim)
self.dropout = nn.Dropout(dropout_p)
if upsample == 'none':
self.pool = nn.Identity()
else:
self.pool = weight_norm(nn.ConvTranspose1d(dim_in, dim_in, kernel_size=3, stride=2, groups=dim_in, padding=1, output_padding=1))
def _build_weights(self, dim_in, dim_out, style_dim):
self.conv1 = weight_norm(nn.Conv1d(dim_in, dim_out, 3, 1, 1))
self.conv2 = weight_norm(nn.Conv1d(dim_out, dim_out, 3, 1, 1))
self.norm1 = AdaIN1d(style_dim, dim_in)
self.norm2 = AdaIN1d(style_dim, dim_out)
if self.learned_sc:
self.conv1x1 = weight_norm(nn.Conv1d(dim_in, dim_out, 1, 1, 0, bias=False))
def _shortcut(self, x):
x = self.upsample(x)
if self.learned_sc:
x = self.conv1x1(x)
return x
def _residual(self, x, s):
x = self.norm1(x, s)
x = self.actv(x)
x = self.pool(x)
x = self.conv1(self.dropout(x))
x = self.norm2(x, s)
x = self.actv(x)
x = self.conv2(self.dropout(x))
return x
def forward(self, x, s):
out = self._residual(x, s)
out = (out + self._shortcut(x)) / np.sqrt(2)
return out
class AdaLayerNorm(nn.Module):
def __init__(self, style_dim, channels, eps=1e-5):
super().__init__()
self.channels = channels
self.eps = eps
self.fc = nn.Linear(style_dim, channels*2)
def forward(self, x, s):
x = x.transpose(-1, -2)
x = x.transpose(1, -1)
h = self.fc(s)
h = h.view(h.size(0), h.size(1), 1)
gamma, beta = torch.chunk(h, chunks=2, dim=1)
gamma, beta = gamma.transpose(1, -1), beta.transpose(1, -1)
x = F.layer_norm(x, (self.channels,), eps=self.eps)
x = (1 + gamma) * x + beta
return x.transpose(1, -1).transpose(-1, -2)
class ProsodyPredictor(nn.Module):
def __init__(self, style_dim, d_hid, nlayers, max_dur=50, dropout=0.1):
super().__init__()
self.text_encoder = DurationEncoder(sty_dim=style_dim,
d_model=d_hid,
nlayers=nlayers,
dropout=dropout)
self.lstm = nn.LSTM(d_hid + style_dim, d_hid // 2, 1, batch_first=True, bidirectional=True)
self.duration_proj = LinearNorm(d_hid, max_dur)
self.shared = nn.LSTM(d_hid + style_dim, d_hid // 2, 1, batch_first=True, bidirectional=True)
self.F0 = nn.ModuleList()
self.F0.append(AdainResBlk1d(d_hid, d_hid, style_dim, dropout_p=dropout))
self.F0.append(AdainResBlk1d(d_hid, d_hid // 2, style_dim, upsample=True, dropout_p=dropout))
self.F0.append(AdainResBlk1d(d_hid // 2, d_hid // 2, style_dim, dropout_p=dropout))
self.N = nn.ModuleList()
self.N.append(AdainResBlk1d(d_hid, d_hid, style_dim, dropout_p=dropout))
self.N.append(AdainResBlk1d(d_hid, d_hid // 2, style_dim, upsample=True, dropout_p=dropout))
self.N.append(AdainResBlk1d(d_hid // 2, d_hid // 2, style_dim, dropout_p=dropout))
self.F0_proj = nn.Conv1d(d_hid // 2, 1, 1, 1, 0)
self.N_proj = nn.Conv1d(d_hid // 2, 1, 1, 1, 0)
def forward(self, texts, style, text_lengths, alignment, m):
d = self.text_encoder(texts, style, text_lengths, m)
batch_size = d.shape[0]
text_size = d.shape[1]
# predict duration
input_lengths = text_lengths.cpu().numpy()
x = nn.utils.rnn.pack_padded_sequence(
d, input_lengths, batch_first=True, enforce_sorted=False)
m = m.to(text_lengths.device).unsqueeze(1)
self.lstm.flatten_parameters()
x, _ = self.lstm(x)
x, _ = nn.utils.rnn.pad_packed_sequence(
x, batch_first=True)
x_pad = torch.zeros([x.shape[0], m.shape[-1], x.shape[-1]])
x_pad[:, :x.shape[1], :] = x
x = x_pad.to(x.device)
duration = self.duration_proj(nn.functional.dropout(x, 0.5, training=self.training))
en = (d.transpose(-1, -2) @ alignment)
return duration.squeeze(-1), en
def F0Ntrain(self, x, s):
x, _ = self.shared(x.transpose(-1, -2))
F0 = x.transpose(-1, -2)
for block in self.F0:
F0 = block(F0, s)
F0 = self.F0_proj(F0)
N = x.transpose(-1, -2)
for block in self.N:
N = block(N, s)
N = self.N_proj(N)
return F0.squeeze(1), N.squeeze(1)
def length_to_mask(self, lengths):
mask = torch.arange(lengths.max()).unsqueeze(0).expand(lengths.shape[0], -1).type_as(lengths)
mask = torch.gt(mask+1, lengths.unsqueeze(1))
return mask
class DurationEncoder(nn.Module):
def __init__(self, sty_dim, d_model, nlayers, dropout=0.1):
super().__init__()
self.lstms = nn.ModuleList()
for _ in range(nlayers):
self.lstms.append(nn.LSTM(d_model + sty_dim,
d_model // 2,
num_layers=1,
batch_first=True,
bidirectional=True,
dropout=dropout))
self.lstms.append(AdaLayerNorm(sty_dim, d_model))
self.dropout = dropout
self.d_model = d_model
self.sty_dim = sty_dim
def forward(self, x, style, text_lengths, m):
masks = m.to(text_lengths.device)
x = x.permute(2, 0, 1)
s = style.expand(x.shape[0], x.shape[1], -1)
x = torch.cat([x, s], axis=-1)
x.masked_fill_(masks.unsqueeze(-1).transpose(0, 1), 0.0)
x = x.transpose(0, 1)
input_lengths = text_lengths.cpu().numpy()
x = x.transpose(-1, -2)
for block in self.lstms:
if isinstance(block, AdaLayerNorm):
x = block(x.transpose(-1, -2), style).transpose(-1, -2)
x = torch.cat([x, s.permute(1, -1, 0)], axis=1)
x.masked_fill_(masks.unsqueeze(-1).transpose(-1, -2), 0.0)
else:
x = x.transpose(-1, -2)
x = nn.utils.rnn.pack_padded_sequence(
x, input_lengths, batch_first=True, enforce_sorted=False)
block.flatten_parameters()
x, _ = block(x)
x, _ = nn.utils.rnn.pad_packed_sequence(
x, batch_first=True)
x = F.dropout(x, p=self.dropout, training=self.training)
x = x.transpose(-1, -2)
x_pad = torch.zeros([x.shape[0], x.shape[1], m.shape[-1]])
x_pad[:, :, :x.shape[-1]] = x
x = x_pad.to(x.device)
return x.transpose(-1, -2)
def inference(self, x, style):
x = self.embedding(x.transpose(-1, -2)) * np.sqrt(self.d_model)
style = style.expand(x.shape[0], x.shape[1], -1)
x = torch.cat([x, style], axis=-1)
src = self.pos_encoder(x)
output = self.transformer_encoder(src).transpose(0, 1)
return output
def length_to_mask(self, lengths):
mask = torch.arange(lengths.max()).unsqueeze(0).expand(lengths.shape[0], -1).type_as(lengths)
mask = torch.gt(mask+1, lengths.unsqueeze(1))
return mask
# https://github.com/yl4579/StyleTTS2/blob/main/utils.py
def recursive_munch(d):
if isinstance(d, dict):
return Munch((k, recursive_munch(v)) for k, v in d.items())
elif isinstance(d, list):
return [recursive_munch(v) for v in d]
else:
return d
def build_model(path, device):
config = Path(__file__).parent / 'config.json'
assert config.exists(), f'Config path incorrect: config.json not found at {config}'
with open(config, 'r') as r:
args = recursive_munch(json.load(r))
assert args.decoder.type == 'istftnet', f'Unknown decoder type: {args.decoder.type}'
decoder = Decoder(dim_in=args.hidden_dim, style_dim=args.style_dim, dim_out=args.n_mels,
resblock_kernel_sizes = args.decoder.resblock_kernel_sizes,
upsample_rates = args.decoder.upsample_rates,
upsample_initial_channel=args.decoder.upsample_initial_channel,
resblock_dilation_sizes=args.decoder.resblock_dilation_sizes,
upsample_kernel_sizes=args.decoder.upsample_kernel_sizes,
gen_istft_n_fft=args.decoder.gen_istft_n_fft, gen_istft_hop_size=args.decoder.gen_istft_hop_size)
text_encoder = TextEncoder(channels=args.hidden_dim, kernel_size=5, depth=args.n_layer, n_symbols=args.n_token)
predictor = ProsodyPredictor(style_dim=args.style_dim, d_hid=args.hidden_dim, nlayers=args.n_layer, max_dur=args.max_dur, dropout=args.dropout)
bert = load_plbert()
bert_encoder = nn.Linear(bert.config.hidden_size, args.hidden_dim)
for parent in [bert, bert_encoder, predictor, decoder, text_encoder]:
for child in parent.children():
if isinstance(child, nn.RNNBase):
child.flatten_parameters()
model = Munch(
bert=bert.to(device).eval(),
bert_encoder=bert_encoder.to(device).eval(),
predictor=predictor.to(device).eval(),
decoder=decoder.to(device).eval(),
text_encoder=text_encoder.to(device).eval(),
)
for key, state_dict in torch.load(path, map_location='cpu', weights_only=True)['net'].items():
assert key in model, key
try:
model[key].load_state_dict(state_dict)
except:
state_dict = {k[7:]: v for k, v in state_dict.items()}
model[key].load_state_dict(state_dict, strict=False)
return model

View File

@@ -1,16 +0,0 @@
# https://huggingface.co/hexgrad/Kokoro-82M/blob/main/plbert.py
# https://github.com/yl4579/StyleTTS2/blob/main/Utils/PLBERT/util.py
from transformers import AlbertConfig, AlbertModel
class CustomAlbert(AlbertModel):
def forward(self, *args, **kwargs):
# Call the original forward method
outputs = super().forward(*args, **kwargs)
# Only return the last_hidden_state
return outputs.last_hidden_state
def load_plbert():
plbert_config = {'vocab_size': 178, 'hidden_size': 768, 'num_attention_heads': 12, 'intermediate_size': 2048, 'max_position_embeddings': 512, 'num_hidden_layers': 12, 'dropout': 0.1}
albert_base_configuration = AlbertConfig(**plbert_config)
bert = CustomAlbert(albert_base_configuration)
return bert

View File

@@ -1,11 +0,0 @@
#!/bin/bash
set -e
backend_dir=$(dirname $0)
if [ -d $backend_dir/common ]; then
source $backend_dir/common/libbackend.sh
else
source $backend_dir/../common/libbackend.sh
fi
python3 -m grpc_tools.protoc -I../.. -I./ --python_out=. --grpc_python_out=. backend.proto

View File

@@ -1,2 +1,6 @@
torch==2.4.1
transformers
--extra-index-url https://download.pytorch.org/whl/cpu
transformers
accelerate
torch
kokoro
soundfile

View File

@@ -1,3 +1,7 @@
--extra-index-url https://download.pytorch.org/whl/cu118
torch==2.4.1+cu118
transformers
torch==2.7.1+cu118
torchaudio==2.7.1+cu118
transformers
accelerate
kokoro
soundfile

View File

@@ -1,2 +1,6 @@
torch==2.4.1
transformers
torch==2.7.1
torchaudio==2.7.1
transformers
accelerate
kokoro
soundfile

View File

@@ -1,3 +1,7 @@
--extra-index-url https://download.pytorch.org/whl/rocm6.0
torch==2.4.1+rocm6.0
transformers
--extra-index-url https://download.pytorch.org/whl/rocm6.3
torch==2.7.1+rocm6.3
torchaudio==2.7.1+rocm6.3
transformers
accelerate
kokoro
soundfile

View File

@@ -1,5 +1,11 @@
--extra-index-url https://pytorch-extension.intel.com/release-whl/stable/xpu/us/
intel-extension-for-pytorch==2.3.110+xpu
torch==2.3.1+cxx11.abi
oneccl_bind_pt==2.3.100+xpu
transformers
intel-extension-for-pytorch==2.8.10+xpu
torch==2.5.1+cxx11.abi
oneccl_bind_pt==2.8.0+xpu
torchaudio==2.5.1+cxx11.abi
optimum[openvino]
setuptools
transformers==4.48.3
accelerate
kokoro
soundfile

View File

@@ -1,7 +1,6 @@
grpcio==1.71.0
protobuf
phonemizer
scipy
munch
setuptools
soundfile
certifi
packaging==24.1
pip
chardet

View File

@@ -0,0 +1,87 @@
"""
A test script to test the gRPC service
"""
import unittest
import subprocess
import time
import backend_pb2
import backend_pb2_grpc
import grpc
class TestBackendServicer(unittest.TestCase):
"""
TestBackendServicer is the class that tests the gRPC service
"""
def setUp(self):
"""
This method sets up the gRPC service by starting the server
"""
self.service = subprocess.Popen(["python3", "backend.py", "--addr", "localhost:50051"])
time.sleep(30)
def tearDown(self) -> None:
"""
This method tears down the gRPC service by terminating the server
"""
self.service.terminate()
self.service.wait()
def test_server_startup(self):
"""
This method tests if the server starts up successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.Health(backend_pb2.HealthMessage())
self.assertEqual(response.message, b'OK')
except Exception as err:
print(err)
self.fail("Server failed to start")
finally:
self.tearDown()
def test_load_model(self):
"""
This method tests if the Kokoro pipeline is loaded successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(language="a"))
print(response)
self.assertTrue(response.success)
self.assertEqual(response.message, "Kokoro TTS pipeline loaded successfully")
except Exception as err:
print(err)
self.fail("LoadModel service failed")
finally:
self.tearDown()
def test_tts(self):
"""
This method tests if the TTS generation works successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(language="a"))
self.assertTrue(response.success)
tts_request = backend_pb2.TTSRequest(
text="Kokoro is an open-weight TTS model with 82 million parameters.",
voice="af_heart",
dst="test_output.wav"
)
tts_response = stub.TTS(tts_request)
self.assertIsNotNone(tts_response)
self.assertTrue(tts_response.success)
except Exception as err:
print(err)
self.fail("TTS service failed")
finally:
self.tearDown()