Denoiser
This guide demonstrates how to use Denoiser with Phonexia Speech Platform 4. You can find a high-level description in the Denoiser article.
Denoiser filters out various noises from audio to enhance human speech intelligibility. It is specifically designed to improve understanding for human listeners and does not enhance the performance of other speech technologies.
In the guide, we'll be using the following media files. You can download them all together in the audio_files.zip archive:
| filename |
|---|
| Carl_factory.wav |
| Dina_pink_noise.wav |
| Luka_train.wav |
At the end of this guide, you'll find the full Python code example that combines all the steps that will first be discussed separately. This guide should give you a comprehensive understanding of how to integrate Denoiser into your own projects.
Prerequisites
Follow the prerequisites for setup of Virtual Appliance and Python environment as described in the Task lifecycle code examples.
Run Denoiser
To run Denoiser for a single media file, you should start by sending a POST
request to the /api/technology/denoiser endpoint. Denoiser
supports multi-channel media files by denoising each channel separately, then
combining them back into a single multi-channel output file. file is the only
mandatory parameter.
Currently, only WAV output format is supported.
In Python, you can do this as follows:
import requests
VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/denoiser"
media_file = "Carl_factory.wav"
with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
)
print(start_task_response.status_code) # Should print '202'
If the task has been successfully accepted, the 202 code will be returned
together with a unique task ID in the response body. The task isn't processed
immediately, but only scheduled for processing. You can check the current task
status by polling for the result.
Polling
To obtain the final result, periodically query the task status until the task
state changes to done, failed or rejected. The general polling procedure
is described in detail in the
Task lifecycle code examples.
Result for Denoiser
The result field of the task contains file_id of the output file and full
file_url for downloading the denoised file.
For our sample data, the task result should look as follows:
{
"task": {
"task_id": "8215c198-5876-4526-a4aa-b2558a8bd1cd",
"state": "done"
},
"result": {
"file_id": "26706ec6-87f6-4f55-84a1-25294b17f334",
"file_url": "<VIRTUAL_APPLIANCE_ADDRESS>/api/files/26706ec6-87f6-4f55-84a1-25294b17f334"
}
}
Downloading the output file
To save the output file locally, you need to specify a filename with its
extension so it appears correctly in your file system. Use the file_url from
the result to download the file. Alternatively, you can construct the download
URL by appending a slash (/) and the file_id to the files endpoint URL.
You can do so as follows:
import requests
file_url = polling_task_response_json["result"]["file_url"]
# Alternatively
# file_id = polling_task_response_json["result"]["file_id"]
# file_url = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/files/{file_id}"
download_file_response = requests.get(file_url)
filename = "denoised_audio.wav"
with open(filename, "wb") as f:
f.write(download_file_response.content)
Since the Denoiser output is a file, it takes up space in the storage for output files. The file is automatically removed after the task expires (see the task lifecycle article), but if you want to be extra careful and avoid potential processing disruptions for subsequent Denoiser tasks, you can delete completed tasks once you've successfully downloaded the denoised file.
Full Python Code
Here is the full example on how to run the Denoiser technology with media files.
The code is slightly adjusted and wrapped into functions. The denoised files are
saved with a denoised_ prefix. You can modify the filename logic to suit your
requirements. The code includes the optional step of deleting completed tasks.
Refer to the
Task lifecycle code examples
for a generic code template, applicable to all technologies.
import json
import requests
import time
VIRTUAL_APPLIANCE_ADDRESS = "http://<virtual-appliance-address>:8000" # Replace with your address
MEDIA_FILE_BASED_ENDPOINT_URL = f"{VIRTUAL_APPLIANCE_ADDRESS}/api/technology/denoiser"
def poll_result(polling_url, polling_interval=5):
"""Poll the task endpoint until processing completes."""
while True:
polling_task_response = requests.get(polling_url)
polling_task_response.raise_for_status()
polling_task_response_json = polling_task_response.json()
task_state = polling_task_response_json["task"]["state"]
if task_state in {"done", "failed", "rejected"}:
break
time.sleep(polling_interval)
return polling_task_response
def run_media_based_task(media_file, params=None, config=None):
"""Create a media-based task and wait for results."""
if params is None:
params = {}
if config is None:
config = {}
with open(media_file, mode="rb") as file:
files = {"file": file}
start_task_response = requests.post(
url=MEDIA_FILE_BASED_ENDPOINT_URL,
files=files,
params=params,
data={"config": json.dumps(config)},
)
start_task_response.raise_for_status()
polling_url = start_task_response.headers["Location"]
task_result = poll_result(polling_url)
return task_result.json()
def download_file(download_file_url, media_file_name):
"""Download and save the output file as `media_file_name`."""
file = requests.get(download_file_url)
with open(media_file_name, "wb") as f:
f.write(file.content)
return
def delete_task(task_id):
"""Delete task with `task_id`."""
response = requests.delete(f"{VIRTUAL_APPLIANCE_ADDRESS}/api/tasks/{task_id}")
return response.status_code
# Run Denoiser
media_files = ["Carl_factory.wav", "Dina_pink_noise.wav", "Luka_train.wav"]
for media_file in media_files:
print(f"Running Denoiser for file {media_file}.")
media_file_based_task = run_media_based_task(media_file)
denoised_media_file = f"denoised_{media_file}"
download_file(media_file_based_task["result"]["file_url"], denoised_media_file)
print(f"Denoising of {media_file} finished. Denoised file saved as {denoised_media_file}.")
task_id = media_file_based_task["task"]["task_id"]
if delete_task(task_id) == 204:
print(f"Task {task_id} has been deleted.")
else:
print(f"Failed to delete task {task_id}. Task may have expired already.")