Skip to main content

Denoiser

This guide demonstrates how to use Denoiser with Phonexia Speech Platform 4. You can find a high-level description in the About Denoiser article.

Denoiser filters out various noises from audio to enhance human speech intelligibility. It is specifically designed to improve understanding for human listeners and does not enhance the performance of other speech technologies.

In the guide, we'll be using the following audio files. You can download them all together in the audio_files.zip archive:

filename
Carl_factory.wav
Dina_pink_noise.wav
Luka_train.wav

At the end of this guide, you'll find the full Python code example that combines all the steps that will first be discussed separately. This guide should give you a comprehensive understanding of how to integrate Denoiser into your own projects.

Prerequisites

In the guide, we assume that the Virtual Appliance is running on port 8000 of http://localhost and contains proper model and license for the technology. For more information on how to install and start the Virtual Appliance, please refer to the Virtual Appliance Installation chapter.

Environment Setup

We are using Python 3.9 and Python library requests 2.27 in this example. You can install the requests library with pip as follows:

pip install requests~=2.27

Basic Denoiser usage

To run Denoiser for a single media file, you should start by sending a POST request to the /api/technology/denoiser endpoint. Denoiser supports multi-channel audio files by denoising each channel separately, then combining them back into a single multi-channel output file.

info

Currently, only WAV output format is supported.

In Python, you can do this as follows:

import requests

SPEECH_PLATFORM_SERVER = "http://localhost:8000" # Replace with your actual server URL
ENDPOINT_URL = f"{SPEECH_PLATFORM_SERVER}/api/technology/denoiser"

audio_path = "Carl_factory.wav"

with open(audio_path, mode="rb") as file:
files = {"file": file}
response = requests.post(
url=ENDPOINT_URL,
files=files,
)

print(response.status_code) # Should print '202'

If the task has been successfully accepted, the 202 code will be returned together with a unique task ID in the response body. The task isn't processed immediately, but only scheduled for processing. You can check the current task status by polling for the result.

The URL for polling the result is returned in the Location header. Alternatively, you can assemble the polling URL on your own by appending a slash (/) and the task ID to the endpoint URL.

import json
import requests
import time

# Use the `response` from the previous step
polling_url = response.headers["Location"]
# Alternatively:
# polling_url = ENDPOINT_URL + "/" + response.json()["task"]["task_id"]

while True:
response = requests.get(polling_url)
data = response.json()
task_status = data["task"]["state"]
if task_status in {"done", "failed", "rejected"}:
break
time.sleep(5)

print(json.dumps(data, indent=2))

Once the polling finishes, data will contain the latest response from the server – either the result of denoising, or an error message with details, in case processing was not able to finish properly. The result should look as follows:

{
"task": {
"task_id": "8215c198-5876-4526-a4aa-b2558a8bd1cd",
"state": "done"
},
"result": {
"file_url": "http://localhost:8000/api/files/33d12bf5-c2ea-464d-b44f-27a138a49be7"
}
}

The Denoiser result contains a file_url for downloading the denoised file. To save the output file locally, you need to specify a filename with its extension so it appears correctly in your file system. You can do so as follows:

import requests

response = requests.get(data["result"]["file_url"])

filename = "denoised_audio.wav"

with open(filename, "wb") as f:
f.write(response.content)

Tasks and their results automatically expire and are deleted after a configurable time. Since the Denoiser output is a file, it consumes storage space for the output files. To avoid potential processing disruptions for subsequent Denoiser tasks, we recommend deleting completed tasks once you've successfully downloaded the denoised file.

task_id = data["task"]["task_id"]
response = requests.delete(f"{SPEECH_PLATFORM_SERVER}/api/tasks/{task_id}")

# Should print '204' (after successful deletion) or '404' (task not found, has
# probably already expired)
print(response.status_code)

Full Python Code

Here is the full example on how to run the Denoiser technology with media files. The code is slightly adjusted and wrapped into functions. The denoised files are saved with a denoised_ prefix. You can modify the filename logic to suit your requirements.

import requests
import time

SPEECH_PLATFORM_SERVER = "http://localhost:8000" # Replace with your actual server URL
ENDPOINT_URL = f"{SPEECH_PLATFORM_SERVER}/api/technology/denoiser"


def poll_result(polling_url: str, sleep: int = 5):
while True:
response = requests.get(polling_url)
response.raise_for_status()
data = response.json()
task_status = data["task"]["state"]
if task_status in {"done", "failed", "rejected"}:
break
time.sleep(sleep)
return response


def run_denoiser(audio_path: str):
with open(audio_path, mode="rb") as file:
files = {"file": file}
response = requests.post(
url=ENDPOINT_URL,
files=files,
)
response.raise_for_status()
polling_url = response.headers["Location"]
response_result = poll_result(polling_url)
return response_result.json()


def download_and_save_file(url: str, filename: str):
file = requests.get(url)
with open(filename, "wb") as f:
f.write(file.content)
return


def delete_finished_task(task_id: str):
response = requests.delete(f"{SPEECH_PLATFORM_SERVER}/api/tasks/{task_id}")
return response.status_code


filenames = ["Carl_factory.wav", "Dina_pink_noise.wav", "Luka_train.wav"]

for filename in filenames:
print(f"Running Denoiser for file {filename}.")
data = run_denoiser(filename)

denoised_filename = f"denoised_{filename}"
download_and_save_file(data["result"]["file_url"], denoised_filename)
print(f"Denoising of {filename} finished. Denoised file saved as {denoised_filename}.")

deletion_status = delete_finished_task(data["task"]["task_id"])
if deletion_status == 204:
print(f"Task {data['task']['task_id']} has been deleted.")
else:
print(f"Failed to delete task {data['task']['task_id']}. Task may have expired already.")