Skip to main content

Speaker Search

This guide demonstrates how to perform Speaker Search with Phonexia Speech Platform 4 Virtual Appliance with Voiceprint Extraction and Voiceprint Comparison.

The objective of Speaker Search use-case is finding known speaker(s) inside a large database of speakers, and essentially answer the question: where are these known speakers speaking?

Let's say we already have a media file with the known voice of John Doe. We also have a database of speakers, and we would like to discover which media files from the database likely contain John Doe's voice.

The flow of Speaker Search consists of Voiceprint Extraction and N to M Voiceprint Comparison, where M is the size of the speaker database. Both Voiceprint Extraction and Comparison are explained in the following sections.

At the end of this guide, you'll find the full Python code example that combines all the steps that will first be discussed separately. For simplicity, our N will be 1 in this example, but you can use an analogous approach to search for multiple known speakers in the speaker database.

For testing, we'll be using the following media files. You can download them all together in the recordings.zip archive, containing a mono-channel media file of John Doe (john_doe.wav) speaking and a speaker database consisting of 8 mono-channel media files (from unknown_01.wav to unknown_08.wav), therefore in this case, M = 8.

Prerequisites

Follow the prerequisites for setup of Virtual Appliance and Python environment as described in the Task lifecycle code examples.

Run Voiceprint Extraction

To run Voiceprint Extraction for a single media file, you should start by sending a POST request to the /api/technology/speaker-identification-voiceprint-extraction endpoint. file is the only mandatory parameter. In Python, you can do this as follows:

import requests

VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-extraction"

media_file = "john_doe.wav"

with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
)
print(start_task_response.status_code) # Should print '202'

If the task has been successfully accepted, the 202 code will be returned together with a unique task ID in the response body. The task isn't processed immediately, but only scheduled for processing. You can check the current task status by polling for the result.

Polling

To obtain the final result, periodically query the task status until the task state changes to done, failed or rejected. The general polling procedure is described in detail in the Task lifecycle code examples.

Result for Voiceprint Extraction

The result field of the task contains channels list of independent results for each channel, identified by its channel_number. Each channel contains:

  • voiceprint: A Base64-encoded string of the extracted voiceprint.
  • speech_length: Length of the speech in seconds used for extraction.
  • model: A string representing the model used for extraction.

Example task result of a successful Voiceprint Extraction from a stereo file (shortened for readability):

{
"task": {
"task_id": "fb9de4e5-a768-4069-aff3-c74c826f3ddf",
"state": "done"
},
"result": {
"channels": [
{
"channel_number": 0,
"voiceprint": "e2kDY3JjbDAWiyhpCWVtYmVkZGluZ1tkO/QWvmS8JkuGZDyv+F5kvJQzJ...",
"speech_length": 49.08,
"model": "sid-xl5"
},
{
"channel_number": 1,
"voiceprint": "e2kDY3JjbFL6NSxpCWVtYmVkZGluZ1tkO2ygcGS8CduAZDxa6ZBkPHrYf...",
"speech_length": 116.35,
"model": "sid-xl5"
}
]
}
}

Let's get back to our example with the mono-channel media files. In our case, the target voiceprint can be accessed as follows (from the polling step):

known_audio_voiceprint = polling_task_response_json["result"]["channels"][0]["voiceprint"]

Now, you can repeat the same process for the entire database and collect the extracted voiceprints in a list, so it's ready for the comparison. Let's name the list unknown_audios_voiceprints. The easiest way to do so is to repeat the same steps for each unknown audio file in a for loop and append each extracted voiceprint to the unknown_audios_voiceprints list. Note that this list should have the length of M. Please refer to the full Python code to see how it's done.

Run Voiceprint Comparison

When all the voiceprints are extracted, we can move on to the actual comparison. Two non-empty lists of voiceprints are the mandatory fields in the request body. Each voiceprint is expected to be a Base64-encoded string, but you don't have to worry about it — the voiceprints are already returned in this format from the Voiceprint Extraction.

Doing the Voiceprint Comparison is analogous to Voiceprint Extraction -- we start by requesting the voiceprint comparison task to be scheduled for our two voiceprint lists. Notice that known_audio_voiceprint is just a string, so it must be placed inside a one-element list in the request body. On the other hand, unknown_audios_voiceprints already is a list, and therefore can be used directly.

import requests

VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
VOICEPRINT_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-comparison"

body = {
"voiceprints_a": [known_audio_voiceprint],
"voiceprints_b": unknown_audios_voiceprints,
}

