Source code for medkit.audio.preprocessing.downmixer
from __future__ import annotations
__all__ = ["Downmixer"]
import numpy as np
from medkit.core.audio import MemoryAudioBuffer, PreprocessingOperation, Segment
[docs]
class Downmixer(PreprocessingOperation):
"""Downmixing operation converting multichannel audio signals to mono."""
def __init__(
self,
output_label: str,
prevent_clipping: bool = True,
uid: str | None = None,
):
"""Parameters
----------
output_label : str
Label of output downmixed segments.
prevent_clipping : bool, default=True
If `True`, normalize downmixed signals by number of channels to prevent clipping.
uid : str, optional
Identifier of the downmixer.
"""
# Pass all arguments to super (remove self)
init_args = locals()
init_args.pop("self")
super().__init__(**init_args)
self.output_label = output_label
self.prevent_clipping = prevent_clipping
[docs]
def run(self, segments: list[Segment]) -> list[Segment]:
"""Return a downmixed segment for each segment in `segments`.
Parameters
----------
segments : list of Segment
Audio segments to downmix.
Returns
-------
list of Segment
Downmixed segments, one per segment in `segments`.
"""
return [self._downmix_segment(s) for s in segments]
def _downmix_segment(self, segment: Segment) -> Segment:
audio = segment.audio
if segment.audio.nb_channels == 1:
downmixed_audio = audio
else:
signal = segment.audio.read()
downmixed_signal = np.sum(signal, axis=0, keepdims=True)
if self.prevent_clipping:
downmixed_signal /= signal.shape[0]
downmixed_audio = MemoryAudioBuffer(downmixed_signal, sample_rate=audio.sample_rate)
downmixed_segment = Segment(
label=self.output_label,
span=segment.span,
audio=downmixed_audio,
)
if self._prov_tracer is not None:
self._prov_tracer.add_prov(downmixed_segment, self.description, [segment])
return downmixed_segment