Source code for biosiglive.streaming.server

"""
This file contains a wrapper for the socket server to send data to the server.
"""
import socket
import numpy as np
import struct

try:
    from pythonosc.udp_client import SimpleUDPClient
except ModuleNotFoundError:
    pass
import pickle


[docs] class Connection: """ This class is used to connect to the biosiglive server. """ def __init__(self, ip: str = "127.0.0.1", port: int = 50000): """ Initialize the connection. Parameters ---------- ip : str The ip address of the server. port : int The port of the server. """ self.ip = ip self.ports = port self.message_queues = None self.buff_size = 100000 self.acquisition_rate = 100 @staticmethod def _prepare_data(command: list, data: dict, down_sampling: dict = None, nb_frames_to_get: int = None): """ Prepare the data to send. Parameters ---------- command : dict The command received from the client. data : dict The data to prepared. down_sampling : dict The down sampling to apply to the data. nb_frames_to_get : int The number of frames to get. Returns ------- prepared data : dict The data prepared to be sent. """ data_tmp = {} if command == "all": command = list(data.keys()) for key in command: if key in data.keys(): data_tmp[key] = [] for d in data[key]: if down_sampling and key in down_sampling.keys(): if not isinstance(down_sampling[key], int): raise ValueError("The down sampling must be an integer.") d = d[..., :: down_sampling[key]] if isinstance(d, np.ndarray): if nb_frames_to_get: data_tmp[key].append(d[..., -nb_frames_to_get:]) else: data_tmp[key].append(d) else: data_tmp[key].append(d) else: raise ValueError(f"The asked data '{key}' is not in the data dictionary.") for key in data_tmp.keys(): if len(data_tmp[key]) == 1: data_tmp[key] = data_tmp[key][0] return data_tmp
[docs] class Server(Connection): """ Class to create a server. """ def __init__(self, ip: str = "127.0.0.1", port: int = 50000, server_type: str = "TCP"): """ Initialize the server. Parameters ---------- ip : str The ip of the server. port : int The port of the server. server_type : str The type of the server. """ self.ip = ip self.port = port self.server_type = server_type self.server = None self.inputs = None self.outputs = None self.buff_size = 100000 super().__init__(ip=ip, port=port)
[docs] def start(self): """ Start the server. """ if self.server_type == "TCP": self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) elif self.server_type == "UDP": raise RuntimeError(f"UDP server not implemented yet.") # self.servers.append(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) else: raise RuntimeError(f"Invalid type of connexion ({self.server_type}). Type must be 'TCP' or 'UDP'.") try: self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((self.ip, self.port)) if self.server_type != "UDP": self.server.listen(10) self.inputs = [self.server] self.outputs = [] self.message_queues = {} except ConnectionError: raise RuntimeError("Unknown error. Server is not listening.")
[docs] def client_listening(self): """ Waiting for the client connection. """ connection, ad = self.server.accept() message = pickle.loads(connection.recv(self.buff_size)) # Received message return connection, message
[docs] def send_data(self, data: dict, connection: socket.socket, message: dict = None): """ Send the data to the client. Parameters ---------- data : dict The data to send. connection : socket.socket The connection to send the data to. message : dict The message received from the client. """ if message: if ( "command" in message.keys() and "down_sampling" in message.keys() and "nb_frames_to_get" in message.keys() ): data = self._prepare_data( message["command"], data, message["down_sampling"], message["nb_frames_to_get"] ) else: raise ValueError( "The message should be a dictionary created from the Message class or contains the key" " : 'command', down_sampling, nb_frames_to_get." ) encoded_data = pickle.dumps(data) encoded_data = struct.pack(">I", len(encoded_data)) + encoded_data try: connection.sendall(encoded_data) except ConnectionError: raise RuntimeError("Unknown error. Data not sent.")
[docs] class OscClient(Connection): """ Class to create an OSC client. """ def __init__(self, ip: str = "127.0.0.1"): self.ports = [51337] self.osc = [] super().__init__(ip=ip, ports=self.ports)
[docs] def start(self): """ Start the client. """ for i in range(len(self.ports)): try: self.osc.append(SimpleUDPClient(self.ip, self.ports[i])) print(f"Streaming OSC {i} activated on '{self.ip}:{self.ports[i]}") except ConnectionError: raise RuntimeError("Unknown error. OSC client not open.")
@staticmethod def __adjust_dims(data: dict, device_to_send: dict): """ Adjust the dimensions of the data to send. Parameters ---------- data : dict The data to send. device_to_send : dict The device type to send the data to (emg or imu). Returns ------- The data to send. """ data_to_return = [] for key in device_to_send: if key == "emg": emg_proc = np.array(data["emg_proc"])[:, -1:] emg_proc = emg_proc.reshape(emg_proc.shape[0]) data_to_return.append(emg_proc.tolist()) elif key == "imu": imu = np.array(data["imu_proc"])[:, :, -1:] data_to_return.append(imu.tolist()) accel_proc = imu[:, :3, :] accel_proc = accel_proc.reshape(accel_proc.shape[0]) data_to_return.append(accel_proc.tolist()) gyro_proc = imu[:, 3:6, :] gyro_proc = gyro_proc.reshape(gyro_proc.shape[0]) data_to_return.append(gyro_proc.tolist()) else: raise RuntimeError(f"Unknown device ({key}) to send. Possible devices are 'emg' and 'imu'.") return data_to_return
[docs] def send_data(self, data: dict, device_to_send: dict): """ Send the data to the client. Parameters ---------- data : dict The data to send. device_to_send : dict The device type to send the data to (emg or imu). """ data = self.__adjust_dims(data, device_to_send) for key in device_to_send: if key == "emg": self.osc[0].send_message("/emg", data[0]) elif key == "imu": idx = 1 if key in "emg" in device_to_send else 0 self.osc[0].send_message("/imu/", data[idx]) self.osc[0].send_message("/accel/", data[idx + 1]) self.osc[0].send_message("/gyro/", data[idx + 2]) else: raise RuntimeError(f"Unknown device ({key}) to send. Possible devices are 'emg' and 'imu'.")