Source code for biosiglive.processing.data_processing

"""
This file contains the functions for data processing (offline and in real-time). Both class herites
 from the GenericProcessing class.
"""

from scipy.signal import butter, lfilter, filtfilt, convolve
import numpy as np
import os
import time
from typing import Union
from ..file_io.save_and_load import save, load


[docs] class GenericProcessing: def __init__(self): """ Initialize the GenericProcessing class. """ self.bpf_lcut = 10 self.bpf_hcut = 425 self.lpf_lcut = 5 self.lp_butter_order = 4 self.bp_butter_order = 2 self.data_rate = None self.process_time = [] @staticmethod def _butter_bandpass(lowcut: float, highcut: float, fs: float, order: int = 5) -> tuple: """ Create a butter bandpass filter. Parameters ---------- lowcut : float Low cut frequency. highcut: float High cut frequency. fs: float Sampling frequency. order: int Order of the filter. Returns ------- tuple Filter coefficients. """ nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq b, a = butter(order, [low, high], btype="band") return b, a @staticmethod def _butter_lowpass(lowcut, fs, order=4) -> tuple: """ Create a butter lowpass filter. Parameters ---------- lowcut : float Low cut frequency. fs: float Sampling frequency. order: int Order of the filter. Returns ------- tuple Filter coefficients. """ nyq = 0.5 * fs low = lowcut / nyq b, a = butter(order, [low], btype="low") return b, a def _butter_bandpass_filter(self, data: np.ndarray, lowcut: float, highcut: float, fs: float, order: int = 5): """ Apply a butter bandpass filter. Parameters ---------- data: numpy.ndarray Data to filter. lowcut: float Low cut frequency. highcut: float High cut frequency. fs: float Sampling frequency. order: int Order of the filter. Returns ------- numpy.ndarray Filtered data. """ b, a = self._butter_bandpass(lowcut, highcut, fs, order=order) y = lfilter(b, a, data) return y
[docs] def butter_lowpass_filter(self, data: np.ndarray, lowcut: float, fs: float, order: int = 4) -> np.ndarray: """ Apply a butter lowpass filter. Parameters ---------- data: numpy.ndarray Data to filter. lowcut: float Low cut frequency. fs: float Sampling frequency. order: int Order of the filter. Returns ------- numpy.ndarray Filtered data. """ b, a = self._butter_lowpass(lowcut, fs, order=order) y = filtfilt(b, a, data) return y
@staticmethod def _moving_average(data: np.ndarray, window: np.ndarray, empty_ma: np.ndarray) -> np.ndarray: """ Apply moving average. Parameters ---------- data: numpy.ndarray Data to process. window: numpy.ndarray window to apply. empty_ma: numpy.ndarray Empty array to store the moving average. Returns ------- numpy.ndarray Moving average. """ for i in range(data.shape[0]): empty_ma[i, :] = convolve(data[i, :], window, mode="same", method="fft") return empty_ma
[docs] @staticmethod def center(emg_data: np.ndarray, center_value: float = None) -> np.ndarray: """ Center the EMG data. Parameters ---------- emg_data : numpy.ndarray EMG data. center_value: int Value to center the data. Returns ------- numpy.ndarray Centered EMG data. """ center_value = center_value if center_value else emg_data.mean(axis=1) emg_centered = np.copy(emg_data) for i in range(emg_data.shape[0]): emg_centered[i, :] = emg_data[i, :] - center_value[i] return emg_centered
[docs] @staticmethod def normalize_emg(emg_data: np.ndarray, mvc_list: list) -> np.ndarray: """ Normalize EMG data. Parameters ---------- emg_data : numpy.ndarray EMG data. mvc_list : list List of MVC values. Returns ------- numpy.ndarray Normalized EMG data. """ if len(mvc_list) == 0: raise RuntimeError("Please give a list of mvc to normalize the emg signal.") norm_emg = np.zeros((emg_data.shape[0], emg_data.shape[1])) for emg in range(emg_data.shape[0]): norm_emg[emg, :] = emg_data[emg, :] / mvc_list[emg] return norm_emg
[docs] def calibration_matrix(self, data: np.ndarray, matrix: np.ndarray) -> np.ndarray: """ Apply a calibration matrix to the data. Parameters ---------- data: numpy.ndarray Data to calibrate. matrix: numpy.ndarray Calibration matrix. Returns ------- numpy.ndarray Calibrated data. """ tic = time.time() data_cal = np.dot(matrix, data) self.process_time.append(time.time() - tic) return data_cal
[docs] def process_generic_signal( self, data: np.ndarray, norm_values: Union[list, tuple] = None, band_pass_filter=True, low_pass_filter=False, moving_average=True, centering=True, absolute_value=True, normalization=False, moving_average_window=200, **kwargs, ) -> np.ndarray: """ Process EMG data. Parameters ---------- data : numpy.ndarray EMG data. norm_values : list Values to normalize the signal. band_pass_filter : bool Apply band pass filter. low_pass_filter : bool Apply low pass filter. moving_average : bool Apply moving average. centering : bool Apply centering. absolute_value : bool Apply absolute value. normalization : bool Apply normalization. moving_average_window : int Moving average window. Returns ------- numpy.ndarray Processed EMG data. """ self.update_signal_processing_parameters(**kwargs) data_proc = data if band_pass_filter: data_proc = self._butter_bandpass_filter(data, self.bpf_lcut, self.bpf_hcut, self.data_rate) if centering: data_proc = self.center(data_proc) if absolute_value: data_proc = np.abs(data_proc) if low_pass_filter and moving_average: raise RuntimeError("Please choose between low-pass filter and moving average.") if low_pass_filter: data_proc = self.butter_lowpass_filter(data_proc, self.lpf_lcut, self.data_rate, order=self.lp_butter_order) elif moving_average: w = np.repeat(1, moving_average_window) / moving_average_window empty_ma = np.ndarray((data.shape[0], data.shape[1])) data_proc = self._moving_average(data_proc, w, empty_ma) if normalization: data_proc = self.normalize_emg(data_proc, norm_values) return data_proc
[docs] def update_signal_processing_parameters(self, **kwargs): """ update the signal processing parameters. Parameters ---------- kwargs: dict Dictionary of parameters. """ for key, value in kwargs.items(): if key in self.__dict__: self.__dict__[key] = value
[docs] class RealTimeProcessing(GenericProcessing): def __init__(self, data_rate: Union[int, float], processing_window: int = None): """ Initialize the class for real time processing. Parameters ---------- data_rate : int Data rate. processing_window : int The window on which data will be processed. """ super().__init__() self.data_rate = data_rate self.processing_window = processing_window if processing_window else data_rate self.raw_data_buffer = [] self.processed_data_buffer = [] self._is_one = None
[docs] def process_emg( self, emg_data: np.ndarray, mvc_list: Union[list, tuple] = None, band_pass_filter=True, low_pass_filter=False, moving_average=True, centering=True, absolute_value=True, normalization=False, moving_average_window=200, **kwargs, ) -> np.ndarray: """ Process EMG data in real-time. Parameters ---------- emg_data : numpy.ndarray Temporary EMG data (nb_emg, emg_sample). mvc_list : list MVC values. band_pass_filter : bool True if apply band pass filter. low_pass_filter : bool True if apply low pass filter. moving_average : bool True if apply moving average. centering : bool True if apply centering. absolute_value : bool True if apply absolute value. normalization : bool True if apply normalization. moving_average_window : int Moving average window. Returns ------- np.ndarray processed EMG data. """ self.update_signal_processing_parameters(**kwargs) tic = time.time() if low_pass_filter and moving_average: raise RuntimeError("Please choose between low-pass filter and moving average.") ma_win = moving_average_window if ma_win > self.processing_window: raise RuntimeError(f"Moving average windows ({ma_win}) higher than emg windows ({self.processing_window}).") emg_sample = emg_data.shape[1] if emg_sample == 0: raise RuntimeError("EMG data are empty.") if normalization: if not mvc_list: raise RuntimeError("Please give a list of mvc to normalize the emg signal.") if isinstance(mvc_list, np.ndarray) is True: if len(mvc_list.shape) == 1: quot = mvc_list.reshape(-1, 1) else: quot = mvc_list else: quot = np.array(mvc_list).reshape(-1, 1) else: quot = [1] if len(self.raw_data_buffer) == 0: self.raw_data_buffer = emg_data processed_shape = self.raw_data_buffer.shape[1] if not moving_average else 1 self.processed_data_buffer = np.zeros((emg_data.shape[0], processed_shape)) elif self.raw_data_buffer.shape[1] < self.processing_window: self.raw_data_buffer = np.append(self.raw_data_buffer, emg_data, axis=1) if not moving_average: self.processed_data_buffer = np.append( self.processed_data_buffer, np.zeros((emg_data.shape[0], emg_sample)), axis=1 ) else: self.processed_data_buffer = np.append( self.processed_data_buffer, np.zeros((emg_data.shape[0], 1)), axis=1 ) else: self.raw_data_buffer = np.append( self.raw_data_buffer[:, -self.processing_window + emg_sample :], emg_data, axis=1 ) emg_proc_tmp = self.process_generic_signal( self.raw_data_buffer, band_pass_filter=band_pass_filter, centering=centering, absolute_value=absolute_value, low_pass_filter=False, moving_average=False, normalization=False, ) self.processed_data_buffer = emg_proc_tmp / quot if low_pass_filter: self.processed_data_buffer = ( self.butter_lowpass_filter(emg_proc_tmp, self.lpf_lcut, self.data_rate, order=self.lp_butter_order) / quot ) elif moving_average: average = np.median(emg_proc_tmp[:, -ma_win:], axis=1).reshape(-1, 1) self.processed_data_buffer = np.append(self.processed_data_buffer[:, 1:], average / quot, axis=1) self.process_time.append(time.time() - tic) return self.processed_data_buffer
[docs] def process_imu( self, im_data: np.ndarray, accel: bool = False, squared: bool = False, norm_min_bound: int = None, norm_max_bound: int = None, **kwargs, ) -> np.ndarray: """ Process IMU data in real-time. Parameters ---------- im_data : numpy.ndarray Temporary IMU data (nb_imu, im_sample). accel : bool True if current data is acceleration data to adapt the processing. squared : bool True if apply squared. norm_min_bound : int Normalization minimum bound. norm_max_bound : int Normalization maximum bound. Returns ------- np.ndarray processed IMU data. """ self.update_signal_processing_parameters(**kwargs) tic = time.time() if len(self.raw_data_buffer) == 0: if squared is not True: self.processed_data_buffer = np.zeros((im_data.shape[0], im_data.shape[1], 1)) else: self.processed_data_buffer = np.zeros((im_data.shape[0], 1)) self.raw_data_buffer = im_data elif self.raw_data_buffer.shape[2] < self.processing_window: self.raw_data_buffer = np.append(self.raw_data_buffer, im_data, axis=2) if squared is not True: self.processed_data_buffer = np.zeros( (im_data.shape[0], im_data.shape[1], self.raw_data_buffer.shape[2]) ) else: self.processed_data_buffer = np.zeros((im_data.shape[0], self.raw_data_buffer.shape[2])) else: self.raw_data_buffer = np.append( self.raw_data_buffer[:, :, -self.processing_window + im_data.shape[2] :], im_data, axis=2 ) im_proc_tmp = self.raw_data_buffer average = np.mean(im_proc_tmp[:, :, -self.processing_window :], axis=2).reshape(-1, 3, 1) if squared: if accel: average = abs(np.linalg.norm(average, axis=1) - 9.81) else: average = np.linalg.norm(average, axis=1) if len(average.shape) == 3: if norm_min_bound or norm_max_bound: for i in range(self.raw_data_buffer.shape[0]): for j in range(self.raw_data_buffer.shape[1]): if average[i, j, :] < 0: average[i, j, :] = average[i, j, :] / abs(norm_min_bound) elif average[i, j, :] >= 0: average[i, j, :] = average[i, j, :] / norm_max_bound self.processed_data_buffer = np.append(self.processed_data_buffer[:, :, 1:], average, axis=2) else: if norm_min_bound or norm_max_bound: for i in range(self.raw_data_buffer.shape[0]): for j in range(self.raw_data_buffer.shape[1]): if average[i, :] < 0: average[i, :] = average[i, :] / abs(norm_min_bound) elif average[i, :] >= 0: average[i, :] = average[i, :] / norm_max_bound self.processed_data_buffer = np.append(self.processed_data_buffer[:, 1:], average, axis=1) self.process_time.append(time.time() - tic) return self.processed_data_buffer
[docs] def get_peaks( self, new_sample: np.ndarray, threshold: float, min_peaks_interval=None, ) -> tuple: """ Allow to get the number of peaks for an analog signal (to get cadence from treadmill for instance). Parameters ---------- new_sample : numpy.ndarray New sample to add to the signal. threshold : float Threshold to detect peaks. min_peaks_interval : float Minimum interval between two peaks. Returns ------- tuple Number of peaks and the processed signal. """ tic = time.time() nb_peaks = [] if len(new_sample.shape) == 1: new_sample = np.expand_dims(new_sample, 0) sample_proc = np.copy(new_sample) if not self._is_one: self._is_one = [False] * new_sample.shape[0] for i in range(new_sample.shape[0]): for j in range(new_sample.shape[1]): if new_sample[i, j] < threshold: sample_proc[i, j] = 0 self._is_one[i] = False elif new_sample[i, j] >= threshold: if not self._is_one[i]: sample_proc[i, j] = 1 self._is_one[i] = True else: sample_proc[i, j] = 0 if len(self.raw_data_buffer) == 0: self.raw_data_buffer = new_sample self.processed_data_buffer = sample_proc nb_peaks = None elif self.raw_data_buffer.shape[1] < self.processing_window: self.raw_data_buffer = np.append(self.raw_data_buffer, new_sample, axis=1) self.processed_data_buffer = np.append(self.processed_data_buffer, sample_proc, axis=1) nb_peaks = None else: self.raw_data_buffer = np.append(self.raw_data_buffer[:, new_sample.shape[1] :], new_sample, axis=1) self.processed_data_buffer = np.append( self.processed_data_buffer[:, new_sample.shape[1] :], sample_proc, axis=1 ) if min_peaks_interval: self.processed_data_buffer = RealTimeProcessing._check_and_adjust_interval( self.processed_data_buffer, min_peaks_interval ) if isinstance(nb_peaks, list): for i in range(self.processed_data_buffer.shape[0]): nb_peaks.append(np.count_nonzero(self.processed_data_buffer[i, :])) self.process_time.append(time.time() - tic) return nb_peaks, self.processed_data_buffer
@staticmethod def _check_and_adjust_interval(signal, interval): for j in range(signal.shape[0]): if np.count_nonzero(signal[j, -interval:] == 1) not in [0, 1]: idx = np.where(signal[j, -interval:] == 1)[0] for i in idx[1:]: signal[j, -interval:][i] = 0 return signal
[docs] def custom_processing(self, funct: callable, data_tmp: np.ndarray, **kwargs) -> np.ndarray: """ Allow to apply a custom processing function to the data. Parameters ---------- funct: callable Function to apply to the data. data_tmp: numpy.ndarray Data to process. Returns ------- numpy.ndarray Processed data. """ tic = time.time() data_tmp = funct(data_tmp, **kwargs) self.process_time.append(time.time() - tic) return data_tmp
[docs] def get_mean_process_time(self): return np.mean(self.process_time)
[docs] class OfflineProcessing(GenericProcessing): def __init__(self, data_rate: float = None, processing_window: int = None): """ Offline processing. Parameters ---------- data_rate : float Data rate of the signal. processing_window : int Number of samples to process at once. """ super(OfflineProcessing, self).__init__() self.data_rate = data_rate self.processing_window = processing_window
[docs] def process_emg(self, data: np.ndarray, mvc_list: list = None, **kwargs) -> np.ndarray: """ Process EMG data. Parameters ---------- data : np.ndarray Raw EMG data. mvc_list: list List of MVC for each muscle. Returns ------- np.ndarray Processed EMG data. """ return self.process_generic_signal(data, mvc_list, **kwargs)
[docs] @staticmethod def compute_mvc( nb_muscles: int, mvc_trials: np.ndarray, window_size: int, tmp_file: str = None, output_file: str = None, save_file: bool = False, ) -> list: """ Compute MVC from several mvc_trials. Parameters ---------- nb_muscles : int Number of muscles. mvc_trials : numpy.ndarray EMG data for all trials. window_size : int Size of the window to compute MVC. Usually it is 1 second so the data rate. tmp_file : str Name of the temporary file. output_file : str Name of the output file. save_file : bool If true, save the results. Returns ------- list MVC for each muscle. """ mvc_list_max = [] for i in range(nb_muscles): mvc_temp = -np.sort(-mvc_trials, axis=1) if i == 0: mvc_list_max = mvc_temp[:, :window_size] else: mvc_list_max = np.concatenate((mvc_list_max, mvc_temp[:, :window_size]), axis=1) mvc_list_max = -np.sort(-mvc_list_max, axis=1)[:, :window_size] mvc_list_max = np.median(mvc_list_max, axis=1) if tmp_file: mat_content = load(tmp_file) mat_content["MVC_list_max"] = mvc_list_max else: mat_content = {"MVC_list_max": mvc_list_max, "MVC_trials": mvc_trials} if save_file: save(mat_content, output_file) if tmp_file: os.remove(tmp_file) return mvc_list_max