import numpy as np
from struct import unpack
from socket import (AF_INET, SOCK_STREAM, SOCK_DGRAM, socket)
import logging
import uuid
from time import time
from pylsl import (StreamInfo, StreamOutlet,
StreamInlet, resolve_byprop, pylsl)
from neuxus.node import Node
from neuxus.chunks import Port
[docs]class LslSend(Node):
"""Send to a LSL stream.
Attributes:
- i (Port): Default data input, expects DataFrame.
Args:
- name (string): The name of the stream.
- type_ (string): The content type of the stream, .
- format (string): The format type for each channel. Currently, only ``double64`` and ``string`` are supported.
- uuid (string, None): The unique identifier for the stream. If ``None``, it will be auto-generated.
"""
_dtypes = {"double64": np.number, "string": np.object, 'float32': np.number, 'int32': np.number}
def __init__(self, input_port, name, type="signal", format="double64", uuid_=None):
Node.__init__(self, input_port, False)
self._name = name
assert self.input.data_type in ['epoch', 'signal', 'marker']
self._type = type
self._format = format
self.outlet = None
self._frequency = self.input.sampling_frequency
if not uuid_:
uuid_ = str(uuid.uuid4())
self.uuid = uuid_
self.connect()
Node.log_instance(self, {
'name': self._name,
'frequency': self._frequency,
'channels': self.input.channels
})
[docs] def connect(self):
'''Create an outlet for streaming data'''
if not self.outlet:
# metadata
info = StreamInfo(
self._name,
self._type,
len(self.input.channels),
self._frequency,
self._format,
self.uuid
)
channels = info.desc().append_child("channels")
for label in self.input.channels:
channels.append_child("channel")\
.append_child_value("label", str(label))\
.append_child_value("unit", "unknown")\
.append_child_value("type", "signal")
# create the outlet
self.outlet = StreamOutlet(info)
[docs] def update(self):
'''Send data found in input port'''
for chunk in self.input:
values = chunk.select_dtypes(
include=[self._dtypes[self._format]]).values
stamps = chunk.index.values.astype(np.float64)
for row, stamp in zip(values, stamps):
self.outlet.push_sample(row, stamp)
[docs]class LslReceive(Node):
"""Receive from a LSL stream.
Attributes:
- input: input port
Args:
- prop (str): property used to resolve the stream (for example 'name')
- value (str): value associated to prop for resolving stream
- data_type (str): type of output value among ['epoch', 'signal', 'vector', 'marker']
- sync (string, None): The method used to synchronize timestamps. Use ``local`` if
you receive the stream from another application on the same computer.
Use ``network`` if you receive from another computer.
- max_samples (int): The maximum number of samples to return per call. Default is 4096
- timeout (float): time for the software to wait the stream
"""
def __init__(self, prop, value, data_type, sync="local", max_samples=1024 * 4, timeout=10.0):
Node.__init__(self, None)
assert data_type in ['epoch', 'signal', 'vector', 'marker']
self._data_type = data_type
self.inlet = None
self._prop = prop
self._value = value
self.sync = sync
self.max_samples = max_samples
self.offset = time() - pylsl.local_clock()
self._timeout = timeout
self.connect()
Node.log_instance(self, {
'channels': self.channels,
'sampling frequency': self._frequency})
def connect(self):
if not self.inlet:
# resolve streams
logging.info(f'Resolving streams with {self._prop} {self._value}')
streams = resolve_byprop(
self._prop, self._value, timeout=self._timeout)
if not streams:
logging.info('No stream found')
raise Exception
logging.info(f'{len(streams)} stream(s) acquired')
# Stream acquired
self.inlet = StreamInlet(streams[0])
info = self.inlet.info()
self.meta = {
"name": info.name(),
"type": info.type(),
"frequency": info.nominal_srate(),
"info": str(info.as_xml()).replace("\n", "").replace("\t", ""),
}
channels = []
if not info.desc().child("channels").empty():
channel = info.desc().child("channels").child("channel")
for _ in range(info.channel_count()):
channel_name = channel.child_value("label")
channels.append(channel_name)
channel = channel.next_sibling()
if not channels:
channels = [f'Ch{i + 1}' for i in range(info.channel_count())]
self.channels = channels
self._frequency = info.nominal_srate()
self.output.set_parameters(
data_type=self._data_type,
channels=channels,
sampling_frequency=self._frequency,
meta=self.meta)
def update(self):
if self.inlet:
values, stamps = self.inlet.pull_chunk(
max_samples=self.max_samples)
if stamps:
stamps = np.array(stamps)
if self.sync == "local":
stamps += self.offset
elif self.sync == "network":
stamps = stamps + self.inlet.time_correction() + self.offset
elif self.sync == 'special':
stamps = stamps
# stamps = pd.to_datetime(stamps, format=None)
if len(stamps) > 0:
if len(self.channels) > 0:
self.output.set(values, stamps, self.channels)
else:
self.output.set(values, stamps)
else:
return
[docs]class UdpSend(Node):
"""Simple UDP client to send some data
Arguments:
- input_port (Port): inout data port
- ip (str): IP address of UDP server which receive data
- port (int): socket port
Example: UdpSend(port895, ip="127.0.0.1", port=20001)
"""
def __init__(self, input_port, ip, port):
Node.__init__(self, input_port, False)
self._server_address_port = (ip, port)
# Create the UDP socket
self._socket = socket(family=AF_INET, type=SOCK_DGRAM)
Node.log_instance(self, {
'IP': ip,
'port': port})
def update(self):
for chunk in self.input:
bytes_to_send = str.encode(chunk.to_string(header=False, index=False))
self._socket.sendto(bytes_to_send, self._server_address_port)
[docs]class RdaReceive(Node):
"""Receive a signal from RDA stream
Attributes:
- output (Port): output port
- marker_output (Port): output marker port
Args:
- rdaport (int): rda port to connect with
- offset (float): offset (in second) to apply to incoming data timestamps, default is 0
- host (str): RDA host, default is 'localhost', it could be the IP adress of the host
- timeout (float): timeout in sec t get the RDA stream, default is 10
Example:
RdaReceive()
RdaReceive(rdaport=52136, offset=.125, host='159.10.20')
"""
def __init__(self, rdaport, offset=.0, host="localhost", timeout=10.0):
Node.__init__(self, None)
self._buf_size_max = 2**15
self._rdaport = rdaport
self._offset = offset
self._timeout = timeout
self._host = host
self._connect()
self.output.set_parameters(
data_type='signal',
channels=self._channels,
sampling_frequency=self._frequency,
meta='')
self.marker_output = Port()
self.marker_output.set_parameters(
data_type='marker',
channels=['Markers'],
sampling_frequency=0,
meta='')
Node.log_instance(self, {
'marquers output': self.marker_output.id,
'channels': self._channels,
'sampling frequency': self._frequency
})
self._persistent = b''
self._last_block = -1
self._time = None
def _connect(self):
# Create a tcpip socket
self._my_socket = socket(AF_INET, SOCK_STREAM)
# RECView: 51254, Recorder: 51244, use 51234 to connect with 16Bit Port
starttime = time()
flag = True
while flag: # wait for the socket connection
try:
self._my_socket.connect((self._host, self._rdaport))
except ConnectionRefusedError:
if time() - starttime > self._timeout:
logging.info('No RDA stream found')
raise Exception
else:
flag = False
while True: # wait for the starting message
# Get message header as raw array of chars
rawhdr = self._receive_data(24)
# Split array into usefull information id1 to id4 are constants
(id1, id2, id3, id4, msgsize, msgtype) = unpack('<llllLL', rawhdr)
# Get data part of message, which is of variable size
rawdata = self._receive_data(msgsize - 24)
if msgtype == 1:
# Start message, extract eeg properties
(channel_count, sampling_interval, resolutions,
channel_names) = self._get_properties(rawdata)
self._frequency = 1000000 / sampling_interval
if not channel_names:
channel_names = [f'Ch{i}' for i in range(1, channel_count + 1)]
self._channels = channel_names
return
if time() - starttime > self._timeout:
logging.info('Timeout')
raise Exception
def update(self):
# receive all data from socket
raw = self._my_socket.recv(self._buf_size_max)
# concatenate with last value in persitence
raw = self._persistent + raw
# initialize loop
data_to_send = []
timestamps = []
flag = True
while flag:
if len(raw) >= 24:
# get info from 24 fisrt bytes
info = raw[:24]
(id1, id2, id3, id4, msgsize, msgtype) = unpack('<llllLL', info)
if len(raw) >= msgsize: # test if we already get the full message in buffer
# get data of current message
rawdata = raw[24:msgsize]
# store the rest in raw
raw = raw[msgsize:]
if msgtype == 4: # if the message contains data do:
(block, points, data, markers) = self._extract_data(rawdata)
# test overflow of block (ie if we do not receive a block)
if self._last_block != -1 and block > self._last_block + 1:
logging.warn(
"Overflow when getting data from RDA, clock is reset ")
self._time = None
# update last_block
self._last_block = block
# concatenate data
data_to_send += data
if not self._time: # set the local clock
self._time = time() - points / self._frequency
timestamps += [self._time - self._offset + i / self._frequency for i in range(points)]
for marker in markers:
self.marker_output.set([marker['message'][1]] * marker['points'], [timestamps[marker['position'] + i - 1] for i in range(int(marker['points']))])
# self._time points to the timestamp of first row from next block
self._time = timestamps[-1] + self._offset + 1 / self._frequency
else:
# add to persitence and stop iterations
self._persistent = raw
flag = False
else:
# add to persitence and stop iterations
self._persistent = raw
flag = False
if len(timestamps) > 0:
# send data in output
self.output.set(data_to_send, timestamps, self._channels)
def _receive_data(self, requestedSize):
"""Helper function for receiving a whole message"""
returnStream = b''
while len(returnStream) < requestedSize:
databytes = self._my_socket.recv(requestedSize - len(returnStream))
if databytes == '':
logging.info("connection broken")
returnStream += databytes
return returnStream
def _split_string(self, raw):
"""Helper function for splitting a raw array of
zero terminated strings (C) into an array of python strings"""
s = [i.decode("utf-8") for i in raw.split(b'\x00')]
s.remove('')
return s
def _get_properties(self, rawdata):
"""Function for extracting eeg properties from a raw data array
read from tcpip socket"""
# Extract numerical data
(channelCount, samplingInterval) = unpack('<Ld', rawdata[:12])
# Extract resolutions
resolutions = []
for c in range(channelCount):
index = 12 + c * 8
restuple = unpack('<d', rawdata[index:index + 8])
resolutions.append(restuple[0])
# Extract channel names
channelNames = self._split_string(rawdata[12 + 8 * channelCount:])
return (channelCount, samplingInterval, resolutions, channelNames)
def _extract_data(self, rawdata):
"""function for extracting data from message body"""
# Extract numerical data
(block, points, markerCount) = unpack('<LLL', rawdata[:12])
# Extract eeg data as array of floats
data = []
for point in range(points):
row = []
for chan in range(len(self._channels)):
index = 12 + 4 * len(self._channels) * point + 4 * chan
value = unpack('<f', rawdata[index:index + 4])
row.append(value[0])
data.append(row)
# Extract markers
markers = []
index = 12 + 4 * points * len(self._channels)
for m in range(markerCount):
markersize = unpack('<L', rawdata[index:index + 4])
(position, points2, channel) = unpack('<LLl', rawdata[index + 4:index + 16])
typedesc = self._split_string(rawdata[index + 16:index + markersize[0]])
markers.append({'position': position, 'points': points2, 'message': typedesc})
index = index + markersize[0]
return (block, points, data, markers)