Source code for biosiglive.streaming.stream_data

"""
This file contains a class that allows to stream data from a source and start some multiprocess
 to process and disseminate. It is a work in progress so basic functions are available but need to be fine-tuned.
"""

from typing import Union
from time import time, sleep, strftime
import datetime
import numpy as np
import multiprocessing as mp
from biosiglive.streaming.server import Server
from ..file_io.save_and_load import save
from ..interfaces.generic_interface import GenericInterface
from ..interfaces.param import Device, MarkerSet
from ..gui.plot import LivePlot
from .utils import dic_merger


# TODO add enum for command type
[docs] class StreamData: def __init__(self, stream_rate: int = 100): """ Initialize the StreamData class. Careful this class do not return anything, you will have to turn the save option to True to save the data. Parameters ---------- stream_rate: int The stream rate of the data. """ self.process = mp.Process self.devices = [] self.marker_sets = [] self.plots = [] self.stream_rate = stream_rate self.interfaces_type = [] self.processes = [] self.interfaces = [] self.multiprocess_started = False # Multiprocessing stuff manager = mp.Manager() self.device_queue_in = [] self.device_queue_out = [] self.kin_queue_in = [] self.kin_queue_out = [] self.plots_queue = [] self.device_event = [] self.is_device_data = [] self.is_kin_data = [] self.interface_event = [] self.kin_event = [] self.custom_processes = [] self.custom_processes_kwargs = [] self.custom_processes_names = [] self.custom_queue_in = [] self.custom_queue_out = [] self.custom_event = [] self.save_data = None self.save_path = None self.save_frequency = None self.plots_multiprocess = False self.device_buffer_size = [] self.marker_set_buffer_size = [] self.raw_plot = None self.data_to_plot = None # Server stuff self.start_server = None self.server_ip = None self.ports = [] self.client_type = None self.count_server = 0 self.server_queue = [] self.device_decimals = 8 self.kin_decimals = 6 def _add_device(self, device: Device): """ Add a device to the stream. Parameters ---------- device: Device Device to add. """ self.devices.append(device) self.device_queue_in.append(mp.Manager().Queue()) self.device_queue_out.append(mp.Manager().Queue()) self.device_event.append(mp.Manager().Event())
[docs] def add_interface(self, interface: GenericInterface()): """ Add an interface to the stream. Parameters ---------- interface: GenericInterface Interface to add. Interface should inherit from the generic interface. """ if self.multiprocess_started: raise Exception("Cannot add interface after the stream has started.") self.interfaces.append(interface) self.interfaces_type.append(interface.interface_type) self.interface_event.append(mp.Manager().Event()) self.is_kin_data = mp.Manager().Event() self.is_device_data = mp.Manager().Event() for device in interface.devices: self._add_device(device) for marker in interface.marker_sets: self._add_marker_set(marker) if len(self.interfaces) > 1: raise ValueError("Only one interface can be added for now.")
[docs] def add_server( self, server_ip: str = "127.0.0.1", ports: Union[int, list] = 50000, client_type: str = "TCP", device_buffer_size: Union[int, list] = None, marker_set_buffer_size: [int, list] = None, ): """ Add a server to the stream. Parameters ---------- server_ip: str The ip address of the server. ports: int or list The port(s) of the server. client_type: str The type of client to use. Can be TCP. device_buffer_size: int or list The size of the buffer for the devices. marker_set_buffer_size: int or list The size of the buffer for the marker sets. """ if self.multiprocess_started: raise Exception("Cannot add interface after the stream has started.") self.server_ip = server_ip self.ports = ports if not isinstance(self.ports, list): self.ports = [self.ports] for p in range(len(self.ports)): self.server_queue.append(mp.Manager().Queue()) self.client_type = client_type if not device_buffer_size: device_buffer_size = [None] * len(self.devices) if isinstance(device_buffer_size, list): if len(device_buffer_size) != len(self.devices): raise ValueError("The device buffer size list should have the same length as the number of devices.") self.device_buffer_size = device_buffer_size elif isinstance(device_buffer_size, int): self.device_buffer_size = [device_buffer_size] * len(self.devices) if not marker_set_buffer_size: marker_set_buffer_size = [None] * len(self.marker_sets) if isinstance(marker_set_buffer_size, list): if len(marker_set_buffer_size) != len(self.marker_sets): raise ValueError( "The marker set buffer size list should have the same length as the number of marker sets." ) self.marker_set_buffer_size = marker_set_buffer_size elif isinstance(marker_set_buffer_size, int): self.marker_set_buffer_size = [marker_set_buffer_size] * len(self.marker_sets) if len(self.ports) > 1: raise ValueError("Only one server can be added for now.")
[docs] def start(self, save_streamed_data: bool = False, save_path: str = None, save_frequency: int = None): """ Start the stream. Parameters ---------- save_streamed_data: bool If True, the streamed data will be saved. save_path: str The path to save the streamed data. save_frequency: The frequency at which the data will be saved. """ self.save_data = save_streamed_data self.save_path = save_path if save_path else f"streamed_data_{strftime('%Y%m%d_%H%M%S')}.bio" self.save_frequency = save_frequency if save_frequency else self.stream_rate self._init_multiprocessing()
def _add_marker_set(self, marker: MarkerSet): """ Add a marker set to the stream. Parameters ---------- marker: MarkerSet Marker set to add from given interface. """ self.marker_sets.append(marker) self.kin_queue_in.append(mp.Manager().Queue()) self.kin_queue_out.append(mp.Manager().Queue()) self.kin_event.append(mp.Manager().Event()) # TODO : add buffer directly in the server
[docs] def device_processing(self, device_idx: int): """ Process the data from the device Parameters ---------- device_idx: int The index of the device in the list of devices. """ if self.device_buffer_size: if not self.device_buffer_size[device_idx]: self.device_buffer_size[device_idx] = self.devices[device_idx].rate buffer_size = self.device_buffer_size[device_idx] else: buffer_size = self.devices[device_idx].rate device_data = [] while True: self.is_device_data.wait() try: device_data = self.device_queue_in[device_idx].get_nowait() is_working = True except Exception: is_working = False if is_working: self.devices[device_idx].new_data = device_data self.devices[device_idx].append_data(device_data) processed_data = self.devices[device_idx].process(**self.devices[device_idx].processing_method_kwargs) self.device_queue_out[device_idx].put_nowait({"processed_data": processed_data[:, -buffer_size:]}) self.device_event[device_idx].set()
[docs] def recons_kin(self, marker_set_idx: int): """ Compute inverse kinematics from markers. Parameters ---------- marker_set_idx: int Index of the marker set in the list of markers. """ if self.marker_set_buffer_size: if not self.marker_set_buffer_size[marker_set_idx]: self.marker_set_buffer_size[marker_set_idx] = self.marker_sets[marker_set_idx].rate buffer_size = self.marker_set_buffer_size[marker_set_idx] else: buffer_size = self.marker_sets[marker_set_idx].rate if "model_path" not in self.marker_sets[marker_set_idx].kin_method_kwargs.keys(): raise ValueError("No model to compute the kinematics.") markers = [] while True: self.is_kin_data.wait() try: markers = self.kin_queue_in[marker_set_idx].get_nowait() is_working = True except Exception: is_working = False if is_working: self.marker_sets[marker_set_idx].new_data = markers self.marker_sets[marker_set_idx].append_data(markers) states, _ = self.marker_sets[marker_set_idx].get_kinematics( **self.marker_sets[marker_set_idx].kin_method_kwargs ) self.kin_queue_out[marker_set_idx].put_nowait({"kinematics_data": states[:, -buffer_size:]}) self.kin_event[marker_set_idx].set()
[docs] def open_server(self, server_idx: int): """ Open the server to send data from the devices. """ server = Server(self.server_ip, self.ports[server_idx], server_type=self.client_type) server.start() while True: connection, message = server.client_listening() data_queue = [] while len(data_queue) == 0: # use Try statement as the queue can be empty and is_empty function is not reliable. try: data_queue = self.server_queue[server_idx].get_nowait() is_working = True except Exception: is_working = False if is_working: # use this method to avoid blocking the server with Windows os. server.send_data(data_queue, connection, message)
def _init_multiprocessing(self): """ Initialize the multiprocessing. """ processes = [] for i in range(len(self.interfaces)): processes.append( self.process(name="reader", target=StreamData.save_streamed_data, args=(self, i), daemon=True) ) for d, device in enumerate(self.devices): if device.processing_method is not None: processes.append( self.process( name=f"process_{device.name}", target=StreamData.device_processing, args=( self, d, ), daemon=True, ) ) for i in range(len(self.ports)): processes.append( self.process(name="listen" + f"_{i}", target=StreamData.open_server, args=(self, i), daemon=True) ) for p, plot in enumerate(self.plots): for device in self.devices: for marker_set in self.marker_sets: if self.data_to_plot[p] not in device.name and self.data_to_plot[p] not in marker_set.name: raise ValueError(f"The name of the data to plot ({self.data_to_plot[p]}) is not correct.") if self.plots_multiprocess: processes.append(self.process(name="plot", target=StreamData.plot_update, args=(self, p), daemon=True)) else: processes.append(self.process(name="plot", target=StreamData.plot_update, args=(self, -1), daemon=True)) break for m, marker in enumerate(self.marker_sets): if marker.kin_method: processes.append( self.process( name=f"process_{marker.name}", target=StreamData.recons_kin, args=( self, m, ), daemon=True, ) ) for i, funct in enumerate(self.custom_processes): processes.append( self.process( name=self.custom_processes_names[i], target=funct, args=(self,), kwargs=self.custom_processes_kwargs[i], daemon=True, ) ) for p in processes: p.start() self.multiprocess_started = True for p in processes: p.join() def _check_nb_processes(self): """ compute the number of process. """ nb_processes = 0 for device in self.devices: if device.process_method is not None: nb_processes += 1 if self.start_server: nb_processes += len(self.ports) nb_processes += len(self.plots) nb_processes += len(self.interfaces) for marker in self.marker_sets: if marker.kin_method: nb_processes += 1 nb_processes += len(self.custom_processes) return nb_processes
[docs] def add_plot( self, plot: Union[LivePlot, list], data_to_plot: Union[str, list], raw: Union[bool, list] = None, multiprocess=False, ): """ Add a plot to the live data. Still Not working for now. Parameters ---------- plot: Union[LivePlot, list] Plot to add. data_to_plot: Union[str, list] Name of the data to plot. raw: Union[bool, list] If True, the raw data will be plotted. multiprocess: bool If True, if several plot each plot will be on a separate process. If False, each plot will be on the same one. """ raise NotImplementedError("Plot are not implemented yet with StreamData class.")
# if isinstance(data_to_plot, str): # data_to_plot = [data_to_plot] # if isinstance(raw, bool): # raw = [raw] # if len(data_to_plot) != len(raw): # raise ValueError("The length of the data to plot and the raw list must be the same.") # if not raw: # raw = [True] * len(data_to_plot) # self.plots_queue.append(self.queue()) # self.raw_plot = raw # self.data_to_plot = data_to_plot # if self.multiprocess_started: # raise Exception("Cannot add plot after the stream has started.") # self.plots_multiprocess = multiprocess # if not isinstance(plot, list): # plot = [plot] # for plt in plot: # if plt.rate: # if plt.rate > self.stream_rate: # raise ValueError("Plot rate cannot be higher than stream rate.") # self.plots.append(plt)
[docs] def plot_update(self, plot_idx: int = -1): """ Update the plots. Parameters ---------- plot_idx: int index of the plot to update. If -1, all plots will be updated. """ if plot_idx == -1: plots = self.plots queue = self.plots_queue[0] else: plots = self.plots[plot_idx] queue = self.plots_queue[plot_idx] data_to_plot = [] data = None device_names = [] marker_set_names = [] for device in self.devices: device_names.append(device.name) for marker in self.marker_sets: marker_set_names.append(marker.name) while True: try: data = queue.get_nowait() is_working = True except Exception: is_working = False if is_working: for p, plot in enumerate(plots): if self.data_to_plot[p] in device_names: if not self.raw_plot[p]: data_to_plot = data["proc_device_data"][device_names.index(self.data_to_plot[p])] else: data_to_plot = data["raw_device_data"][device_names.index(self.data_to_plot[p])] if self.data_to_plot[p] in marker_set_names: if not self.raw_plot[p]: data_to_plot = data["kinematics_data"][marker_set_names.index(self.data_to_plot[p])] else: data_to_plot = data["marker_set_data"][marker_set_names.index(self.data_to_plot[p])][ :, :, -1 ].T plot.update(data_to_plot)
[docs] def save_streamed_data(self, interface_idx: int): """ Stream, process and save the data. Parameters ---------- interface_idx: idx Interface index to use from the interface list. for now only one interface is supported. """ initial_time = 0 iteration = 0 dic_to_save = {} save_count = 0 self.save_frequency = self.save_frequency if self.save_frequency else self.stream_rate interface = self.interfaces[interface_idx] saving_time = None while True: data_dic = {} proc_device_data = [] raw_device_data = [] raw_markers_data = [] all_device_data = [] all_markers_tmp = [] kin_data = [] tic = time() if iteration == 0: initial_time = time() - tic interface_latency = interface.get_latency() is_frame = interface.get_frame() absolute_time_frame = datetime.datetime.now() absolute_time_frame_dic = { "day": absolute_time_frame.day, "hour": absolute_time_frame.hour, "hour_s": absolute_time_frame.hour * 3600, "minute": absolute_time_frame.minute, "minute_s": absolute_time_frame.minute * 60, "second": absolute_time_frame.second, "millisecond": int(absolute_time_frame.microsecond / 1000), "millisecond_s": int(absolute_time_frame.microsecond / 1000) * 0.001, } self.is_kin_data.clear() self.is_device_data.clear() if is_frame: if iteration == 0: print("Data start streaming") iteration = 1 if len(interface.devices) != 0: all_device_data = interface.get_device_data(device_name="all", get_frame=False) if not isinstance(all_device_data, list): all_device_data = [all_device_data] self.is_device_data.set() for i in range(len(all_device_data)): if self.devices[i].processing_method is not None: self.device_queue_in[i].put_nowait(all_device_data[i]) if len(interface.marker_sets) != 0: all_markers_tmp, _ = interface.get_marker_set_data(get_frame=False) if not isinstance(all_markers_tmp, list): all_markers_tmp = [all_markers_tmp] self.is_kin_data.set() for i in range(len(self.marker_sets)): if self.marker_sets[i].kin_method is not None: self.kin_queue_in[i].put_nowait(all_markers_tmp[i]) time_to_get_data = time() - tic tic_process = time() if len(interface.devices) != 0: for i in range(len(interface.devices)): if self.devices[i].processing_method is not None: self.device_event[i].wait() device_data = self.device_queue_out[i].get_nowait() self.device_event[i].clear() proc_device_data.append( np.around(device_data["processed_data"], decimals=self.device_decimals) ) raw_device_data.append(np.around(all_device_data[i], decimals=self.device_decimals)) data_dic["proc_device_data"] = proc_device_data data_dic["raw_device_data"] = raw_device_data if len(interface.marker_sets) != 0: for i in range(len(interface.marker_sets)): if self.marker_sets[i].kin_method is not None: self.kin_event[i].wait() kin_data_proc = self.kin_queue_out[i].get_nowait() self.kin_event[i].clear() kin_data.append(np.around(kin_data_proc["kinematics_data"], decimals=self.kin_decimals)) raw_markers_data.append(np.around(all_markers_tmp[i], decimals=self.kin_decimals)) data_dic["kinematics_data"] = kin_data data_dic["marker_set_data"] = raw_markers_data process_time = time() - tic_process # time to process all data for i in range(len(self.ports)): try: self.server_queue[i].get_nowait() except Exception: pass self.server_queue[i].put_nowait(data_dic) if len(self.plots) != 0: size = 1 if not self.plots_multiprocess else len(self.plots) for i in range(size): try: self.plots_queue[i].get_nowait() except Exception: pass self.plots_queue[i].put_nowait(data_dic) data_dic["absolute_time_frame"] = absolute_time_frame_dic data_dic["interface_latency"] = interface_latency data_dic["process_time"] = process_time data_dic["initial_time"] = initial_time data_dic["time_to_get_data"] = time_to_get_data # Save data if self.save_data is True: tic_save = time() data_dic["saving_time"] = saving_time dic_to_save = dic_merger(data_dic, dic_to_save) if save_count == int(self.stream_rate / self.save_frequency): save(data_dic, self.save_path) dic_to_save = {} save_count = 0 save_count += 1 saving_time = time() - tic_save if tic - time() < 1 / self.stream_rate: sleep(1 / self.stream_rate - (tic - time())) else: print( f"WARNING: Stream rate ({self.stream_rate}) is too high for the computer." f"The actual stream rate is {1 / (tic - time())}" )
[docs] def stop(self): """ Stop the stream """ for process in self.processes: process.terminate() process.join()