Source code for stream.mnelsl_stream

from collections.abc import Iterator
import time
from typing import TYPE_CHECKING
import numpy as np
from py_neuromodulation import logger
from mne_lsl.lsl import resolve_streams
import os

if TYPE_CHECKING:
    from py_neuromodulation import NMSettings


[docs] class LSLStream: """ Class is used to create and connect to a LSL stream and pull data from it. """ def __init__(self, settings: "NMSettings", stream_name: str | None = None) -> None: """ Initialize the LSL stream. Parameters: ----------- settings : settings.NMSettings object stream_name : str, optional Name of the stream to connect to. If not provided, the first available stream is used. Raises: ------- RuntimeError If no stream is running under the provided name or if there are multiple streams running under the same name. """ from mne_lsl.stream import StreamLSL self.stream: StreamLSL self.keyboard_interrupt = False self.settings = settings self._n_seconds_wait_before_disconnect = 3 try: if stream_name is None: stream_name = resolve_streams()[0].name logger.info( f"Stream name not provided. Using first available stream: {stream_name}" ) self.stream = StreamLSL(name=stream_name, bufsize=2).connect(timeout=2) except Exception as e: msg = f"Could not connect to stream: {e}. Either no stream is running under the name {stream_name} or there is several streams under this name." logger.exception(msg) raise RuntimeError(msg) if self.stream.sinfo is None: raise RuntimeError("Stream info is None. Check if the stream is running.") self.winsize = settings.segment_length_features_ms / self.stream.sinfo.sfreq self.sampling_interval = 1 / self.settings.sampling_rate_features_hz # If not running the generator when the escape key is pressed. self.headless: bool = not os.environ.get("DISPLAY") if not self.headless: from py_neuromodulation.utils.keyboard import KeyboardListener self.listener = KeyboardListener(("esc", self.set_keyboard_interrupt)) self.listener.start() def get_next_batch(self) -> Iterator[tuple[np.ndarray, np.ndarray]]: self.last_time = time.time() check_data = None data = None stream_start_time = None while self.stream.connected: time_diff = time.time() - self.last_time # in s time.sleep(0.005) if time_diff >= self.sampling_interval: self.last_time = time.time() logger.debug(f"Pull data - current time: {self.last_time}") logger.debug(f"time since last data pull {time_diff} seconds") if time_diff >= 2 * self.sampling_interval: logger.warning( "Feature computation time between two consecutive samples" "was twice the feature sampling interval" ) if data is not None: check_data = data data, timestamp = self.stream.get_data(winsize=self.winsize) if stream_start_time is None: stream_start_time = timestamp[0] for i in range(self._n_seconds_wait_before_disconnect): if ( data is not None and check_data is not None and np.allclose(data, check_data, atol=1e-7, rtol=1e-7) ): logger.warning( f"No new data incoming. Disconnecting stream in {3-i} seconds." ) time.sleep(1) i += 1 if i == self._n_seconds_wait_before_disconnect: self.stream.disconnect() logger.warning("Stream disconnected.") break yield timestamp, data logger.info(f"Stream time: {timestamp[-1] - stream_start_time}") if not self.headless and self.keyboard_interrupt: logger.info("Keyboard interrupt") self.listener.stop() self.stream.disconnect() def set_keyboard_interrupt(self): self.keyboard_interrupt = True