"""
This file contains the Parameter class that define the device and markers classes.
"""
from math import ceil
from ..enums import DeviceType, MarkerType, InverseKinematicsMethods, RealTimeProcessingMethod, OfflineProcessingMethod
from ..processing.data_processing import RealTimeProcessing, OfflineProcessing, GenericProcessing
from ..processing.msk_functions import MskFunctions
from typing import Union
import numpy as np
[docs]
class Param:
def __init__(
self,
nb_channels: int,
name: str = None,
rate: float = None,
system_rate: float = 100,
data_window: int = None,
):
"""
initialize the parameter class
Parameters
----------
nb_channels : int
number of channels of the parameter
name : str
name of the parameter
rate : float
rate of the parameter
system_rate : float
rate of the system
data_window : int
size of the data window
"""
self.nb_channels = nb_channels
self.name = name
self.rate = rate
self.system_rate = system_rate
self.sample = ceil(rate / self.system_rate)
self.range = None
self.raw_data = []
self.data_window = data_window if data_window else int(rate)
self.new_data = None
[docs]
def append_data(self, new_data: np.ndarray):
"""
Append new data to the parameter to the raw data
Parameters
----------
new_data: np.ndarray
new data to append to the buffer
"""
if len(self.raw_data) == 0:
self.raw_data = new_data
elif self.raw_data.shape[-1] < self.data_window:
self.raw_data = np.append(self.raw_data, new_data, axis=-1)
else:
self.raw_data = np.append(self.raw_data[..., new_data.shape[-1] :], new_data, axis=-1)
[docs]
class Device(Param):
"""
This class is used to store the available devices.
"""
def __init__(
self,
device_type: DeviceType = DeviceType.Emg,
nb_channels: int = 1,
name: str = None,
rate: float = 2000,
system_rate: float = 100,
channel_names: Union[list, str] = None,
):
"""
Initialize the device class. A device is an electronic device that can be used to measure a parameter
(e.g. EMG, treadmill, etc.).
Parameters
----------
device_type: DeviceType
Type of the device.
nb_channels: int
Number of channels of the device.
name: str
Name of the device.
rate: float
Rate of the device.
system_rate: float
Rate of the system.
channel_names: list
Name of the channels of the device.
"""
super().__init__(nb_channels, name, rate, system_rate)
if isinstance(channel_names, str):
channel_names = [channel_names]
if channel_names:
if nb_channels != len(channel_names):
raise ValueError("The number of channels is not equal to the number of channel names.")
else:
channel_names = []
self.device_range = None
self.infos = None
self.channel_names = channel_names
self.interface = None
self.device_type = device_type
self.processed_data = None
self.processing_method = None
self.processing_function = None
self.processing_method_kwargs = {}
self.processing_method_changed = False
self.processing_window = None
[docs]
def process(
self,
method: Union[str, RealTimeProcessingMethod, OfflineProcessingMethod] = None,
custom_function: callable = None,
**kwargs,
):
"""
Process the data of the device. The raw data are stored in a buffer fill by the append data method.
This method should be called after any get_data method, otherwise there will be not available data.
Parameters
----------
method: GenericProcessing
Method to process the data.
custom_function: callable
Custom function to process the data.
kwargs:
Keyword arguments to pass to the method.
"""
self.processing_method_kwargs.update(kwargs)
if "processing_method" in self.processing_method_kwargs.keys():
if method and method != self.processing_method_kwargs["processing_method"]:
raise ValueError("You have enter two different type of method for the same function.")
method = self.processing_method_kwargs["processing_method"]
if (
not method
and not self.processing_method
and "processing_method" not in self.processing_method_kwargs.keys()
):
raise RuntimeError(
"No method to process the data. Please specify a method with the argument 'processing_method'."
)
has_changed = self._check_if_has_changed(method, self.processing_method_kwargs)
if "custom_function" in self.processing_method_kwargs.keys():
custom_function = custom_function if custom_function else self.processing_method_kwargs["custom_function"]
self.processing_method_kwargs.pop("custom_function")
if not self.processing_function or has_changed:
self._init_processing_method()
if method == RealTimeProcessingMethod.Custom:
if not custom_function:
raise ValueError("No custom function to process the data.")
self.processed_data = self.processing_function(
custom_function, self.new_data, **self.processing_method_kwargs
)
else:
self.processed_data = self.processing_function(self.new_data, **self.processing_method_kwargs)
return self.processed_data
def _init_processing_method(self):
"""
Initialize the processing method.
"""
self.processing_window = self.processing_window if self.processing_window else self.data_window
if self.processing_method == RealTimeProcessingMethod.ProcessEmg:
self.processing_function = RealTimeProcessing(self.rate, self.processing_window).process_emg
elif self.processing_method == RealTimeProcessingMethod.ProcessGenericSignal:
self.processing_function = RealTimeProcessing(self.rate, self.processing_window).process_generic_signal
elif self.processing_method == RealTimeProcessingMethod.ProcessImu:
self.processing_function = RealTimeProcessing(self.rate, self.processing_window).process_imu
elif self.processing_method == RealTimeProcessingMethod.GetPeaks:
self.processing_function = RealTimeProcessing(self.rate, self.processing_window).get_peaks
elif self.processing_method == OfflineProcessingMethod.ProcessEmg:
self.processing_function = OfflineProcessing(self.rate, self.processing_window).process_emg
elif self.processing_method == OfflineProcessingMethod.ComputeMvc:
self.processing_function = OfflineProcessing(self.rate, self.processing_window).compute_mvc
elif (
self.processing_method == RealTimeProcessingMethod.CalibrationMatrix
or self.processing_method == OfflineProcessingMethod.CalibrationMatrix
):
self.processing_function = GenericProcessing().calibration_matrix
elif self.processing_method == RealTimeProcessingMethod.Custom:
self.processing_function = RealTimeProcessing(self.rate, self.processing_window).custom_processing
def _check_if_has_changed(self, method: GenericProcessing(), kwargs: dict) -> bool:
"""
Check if the processing method has changed.
Parameters
----------
method: GenericProcessing
Method to process the data.
kwargs: dict
Keyword arguments to pass to the method.
Returns
-------
has_changed: bool
True if the method has changed, False otherwise.
"""
if isinstance(method, str):
if method in [t.value for t in RealTimeProcessingMethod]:
self.processing_method = RealTimeProcessingMethod(method)
elif method not in [t.value for t in OfflineProcessingMethod]:
self.processing_method = OfflineProcessingMethod(method)
else:
raise ValueError("The method is not valid.")
has_changed = False
if method and method != self.processing_method:
has_changed = True
self.processing_method = method
if "processing_window" in kwargs:
if kwargs["processing_window"] > self.data_window:
raise ValueError("The processing windows is higher than the data buffer windows.")
if kwargs["processing_window"] != self.processing_window:
has_changed = True
self.processing_window = kwargs["processing_window"]
self.processing_method_kwargs.pop("processing_window")
if self.new_data is None:
raise RuntimeError("No data to process. Please run first the function get_device_data.")
return has_changed
[docs]
class MarkerSet(Param):
"""
This class is used to store the available markers.
"""
def __init__(
self,
nb_channels: int = 1,
name: str = None,
marker_names: Union[str, list] = None,
rate: float = None,
unlabeled: bool = False,
system_rate: float = 100,
unit: str = "m",
):
"""
Initialize a marker set.
Parameters
----------
nb_channels: int
Number of channels of the marker set (number of markers).
name: str
Name of the marker set.
marker_names: str or list
Name of the markers.
rate: float
Rate of the marker set.
unlabeled: bool
True if the marker set is unlabeled, False otherwise.
system_rate: float
Rate of the system.
unit: str
Unit of the marker set.
"""
marker_type = MarkerType.Unlabeled if unlabeled else MarkerType.Labeled
super(MarkerSet, self).__init__(nb_channels, name, rate, system_rate)
if isinstance(marker_names, str):
marker_names = [marker_names]
if marker_names:
if nb_channels != len(marker_names):
raise ValueError("The number of channels and the number of markers names are not the same.")
self.marker_names = marker_names
self.subject_name = None
self.interface = None
self.marker_type = marker_type
self.kin_data = None
self.kin_method = None
self.kin_method_kwargs = {}
self.biorbd_model_path = None
self.kalman = None
self.msk_class = None
self.unit = unit
[docs]
def get_kinematics(
self,
model_path: str = None,
method: Union[InverseKinematicsMethods, str] = None,
custom_function: callable = None,
kin_data_window: int = None,
**kwargs,
) -> tuple:
"""
Function to apply the Kalman filter to the markers.
The raw data are stored in a buffer fill by the append data method.
This method should be called after any get_data method, otherwise there will be not available data.
Parameters
----------
model_path : str
The biomod model used to compute the kinematics.
method : Union[InverseKinematicsMethods, str]
The method to use to compute the inverse kinematics.
custom_function : callable
Custom function to use.
kin_data_window : int
The size of the window to use to compute the kinematics.
**kwargs : dict
Keyword arguments to pass to the method.
Returns
-------
tuple
The joint angle and velocity.
"""
if len(self.raw_data) == 0:
raise RuntimeError(
"No markers data to compute the kinematics." " Please run first the function get_markers_data."
)
method = method if method else self.kin_method
self.kin_method_kwargs.update(kwargs)
if "custom_function" in self.kin_method_kwargs.keys():
custom_function = custom_function if custom_function else self.kin_method_kwargs["custom_function"]
if self.new_data is None:
raise RuntimeError("No data to process. Please run first the function get_markers_data.")
if not method:
raise RuntimeError("No method to compute the kinematics. Please specify a method.")
if isinstance(method, str):
if method in [t.value for t in InverseKinematicsMethods]:
method = InverseKinematicsMethods(method)
else:
raise ValueError("The method is not valid.")
kin_data_window = kin_data_window if kin_data_window else self.data_window
if "model_path" in self.kin_method_kwargs.keys():
model_path = self.kin_method_kwargs["model_path"]
model_path = model_path if model_path else self.biorbd_model_path
if model_path is None and not "model_path" in self.kin_method_kwargs.keys():
raise ValueError("No model to compute the kinematics.")
if not self.msk_class:
self.msk_class = MskFunctions(model_path, kin_data_window)
if method == InverseKinematicsMethods.Custom:
if not custom_function:
raise ValueError("No custom function to process the data.")
self.kin_data = self.msk_class.compute_inverse_kinematics(
self.new_data, method, self.rate, custom_function=custom_function, **self.kin_method_kwargs
)
else:
self.kin_data = self.msk_class.compute_inverse_kinematics(
self.new_data, method, self.rate, **self.kin_method_kwargs
)
return self.kin_data