Source code for stream.mnelsl_player

import numpy as np
import mne
from pathlib import Path
import multiprocessing as mp
import atexit
import time
import signal

from py_neuromodulation.utils.types import _PathLike
from py_neuromodulation.utils.io import read_BIDS_data
from py_neuromodulation import logger


[docs] class LSLOfflinePlayer: _instances: set["LSLOfflinePlayer"] = set() # Keep track of initialized players _atexit_registered: bool = False # Flag to register atexit def __init__( self, stream_name: str | None = "lsl_offline_player", f_name: str | _PathLike = None, raw: mne.io.Raw | None = None, sfreq: int | float | None = None, data: np.ndarray | None = None, ch_type: str | None = "dbs", chunk_size: int = 10, n_repeat: int = 1, ) -> None: """Initialization of MNE-LSL offline player. Either a filename (PathLike) is provided, or data and sampling frequency to initialize an example mock-up stream. Parameters ---------- stream_name : str, optional LSL stream name, by default "example_stream" f_name : str | None, optional file name used for streaming, by default None sfreq : int | float | None, optional sampling rate, by default None data : np.ndarray | None, optional data used for streaming, by default None ch_type: str | None, optional channel type to select for streaming, by default "dbs" Raises ------ ValueError _description_ """ self.sfreq = sfreq self.stream_name = stream_name self.chunk_size = chunk_size self.n_repeat = n_repeat if f_name: (self._path_raw, data, sfreq, line_noise, coord_list, coord_names) = ( read_BIDS_data(f_name) ) elif raw: self._path_raw = raw elif sfreq and data: info = mne.create_info( ch_names=[f"ch{i}" for i in range(data.shape[0])], ch_types=[ch_type for _ in range(data.shape[0])], sfreq=sfreq, ) raw = mne.io.RawArray(data, info) self._path_raw = Path.cwd() / "temp_raw.fif" raw.save(self._path_raw, overwrite=True) else: error_msg = "Either f_name or raw or sfreq and data must be provided." logger.critical(error_msg) raise ValueError(error_msg) # Flags to control the player subprocess self._streaming_complete = mp.Event() self._player_process = None self._stop_flag = mp.Event() LSLOfflinePlayer._instances.add(self) # Register instancwe if LSLOfflinePlayer._atexit_registered: atexit.register(LSLOfflinePlayer._stop_all_players) LSLOfflinePlayer._atexit_registered = True
[docs] def start_player( self, chunk_size: int | None = None, n_repeat: int | None = None, block: bool = False, ): """Start MNE-LSL Player Parameters ---------- chunk_size : int, optional Number of samples to stream at once, by default 10 n_repeat : int, optional Number of times to repeat the stream, by default 1 block : bool, optional If True, block until streaming is complete, by default False """ if chunk_size: self.chunk_size = chunk_size if n_repeat: self.n_repeat = n_repeat self._stop_flag.clear() self._streaming_complete.clear() self._player_process = mp.Process( target=self._run_player, args=( self.chunk_size, self.n_repeat, self._stop_flag, self._streaming_complete, ), ) self._player_process.start() if block: try: self.wait_for_completion() except KeyboardInterrupt: logger.info("\nKeyboard interrupt received. Stopping the player...") self.stop_player()
def _run_player(self, chunk_size, n_repeat, stop_flag, streaming_complete): from mne_lsl.player import PlayerLSL signal.signal(signal.SIGINT, lambda: stop_flag.set()) player = PlayerLSL( self._path_raw, name=self.stream_name, chunk_size=chunk_size, n_repeat=n_repeat, ) player = player.start() try: while not stop_flag.is_set() and not player._end_streaming: time.sleep(0.1) finally: try: player.stop() except RuntimeError: # player already stopped pass streaming_complete.set()
[docs] def wait_for_completion(self): """Block until streaming is complete""" while self._player_process and self._player_process.is_alive(): try: self._streaming_complete.wait(timeout=1.0) if self._streaming_complete.is_set(): break except KeyboardInterrupt: logger.info("\nKeyboard interrupt received. Stopping the player...") self.stop_player() break
[docs] def stop_player(self): """Stop MNE-LSL Player""" if self._player_process and self._player_process.is_alive(): self._stop_flag.set() self._player_process.join(timeout=5) if self._player_process.is_alive(): self._player_process.terminate() self._player_process.join(timeout=1) if self._player_process.is_alive(): self._player_process.kill() self._player_process = None logger.info(f"Player stopped: {self.stream_name}") LSLOfflinePlayer._instances.discard(self)
@classmethod def _stop_all_players(cls): """Stop all player instances (used for atexit)""" for player in cls._instances: player.stop_player() # Enable use as a context manager def __enter__(self): self.start_player() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop_player()