Source code for medkit.audio.preprocessing.resampler
"""This module needs extra-dependencies not installed as core dependencies of medkit.
To install them, use `pip install medkit-lib[resampler]`.
"""
from __future__ import annotations
__all__ = ["Resampler"]
import resampy
from medkit.core.audio import MemoryAudioBuffer, PreprocessingOperation, Segment
[docs]
class Resampler(PreprocessingOperation):
"""Resampling operation relying on the resampy package."""
def __init__(
self,
output_label: str,
sample_rate: int,
fast: bool = False,
uid: str | None = None,
):
"""Parameters
----------
output_label : str
Label of output resampled segments.
sample_rate : int
Target sample rate to resample to, in samples per second.
fast : bool, default=False
If `True`, prefer speed over quality and use resampy's "kaiser_fast" filter
instead of "kaiser_best".
uid : str, optional
Identifier of the resampler.
"""
# Pass all arguments to super (remove self)
init_args = locals()
init_args.pop("self")
super().__init__(**init_args)
self.output_label = output_label
self.sample_rate = sample_rate
self.fast = fast
[docs]
def run(self, segments: list[Segment]) -> list[Segment]:
"""Return a resampled segment for each segment in `segments`.
Parameters
----------
segments : list of Segment
Audio segments to resample.
Returns
-------
list of Segment
Resampled segments, one per segment in `segments`.
"""
return [self._resample_segment(s) for s in segments]
def _resample_segment(self, segment: Segment) -> Segment:
audio = segment.audio
if audio.sample_rate == self.sample_rate:
resampled_audio = audio
else:
signal = audio.read()
resampled_signal = resampy.resample(signal, audio.sample_rate, self.sample_rate, axis=1)
resampled_audio = MemoryAudioBuffer(resampled_signal, sample_rate=self.sample_rate)
resampled_segment = Segment(
label=self.output_label,
span=segment.span,
audio=resampled_audio,
)
if self._prov_tracer is not None:
self._prov_tracer.add_prov(resampled_segment, self.description, [segment])
return resampled_segment