Source code for neuxus.nodes.epoching

import pandas as pd

from neuxus.node import Node


[docs]class TimeBasedEpoching(Node): """Generates signal 'slices' or 'blocks' having a specified duration and interval Attributes: - output: output Port used to share data to other nodes Args: - input_port (Port): Input signal (type 'signal' or 'epoch') - duration (float): Length of epoched signal - interval (float): Time interval between two consecutive epochs for signal """ def __init__(self, input_port, duration, interval): Node.__init__(self, input_port) self.duration = duration self.interval = interval self.output.set_parameters( data_type='epoch', channels=self.input.channels, sampling_frequency=self.input.sampling_frequency, meta=self.input.meta, epoching_frequency=1 / interval ) self.persistent = pd.DataFrame([], [], self.input.channels) self.trigger = None self.markers = [] Node.log_instance(self, { 'duration': self.duration, 'interval': self.interval }) def update_timing(self, min_time, max_time): if not self.trigger or self.input.data_type == 'epoch': self.trigger = min_time if self.input.data_type == 'epoch': while self.trigger + self.duration < max_time: self.markers.append((self.trigger, self.trigger + self.duration)) self.trigger += self.interval elif self.input.data_type == 'signal': while self.trigger < max_time: self.markers.append((self.trigger, self.trigger + self.duration)) self.trigger += self.interval def update(self): # update persistence for chunk in self.input: self.persistent = pd.concat([self.persistent, chunk]) # update markers received self.update_timing(float(chunk.index.values[0]), float(chunk.index.values[-1])) if len(self.persistent) > 0: for marker in self.markers.copy(): start_time, end_time = marker if float(self.persistent.index[-1]) > end_time: # get epoch self.persistent = self.persistent.iloc[lambda x: x.index >= start_time] epoch = self.persistent.iloc[lambda x: x.index < end_time] # update the output self.output.set_from_df(epoch) self.markers.remove(marker) if len(self.markers) == 0 or self.input.data_type == 'epoch': self.persistent = pd.DataFrame([], [], self.input.channels)
[docs]class MarkerBasedSeparation(Node): """Cut a continuous signal in epoch, separation of epoch are set each time a Marker come Attributes: - output (Port): epoched output port Args: - input_port (Port): input signal - marker_input_port (Port): input markers """ def __init__(self, input_port, marker_input_port): Node.__init__(self, input_port) self.marker_input = marker_input_port self.output.set_parameters( data_type='epoch', channels=self.input.channels, sampling_frequency=self.input.sampling_frequency, meta=self.input.meta, epoching_frequency=0) # persitent is data of a non-complete epoch self.persistent = pd.DataFrame([], [], self.input.channels) self.markers_received = pd.DataFrame([], [], self.marker_input.channels) # current_name is the name to add for current epoch self.current_name = 'first epoch' Node.log_instance(self, {'marker port': self.marker_input.id})
[docs] def get_end_time(self): """Test if a new marker arrived and return its timestamp as end of last epoch else return None """ if len(self.markers_received) > 0: # end_time of current epoch end_time = float(self.markers_received.index.values[0]) name = self.markers_received.loc[end_time, self.marker_input.channels].values[0] else: end_time = None name = None return (end_time, name)
def update(self): # concatenate all markers received for marker in self.marker_input: self.markers_received = pd.concat([self.markers_received, marker]) # initialize the end_time of current epoch and name of the next epoch end_time, next_name = self.get_end_time() for chunk in self.input: self.persistent = pd.concat([self.persistent, chunk]) while end_time and float(chunk.index[-1]) > end_time: # while the new chunk complete epochs do: # get epoch epoch = self.persistent[lambda x: x.index < end_time] # the rest is stored in persistence self.persistent = self.persistent.iloc[lambda x: x.index >= end_time] # update the output self.output.set_from_df(epoch) # update the new current epoch name self.current_name = next_name # drop marker from markers_recived and update end_time and next_name self.markers_received = self.markers_received.drop([end_time]) end_time, next_name = self.get_end_time()
[docs]class StimulationBasedEpoching(Node): """Slices signal into epoch of a desired length following a stimulation event Attributes: - output (Port): epoched output port Args: - input_port (Port): input signal - marker_input_port (Port): input markers - stimulation (int or float or str): stimulation that trigger a new epoching, the other event will be ignored, be carefull with the type of stimulation - offset (float): offset to wait before starting epoching when receiving the stimulation - duration (float): duration of epochs """ def __init__(self, input_port, marker_input_port, stimulation, offset, duration): Node.__init__(self, input_port) self.marker_input = marker_input_port self.output.set_parameters( data_type='epoch', channels=self.input.channels, sampling_frequency=self.input.sampling_frequency, meta=self.input.meta, epoching_frequency=0 ) self.stimulation = stimulation self.duration = duration self.offset = offset # persitent is data of a non-complete epoch self.persistent = pd.DataFrame([], [], self.input.channels) self.markers = [] Node.log_instance(self, { 'marker port': self.marker_input.id, 'stimulation': self.stimulation, 'type stimulation': type(self.stimulation), 'offset': self.offset, 'duration': self.duration}) # TO DO terminate def get_timing_from_marker(self): markers = [] for marker_df in self.marker_input: for marker_index in marker_df.index: name = marker_df.loc[marker_index].values[0] if name == self.stimulation: start_time = marker_index + self.offset end_time = start_time + self.duration markers.append((float(start_time), float(end_time))) return markers def update(self): # update persistence for chunk in self.input: self.persistent = pd.concat([self.persistent, chunk]) # update markers received self.markers = self.markers + self.get_timing_from_marker() if len(self.persistent) > 0: for marker in self.markers.copy(): start_time, end_time = marker if float(self.persistent.index[-1]) > end_time: # get epoch self.persistent = self.persistent.iloc[lambda x: x.index >= start_time] epoch = self.persistent.iloc[lambda x: x.index < end_time] # update the output self.output.set_from_df(epoch) self.markers.remove(marker) if len(self.markers) == 0: self.persistent = pd.DataFrame([], [], self.input.channels)