"""
This file contains a wrapper for the python Vicon SDK.
"""
from .param import *
from typing import Union
from .generic_interface import GenericInterface
from ..enums import InverseKinematicsMethods, InterfaceType
try:
from vicon_dssdk import ViconDataStream as VDS
except ModuleNotFoundError:
pass
[docs]
class ViconClient(GenericInterface):
"""
Class for interfacing with the Vicon system.
"""
def __init__(self, system_rate: int, ip: str = "127.0.0.1", port: int = 801, init_now=True):
"""
Initialize the ViconClient class.
Parameters
----------
system_rate: int
Streaming rate of the nexus software.
ip: str
IP address of the nexus software.
port: int
Port of the nexus software.
init_now: bool
Whether to initialize the client now.
Usefull if you want to pickle the interface as the Vicon SDK is not pickable (swig).
"""
super(ViconClient, self).__init__(ip=ip, system_rate=system_rate, interface_type=InterfaceType.ViconClient)
self.address = f"{ip}:{port}"
self.vicon_client = None
self.acquisition_rate = None
self.system_rate = system_rate
self.devices = []
self.imu = []
self.marker_sets = []
self.is_frame = False
self.is_initialized = False
if init_now:
self._init_client()
def _init_client(self):
"""
Initialize the Vicon client.
"""
print(f"Connection to ViconDataStreamSDK at : {self.address} ...")
self.vicon_client = VDS.Client()
self.vicon_client.Connect(self.address)
print("Connected to Vicon.")
self.is_initialized = True
# Enable several data types
self.vicon_client.EnableSegmentData()
self.vicon_client.EnableDeviceData()
self.vicon_client.EnableMarkerData()
self.vicon_client.EnableUnlabeledMarkerData()
self.get_frame()
if self.system_rate != self.vicon_client.GetFrameRate():
raise ValueError(
f"Vicon system rate ({self.vicon_client.GetFrameRate()}) does not match the system rate "
f"({self.system_rate})."
)
[docs]
def add_device(
self,
nb_channels: int,
device_type: Union[DeviceType, str] = DeviceType.Emg,
data_buffer_size: int = None,
name: str = None,
rate: float = 2000,
device_range: tuple = None,
processing_method: Union[RealTimeProcessingMethod, OfflineProcessingMethod] = None,
**process_kwargs,
):
"""
Add a device to the Vicon system.
Parameters
----------
nb_channels: int
Number of channels of the device.
device_type: Union[DeviceType, str]
Type of the device.
data_buffer_size: int
Size of the buffer for the device.
name: str
Name of the device.
rate: float
Rate of the device.
device_range: tuple
Range of the device.
processing_method : Union[RealTimeProcessingMethod, OfflineProcessingMethod]
Method used to process the data.
**process_kwargs
Keyword arguments for the processing method.
"""
device_tmp = self._add_device(
nb_channels, device_type, name, rate, device_range, processing_method, **process_kwargs
)
device_tmp.interface = self.interface_type
if self.vicon_client:
device_tmp.infos = self.vicon_client.GetDeviceOutputDetails(name)
else:
device_tmp.infos = None
device_tmp.data_windows = data_buffer_size
self.devices.append(device_tmp)
[docs]
def add_marker_set(
self,
nb_markers: int,
name: str = None,
data_buffer_size: int = None,
marker_names: Union[str, list] = None,
rate: float = 100,
unlabeled: bool = False,
subject_name: str = None,
kinematics_method: InverseKinematicsMethods = None,
**kin_method_kwargs,
):
"""
Add markers set to stream from the Vicon system.
Parameters
----------
nb_markers: int
Number of markers.
name: str
Name of the markers set.
data_buffer_size: int
Size of the buffer for the markers set.
marker_names: Union[list, str]
List of markers names.
rate: int
Rate of the markers set.
unlabeled: bool
Whether the markers set is unlabeled.
subject_name: str
Name of the subject. If None, the subject will be the first one in Nexus.
kinematics_method: InverseKinematicsMethods
Method used to compute the kinematics.
**kin_method_kwargs
Keyword arguments for the kinematics method.
"""
if len(self.marker_sets) != 0:
raise ValueError("Only one marker set can be added for now.")
markers_tmp = self._add_marker_set(
nb_markers=nb_markers,
name=name,
marker_names=marker_names,
rate=rate,
unlabeled=unlabeled,
kinematics_method=kinematics_method,
**kin_method_kwargs,
)
if self.vicon_client:
markers_tmp.subject_name = subject_name if subject_name else self.vicon_client.GetSubjectNames()[0]
markers_tmp.marker_names = (
self.vicon_client.GetMarkerNames(markers_tmp.subject_name) if not marker_names else marker_names
)
markers_tmp.marker_names = [name[0] for name in markers_tmp.marker_names]
if markers_tmp.nb_channels != len(markers_tmp.marker_names):
raise RuntimeError("Nb of marker not the same than markers on vicon.")
else:
markers_tmp.subject_name = subject_name
markers_tmp.marker_names = marker_names
markers_tmp.data_windows = data_buffer_size
self.marker_sets.append(markers_tmp)
[docs]
@staticmethod
def get_force_plate_data():
raise NotImplementedError("Force plate streaming is not implemented yet.")
[docs]
def get_device_data(
self, device_name: Union[str, list] = "all", channel_idx: Union[int, list] = (), get_frame: bool = True
):
"""
Get the device data from Vicon.
Parameters
----------
device_name: str or list
Name of the device or list of devices names.
channel_idx: Union[int, str]
Index of the channel to return.
get_frame: bool
Whether to get a new frame from the Vicon system.
Returns
-------
device_data: list
All asked device data.
"""
if len(self.devices) == 0:
raise ValueError("No device has been added to the Vicon system.")
if not self.is_initialized:
raise RuntimeError("Vicon client is not initialized.")
if get_frame:
self.get_frame()
all_device_data = []
if not isinstance(device_name, list):
device_name = [device_name]
if channel_idx and not isinstance(channel_idx, list):
channel_idx = [channel_idx]
device_data = []
for d, device in enumerate(self.devices):
if device_name[0] == "all" or device.name in device_name:
device.new_data = np.zeros((device.nb_channels, device.sample))
count = 0
for output_name, channel_name, unit in device.infos:
data_tmp, _ = self.vicon_client.GetDeviceOutputValues(device.name, output_name, channel_name)
device.new_data[count, :] = data_tmp
device.channel_names.append(channel_name)
count += 1
if count == device.nb_channels:
break
if channel_idx:
device_data = np.zeros((len(channel_idx), device.sample))
for idx in range(device.nb_channels):
if idx in channel_idx:
device_data[channel_idx.index(idx), :] = device.new_data[idx, :]
device_data = device_data if channel_idx else device.new_data
device.append_data(device.new_data)
all_device_data.append(device_data)
if len(all_device_data) == 1:
return all_device_data[0]
return all_device_data
[docs]
def get_marker_set_data(
self, subject_name: Union[str, list] = None, marker_names: Union[str, list] = None, get_frame: bool = True
):
"""
Get the markers data from Vicon.
Parameters
----------
subject_name: Union[str, list]
Name of the subject. If None, the subject will be the first one in Nexus.
marker_names: Union[str, list]
List of markers names.
get_frame: bool
Whether to get a new frame or not.
Returns
-------
markers_data: list
All asked markers data.
"""
if len(self.marker_sets) == 0:
raise ValueError("No marker set has been added to the Vicon system.")
if not self.is_initialized:
raise RuntimeError("Vicon client is not initialized.")
if get_frame:
self.get_frame()
if subject_name and isinstance(subject_name, list):
subject_name = [subject_name]
if marker_names and isinstance(marker_names, list):
marker_names = [marker_names]
occluded = []
all_markers_data = []
all_occluded_data = []
if subject_name:
marker_sets = [None] * len(subject_name)
for s, marker_set in enumerate(self.marker_sets):
if marker_set.subject_name in subject_name:
marker_sets[subject_name.index(marker_set.subject_name)] = marker_set
if marker_sets == [None]:
raise RuntimeError("No subject of this name.")
else:
marker_sets = self.marker_sets
for markers in marker_sets:
markers.new_data = np.zeros((3, len(markers.marker_names), markers.sample))
count = 0
for m, marker_name in enumerate(markers.marker_names):
markers_data_tmp, occluded_tmp = self.vicon_client.GetMarkerGlobalTranslation(
markers.subject_name, marker_name
)
markers.new_data[:, count, :] = np.array(markers_data_tmp)[:, np.newaxis]
occluded.append(occluded_tmp)
if marker_names:
markers_data = np.zeros((3, len(marker_names), markers.sample))
for n, name in enumerate(markers.marker_names):
if name in marker_names:
markers_data[:, marker_names.index(name), :] = markers.new_data[:, n, :]
all_markers_data.append(markers_data)
else:
all_markers_data.append(markers.new_data)
markers.append_data(markers.new_data)
all_occluded_data.append(occluded)
if len(all_markers_data) == 1:
return all_markers_data[0], all_occluded_data[0]
return all_markers_data, all_occluded_data
[docs]
def init_client(self):
"""
Initialize the Vicon client if it is not already initialized.
This function has to be called before get frame from interface.
"""
if self.is_initialized:
raise RuntimeError("Vicon client is already initialized.")
else:
self._init_client()
for d, device in enumerate(self.devices):
if not device.infos:
device.infos = self.vicon_client.GetDeviceOutputDetails(device.name)
for m, marker_set in enumerate(self.marker_sets):
if not marker_set.markers_names:
marker_set.markers_names = self.vicon_client.GetMarkerNames(marker_set.subject_name)
if not marker_set.subject_name:
marker_set.subject_name = self.vicon_client.GetSubjectNames()[0]
[docs]
def get_latency(self) -> float:
"""
Get the latency between the Vicon system and the Vicon SDK.
Returns
-------
latency: float
Latency between the Vicon system and the Vicon SDK.
"""
if not self.is_initialized:
raise RuntimeError("Vicon client is not initialized.")
return self.vicon_client.GetLatencyTotal()
[docs]
def get_frame(self) -> bool:
"""
Get a new frame from the Vicon system.
Returns
-------
bool
True if there is a frame, False otherwise.
"""
if not self.is_initialized:
raise RuntimeError("Vicon client is not initialized.")
self.is_frame = self.vicon_client.GetFrame()
while self.is_frame is not True:
self.is_frame = self.vicon_client.GetFrame()
return self.is_frame
[docs]
def get_frame_number(self) -> int:
"""
Get the last frame number.
Returns
-------
frame_number: int
Last frame number.
"""
if not self.is_initialized:
raise RuntimeError("Vicon client is not initialized.")
return self.vicon_client.GetFrameNumber()
[docs]
def get_kinematics_from_markers(
self,
marker_set_name: str,
model_path: str = None,
method: Union[InverseKinematicsMethods, str] = InverseKinematicsMethods.BiorbdLeastSquare,
custom_func: callable = None,
**kwargs,
):
"""
Get the kinematics from markers.
Parameters
----------
marker_set_name: str
name of the markerset.
model_path: str
biorbd model of the kinematics.
method: str
Method to use to get the kinematics. Can be "kalman" or "custom".
custom_func: function
Custom function to get the kinematics.
Returns
-------
kinematics: list
List of kinematics.
"""
marker_set_idx = [i for i, m in enumerate(self.marker_sets) if m.name == marker_set_name][0]
return self.marker_sets[marker_set_idx].get_kinematics(model_path, method, custom_func=custom_func, **kwargs)