Source code for neuxus.nodes.generate

import random as rd
from time import time
import numpy as np

from neuxus.node import Node


[docs]class Generator(Node): """Generate signals for testing purpose Attributes: - output (Port): Output port Args: - generator (str): Type of generator used among ['random', 'oscillator', 'simulation'] - nb_channels (int): Number of channels to send in output - sampling_frequency (float): Sampling frequency of the output - min_chunk_size (int): Minimum number of rows per output chunk, default is 32 Example: Generator('random', 4, 500) Generator('oscillator', 16, 250) Generetor('simulation', 32, 500, 16) """ def __init__(self, generator, nb_channels, sampling_frequency, min_chunk_size=32): Node.__init__(self, None) assert generator in ['random', 'oscillator', 'simulation'] self._generator = generator self._sampling_frequency = sampling_frequency self._min_chunk_size = int(min_chunk_size) self._channels = [f'Ch{i}' for i in range(1, int(nb_channels) + 1)] self._min_period = self._min_chunk_size / self._sampling_frequency self.output.set_parameters( data_type='signal', channels=self._channels, sampling_frequency=self._sampling_frequency, meta='') Node.log_instance(self, { 'generator': self._generator, 'frequency': self._sampling_frequency, 'channels': self._channels}) self._last_t = None if self._generator == 'simulation': highest_freq = 45 lowest_freq = 0 mid_freq = 15 self._frequency = {} self._amplitude = {} self._baseline = {} for chan in self._channels: nb = rd.randint(1, 4) self._frequency[chan] = [rd.triangular(lowest_freq, highest_freq, mid_freq) for i in range(nb)] self._amplitude[chan] = [rd.triangular(10, 30) for i in range(nb)] self._baseline[chan] = rd.gauss(0, 10000) def update(self): t = time() if not self._last_t: self._last_t = t if t > self._min_period + self._last_t: nb_rows = int((t - self._last_t) * self._sampling_frequency) timestamps = np.array([self._last_t + i / self._sampling_frequency for i in range(1, nb_rows + 1)]) self._last_t = timestamps[-1] data = [] for i, chan in enumerate(self._channels): if self._generator == 'random': data.append([rd.random() * 20 - 10 for row in range(nb_rows)]) elif self._generator == 'oscillator': data.append(np.sin(2 * np.pi * (5 + i) * timestamps) * (5 + 5 / (i + 1))) elif self._generator == 'simulation': serie = np.zeros(len(timestamps)) for i, freq in enumerate(self._frequency[chan]): serie += np.sin(2 * np.pi * freq * timestamps) * self._amplitude[chan][i] serie += np.array([self._baseline[chan] + rd.random() * 20 for row in range(nb_rows)]) data.append(serie) self.output.set(np.array(data).transpose(), timestamps, self._channels)