Denoiser
This guide demonstrates how to use Denoiser with Phonexia Speech Platform 4. You can find a high-level description in the About Denoiser article.
Denoiser filters out various noises from audio to enhance human speech intelligibility. It is specifically designed to improve understanding for human listeners and does not enhance the performance of other speech technologies.
In the guide, we'll be using the following audio files. You can download them all together in the audio_files.zip archive:
filename |
---|
Carl_factory.wav |
Dina_pink_noise.wav |
Luka_train.wav |
At the end of this guide, you'll find the full Python code example that combines all the steps that will first be discussed separately. This guide should give you a comprehensive understanding of how to integrate Denoiser into your own projects.
Prerequisites
In the guide, we assume that the Virtual Appliance is running on port 8000
of
http://localhost
and contains proper model and license for the technology. For
more information on how to install and start the Virtual Appliance, please refer
to the Virtual Appliance Installation chapter.
Environment Setup
We are using Python 3.9
and Python library requests 2.27
in this example.
You can install the requests
library with pip
as follows:
pip install requests~=2.27
Basic Denoiser usage
To run Denoiser for a single media file, you should start by sending a POST
request to the /api/technology/denoiser endpoint. Denoiser
supports multi-channel audio files by denoising each channel separately, then
combining them back into a single multi-channel output file.
Currently, only WAV
output format is supported.
In Python, you can do this as follows:
import requests
SPEECH_PLATFORM_SERVER = "http://localhost:8000" # Replace with your actual server URL
ENDPOINT_URL = f"{SPEECH_PLATFORM_SERVER}/api/technology/denoiser"
audio_path = "Carl_factory.wav"
with open(audio_path, mode="rb") as file:
files = {"file": file}
response = requests.post(
url=ENDPOINT_URL,
files=files,
)
print(response.status_code) # Should print '202'
If the task has been successfully accepted, the 202
code will be returned
together with a unique task ID
in the response body. The task isn't processed
immediately, but only scheduled for processing. You can check the current task
status by polling for the result.
The URL for polling the result is returned in the Location
header.
Alternatively, you can assemble the polling URL on your own by appending a slash
(/
) and the task ID
to the endpoint URL.
import json
import requests
import time
# Use the `response` from the previous step
polling_url = response.headers["Location"]
# Alternatively:
# polling_url = ENDPOINT_URL + "/" + response.json()["task"]["task_id"]
while True:
response = requests.get(polling_url)
data = response.json()
task_status = data["task"]["state"]
if task_status in {"done", "failed", "rejected"}:
break
time.sleep(5)
print(json.dumps(data, indent=2))
Once the polling finishes, data
will contain the latest response from the
server – either the result of denoising, or an error message with details, in
case processing was not able to finish properly. The result should look as
follows:
{
"task": {
"task_id": "8215c198-5876-4526-a4aa-b2558a8bd1cd",
"state": "done"
},
"result": {
"file_url": "http://localhost:8000/api/files/33d12bf5-c2ea-464d-b44f-27a138a49be7"
}
}
The Denoiser result contains a file_url
for downloading the denoised file. To
save the output file locally, you need to specify a filename with its extension
so it appears correctly in your file system. You can do so as follows:
import requests
response = requests.get(data["result"]["file_url"])
filename = "denoised_audio.wav"
with open(filename, "wb") as f:
f.write(response.content)
Tasks and their results automatically expire and are deleted after a configurable time. Since the Denoiser output is a file, it consumes storage space for the output files. To avoid potential processing disruptions for subsequent Denoiser tasks, we recommend deleting completed tasks once you've successfully downloaded the denoised file.
task_id = data["task"]["task_id"]
response = requests.delete(f"{SPEECH_PLATFORM_SERVER}/api/tasks/{task_id}")
# Should print '204' (after successful deletion) or '404' (task not found, has
# probably already expired)
print(response.status_code)
Full Python Code
Here is the full example on how to run the Denoiser technology with media files.
The code is slightly adjusted and wrapped into functions. The denoised files are
saved with a denoised_
prefix. You can modify the filename logic to suit your
requirements.
import requests
import time
SPEECH_PLATFORM_SERVER = "http://localhost:8000" # Replace with your actual server URL
ENDPOINT_URL = f"{SPEECH_PLATFORM_SERVER}/api/technology/denoiser"
def poll_result(polling_url: str, sleep: int = 5):
while True:
response = requests.get(polling_url)
response.raise_for_status()
data = response.json()
task_status = data["task"]["state"]
if task_status in {"done", "failed", "rejected"}:
break
time.sleep(sleep)
return response
def run_denoiser(audio_path: str):
with open(audio_path, mode="rb") as file:
files = {"file": file}
response = requests.post(
url=ENDPOINT_URL,
files=files,
)
response.raise_for_status()
polling_url = response.headers["Location"]
response_result = poll_result(polling_url)
return response_result.json()
def download_and_save_file(url: str, filename: str):
file = requests.get(url)
with open(filename, "wb") as f:
f.write(file.content)
return
def delete_finished_task(task_id: str):
response = requests.delete(f"{SPEECH_PLATFORM_SERVER}/api/tasks/{task_id}")
return response.status_code
filenames = ["Carl_factory.wav", "Dina_pink_noise.wav", "Luka_train.wav"]
for filename in filenames:
print(f"Running Denoiser for file {filename}.")
data = run_denoiser(filename)
denoised_filename = f"denoised_{filename}"
download_and_save_file(data["result"]["file_url"], denoised_filename)
print(f"Denoising of {filename} finished. Denoised file saved as {denoised_filename}.")
deletion_status = delete_finished_task(data["task"]["task_id"])
if deletion_status == 204:
print(f"Task {data['task']['task_id']} has been deleted.")
else:
print(f"Failed to delete task {data['task']['task_id']}. Task may have expired already.")