start_task_response = requests.post(
url=VOICEPRINT_BASED_ENDPOINT_URL,
json=body,
)
print(start_task_response.status_code) # Should print '202'

Follow the polling process as described in Task lifecycle code examples to get the result of the Voiceprint Comparison.

Result for Voiceprint Comparison

The result field is a row-major flat list of comparison scores, representing a matrix with shape based on the sizes of both input voiceprint lists. In case of our current Speaker Search, the resulting list has a length of M, representing a matrix with shape of 1xM, where M is the size of the speaker database.

For our sample voiceprints, the task result should look as follows:

{
"task": {
"task_id": "4178f672-20b4-4f79-b7eb-a871bbae4456",
"state": "done"
},
"result": {
"scores": {
"rows_count": 1,
"columns_count": 8,
"values": [
-4.726645469665527, 9.340583801269531, 6.426426887512207,
-5.342464447021484, 4.384160041809082, 7.261765480041504,
-5.660372257232666, 4.433615684509277
]
}
}
}

Let's map the scores to their corresponding media files for better readability:

media_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for media_file, score in media_files_with_scores:
print(f"{media_file}\t{score}")

It should result in the following output:

unknown_01.wav	-4.726645469665527
unknown_02.wav 9.340583801269531
unknown_03.wav 6.426426887512207
unknown_04.wav -5.342464447021484
unknown_05.wav 4.384160041809082
unknown_06.wav 7.261765480041504
unknown_07.wav -5.660372257232666
unknown_08.wav -4.433615684509277

This result shows that John Doe is very likely speaking in the following files: unknown_02.wav, unknown_03.wav, unknown_05.wav, unknown_06.wav, but not in the rest of the speaker database.

For more details on how scoring is handled, refer to the Scoring and conversion to percentage section in our Speaker Identification technology guide.

Full Python code

Here is the full example on how to perform Speaker Search. The code is slightly adjusted and wrapped into functions for better readability. Refer to the Task lifecycle code examples for a generic code template, applicable to all technologies.

import json
import requests
import time

VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address

MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-extraction"
VOICEPRINT_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-comparison"


def poll_result(polling_url, polling_interval=5):
"""Poll the task endpoint until processing completes."""
while True:
polling_task_response = requests.get(polling_url)
polling_task_response.raise_for_status()
polling_task_response_json = polling_task_response.json()
task_state = polling_task_response_json["task"]["state"]
if task_state in {"done", "failed", "rejected"}:
break
time.sleep(polling_interval)
return polling_task_response


def run_media_based_task(media_file, params=None, config=None):
"""Create a media-based task and wait for results."""
if params is None:
params = {}
if config is None:
config = {}

with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
params=params,
data={"config": json.dumps(config)},
)
start_task_response.raise_for_status()
polling_url = start_task_response.headers["Location"]
task_result = poll_result(polling_url)
return task_result.json()


def run_voiceprint_based_task(json_payload):
"""Create a voiceprint-based task and wait for results."""
start_task_response = requests.post(
url=VOICEPRINT_BASED_ENDPOINT_URL,
json=json_payload,
)
start_task_response.raise_for_status()
polling_url = start_task_response.headers["Location"]
task_result = poll_result(polling_url)
return task_result.json()


known_audio = "john_doe.wav"
unknown_audios = [
"unknown_01.wav",
"unknown_02.wav",
"unknown_03.wav",
"unknown_04.wav",
"unknown_05.wav",
"unknown_06.wav",
"unknown_07.wav",
"unknown_08.wav",
]
unknown_audios_voiceprints = []

# Extract voiceprint for the known audio
known_audio_response = run_media_based_task(known_audio)
known_audio_voiceprint = known_audio_response["result"]["channels"][0]["voiceprint"]

# Extract voiceprints for the uknown audios
for unknown_audio in unknown_audios:
response = run_media_based_task(unknown_audio)
voiceprint = response["result"]["channels"][0]["voiceprint"]
unknown_audios_voiceprints.append(voiceprint)

# Run Voiceprint Comparison for the extracted voiceprints
voiceprint_comparison_response = run_voiceprint_based_task(
json_payload={
"voiceprints_a": [known_audio_voiceprint],
"voiceprints_b": unknown_audios_voiceprints,
}
)

# Map the scores to their corresponding media files
media_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for media_file, score in media_files_with_scores:
print(f"{media_file}\t{score}")