import numpy as np
from pydantic import Field
from py_neuromodulation.utils.types import NMBaseModel
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from py_neuromodulation import NMSettings
import pandas as pd
class ProjectionSettings(NMBaseModel):
max_dist_mm: float = Field(default=20.0, gt=0.0)
[docs]
class Projection:
def __init__(
self,
settings: "NMSettings",
grid_cortex: "pd.DataFrame",
grid_subcortex: "pd.DataFrame",
coords: dict,
channels: "pd.DataFrame",
plot_projection: bool = False,
) -> None:
self.grid_cortex = grid_cortex
self.grid_subcortex = grid_subcortex
self.coords = coords
self.channels = channels
self.project_cortex = settings.postprocessing.project_cortex
self.project_subcortex = settings.postprocessing.project_subcortex
self.max_dist_cortex = settings.project_cortex_settings.max_dist_mm
self.max_dist_subcortex = settings.project_subcortex_settings.max_dist_mm
self.ecog_channels: list # None case never handled, no need for default value
self.lfp_channels: list # None case never handled, no need for default value
self.idx_chs_ecog: list = [] # feature series indexes for ecog channels
self.names_chs_ecog: list = [] # feature series name of ecog features
self.idx_chs_lfp: list = [] # feature series indexes for lfp channels
self.names_chs_lfp: list = [] # feature series name of lfp features
self.feature_names: list | None = None
self.initialized: bool = False
self.remove_not_used_ch_from_coords() # remove beforehand non used channels from coords
if len(self.coords["cortex_left"]["positions"]) == 0:
self.sess_right = True
self.ecog_strip = self.coords["cortex_right"]["positions"]
self.ecog_strip_names = self.coords["cortex_right"]["ch_names"]
elif len(self.coords["cortex_right"]["positions"]) == 0:
self.sess_right = False
self.ecog_strip = self.coords["cortex_left"]["positions"]
self.ecog_strip_names = self.coords["cortex_left"]["ch_names"]
if self.sess_right and len(self.coords["subcortex_right"]["positions"]) > 0:
self.lfp_elec = self.coords["subcortex_right"]["positions"]
self.lfp_elec_names = self.coords["subcortex_right"]["ch_names"]
elif (
self.sess_right is False
and len(self.coords["subcortex_left"]["positions"]) > 0
):
self.lfp_elec = self.coords["subcortex_left"]["positions"]
self.lfp_elec_names = self.coords["subcortex_left"]["ch_names"]
self._initialize_channels()
(
self.proj_matrix_cortex,
self.proj_matrix_subcortex,
) = self.calc_projection_matrix()
if self.project_cortex:
self.active_cortex_gridpoints = np.nonzero(
self.proj_matrix_cortex.sum(axis=1)
)[0]
if self.project_subcortex:
self.active_subcortex_gridpoints = np.nonzero(
self.proj_matrix_subcortex.sum(axis=1)
)[0]
if plot_projection:
from py_neuromodulation.analysis.plots import NM_Plot
nmplotter = NM_Plot(
ecog_strip=self.ecog_strip,
grid_cortex=self.grid_cortex.to_numpy(),
grid_subcortex=self.grid_subcortex.to_numpy(),
sess_right=self.sess_right,
proj_matrix_cortex=self.proj_matrix_cortex,
)
nmplotter.plot_cortex()
def remove_not_used_ch_from_coords(self):
ch_not_used = self.channels.query('(used==0) or (status=="bad")').name
if len(ch_not_used) > 0:
for ch in ch_not_used:
for key_ in self.coords:
for idx, ch_coords in enumerate(self.coords[key_]["ch_names"]):
if ch.startswith(ch_coords):
# delete index
self.coords[key_]["positions"] = np.delete(
self.coords[key_]["positions"], idx, axis=0
)
self.coords[key_]["ch_names"].remove(ch)
[docs]
def calc_proj_matrix(
self,
max_dist: float,
grid: np.ndarray,
coord_array: np.ndarray,
) -> np.ndarray:
"""Calculate projection matrix."""
channels = coord_array.shape[0]
distance_matrix = np.zeros([grid.shape[1], channels])
for project_point in range(grid.shape[1]):
for channel in range(coord_array.shape[0]):
distance_matrix[project_point, channel] = np.linalg.norm(
grid[:, project_point] - coord_array[channel, :]
)
proj_matrix = np.zeros(distance_matrix.shape)
for grid_point in range(distance_matrix.shape[0]):
used_channels = np.where(distance_matrix[grid_point, :] < max_dist)[0]
rec_distances = distance_matrix[grid_point, used_channels]
sum_distances: float = np.sum(1 / rec_distances)
for _, used_channel in enumerate(used_channels):
proj_matrix[grid_point, used_channel] = (
1 / distance_matrix[grid_point, used_channel]
) / sum_distances
return proj_matrix
[docs]
def calc_projection_matrix(self):
"""Calculates a projection matrix based on the used coordinate arrays
Returns
-------
proj_matrix_cortex (np.array)
cortical projection_matrix in shape [grid contacts, channel contact] defaults to None
proj_matrix_subcortex (np.array)
subcortical projection_matrix in shape [grid contacts, channel contact] defaults to None
"""
proj_matrix_run = np.empty(2, dtype=object)
if self.sess_right:
if self.project_cortex:
cortex_grid_right = np.copy(self.grid_cortex)
cortex_grid_right[:, 0] = cortex_grid_right[:, 0] * -1
self.cortex_grid_right = np.array(cortex_grid_right.T)
else:
self.cortex_grid_right = None
if self.project_subcortex:
subcortex_grid_right = np.copy(self.grid_subcortex)
subcortex_grid_right[:, 0] = subcortex_grid_right[:, 0] * -1
self.subcortex_grid_right = np.array(subcortex_grid_right).T
else:
self.subcortex_grid_right = None
grid_session = [self.cortex_grid_right, self.subcortex_grid_right]
else:
if self.project_cortex:
self.cortex_grid_left = np.array(self.grid_cortex.T)
else:
self.cortex_grid_left = None
if self.project_subcortex:
self.subcortex_grid_left = np.array(self.grid_subcortex.T)
else:
self.subcortex_grid_left = None
grid_session = [self.cortex_grid_left, self.subcortex_grid_left]
coord_array = [
self.ecog_strip if grid_session[0] is not None else None,
self.lfp_elec if grid_session[1] is not None else None,
]
for loc_, grid in enumerate(grid_session):
if loc_ == 0: # cortex
max_dist = self.max_dist_cortex
elif loc_ == 1: # subcortex
max_dist = self.max_dist_subcortex
if grid_session[loc_] is not None:
proj_matrix_run[loc_] = self.calc_proj_matrix(
max_dist, grid, coord_array[loc_]
)
return proj_matrix_run[0], proj_matrix_run[1] # cortex, subcortex
def _initialize_channels(self) -> None:
"""Initialize channel names via nm_channel new_name column"""
if self.project_cortex:
self.ecog_channels = self.channels.query(
'(type=="ecog") and (used == 1) and (status=="good")'
).name.to_list()
chs_ecog = self.ecog_channels.copy()
for ecog_channel in chs_ecog:
if ecog_channel not in self.ecog_strip_names:
self.ecog_channels.remove(ecog_channel)
# write ecog_channels to be new_name
self.ecog_channels = list(
self.channels.query("name == @self.ecog_channels").new_name
)
if self.project_subcortex:
self.lfp_channels = self.channels.query(
'(type=="lfp" or type=="seeg" or type=="dbs") \
and (used == 1) and (status=="good")'
).name.to_list()
# project only channels that are in the coords
# this also deletes channels of the other hemisphere
chs_lfp = self.lfp_channels.copy()
for lfp_channel in chs_lfp:
if lfp_channel not in self.lfp_elec_names:
self.lfp_channels.remove(lfp_channel)
# write lfp_channels to be new_name
self.lfp_channels = list(
self.channels.query("name == @self.lfp_channels").new_name
)
[docs]
def init_projection_run(self, feature_dict: dict) -> None:
"""Initialize indexes for respective channels in feature series computed by features.py"""
# here it is assumed that only one hemisphere is recorded at a time!
if self.project_cortex:
for ecog_channel in self.ecog_channels:
self.idx_chs_ecog.append(
[
ch_idx
for ch_idx, ch in enumerate(feature_dict.keys())
if ch.startswith(ecog_channel)
]
)
self.names_chs_ecog.append(
[
ch
for _, ch in enumerate(feature_dict.keys())
if ch.startswith(ecog_channel)
]
)
if self.names_chs_ecog:
# get feature_names; given by ECoG sequency of features
self.feature_names = [
feature_name[len(self.ecog_channels[0]) + 1 :]
for feature_name in self.names_chs_ecog[0]
]
if self.project_subcortex:
# for lfp_channels select here only the ones from the correct hemisphere!
for lfp_channel in self.lfp_channels:
self.idx_chs_lfp.append(
[
ch_idx
for ch_idx, ch in enumerate(feature_dict.keys())
if ch.startswith(lfp_channel)
]
)
self.names_chs_lfp.append(
[
ch
for _, ch in enumerate(feature_dict.keys())
if ch.startswith(lfp_channel)
]
)
if not self.feature_names and self.names_chs_lfp:
# get feature_names; given by LFP sequency of features
self.feature_names = [
feature_name[len(self.lfp_channels[0]) + 1 :]
for feature_name in self.names_chs_lfp[0]
]
self.initialized = True
[docs]
def project_features(self, feature_dict: dict) -> None:
"""Project data, given idx_chs_ecog/stn"""
if not self.initialized:
self.init_projection_run(feature_dict)
dat_cortex = (
np.array(
[
np.array([feature_dict[ch] for ch in ch_names])
for ch_names in self.names_chs_ecog
]
)
if self.project_cortex
else None
)
dat_subcortex = (
np.array(
[
np.array([feature_dict[ch] for ch in ch_names])
for ch_names in self.names_chs_lfp
]
)
if self.project_subcortex
else None
)
# project data
# get_projected_cortex_subcortex_data can return None
# but None is not handled and will throw error in the code below
proj_cortex_array: np.ndarray
proj_subcortex_array: np.ndarray
(proj_cortex_array, proj_subcortex_array) = (
self.get_projected_cortex_subcortex_data(dat_cortex, dat_subcortex)
) # type: ignore # Ignore None return
features_new: dict = {}
# proj_cortex_array has shape grid_points x feature_number
if self.project_cortex:
features_new = features_new | {
"gridcortex_"
+ str(act_grid_point)
+ "_"
+ feature_name: proj_cortex_array[act_grid_point, feature_idx]
for feature_idx, feature_name in enumerate(self.feature_names) # type: ignore # Empty list handled above
for act_grid_point in self.active_cortex_gridpoints
}
if self.project_subcortex:
features_new = features_new | {
"gridsubcortex_"
+ str(act_grid_point)
+ "_"
+ feature_name: proj_subcortex_array[act_grid_point, feature_idx]
for feature_idx, feature_name in enumerate(self.feature_names) # type: ignore # Empty list handled above
for act_grid_point in self.active_subcortex_gridpoints
}
feature_dict.update(features_new)
[docs]
def get_projected_cortex_subcortex_data(
self,
dat_cortex: np.ndarray | None = None,
dat_subcortex: np.ndarray | None = None,
) -> tuple[np.ndarray | None, np.ndarray | None]:
"""Project cortical and subcortical data to predefined projection matrices
Parameters
----------
dat_cortex : np.ndarray, optional
cortical features, by default None
dat_subcortex : np.ndarray, optional
subcortical features, by default None
Returns
-------
proj_cortex : np.ndarray
projected cortical features, by detault None
proj_subcortex : np.ndarray
projected subcortical features, by detault None
"""
proj_cortex = None
proj_subcortex = None
if dat_cortex is not None:
proj_cortex = self.proj_matrix_cortex @ dat_cortex
if dat_subcortex is not None:
proj_subcortex = self.proj_matrix_subcortex @ dat_subcortex
return proj_cortex, proj_subcortex