Speaker Search
This guide demonstrates how to perform Speaker Search with Phonexia Speech Platform 4 Virtual Appliance with Voiceprint Extraction and Voiceprint Comparison.
The objective of Speaker Search use-case is finding known speaker(s) inside a large database of speakers, and essentially answer the question: where are these known speakers speaking?
Let's say we already have a media file with the known voice of John Doe. We also have a database of speakers, and we would like to discover which media files from the database likely contain John Doe's voice.
The flow of Speaker Search consists of Voiceprint Extraction and N to M
Voiceprint Comparison, where M is the size of the speaker database. Both
Voiceprint Extraction and Comparison are explained in the following sections.
At the end of this guide, you'll find the full Python code
example that combines all the steps that will first be discussed separately. For
simplicity, our N will be 1 in this example, but you can use an analogous
approach to search for multiple known speakers in the speaker database.
For testing, we'll be using the following media files.
You can download them all together in the recordings.zip
archive, containing a mono-channel media file of John Doe (john_doe.wav)
speaking and a speaker database consisting of 8 mono-channel media files (from
unknown_01.wav to unknown_08.wav), therefore in this case, M = 8.
Prerequisites
Follow the prerequisites for setup of Virtual Appliance and Python environment as described in the Task lifecycle code examples.
Run Voiceprint Extraction
To run Voiceprint Extraction for a single media file, you should start by
sending a POST request to the
/api/technology/speaker-identification-voiceprint-extraction
endpoint. file is the only mandatory parameter. In Python, you can do this as
follows:
import requests
VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-extraction"
media_file = "john_doe.wav"
with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
)
print(start_task_response.status_code) # Should print '202'
If the task has been successfully accepted, the 202 code will be returned
together with a unique task ID in the response body. The task isn't processed
immediately, but only scheduled for processing. You can check the current task
status by polling for the result.
Polling
To obtain the final result, periodically query the task status until the task
state changes to done, failed or rejected. The general polling procedure
is described in detail in the
Task lifecycle code examples.
Result for Voiceprint Extraction
The result field of the task contains channels list of independent results
for each channel, identified by its channel_number. Each channel contains:
voiceprint: A Base64-encoded string of the extracted voiceprint.speech_length: Length of the speech in seconds used for extraction.model: A string representing the model used for extraction.
Example task result of a successful Voiceprint Extraction from a stereo file (shortened for readability):
{
"task": {
"task_id": "fb9de4e5-a768-4069-aff3-c74c826f3ddf",
"state": "done"
},
"result": {
"channels": [
{
"channel_number": 0,
"voiceprint": "e2kDY3JjbDAWiyhpCWVtYmVkZGluZ1tkO/QWvmS8JkuGZDyv+F5kvJQzJ...",
"speech_length": 49.08,
"model": "sid-xl5"
},
{
"channel_number": 1,
"voiceprint": "e2kDY3JjbFL6NSxpCWVtYmVkZGluZ1tkO2ygcGS8CduAZDxa6ZBkPHrYf...",
"speech_length": 116.35,
"model": "sid-xl5"
}
]
}
}
Let's get back to our example with the mono-channel media files. In our case, the target voiceprint can be accessed as follows (from the polling step):
known_audio_voiceprint = polling_task_response_json["result"]["channels"][0]["voiceprint"]
Now, you can repeat the same process for the entire database and collect the
extracted voiceprints in a list, so it's ready for the comparison. Let's name
the list unknown_audios_voiceprints. The easiest way to do so is to repeat the
same steps for each unknown audio file in a for loop and append each extracted
voiceprint to the unknown_audios_voiceprints list. Note that this list should
have the length of M. Please refer to the
full Python code to see how it's done.
Run Voiceprint Comparison
When all the voiceprints are extracted, we can move on to the actual comparison. Two non-empty lists of voiceprints are the mandatory fields in the request body. Each voiceprint is expected to be a Base64-encoded string, but you don't have to worry about it — the voiceprints are already returned in this format from the Voiceprint Extraction.
Doing the Voiceprint Comparison is analogous to Voiceprint Extraction -- we
start by requesting the voiceprint comparison task to be scheduled for our two
voiceprint lists. Notice that known_audio_voiceprint is just a string, so it
must be placed inside a one-element list in the request body. On the other
hand, unknown_audios_voiceprints already is a list, and therefore can be
used directly.
import requests
VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
VOICEPRINT_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-comparison"
body = {
"voiceprints_a": [known_audio_voiceprint],
"voiceprints_b": unknown_audios_voiceprints,
}
start_task_response = requests.post(
url=VOICEPRINT_BASED_ENDPOINT_URL,
json=body,
)
print(start_task_response.status_code) # Should print '202'
Follow the polling process as described in Task lifecycle code examples to get the result of the Voiceprint Comparison.
Result for Voiceprint Comparison
The result field is a row-major flat list of comparison scores, representing a
matrix with shape based on the sizes of both input voiceprint lists. In case of
our current Speaker Search, the resulting list has a length of M, representing
a matrix with shape of 1xM, where M is the size of the speaker database.
For our sample voiceprints, the task result should look as follows:
{
"task": {
"task_id": "4178f672-20b4-4f79-b7eb-a871bbae4456",
"state": "done"
},
"result": {
"scores": {
"rows_count": 1,
"columns_count": 8,
"values": [
-4.726645469665527, 9.340583801269531, 6.426426887512207,
-5.342464447021484, 4.384160041809082, 7.261765480041504,
-5.660372257232666, 4.433615684509277
]
}
}
}
Let's map the scores to their corresponding media files for better readability:
media_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for media_file, score in media_files_with_scores:
print(f"{media_file}\t{score}")
It should result in the following output:
unknown_01.wav -4.726645469665527
unknown_02.wav 9.340583801269531
unknown_03.wav 6.426426887512207
unknown_04.wav -5.342464447021484
unknown_05.wav 4.384160041809082
unknown_06.wav 7.261765480041504
unknown_07.wav -5.660372257232666
unknown_08.wav -4.433615684509277
This result shows that John Doe is very likely speaking in the following files:
unknown_02.wav, unknown_03.wav, unknown_05.wav, unknown_06.wav, but not
in the rest of the speaker database.
For more details on how scoring is handled, refer to the Scoring and conversion to percentage section in our Speaker Identification technology guide.
Full Python code
Here is the full example on how to perform Speaker Search. The code is slightly adjusted and wrapped into functions for better readability. Refer to the Task lifecycle code examples for a generic code template, applicable to all technologies.
import json
import requests
import time
VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-extraction"
VOICEPRINT_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/speaker-identification-voiceprint-comparison"
def poll_result(polling_url, polling_interval=5):
"""Poll the task endpoint until processing completes."""
while True:
polling_task_response = requests.get(polling_url)
polling_task_response.raise_for_status()
polling_task_response_json = polling_task_response.json()
task_state = polling_task_response_json["task"]["state"]
if task_state in {"done", "failed", "rejected"}:
break
time.sleep(polling_interval)
return polling_task_response
def run_media_based_task(media_file, params=None, config=None):
"""Create a media-based task and wait for results."""
if params is None:
params = {}
if config is None:
config = {}
with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
params=params,
data={"config": json.dumps(config)},
)
start_task_response.raise_for_status()
polling_url = start_task_response.headers["Location"]
task_result = poll_result(polling_url)
return task_result.json()
def run_voiceprint_based_task(json_payload):
"""Create a voiceprint-based task and wait for results."""
start_task_response = requests.post(
url=VOICEPRINT_BASED_ENDPOINT_URL,
json=json_payload,
)
start_task_response.raise_for_status()
polling_url = start_task_response.headers["Location"]
task_result = poll_result(polling_url)
return task_result.json()
known_audio = "john_doe.wav"
unknown_audios = [
"unknown_01.wav",
"unknown_02.wav",
"unknown_03.wav",
"unknown_04.wav",
"unknown_05.wav",
"unknown_06.wav",
"unknown_07.wav",
"unknown_08.wav",
]
unknown_audios_voiceprints = []
# Extract voiceprint for the known audio
known_audio_response = run_media_based_task(known_audio)
known_audio_voiceprint = known_audio_response["result"]["channels"][0]["voiceprint"]
# Extract voiceprints for the uknown audios
for unknown_audio in unknown_audios:
response = run_media_based_task(unknown_audio)
voiceprint = response["result"]["channels"][0]["voiceprint"]
unknown_audios_voiceprints.append(voiceprint)
# Run Voiceprint Comparison for the extracted voiceprints
voiceprint_comparison_response = run_voiceprint_based_task(
json_payload={
"voiceprints_a": [known_audio_voiceprint],
"voiceprints_b": unknown_audios_voiceprints,
}
)
# Map the scores to their corresponding media files
media_files_with_scores = zip(
unknown_audios, voiceprint_comparison_response["result"]["scores"]["values"]
)
for media_file, score in media_files_with_scores:
print(f"{media_file}\t{score}")