Create a pipeline¶
A simple example¶
NeuXus loads and executes a py file (called the pipeline script) at the beginning of its execution, this file describes the pipeline. It initializes all Nodes and links between them.
Example:
from neuxus.nodes import *
# dir_path is the path to directory containing this file
stim_cfg = dir_path + '/stimulation_config_1.xml'
# Node 1
generated_markers = stimulator.Stimulator(
file=stim_cfg
)
# Node 2
sending = io.LslSend(
input_port=generated_markers.output,
name='my_simulation',
type='marker'
)
Try the following command to run this pipeline
$ neuxus -e basics/stimulate_send.py
This pipeline reads the stimulations config with the Stimulator Node first (refer to stimulator), and then send the stimulations via LSL with the second Node LslSend (refer to io). To link the Stimulator to the LslSend, LslSend takes as input_port
, the output
attribute of the Stimulator, generated_markers.output
.
Note
The output
attribute is of type Port, and enables to share data between Nodes. Not every Node has that attribute, read the API!
Warning
Do not name Nodes with the same name, else they will override each others.
A more complicated example¶
This pipeline sends by LSL a simple DSP feedback calculated in real-time:
from neuxus.nodes import *
def square(x):
# x is a matrix of a row
return x ** 2
lsl_reception = io.LslReceive(
prop='type',
value='EEG',
data_type='signal'
)
butter_filter = filter.ButterFilter(
input_port=lsl_reception.output,
lowcut=8,
highcut=12
)
apply_function = function.ApplyFunction(
input_port=butter_filter.output,
function=square
)
epoch = epoching.TimeBasedEpoching(
input_port=apply_function.output,
duration=1,
interval=0.5
)
average = epoch_function.UnvariateStat(
input_port=epoch.output,
stat='mean'
)
lsl_send = io.LslSend(
input_port=average.output,
name='mySignalEpoched'
)
It first receives the signal from LSL, then filters it with a bandpass ButterFilter, squares it with ApplyFunction. The signal is epoched every 0.5s on a range of 1s and averaged epoch by epoch. The DSP is then sent via LSL.
Note
The pipeline script is only executed once when running NeuXus, users can then define functions, constants that can be used to build Nodes.
Warning
The processing frequency of NeuXus is self-regulated, the software calculates all updates as quickly as possible and then waits for the next data entry.
If the RAM is overloaded (because of other running software or because there are too many nodes in the pipeline), the loop takes longer to complete. This may involve a significant (and exponentially increasing) delay.
If stimuli are used outside the software (via LSL for example), divide your pipeline, one with the independent stimulator, the other with the rest: this ensures that the pipeline does not slow down the sending of stimulation via LSL.
Write your pipeline¶
Get inspiration from the following template
to write new pipelines.
# description of the pipeline
"""
Detailed template for creating basic NeuXus scripts
Refer to the examples for more complex pipelines and possibilities.
author: ...
mail: ...
"""
# import usefull library
import numpy as np
import datetime
# import all nodes from neuxus
# refer to the API to see all available nodes
from neuxus.nodes import *
# import your customized nodes from a file located in the same directory as your pipeline script
from my_custom_node_file import MyCustomNode
# this script is executed at the very beginning, you can initialze some data, parameters
# and create all Nodes you need for your pipeline
# for example
date = datetime.datetime.now().strftime('%Y-%m-%d_%H%M%S')
print(date)
features_file = 'my_path' + date
# stimulation and visualization
lsl_markers = stimulator.Stimulator(
file=dir_path + 'config_ov.xml' # dir_path is the path to the directory containing the current script
)
# data aqcuisition from LSL stream
lsl_signal = generate.Generator(
generator='simulation',
nb_channels=32,
sampling_frequency=250
)
# or from simulation
lsl_signal = io.LslReceive(
prop='name',
value='LiveAmpSN-054207-0168',
data_type='signal',
sync='network'
)
# to link node, use the attribute output in input_port arg of another node
# channel selection
selected_chans = select.ChannelSelector(
input_port=lsl_signal.output,
mode='index',
selected=[8, 25]
)
# temporal filtering
butter_filter = filter.ButterFilter(
input_port=selected_chans.output,
lowcut=8,
highcut=12
)
# time-based epoching
time_epoch = epoching.TimeBasedEpoching(
input_port=butter_filter.output,
duration=1,
interval=0.25
)
# averaging
average_epoch = epoch_function.UnivariateStat(
input_port=time_epoch.output,
stat='mean'
)
# save features
tocsvL = store.ToCsv(
input_port=average_epoch.output,
file=features_file
)
# my custom node
my_node = MyCustomNode(
input_port=average_epoch.output
)
Note
Users can load their own Nodes in pipeline (see Create your own node).