Source code for biosiglive.processing.compute_mvc

"""
This file is part of biosiglive. it is an example to see how to use biosiglive to compute the maximal voluntary
 contraction from EMG signals.
"""

from time import strftime
from ..interfaces.vicon_interface import ViconClient
from ..interfaces.pytrigno_interface import PytrignoClient
from ..interfaces.tcp_interface import TcpClient
from ..interfaces.generic_interface import GenericInterface
from .data_processing import OfflineProcessing
from ..gui.plot import LivePlot, OfflinePlot
from ..enums import InterfaceType
from ..file_io.save_and_load import save, load
from pathlib import Path
from time import time, sleep
import os
import numpy as np
from typing import Union


# TODO: Add the possibility to send a temporary file to append the data to it in case of former dysfunction.
[docs] class ComputeMvc: def __init__( self, interface_type: Union[str, InterfaceType] = InterfaceType.PytrignoClient, interface_ip: str = "127.0.0.1", interface_port: int = 801, # only for vicon output_file: str = None, muscle_names: list = None, emg_frequency: float = 2000, acquisition_rate: int = 100, mvc_windows: int = 2000, range_muscle: Union[tuple, int] = None, custom_interface: GenericInterface = None, ): """ Initialize the class. Parameters ---------- interface_type : Union[str, InterfaceType] The mode of the stream (describe in the InterfaceType enum). interface_ip : str The ip of the interface. interface_port : int The port of the interface. Only needed for the Vicon interface. output_file : str The path of the output file. muscle_names : list The list of the muscle names. emg_frequency : float The frequency of the device. acquisition_rate : float The acquisition rate of the acquisition. mvc_windows : int size of the window to compute the mvc. range_muscle : tuple The range of the muscle to compute the mvc. custom_interface: classmethod The custom interface to use """ if Path(output_file).suffix != ".bio": if Path(output_file).suffix == "": output_file += ".bio" else: raise ValueError("The file must be a .bio file.") if isinstance(interface_type, str): if interface_type not in [t.value for t in InterfaceType]: raise ValueError("The type of the interface is not valid.") interface_type = InterfaceType(interface_type) self.interface_type = interface_type self.range_muscle = range_muscle if range_muscle else (0, 16) self.nb_muscles = self.range_muscle[1] - self.range_muscle[0] if muscle_names: self.muscle_names = muscle_names else: self.muscle_names = [] for i in range(self.range_muscle[0], self.range_muscle[1]): self.muscle_names.append(f"Muscle_{i}") if self.nb_muscles != len(self.muscle_names): raise RuntimeError("number of muscle must have the same length than names.") self.frequency = emg_frequency self.acquisition_rate = acquisition_rate self.mvc_windows = mvc_windows current_time = strftime("%Y%m%d-%H%M") self.output_file = f"MVC_{current_time}.bio" if not output_file else output_file self.device_host = None self.interface_port = interface_port self.interface_ip = interface_ip self.device_name = None self.show_data = False self.is_processing_method = False self.try_number = 0 self.emg_plot = None self.emg_processing = None self.moving_average, self.low_pass, self.custom = None, None, None self.try_name = "" self.try_list = [] self.emg_interface = None if self.interface_type == InterfaceType.PytrignoClient: self.emg_interface = PytrignoClient(system_rate=self.acquisition_rate, ip=self.interface_ip) elif self.interface_type == InterfaceType.ViconClient: self.emg_interface = ViconClient( system_rate=self.acquisition_rate, ip=self.interface_ip, port=self.interface_port ) elif self.interface_type == InterfaceType.TcpClient: self.emg_interface = TcpClient( read_frequency=self.acquisition_rate, ip=self.interface_ip, port=self.interface_port ) elif self.interface_type == InterfaceType.Custom: self.emg_interface = custom_interface if self.interface_type != InterfaceType.Custom: self.emg_interface.add_device(nb_channels=self.nb_muscles, name="EMG", device_range=self.range_muscle) else: if len(self.emg_interface.devices) == 0: raise RuntimeError("Please add a device in the custom interface.")
[docs] def set_processing_method( self, moving_average: bool = True, low_pass: bool = False, custom: bool = False, custom_function: callable = None, bandpass_frequency: tuple = (10, 425), lowpass_frequency: float = 5, lowpass_order: int = 4, butterworth_order: int = 4, ma_window: int = 200, ): """ Set the emg processing method. This method allow to customize the processing of the emg signal, so it need to be called before the start of the acquisition. Parameters ---------- moving_average : bool If True, the emg data will be processed with a moving average. low_pass : bool If True, the emg data will be processed with a low pass filter. custom : bool If True, the emg data will be processed with a custom function. custom_function : callable The custom function. Input : raw data, device frequency Output : processed data. bandpass_frequency : tuple The frequency of the bandpass filter. lowpass_frequency : float The frequency of the low pass filter. lowpass_order : int The order of the low pass filter. butterworth_order : int The order of the butterworth filter. ma_window : int The size of the moving average window. """ self.moving_average = moving_average self.low_pass = low_pass self.custom = custom if [moving_average, custom, low_pass].count(True) > 1: raise ValueError("Only one processing method can be selected") if custom and not custom_function: raise ValueError("custom_function must be defined") if custom: self.emg_processing = custom_function else: emg_processing = OfflineProcessing() emg_processing.bpf_lcut = bandpass_frequency[0] emg_processing.bpf_hcut = bandpass_frequency[1] emg_processing.lpf_lcut = lowpass_frequency emg_processing.lp_butter_order = lowpass_order emg_processing.bp_butter_order = butterworth_order emg_processing.ma_win = ma_window self.emg_processing = emg_processing.process_emg self.is_processing_method = True
[docs] def run(self, show_data: bool = False) -> list: """ Run the MVC program. Parameters ---------- show_data: bool If True, the data will be displayed in live in a separate plot. Returns ------- list The list of the MVC value for each muscle. """ self.show_data = show_data self.try_number = 0 while True: if show_data: self.emg_plot = LivePlot(nb_subplots=self.nb_muscles, channel_names=self.muscle_names) self.emg_plot.init(plot_windows=int(self.frequency * 2)) nb_frame, var, duration = self._init_trial() trial_emg = self._mvc_trial(duration, nb_frame, var) processed_emg, raw_emg = self._process_emg(trial_emg, save_tmp=True) self._plot_trial(raw_emg, processed_emg) task = input( "Press 'c' to do an other MVC trial," " 'r' to do again the MVC trial or 'q' then enter to quit.\n" ) while task != "c" and task != "r" and task != "q": print(f"Invalid entry ({task}). Please press 'c', 'r', or 'q' (in lowercase).") task = input( "Press 'c' to do an other MVC trial," " 'r' to do again the MVC trial or 'q' then enter to quit.\n" ) if task == "c": pass elif task == "r": self.try_number -= 1 mat_content = load("_MVC_tmp.bio") mat_content.pop(f"{self.try_name}_processed", None) mat_content.pop(f"{self.try_name}_raw", None) save(mat_content, "_MVC_tmp.bio") self.try_list = self.try_list[:-1] elif task == "q": mvc_list = self._save_trial() break return mvc_list
def _init_trial(self): """ Initialize the trial. Returns ------- nb_frame : int The number of frames of the trial. var : float The current iteration. duration : float The duration of the trial. """ try_name = input("Please enter a name of your trial (string) then press enter or press enter.\n") while try_name in self.try_list: try_name = input("This name is already used. Please chose and other name.\n") if try_name == "": self.try_name = f"MVC_{self.try_number}" else: self.try_name = f"{try_name}" self.try_number += 1 self.try_list.append(self.try_name) t = input( f"Ready to start trial: {self.try_name}, with muscles :{self.muscle_names}. " f"Press enter to begin your MVC. or enter a number of seconds." ) nb_frame = 0 try: float(t) iter = float(t) * self.acquisition_rate var = iter duration = True except ValueError: var = -1 duration = False return nb_frame, var, duration def _mvc_trial(self, duration: float, nb_frame: int, var: float): """ Run the MVC trial. Parameters ---------- duration : float The duration of the trial. nb_frame : int The number of frames of the trial. var : float The current iteration. Returns ------- trial_emg : numpy.ndarray The EMG data of the trial. """ data = None while True: try: if nb_frame == 0: print("Trial is running please press 'Ctrl+C' when trial is ended (it will not end the program).") data_tmp = self.emg_interface.get_device_data() tic = time() data = data_tmp if nb_frame == 0 else np.append(data, data_tmp, axis=1) if self.show_data: self.emg_plot.update(data_tmp) nb_frame += 1 time_to_sleep = (1 / self.acquisition_rate) - (time() - tic) if time_to_sleep > 0: sleep(time_to_sleep) else: print(f"Delay of {abs(time_to_sleep)}.") if duration: if nb_frame == var: return data except KeyboardInterrupt: if self.show_data is True: self.emg_plot.disconnect() return data def _plot_trial(self, raw_data: np.ndarray = None, processed_data: np.ndarray = None): """ Plot the trial. Parameters ---------- raw_data : numpy.ndarray The raw EMG data of the trial. processed_data : numpy.ndarray The processed EMG data of the trial. """ data = raw_data legend = ["Raw"] nb_column = 4 if raw_data.shape[0] > 4 else raw_data.shape[0] n_p = 0 plot_comm = "y" print(f"Trial {self.try_name} terminated. ") while plot_comm != "n": if n_p != 0: plot_comm = input(f"Would you like to plot again ? 'y'/'n'") if plot_comm == "y": plot = input( f"Press 'pr' to plot your raw trial," f" 'p' to plot your processed trial, 'b' to plot both or 'c' to continue," f" then press enter." ) while plot != "p" and plot != "pr" and plot != "c" and plot != "b": print(f"Invalid entry ({plot}). Please press 'p', 'pr', 'b', or 'c' (in lowercase).") plot = input( f"Press 'pr' to plot your raw trial," f"'p' to plot your processed trial or 'c' to continue then press enter." ) if plot != "c": if plot == "p": data = processed_data legend = ["Processed"] elif plot == "b": data = [raw_data, processed_data] legend = ["Raw data", "Processed"] legend = legend * raw_data.shape[0] x = np.linspace(0, raw_data.shape[1] / self.frequency, raw_data.shape[1]) print("Close the plot windows to continue.") OfflinePlot().multi_plot( data, nb_column=nb_column, y_label="Activation level (v)", x_label="Time (s)", legend=legend, subplot_title=self.muscle_names, figure_name=self.try_name, x=x, ) else: pass n_p += 1 def _process_emg(self, data, save_tmp=True): """ Process the EMG data. Parameters ---------- data : numpy.ndarray The raw EMG data of the trial. save_tmp : bool If True, the processed data is saved in a temporary file. Returns ------- numpy.ndarray The processed EMG data of the trial. """ if not self.is_processing_method: self.set_processing_method() emg_processed = self.emg_processing( data, data_rate=self.frequency, moving_average=self.moving_average, low_pass_filter=self.low_pass ) file_name = "_MVC_tmp.bio" # save tmp_file if save_tmp: if os.path.isfile(file_name): mat = load(file_name) mat[f"{self.try_name}_processed"] = emg_processed mat[f"{self.try_name}_raw"] = data data_to_save = mat else: data_to_save = {f"{self.try_name}_processed": emg_processed, f"{self.try_name}_raw": data} save(data_to_save, file_name) return emg_processed, data def _save_trial(self) -> list: """ Save and end the trial. Returns ------- list The list of the mvc for each muscle. """ print("Concatenate data for all trials.") mat_content = load("_MVC_tmp.bio") data_final = [] for i in range(len(self.try_list)): if i == 0: data_final = mat_content[f"{self.try_list[i]}_processed"] else: data_final_tmp = mat_content[f"{self.try_list[i]}_processed"] data_final = np.append(data_final, data_final_tmp, axis=1) save = input("Press 's' to save your data, other key to just return a list of MVC.\n") if save != "s": save = input("Data will not be saved. " "If you want to save press 's', if not, press enter.\n") print("Please wait during data processing (it could take some time)...") emg_processed, data_raw = self._process_emg(data_final, save_tmp=False) mvc_trials = emg_processed save = True if save == "s" else False mvc = OfflineProcessing.compute_mvc( self.nb_muscles, mvc_trials, self.mvc_windows, "_MVC_tmp.bio", self.output_file, save ) return mvc