feat(backends): add moonshine backend for faster transcription (#7833)

* feat(backends): add moonshine backend for faster transcription

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Add backend to CI, update AGENTS.md from this exercise

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-01-07 21:44:35 +01:00
committed by GitHub
parent 0b26669d0b
commit b964b3d53e
13 changed files with 582 additions and 2 deletions

View File

@@ -0,0 +1,16 @@
.DEFAULT_GOAL := install
.PHONY: install
install:
bash install.sh
.PHONY: protogen-clean
protogen-clean:
$(RM) backend_pb2_grpc.py backend_pb2.py
.PHONY: clean
clean: protogen-clean
rm -rf venv __pycache__
test: install
bash test.sh

View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
This is an extra gRPC server of LocalAI for Moonshine transcription
"""
from concurrent import futures
import time
import argparse
import signal
import sys
import os
import backend_pb2
import backend_pb2_grpc
import moonshine_onnx
import grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
# If MAX_WORKERS are specified in the environment use it, otherwise default to 1
MAX_WORKERS = int(os.environ.get('PYTHON_GRPC_MAX_WORKERS', '1'))
# Implement the BackendServicer class with the service methods
class BackendServicer(backend_pb2_grpc.BackendServicer):
"""
BackendServicer is the class that implements the gRPC service
"""
def Health(self, request, context):
return backend_pb2.Reply(message=bytes("OK", 'utf-8'))
def LoadModel(self, request, context):
try:
print("Preparing models, please wait", file=sys.stderr)
# Store the model name for use in transcription
# Model name format: e.g., "moonshine/tiny"
self.model_name = request.Model
print(f"Model name set to: {self.model_name}", file=sys.stderr)
except Exception as err:
return backend_pb2.Result(success=False, message=f"Unexpected {err=}, {type(err)=}")
return backend_pb2.Result(message="Model loaded successfully", success=True)
def AudioTranscription(self, request, context):
resultSegments = []
text = ""
try:
# moonshine_onnx.transcribe returns a list of strings
transcriptions = moonshine_onnx.transcribe(request.dst, self.model_name)
# Combine all transcriptions into a single text
if isinstance(transcriptions, list):
text = " ".join(transcriptions)
# Create segments for each transcription in the list
for id, trans in enumerate(transcriptions):
# Since moonshine doesn't provide timing info, we'll create a single segment
# with id and text, using approximate timing
resultSegments.append(backend_pb2.TranscriptSegment(
id=id,
start=0,
end=0,
text=trans
))
else:
# Handle case where it's not a list (shouldn't happen, but be safe)
text = str(transcriptions)
resultSegments.append(backend_pb2.TranscriptSegment(
id=0,
start=0,
end=0,
text=text
))
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}", file=sys.stderr)
return backend_pb2.TranscriptResult(segments=[], text="")
return backend_pb2.TranscriptResult(segments=resultSegments, text=text)
def serve(address):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_WORKERS),
options=[
('grpc.max_message_length', 50 * 1024 * 1024), # 50MB
('grpc.max_send_message_length', 50 * 1024 * 1024), # 50MB
('grpc.max_receive_message_length', 50 * 1024 * 1024), # 50MB
])
backend_pb2_grpc.add_BackendServicer_to_server(BackendServicer(), server)
server.add_insecure_port(address)
server.start()
print("Server started. Listening on: " + address, file=sys.stderr)
# Define the signal handler function
def signal_handler(sig, frame):
print("Received termination signal. Shutting down...")
server.stop(0)
sys.exit(0)
# Set the signal handlers for SIGINT and SIGTERM
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the gRPC server.")
parser.add_argument(
"--addr", default="localhost:50051", help="The address to bind the server to."
)
args = parser.parse_args()
serve(args.addr)

View File

@@ -0,0 +1,12 @@
#!/bin/bash
set -e
backend_dir=$(dirname $0)
if [ -d $backend_dir/common ]; then
source $backend_dir/common/libbackend.sh
else
source $backend_dir/../common/libbackend.sh
fi
installRequirements

View File

@@ -0,0 +1,12 @@
#!/bin/bash
set -e
backend_dir=$(dirname $0)
if [ -d $backend_dir/common ]; then
source $backend_dir/common/libbackend.sh
else
source $backend_dir/../common/libbackend.sh
fi
python3 -m grpc_tools.protoc -I../.. -I./ --python_out=. --grpc_python_out=. backend.proto

View File

@@ -0,0 +1,4 @@
grpcio==1.71.0
protobuf
grpcio-tools
useful-moonshine-onnx@git+https://git@github.com/moonshine-ai/moonshine.git#subdirectory=moonshine-onnx

10
backend/python/moonshine/run.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash
backend_dir=$(dirname $0)
if [ -d $backend_dir/common ]; then
source $backend_dir/common/libbackend.sh
else
source $backend_dir/../common/libbackend.sh
fi
startBackend $@

View File

@@ -0,0 +1,139 @@
"""
A test script to test the gRPC service for Moonshine transcription
"""
import unittest
import subprocess
import time
import os
import tempfile
import shutil
import backend_pb2
import backend_pb2_grpc
import grpc
class TestBackendServicer(unittest.TestCase):
"""
TestBackendServicer is the class that tests the gRPC service
"""
def setUp(self):
"""
This method sets up the gRPC service by starting the server
"""
self.service = subprocess.Popen(["python3", "backend.py", "--addr", "localhost:50051"])
time.sleep(10)
def tearDown(self) -> None:
"""
This method tears down the gRPC service by terminating the server
"""
self.service.terminate()
self.service.wait()
def test_server_startup(self):
"""
This method tests if the server starts up successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.Health(backend_pb2.HealthMessage())
self.assertEqual(response.message, b'OK')
except Exception as err:
print(err)
self.fail("Server failed to start")
finally:
self.tearDown()
def test_load_model(self):
"""
This method tests if the model is loaded successfully
"""
try:
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
response = stub.LoadModel(backend_pb2.ModelOptions(Model="moonshine/tiny"))
self.assertTrue(response.success)
self.assertEqual(response.message, "Model loaded successfully")
except Exception as err:
print(err)
self.fail("LoadModel service failed")
finally:
self.tearDown()
def test_audio_transcription(self):
"""
This method tests if audio transcription works successfully
"""
# Create a temporary directory for the audio file
temp_dir = tempfile.mkdtemp()
audio_file = os.path.join(temp_dir, 'audio.wav')
try:
# Download the audio file to the temporary directory
print(f"Downloading audio file to {audio_file}...")
url = "https://cdn.openai.com/whisper/draft-20220913a/micro-machines.wav"
result = subprocess.run(
["wget", "-q", url, "-O", audio_file],
capture_output=True,
text=True
)
if result.returncode != 0:
self.fail(f"Failed to download audio file: {result.stderr}")
# Verify the file was downloaded
if not os.path.exists(audio_file):
self.fail(f"Audio file was not downloaded to {audio_file}")
self.setUp()
with grpc.insecure_channel("localhost:50051") as channel:
stub = backend_pb2_grpc.BackendStub(channel)
# Load the model first
load_response = stub.LoadModel(backend_pb2.ModelOptions(Model="moonshine/tiny"))
self.assertTrue(load_response.success)
# Perform transcription
transcript_request = backend_pb2.TranscriptRequest(dst=audio_file)
transcript_response = stub.AudioTranscription(transcript_request)
# Print the transcribed text for debugging
print(f"Transcribed text: {transcript_response.text}")
print(f"Number of segments: {len(transcript_response.segments)}")
# Verify response structure
self.assertIsNotNone(transcript_response)
self.assertIsNotNone(transcript_response.text)
# Protobuf repeated fields return a sequence, not a list
self.assertIsNotNone(transcript_response.segments)
# Check if segments is iterable (has length)
self.assertGreaterEqual(len(transcript_response.segments), 0)
# Verify the transcription contains the expected text
expected_text = "This is the micro machine man presenting the most midget miniature"
self.assertIn(
expected_text.lower(),
transcript_response.text.lower(),
f"Expected text '{expected_text}' not found in transcription: '{transcript_response.text}'"
)
# If we got segments, verify they have the expected structure
if len(transcript_response.segments) > 0:
segment = transcript_response.segments[0]
self.assertIsNotNone(segment.text)
self.assertIsInstance(segment.id, int)
else:
# Even if no segments, we should have text
self.assertIsNotNone(transcript_response.text)
self.assertGreater(len(transcript_response.text), 0)
except Exception as err:
print(err)
self.fail("AudioTranscription service failed")
finally:
self.tearDown()
# Clean up the temporary directory
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)

View File

@@ -0,0 +1,12 @@
#!/bin/bash
set -e
backend_dir=$(dirname $0)
if [ -d $backend_dir/common ]; then
source $backend_dir/common/libbackend.sh
else
source $backend_dir/../common/libbackend.sh
fi
runUnittests