Skip to main content

Denoiser

This guide demonstrates how to use Denoiser with Phonexia Speech Platform 4. You can find a high-level description in the Denoiser article.

Denoiser filters out various noises from audio to enhance human speech intelligibility. It is specifically designed to improve understanding for human listeners and does not enhance the performance of other speech technologies.

In the guide, we'll be using the following media files. You can download them all together in the audio_files.zip archive:

filename
Carl_factory.wav
Dina_pink_noise.wav
Luka_train.wav

At the end of this guide, you'll find the full Python code example that combines all the steps that will first be discussed separately. This guide should give you a comprehensive understanding of how to integrate Denoiser into your own projects.

Prerequisites

Follow the prerequisites for setup of Virtual Appliance and Python environment as described in the Task lifecycle code examples.

Run Denoiser

To run Denoiser for a single media file, you should start by sending a POST request to the /api/technology/denoiser endpoint. Denoiser supports multi-channel media files by denoising each channel separately, then combining them back into a single multi-channel output file. file is the only mandatory parameter.

info

Currently, only WAV output format is supported.

In Python, you can do this as follows:

import requests

VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/denoiser"

media_file = "Carl_factory.wav"

with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
)
print(start_task_response.status_code) # Should print '202'

If the task has been successfully accepted, the 202 code will be returned together with a unique task ID in the response body. The task isn't processed immediately, but only scheduled for processing. You can check the current task status by polling for the result.

Polling

To obtain the final result, periodically query the task status until the task state changes to done, failed or rejected. The general polling procedure is described in detail in the Task lifecycle code examples.

Result for Denoiser

The result field of the task contains file_id of the output file and full file_url for downloading the denoised file.

For our sample data, the task result should look as follows:

{
"task": {
"task_id": "8215c198-5876-4526-a4aa-b2558a8bd1cd",
"state": "done"
},
"result": {
"file_id": "26706ec6-87f6-4f55-84a1-25294b17f334",
"file_url": "<VIRTUAL_APPLIANCE_ADDRESS>/api/files/26706ec6-87f6-4f55-84a1-25294b17f334"
}
}

Downloading the output file

To save the output file locally, you need to specify a filename with its extension so it appears correctly in your file system. Use the file_url from the result to download the file. Alternatively, you can construct the download URL by appending a slash (/) and the file_id to the files endpoint URL. You can do so as follows:

import requests

file_url = polling_task_response_json["result"]["file_url"]

# Alternatively
# file_id = polling_task_response_json["result"]["file_id"]
# file_url = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/files/{file_id}"

download_file_response = requests.get(file_url)

filename = "denoised_audio.wav"

with open(filename, "wb") as f:
f.write(download_file_response.content)

Since the Denoiser output is a file, it takes up space in the storage for output files. The file is automatically removed after the task expires (see the task lifecycle article), but if you want to be extra careful and avoid potential processing disruptions for subsequent Denoiser tasks, you can delete completed tasks once you've successfully downloaded the denoised file.

Full Python Code

Here is the full example on how to run the Denoiser technology with media files. The code is slightly adjusted and wrapped into functions. The denoised files are saved with a denoised_ prefix. You can modify the filename logic to suit your requirements. The code includes the optional step of deleting completed tasks. Refer to the Task lifecycle code examples for a generic code template, applicable to all technologies.

import json
import requests
import time

VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address

MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/denoiser"


def poll_result(polling_url, polling_interval=5):
"""Poll the task endpoint until processing completes."""
while True:
polling_task_response = requests.get(polling_url)
polling_task_response.raise_for_status()
polling_task_response_json = polling_task_response.json()
task_state = polling_task_response_json["task"]["state"]
if task_state in {"done", "failed", "rejected"}:
break
time.sleep(polling_interval)
return polling_task_response


def run_media_based_task(media_file, params=None, config=None):
"""Create a media-based task and wait for results."""
if params is None:
params = {}
if config is None:
config = {}

with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
params=params,
data={"config": json.dumps(config)},
)
start_task_response.raise_for_status()
polling_url = start_task_response.headers["Location"]
task_result = poll_result(polling_url)
return task_result.json()


def download_file(download_file_url, media_file_name):
"""Download and save the output file as `media_file_name`."""
file = requests.get(download_file_url)
with open(media_file_name, "wb") as f:
f.write(file.content)
return


def delete_task(task_id):
"""Delete task with `task_id`."""
response = requests.delete(f"{VIRTUAL_APPLIANCE_ADDRESS}/api/tasks/{task_id}")
return response.status_code


# Run Denoiser
media_files = ["Carl_factory.wav", "Dina_pink_noise.wav", "Luka_train.wav"]

for media_file in media_files:
print(f"Running Denoiser for file {media_file}.")
media_file_based_task = run_media_based_task(media_file)

denoised_media_file = f"denoised_{media_file}"
download_file(media_file_based_task["result"]["file_url"], denoised_media_file)
print(f"Denoising of {media_file} finished. Denoised file saved as {denoised_media_file}.")

task_id = media_file_based_task["task"]["task_id"]
if delete_task(task_id) == 204:
print(f"Task {task_id} has been deleted.")
else:
print(f"Failed to delete task {task_id}. Task may have expired already.")