Voice Biometrics
Introduction
Voice biometrics is a technology that uses the unique characteristics of a person’s voice to identify or authenticate them.
Use cases
Authentication: Verifies if the speaker matches a specific enrolled identity.
Identification: Determines which enrolled user is speaking.
Providers
Feature | TSSV | IDVoice |
|---|---|---|
Accuracy & Performance | Faster, but less accurate | Slower, but more accurate |
Result Behavior | Returns results only if confidence ≥ threshold | Returns all results, regardless of confidence |
Language Dependency | Language-agnostic | Language-agnostic |
Enrollment Flow | Identical for both providers | Identical for both providers |
Supported Modes | Text-dependent and text-independent | Text-dependent and text-independent |
Different SDKs will give you different results, for example vsdk-idvoice reports varying results as it analyzes the audio, while vsdk-tssv only sends you result if the engine thinks it is acceptable (depending of the confidence level you set).
We recommend that you try it out the application in real situation to select your custom minimum score required to satisfy your need in false rejection and false acceptation. But by default you can just check if the score is above 0.
Audio Format
The input audio data for enrollment and recognition is a 16-bit signed PCM buffer in Little-Endian format. It is always mono (1 channel), and the sample rate 16KHz.
Examples
Enroll
This example demonstrates how to perform user enrollment with voice biometrics.
The VoiceBiometrics class is responsible for:
Configuring the enrollment parameters.
Streaming audio data to the server via WebSocket.
Sending user data along with the audio stream.
Handling server messages to determine whether the enrollment succeeded or failed.
import asyncio
import base64
import json
import os
import requests
import websockets
class VoiceBiometrics:
model_name = "Portal"
model_type = "text_independent"
username = "Mike"
raw_audio_file = r"C:\Users\vivoka\Music\record.raw"
enroll_uri = "http://localhost:39806/v1/voice-biometrics/enroll"
audio_data_header = "data:audio/pcm;base64,"
def __init__(self):
self.token = ""
self.success = False
async def enroll(self):
# Send HTTP POST request to enroll
request_data = {
"model": self.model_name,
"model_type": self.model_type,
"user": self.username
}
response = requests.post(self.enroll_uri, json=request_data)
response.raise_for_status()
json_response = response.json()
self.token = json_response["token"]
# Connect to WebSocket
web_socket_url = f"ws://localhost:39806/v1/ws/{self.token}"
async with websockets.connect(web_socket_url) as websocket:
sending = asyncio.create_task(self._stream_audio_file(websocket))
receiving = asyncio.create_task(self._on_msg_received(websocket))
await asyncio.gather(sending, receiving)
async def _on_msg_received(self, websocket):
try:
async for message in websocket:
message_json = json.loads(message)
if "event" in message_json:
event = message_json["event"]
if "progress" in event:
self.success = event["progress"] >= 100
print(f"Event received: {json.dumps(event)}")
elif "result" in message_json:
print(f"Result received: {json.dumps(message_json['result'])}")
elif "error" in message_json:
print(f"Error received: {json.dumps(message_json['error'])}")
else:
print("Unknown message type received.")
except json.JSONDecodeError as ex:
print(f"Failed to parse message: {ex}")
async def _stream_audio_file(self, websocket):
# Read audio data from a file and send it through websocket
fileSize = os.path.getsize(self.raw_audio_file)
with open(self.raw_audio_file, 'rb') as fs:
while True:
buffer = fs.read(1024)
if not buffer:
break
base64_data = base64.b64encode(buffer).decode('utf-8')
audio_chunk = {
"data": f"{self.audio_data_header}{base64_data}",
"last": fs.tell() == fileSize
}
await websocket.send(json.dumps(audio_chunk))
await asyncio.sleep(0.01) # To prevent overwhelming the server
if __name__ == "__main__":
biometrics = VoiceBiometrics()
asyncio.run(biometrics.enroll())
using System.Text.Json;
using System.Net.WebSockets;
using System.Text;
new VoiceBiometrics().Enroll().Wait();
class VoiceBiometrics
{
const string ModelName = "Portal";
const string ModelType = "text_independent";
const string UserName = "Mike";
const string RawAudioFile = @"C:\Users\vivoka\Music\record.raw";
const string EnrollUri = "http://localhost:39806/v1/voice-biometrics/enroll";
const string AudioDataHeader = "data:audio/pcm;base64,";
public string Token { get; private set; } = "";
public bool Success { get; private set; } = false;
public async Task Enroll()
{
using (var client = new HttpClient())
{
var request = new { model = ModelName, model_type = ModelType, user = UserName };
var content = new StringContent(JsonSerializer.Serialize(request), Encoding.UTF8, "application/json");
var response = await client.PostAsync(EnrollUri, content);
response.EnsureSuccessStatusCode();
var responseBody = await response.Content.ReadAsStringAsync();
var jsonResponse = JsonDocument.Parse(responseBody).RootElement;
Token = jsonResponse.GetProperty("token").ToString();
}
using (var webSocket = new ClientWebSocket())
{
await webSocket.ConnectAsync(new Uri($"ws://localhost:39806/v1/ws/{Token}"), CancellationToken.None);
var sending = Task.Run(() => StreamAudioFile(webSocket));
var receiving = Task.Run(() => ReceivePackets(webSocket));
await Task.WhenAll(sending, receiving);
}
}
private void OnMsgReceived(string message)
{
try
{
using (var document = JsonDocument.Parse(message))
{
var root = document.RootElement;
if (root.TryGetProperty("event", out JsonElement eventElement))
{
if (eventElement.TryGetProperty("progress", out JsonElement progressElement))
Success = progressElement.GetInt32() >= 100;
Console.WriteLine($"Event received: {JsonSerializer.Serialize(eventElement)}");
}
else if (root.TryGetProperty("result", out JsonElement resultElement))
Console.WriteLine($"Result received: {JsonSerializer.Serialize(resultElement)}");
else if (root.TryGetProperty("error", out JsonElement errorElement))
Console.WriteLine($"Error received: {JsonSerializer.Serialize(errorElement)}");
else
Console.WriteLine("Unknown message type received.");
}
}
catch (JsonException ex)
{
Console.WriteLine($"Failed to parse message: {ex.Message}");
}
}
private async Task StreamAudioFile(ClientWebSocket webSocket)
{
// Read audio data from a file and send it throw websocket
using (var fs = File.OpenRead(RawAudioFile))
{
int bytesRead;
var buffer = new byte[1024];
while ((bytesRead = await fs.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var base64 = Convert.ToBase64String(new ArraySegment<byte>(buffer, 0, bytesRead));
var audioChunk = new
{
data = $"{AudioDataHeader}{base64}",
last = fs.Position == fs.Length
};
var json = JsonSerializer.Serialize(audioChunk);
var bytes = Encoding.UTF8.GetBytes(json);
await webSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
}
private async Task ReceivePackets(ClientWebSocket webSocket)
{
var message = "";
var buffer = new byte[1024];
while (webSocket.State == WebSocketState.Open)
{
var packet = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (packet.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
break;
}
else
{
message = (message.Length > 0 ? message : "") + Encoding.UTF8.GetString(buffer, 0, packet.Count);
if (packet.EndOfMessage)
{
OnMsgReceived(message);
message = "";
}
}
}
}
}
Authentication
This example demonstrates how to perform voice biometrics authentication using a text-independent model.
The VoiceBiometrics class includes an authenticate method that:
Initializes the authentication process.
Establishes a WebSocket connection to the server.
Streams audio data in Base64 format.
Handles incoming server messages containing authentication results or error notifications.
import asyncio
import base64
import json
import os
import requests
import websockets
class VoiceBiometrics:
model_name = "Portal"
model_type = "text_independent"
username = "Mike"
raw_audio_file = r"C:\Users\vivoka\Music\record.raw"
authenticate_uri = "http://localhost:39806/v1/voice-biometrics/authenticate"
audio_data_header = "data:audio/pcm;base64,"
def __init__(self):
self.token = ""
self.success = False
async def authenticate(self):
# Send HTTP POST request to authenticate
request_data = {
"model": self.model_name,
"user": self.username
}
response = requests.post(self.authenticate_uri, json=request_data)
response.raise_for_status()
json_response = response.json()
self.token = json_response["token"]
# Connect to WebSocket
web_socket_url = f"ws://localhost:39806/v1/ws/{self.token}"
async with websockets.connect(web_socket_url) as websocket:
sending = asyncio.create_task(self._stream_audio_file(websocket))
receiving = asyncio.create_task(self._on_msg_received(websocket))
await asyncio.gather(sending, receiving)
async def _on_msg_received(self, websocket):
try:
async for message in websocket:
message_json = json.loads(message)
if "event" in message_json:
print(f"Event received: {json.dumps(message_json['event'])}")
elif "result" in message_json:
print(f"Result received: {json.dumps(message_json['result'])}")
elif "error" in message_json:
print(f"Error received: {json.dumps(message_json['error'])}")
else:
print("Unknown message type received.")
except json.JSONDecodeError as ex:
print(f"Failed to parse message: {ex}")
async def _stream_audio_file(self, websocket):
# Read audio data from a file and send it through websocket
fileSize = os.path.getsize(self.raw_audio_file)
with open(self.raw_audio_file, 'rb') as fs:
while True:
buffer = fs.read(1024)
if not buffer:
break
base64_data = base64.b64encode(buffer).decode('utf-8')
audio_chunk = {
"data": f"{self.audio_data_header}{base64_data}",
"last": fs.tell() == fileSize
}
await websocket.send(json.dumps(audio_chunk))
await asyncio.sleep(0.01) # To prevent overwhelming the server
if __name__ == "__main__":
biometrics = VoiceBiometrics()
asyncio.run(biometrics.authenticate())
using System.Text.Json;
using System.Net.WebSockets;
using System.Text;
new VoiceBiometrics().Authentificate().Wait();
class VoiceBiometrics
{
const string ModelName = "Portal";
const string UserName = "Mike";
const string RawAudioFile = @"C:\Users\vivoka\Music\record.raw";
const string AuthenticateUri = "http://localhost:39806/v1/voice-biometrics/authenticate";
const string AudioDataHeader = "data:audio/pcm;base64,";
public string Token { get; private set; } = "";
public async Task Authentificate()
{
using (var client = new HttpClient())
{
var request = new { model = ModelName, user = UserName };
var content = new StringContent(JsonSerializer.Serialize(request), Encoding.UTF8, "application/json");
var response = await client.PostAsync(AuthenticateUri, content);
response.EnsureSuccessStatusCode();
var responseBody = await response.Content.ReadAsStringAsync();
var jsonResponse = JsonDocument.Parse(responseBody).RootElement;
Token = jsonResponse.GetProperty("token").ToString();
}
using (var webSocket = new ClientWebSocket())
{
await webSocket.ConnectAsync(new Uri($"ws://localhost:39806/v1/ws/{Token}"), CancellationToken.None);
var sending = Task.Run(() => StreamAudioFile(webSocket));
var receiving = Task.Run(() => ReceivePackets(webSocket));
await Task.WhenAll(sending, receiving);
}
}
private void OnMsgReceived(string message)
{
try
{
using (var document = JsonDocument.Parse(message))
{
var root = document.RootElement;
if (root.TryGetProperty("event", out JsonElement eventElement))
Console.WriteLine($"Event received: {JsonSerializer.Serialize(eventElement)}");
else if (root.TryGetProperty("result", out JsonElement resultElement))
Console.WriteLine($"Result received: {JsonSerializer.Serialize(resultElement)}");
else if (root.TryGetProperty("error", out JsonElement errorElement))
Console.WriteLine($"Error received: {JsonSerializer.Serialize(errorElement)}");
else
Console.WriteLine("Unknown message type received.");
}
}
catch (JsonException ex)
{
Console.WriteLine($"Failed to parse message: {ex.Message}");
}
}
private async Task StreamAudioFile(ClientWebSocket webSocket)
{
// Read audio data from a file and send it throw websocket
using (var fs = File.OpenRead(RawAudioFile))
{
int bytesRead;
var buffer = new byte[1024];
while ((bytesRead = await fs.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var base64 = Convert.ToBase64String(new ArraySegment<byte>(buffer, 0, bytesRead));
var audioChunk = new
{
data = $"{AudioDataHeader}{base64}",
last = fs.Position == fs.Length
};
var json = JsonSerializer.Serialize(audioChunk);
var bytes = Encoding.UTF8.GetBytes(json);
if (webSocket.State == WebSocketState.Open)
await webSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None);
else
break;
}
}
}
private async Task ReceivePackets(ClientWebSocket webSocket)
{
var message = "";
var buffer = new byte[1024];
while (webSocket.State == WebSocketState.Open)
{
var packet = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (packet.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
break;
}
else
{
message = (message.Length > 0 ? message : "") + Encoding.UTF8.GetString(buffer, 0, packet.Count);
if (packet.EndOfMessage)
{
OnMsgReceived(message);
message = "";
}
}
}
}
}
Identification
This example demonstrates how to perform voice biometrics identification, enabling users to be identified from a raw audio file of their voice.
The VoiceBiometrics class is responsible for:
Managing user data and server connection details.
Requesting a token via an HTTP request to initiate identification.
Establishing a WebSocket connection with the server.
Streaming raw audio data for processing.
Handling server messages to retrieve identification results or errors.
import asyncio
import base64
import json
import os
import requests
import websockets
class VoiceBiometrics:
model_name = "Portal"
model_type = "text_independent"
username = "Mike"
raw_audio_file = r"C:\Users\vivoka\Music\record.raw"
identify_uri = "http://localhost:39806/v1/voice-biometrics/identify"
audio_data_header = "data:audio/pcm;base64,"
def __init__(self):
self.token = ""
self.success = False
async def identify(self):
# Send HTTP POST request to authenticate
request_data = { "model": self.model_name }
response = requests.post(self.identify_uri, json=request_data)
response.raise_for_status()
json_response = response.json()
self.token = json_response["token"]
# Connect to WebSocket
web_socket_url = f"ws://localhost:39806/v1/ws/{self.token}"
async with websockets.connect(web_socket_url) as websocket:
sending = asyncio.create_task(self._stream_audio_file(websocket))
receiving = asyncio.create_task(self._on_msg_received(websocket))
await asyncio.gather(sending, receiving)
async def _on_msg_received(self, websocket):
try:
async for message in websocket:
message_json = json.loads(message)
if "event" in message_json:
print(f"Event received: {json.dumps(message_json['event'])}")
elif "result" in message_json:
print(f"Result received: {json.dumps(message_json['result'])}")
elif "error" in message_json:
print(f"Error received: {json.dumps(message_json['error'])}")
else:
print("Unknown message type received.")
except json.JSONDecodeError as ex:
print(f"Failed to parse message: {ex}")
async def _stream_audio_file(self, websocket):
# Read audio data from a file and send it through websocket
fileSize = os.path.getsize(self.raw_audio_file)
with open(self.raw_audio_file, 'rb') as fs:
while True:
buffer = fs.read(1024)
if not buffer:
break
base64_data = base64.b64encode(buffer).decode('utf-8')
audio_chunk = {
"data": f"{self.audio_data_header}{base64_data}",
"last": fs.tell() == fileSize
}
await websocket.send(json.dumps(audio_chunk))
await asyncio.sleep(0.01) # To prevent overwhelming the server
if __name__ == "__main__":
biometrics = VoiceBiometrics()
asyncio.run(biometrics.identify())
using System.Text.Json;
using System.Net.WebSockets;
using System.Text;
new VoiceBiometrics().Identify().Wait();
class VoiceBiometrics
{
const string ModelName = "Portal";
const string RawAudioFile = @"C:\Users\vivoka\Music\record.raw";
const string IdentifyUri = "http://localhost:39806/v1/voice-biometrics/identify";
const string AudioDataHeader = "data:audio/pcm;base64,";
public string Token { get; private set; } = "";
public async Task Identify()
{
using (var client = new HttpClient())
{
var request = new { model = ModelName };
var content = new StringContent(JsonSerializer.Serialize(request), Encoding.UTF8, "application/json");
var response = await client.PostAsync(IdentifyUri, content);
response.EnsureSuccessStatusCode();
var responseBody = await response.Content.ReadAsStringAsync();
var jsonResponse = JsonDocument.Parse(responseBody).RootElement;
Token = jsonResponse.GetProperty("token").ToString();
}
using (var webSocket = new ClientWebSocket())
{
await webSocket.ConnectAsync(new Uri($"ws://localhost:39806/v1/ws/{Token}"), CancellationToken.None);
var sending = Task.Run(() => StreamAudioFile(webSocket));
var receiving = Task.Run(() => ReceivePackets(webSocket));
await Task.WhenAll(sending, receiving);
}
}
private void OnMsgReceived(string message)
{
try
{
using (var document = JsonDocument.Parse(message))
{
var root = document.RootElement;
if (root.TryGetProperty("event", out JsonElement eventElement))
Console.WriteLine($"Event received: {JsonSerializer.Serialize(eventElement)}");
else if (root.TryGetProperty("result", out JsonElement resultElement))
Console.WriteLine($"Result received: {JsonSerializer.Serialize(resultElement)}");
else if (root.TryGetProperty("error", out JsonElement errorElement))
Console.WriteLine($"Error received: {JsonSerializer.Serialize(errorElement)}");
else
Console.WriteLine("Unknown message type received.");
}
}
catch (JsonException ex)
{
Console.WriteLine($"Failed to parse message: {ex.Message}");
}
}
private async Task StreamAudioFile(ClientWebSocket webSocket)
{
// Read audio data from a file and send it throw websocket
using (var fs = File.OpenRead(RawAudioFile))
{
int bytesRead;
var buffer = new byte[1024];
while ((bytesRead = await fs.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
var base64 = Convert.ToBase64String(new ArraySegment<byte>(buffer, 0, bytesRead));
var audioChunk = new
{
data = $"{AudioDataHeader}{base64}",
last = fs.Position == fs.Length
};
var json = JsonSerializer.Serialize(audioChunk);
var bytes = Encoding.UTF8.GetBytes(json);
if (webSocket.State == WebSocketState.Open)
await webSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, CancellationToken.None);
else
break;
}
}
}
private async Task ReceivePackets(ClientWebSocket webSocket)
{
var message = "";
var buffer = new byte[1024];
while (webSocket.State == WebSocketState.Open)
{
var packet = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (packet.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
break;
}
else
{
message = (message.Length > 0 ? message : "") + Encoding.UTF8.GetString(buffer, 0, packet.Count);
if (packet.EndOfMessage)
{
OnMsgReceived(message);
message = "";
}
}
}
}
}