EMG streaming
This example shows how to disseminate EMG data using biosiglive and a dedicated interface. First an interface is created, then a device is added to the interface. For EMG data, two interfaces are available: ViconClient to stream data from a vicon Nexus software via the Vicon Data Stream SDK (https://www.vicon.com/software/datastream-sdk/). The Pytrigno interface for streaming data from the Delsys trigno community SDK (https://delsys.com/sdk/). If you want to try this example offline, you can use the provided custom interface named ‘MyInterface’ which you can use as a standard interface.
The EMG device taking as input argument :
-the number of electrodes -the type of the device (the allowed types are listed in the DeviceType class) -the name of the device (for the Vicon system, it must be the same name as the device to stream). -the flow of the device. -the process method to use. Here, the process_emg method is used. -any other argument needed for the processing method. (you can see the possible arguments in the documentation of the process_emg method).
If you want to display the data in real time, you can use the biosiglive plot classes, please take a look at the live_plot.py example. After initializing the interface and the device, the data exchange is done in a loop. The data must first be received from the source by the get_device_data method. After that, the data can be used as is or processed using the process() method of the Device class. In this function you can pass a method if you want to use a different method than the default one and every needed argument for that function as well.
from custom_interface import MyInterface
from biosiglive.gui.plot import LivePlot
from biosiglive import (
ViconClient,
RealTimeProcessingMethod,
PlotType,
)
from time import sleep, time
if __name__ == "__main__":
try_offline = True
output_file_path = "trial_x.bio"
if try_offline:
interface = MyInterface(system_rate=100, data_path="abd.bio")
else:
interface = ViconClient(ip="localhost", system_rate=100)
n_electrodes = 4
muscle_names = [
"Pectoralis major",
"Deltoid anterior",
"Deltoid medial",
"Deltoid posterior",
]
interface.add_device(
nb_channels=n_electrodes,
device_type="emg",
name="emg",
rate=2000,
device_data_file_key="emg",
processing_method=RealTimeProcessingMethod.ProcessEmg,
moving_average=True,
)
emg_plot = LivePlot(
name="emg", rate=100, plot_type=PlotType.Curve, nb_subplots=n_electrodes, channel_names=muscle_names
)
emg_plot.init(plot_windows=500, y_labels="Processed EMG (mV)")
emg_raw_plot = LivePlot(
name="emg_raw", rate=100, plot_type=PlotType.Curve, nb_subplots=n_electrodes, channel_names=muscle_names
)
emg_raw_plot.init(plot_windows=10000, colors=(255, 0, 0), y_labels="EMG (mV)")
time_to_sleep = 1 / 100
count = 0
while True:
tic = time()
raw_emg = interface.get_device_data(device_name="emg")
emg_proc = interface.devices[0].process()
emg_plot.update(emg_proc[:, -1:])
emg_raw_plot.update(raw_emg)
count += 1
loop_time = time() - tic
real_time_to_sleep = time_to_sleep - loop_time
if real_time_to_sleep > 0:
sleep(real_time_to_sleep)