Source code for neuxus.nodes.select

import os

import pandas as pd
import logging
import yaml
from xml.dom import minidom

from neuxus.node import Node


[docs]class ChannelSelector(Node): """Select a subset of signal channels Attributes: - output (port): output port Args: - mode ('index' or 'name'): indicate the way to select data - selected (list): column to be selected Example: ChannelSelector(port1, port2, 'index', [2, 4, 5]) ChannelSelector(port1, port2, 'name', ['Channel 2', 'Channel 4']) """ def __init__(self, input_port, mode, selected): Node.__init__(self, input_port) assert mode in ['index', 'name'] # get channels if mode == 'index': self._channels = [self.input.channels[i - 1] for i in selected] elif mode == 'name': self._channels = selected self.output.set_parameters( data_type=self.input.data_type, channels=self._channels, sampling_frequency=self.input.sampling_frequency, meta=self.input.meta, epoching_frequency=self.input.epoching_frequency) Node.log_instance(self, {'selected channels': self._channels}) def update(self): for chunk in self.input: self.output.set_from_df(chunk[self._channels])
[docs]class SpatialFilter(Node): """Maps M inputs to N outputs by multiplying the each input vector with a matrix Attributes: - output (port): output GroupOfPorts Args: - input_port (port): input port - matrix (dict or str): path to the matrix yaml/xml/cfg file OR dictionnary with new channel name as keys and list of coefficients as values, list must be of the same length as input.channels Example: - SpatialFilter(input_port, '../example/my_matrix.yaml') where my_matrix is the following yaml file: --- # Matrix of coefficient for SpatialFilter OC1: [1, 1, 0, 4e-9] OC2: [4, 2, 4, -2] ... - SpatialFilter(input_port, '../example/my_matrix.cfg') where my_matrix is the following cfg file: <SettingValue>7.352412e-002 2.354113e-001 -1.212912e-001 -3.687871e-002 2.785947e-002 9.112804e-003 2.665961e-001 1.635848e-001 -1.540287e-002 -1.944730e-002 -7.130017e-002 -6.074913e-001</SettingValue> <SettingValue>2</SettingValue> <SettingValue>6</SettingValue> (first setting values are coefficients second one is the number of output channels last one is the number of input channels) - SpatialFilter(input_port, matrix) where matrix = { 'OC2': [4, 0, -1e-2, 0], 'OC3': [0, -1, 2, 4] } """ def __init__(self, input_port, matrix): Node.__init__(self, input_port) if isinstance(matrix, str): filename, file_extension = os.path.splitext(matrix) if file_extension == '.yaml': logging.debug(f'Got matrix from file {matrix}') with open(matrix, 'r') as file: matrix = yaml.load(file, Loader=yaml.FullLoader) logging.debug(f'{matrix}') elif file_extension in ['.xml', '.cfg']: file = minidom.parse(matrix) coefs = file.getElementsByTagName('SettingValue')[0].firstChild.data coefs = [float(coef) for coef in coefs.split()] output_ch_nb = int(file.getElementsByTagName('SettingValue')[1].firstChild.data) input_ch_nb = int(file.getElementsByTagName('SettingValue')[2].firstChild.data) assert len(coefs) == input_ch_nb * output_ch_nb matrix = {f'Ch{i + 1}': [coefs[i * input_ch_nb + j] for j in range(input_ch_nb)] for i in range(output_ch_nb)} self._matrix = matrix self._channels = [*self._matrix.keys()] str_matrix = f'' for chan in self._channels: # verify that the size of _matrix is correct assert len(self._matrix[chan]) == len(self.input.channels) # convert to float (for format '8e-4') self._matrix[chan] = [float(i) for i in self._matrix[chan]] str_matrix += f'\n {chan}: {self._matrix[chan]}' self.output.set_parameters( data_type=self.input.data_type, channels=self._channels, sampling_frequency=self.input.sampling_frequency, meta=self.input.meta, epoching_frequency=self.input.epoching_frequency) Node.log_instance(self, {'matrix': str_matrix}) def update(self): for chunk in self.input: df = pd.DataFrame([]) for chan in self._channels: flag = True # use to first create the serie for index, coef in enumerate(self._matrix[chan]): # print(f'{index} columns with coef {coef}') if flag: serie = chunk.iloc[:, index] * coef flag = False else: serie += chunk.iloc[:, index] * coef df[chan] = serie self.output.set_from_df(df)
[docs]class ReferenceChannel(Node): """Subtracts the value of the reference channel from all other channels Attributes: - output (port): output GroupOfPorts Args: - mode ('index' or 'name'): indicate the way to select data - reference channel (str or int): column to be substracted example: ReferenceChannel(input_port, 'index', 4) or ReferenceChannel(input_port, 'name', 'Cz') """ def __init__(self, input_port, mode, ref): Node.__init__(self, input_port) assert mode in ['index', 'name'] # get reference channel name if mode == 'index': self._ref = self.input.channels[ref - 1] elif mode == 'name': self._ref = ref self._channels = self.input.channels.copy() self._channels.remove(self._ref) self.output.set_parameters( data_type=self.input.data_type, channels=self._channels, sampling_frequency=self.input.sampling_frequency, meta=self.input.meta, epoching_frequency=self.input.epoching_frequency) Node.log_instance(self, {'reference': self._ref}) def update(self): for chunk in self.input: df = pd.DataFrame([]) to_substract = chunk.loc[:, self._ref] for chan in self._channels: df[chan] = chunk.loc[:, chan] - to_substract self.output.set_from_df(df)
[docs]class CommonAverageReference(Node): """Re-referencing the signal to common average reference consists in subtracting from each sample the average value of the samples of all electrodes at this time Attributes: - output (port): output GroupOfPorts example: CommonReferenceChannel(input_port) """ def __init__(self, input_port): Node.__init__(self, input_port) self.output.set_parameters( data_type=self.input.data_type, channels=self.input.channels, sampling_frequency=self.input.sampling_frequency, meta=self.input.meta, epoching_frequency=self.input.epoching_frequency) Node.log_instance(self, {}) def update(self): for chunk in self.input: to_substract = chunk.mean(axis=1) df = pd.DataFrame([]) for chan in self.input.channels: df[chan] = chunk.loc[:, chan] - to_substract self.output.set_from_df(df)