from sklearn import model_selection
from sklearn.linear_model import LinearRegression
from sklearn.base import clone
from sklearn.metrics import r2_score
import pandas as pd
import numpy as np
from copy import deepcopy
from pathlib import PurePath
import pickle
from py_neuromodulation import logger
from typing import Callable
class CV_res:
def __init__(
self,
get_movement_detection_rate: bool = False,
RUN_BAY_OPT: bool = False,
mrmr_select: bool = False,
model_save: bool = False,
) -> None:
self.score_train: list = []
self.score_test: list = []
self.y_test: list = []
self.y_train: list = []
self.y_test_pr: list = []
self.y_train_pr: list = []
self.X_test: list = []
self.X_train: list = []
self.coef: list = []
if get_movement_detection_rate:
self.mov_detection_rates_test: list = []
self.tprate_test: list = []
self.fprate_test: list = []
self.mov_detection_rates_train: list = []
self.tprate_train: list = []
self.fprate_train: list = []
if RUN_BAY_OPT:
self.best_bay_opt_params: list = []
if mrmr_select:
self.mrmr_select: list = []
if model_save:
self.model_save: list = []
[docs]
class Decoder:
[docs]
class ClassMissingException(Exception):
def __init__(
self,
message="Only one class present.",
) -> None:
self.message = message
super().__init__(self.message)
def __str__(self):
return self.message
def __init__(
self,
features: "pd.DataFrame| None " = None,
label: np.ndarray | None = None,
label_name: str | None = None,
used_chs: list[str] = [],
model=LinearRegression(),
eval_method: Callable = r2_score,
cv_method=model_selection.KFold(n_splits=3, shuffle=False),
use_nested_cv: bool = False,
threshold_score=True,
mov_detection_threshold: float = 0.5,
TRAIN_VAL_SPLIT: bool = False,
RUN_BAY_OPT: bool = False,
STACK_FEATURES_N_SAMPLES: bool = False,
time_stack_n_samples: int = 5,
save_coef: bool = False,
get_movement_detection_rate: bool = False,
min_consequent_count: int = 3,
bay_opt_param_space: list = [],
VERBOSE: bool = False,
sfreq: int | None = None,
undersampling: bool = False,
oversampling: bool = False,
mrmr_select: bool = False,
pca: bool = False,
cca: bool = False,
model_save: bool = False,
) -> None:
"""Initialize here a feature file for processing
Read settings.json channels.csv and features.csv
Read target label
Parameters
----------
model : machine learning model
model that utilizes fit and predict functions
eval_method : sklearn metrics
evaluation scoring method, will default to r2_score if not passed
cv_method : sklearm model_selection method
threshold_score : boolean
if True set lower threshold at zero (useful for r2),
mov_detection_threshold : float
if get_movement_detection_rate is True, find given minimum 'threshold' respective
consecutive movement blocks, by default 0.5
TRAIN_VAL_SPLIT (boolean):
if true split data into additinal validation, and run class weighted CV
save_coef (boolean):
if true, save model._coef trained coefficients
get_movement_detection_rate (boolean):
save detection rate and tpr / fpr as well
min_consequent_count (int):
if get_movement_detection_rate is True, find given 'min_consequent_count' respective
consecutive movement blocks with minimum size of 'min_consequent_count'
"""
self.model = model
self.eval_method = eval_method
self.cv_method = cv_method
self.use_nested_cv = use_nested_cv
self.threshold_score = threshold_score
self.mov_detection_threshold = mov_detection_threshold
self.TRAIN_VAL_SPLIT = TRAIN_VAL_SPLIT
self.RUN_BAY_OPT = RUN_BAY_OPT
self.save_coef = save_coef
self.sfreq = sfreq
self.get_movement_detection_rate = get_movement_detection_rate
self.min_consequent_count = min_consequent_count
self.STACK_FEATURES_N_SAMPLES = STACK_FEATURES_N_SAMPLES
self.time_stack_n_samples = time_stack_n_samples
self.bay_opt_param_space = bay_opt_param_space
self.VERBOSE = VERBOSE
self.undersampling = undersampling
self.oversampling = oversampling
self.mrmr_select = mrmr_select
self.used_chs = used_chs
self.label = label
self.label_name = label_name
self.cca = cca
self.pca = pca
self.model_save = model_save
self.set_data(features)
self.ch_ind_data = {}
self.grid_point_ind_data = {}
self.active_gridpoints = []
self.feature_names = []
self.ch_ind_results = {}
self.gridpoint_ind_results = {}
self.all_ch_results = {}
self.columns_names_single_ch = None
if undersampling:
from imblearn.under_sampling import RandomUnderSampler
self.rus = RandomUnderSampler(random_state=0)
if oversampling:
from imblearn.over_sampling import RandomOverSampler
self.ros = RandomOverSampler(random_state=0)
def set_data(self, features):
if features is not None:
self.features = features
self.feature_names = [
col
for col in self.features.columns
if any(col.startswith(used_ch) for used_ch in self.used_chs)
]
self.data = np.nan_to_num(np.array(self.features[self.feature_names]))
[docs]
def set_data_ind_channels(self):
"""specified channel individual data"""
self.ch_ind_data = {}
for ch in self.used_chs:
self.ch_ind_data[ch] = np.nan_to_num(
np.array(
self.features[
[col for col in self.features.columns if col.startswith(ch)]
]
)
)
[docs]
def set_CV_results(self, attr_name, contact_point=None):
"""set CV results in respectie nm_decode attributes
The reference is first stored in obj_set, and the used lateron
Parameters
----------
attr_name : string
is either all_ch_results, ch_ind_results, gridpoint_ind_results
contact_point : object, optional
usually an int specifying the grid_point or string, specifying the used channel,
by default None
"""
if contact_point is not None:
getattr(self, attr_name)[contact_point] = {}
obj_set = getattr(self, attr_name)[contact_point]
else:
obj_set = getattr(self, attr_name)
def set_scores(cv_res: CV_res, set_inner_CV_res: bool = False):
"""
This function renames the CV_res keys for InnerCV
"""
def set_score(key_: str, val):
if set_inner_CV_res:
key_ = "InnerCV_" + key_
obj_set[key_] = val
set_score("score_train", cv_res.score_train)
set_score("score_test", cv_res.score_test)
set_score("y_test", cv_res.y_test)
set_score("y_train", cv_res.y_train)
set_score("y_test_pr", cv_res.y_test_pr)
set_score("y_train_pr", cv_res.y_train_pr)
set_score("X_train", cv_res.X_train)
set_score("X_test", cv_res.X_test)
if self.save_coef:
set_score("coef", cv_res.coef)
if self.get_movement_detection_rate:
set_score("mov_detection_rates_test", cv_res.mov_detection_rates_test)
set_score(
"mov_detection_rates_train",
cv_res.mov_detection_rates_train,
)
set_score("fprate_test", cv_res.fprate_test)
set_score("fprate_train", cv_res.fprate_train)
set_score("tprate_test", cv_res.tprate_test)
set_score("tprate_train", cv_res.tprate_train)
if self.RUN_BAY_OPT:
set_score("best_bay_opt_params", cv_res.best_bay_opt_params)
if self.mrmr_select:
set_score("mrmr_select", cv_res.mrmr_select)
if self.model_save:
set_score("model_save", cv_res.model_save)
return obj_set
obj_set = set_scores(self.cv_res)
if self.use_nested_cv:
obj_set = set_scores(self.cv_res_inner, set_inner_CV_res=True)
[docs]
def run_CV_caller(self, feature_contacts: str = "ind_channels"):
"""Wrapper that call for all channels / grid points / combined channels the CV function
Parameters
----------
feature_contacts : str, optional
"grid_points", "ind_channels" or "all_channels_combined" , by default "ind_channels"
"""
valid_feature_contacts = [
"ind_channels",
"all_channels_combined",
"grid_points",
]
if feature_contacts not in valid_feature_contacts:
raise ValueError(f"{feature_contacts} not in {valid_feature_contacts}")
if feature_contacts == "grid_points":
for grid_point in self.active_gridpoints:
self.run_CV(self.grid_point_ind_data[grid_point], self.label)
self.set_CV_results("gridpoint_ind_results", contact_point=grid_point)
return self.gridpoint_ind_results
if feature_contacts == "ind_channels":
for ch in self.used_chs:
self.ch_name_tested = ch
self.run_CV(self.ch_ind_data[ch], self.label)
self.set_CV_results("ch_ind_results", contact_point=ch)
return self.ch_ind_results
if feature_contacts == "all_channels_combined":
dat_combined = np.array(self.data)
self.run_CV(dat_combined, self.label)
self.set_CV_results("all_ch_results", contact_point=None)
return self.all_ch_results
[docs]
def set_data_grid_points(self, cortex_only=False, subcortex_only=False):
"""Read the run_analysis
Projected data has the shape (samples, grid points, features)
"""
# activate_gridpoints stores cortex + subcortex data
self.active_gridpoints = np.unique(
[
i.split("_")[0] + "_" + i.split("_")[1]
for i in self.features.columns
if "grid" in i
]
)
if cortex_only:
self.active_gridpoints = [
i for i in self.active_gridpoints if i.startswith("gridcortex")
]
if subcortex_only:
self.active_gridpoints = [
i for i in self.active_gridpoints if i.startswith("gridsubcortex")
]
self.feature_names = [
i[len(self.active_gridpoints[0] + "_") :]
for i in self.features.columns
if self.active_gridpoints[0] + "_" in i
]
self.grid_point_ind_data = {}
self.grid_point_ind_data = {
grid_point: np.nan_to_num(
self.features[
[i for i in self.features.columns if grid_point + "_" in i]
]
)
for grid_point in self.active_gridpoints
}
[docs]
def get_movement_grouped_array(
self, prediction, threshold=0.5, min_consequent_count=5
):
"""Return given a 1D numpy array, an array of same size with grouped consective blocks
Parameters
----------
prediction : np.ndarray
numpy array of either predictions or labels, that is going to be grouped
threshold : float, optional
threshold to be applied to 'prediction', by default 0.5
min_consequent_count : int, optional
minimum required consective samples higher than 'threshold', by default 5
Returns
-------
labeled_array : np.ndarray
grouped vector with incrementing number for movement blocks
labels_count : int
count of individual movement blocks
"""
from scipy.ndimage import label as label_ndimage
from scipy.ndimage import binary_dilation, binary_erosion
mask = prediction > threshold
structure = [True] * min_consequent_count # used for erosion and dilation
eroded = binary_erosion(mask, structure)
dilated = binary_dilation(eroded, structure)
labeled_array, labels_count = label_ndimage(dilated)
return labeled_array, labels_count
[docs]
def calc_movement_detection_rate(
self, y_label, prediction, threshold=0.5, min_consequent_count=3
):
"""Given a label and prediction, return the movement detection rate on the basis of
movements classified in blocks of 'min_consequent_count'.
Parameters
----------
y_label : [type]
[description]
prediction : [type]
[description]
threshold : float, optional
threshold to be applied to 'prediction', by default 0.5
min_consequent_count : int, optional
minimum required consective samples higher than 'threshold', by default 3
Returns
-------
mov_detection_rate : float
movement detection rate, where at least 'min_consequent_count' samples where high in prediction
fpr : np.ndarray
sklearn.metrics false positive rate np.ndarray
tpr : np.ndarray
sklearn.metrics true positive rate np.ndarray
"""
from sklearn.metrics import confusion_matrix
pred_grouped, _ = self.get_movement_grouped_array(
prediction, threshold, min_consequent_count
)
y_grouped, labels_count = self.get_movement_grouped_array(
y_label, threshold, min_consequent_count
)
hit_rate = np.zeros(labels_count)
pred_group_bin = np.array(pred_grouped > 0)
for label_number in range(1, labels_count + 1): # labeling starts from 1
hit_rate[label_number - 1] = np.sum(
pred_group_bin[np.where(y_grouped == label_number)[0]]
)
try:
mov_detection_rate = np.where(hit_rate > 0)[0].shape[0] / labels_count
except ZeroDivisionError:
logger.warning("no movements in label")
return 0, 0, 0
# calculating TPR and FPR: https://stackoverflow.com/a/40324184/5060208
CM = confusion_matrix(y_label, prediction)
TN = CM[0][0]
FN = CM[1][0]
TP = CM[1][1]
FP = CM[0][1]
fpr = FP / (FP + TN)
tpr = TP / (TP + FN)
return mov_detection_rate, fpr, tpr
def init_cv_res(self) -> None:
return CV_res(
get_movement_detection_rate=self.get_movement_detection_rate,
RUN_BAY_OPT=self.RUN_BAY_OPT,
mrmr_select=self.mrmr_select,
model_save=self.model_save,
)
# @staticmethod
# @jit(nopython=True)
[docs]
def append_previous_n_samples(X: np.ndarray, y: np.ndarray, n: int = 5):
"""
stack feature vector for n samples
"""
TIME_DIM = X.shape[0] - n
FEATURE_DIM = int(n * X.shape[1])
time_arr = np.empty((TIME_DIM, FEATURE_DIM))
for time_idx, time_ in enumerate(np.arange(n, X.shape[0])):
for time_point in range(n):
time_arr[
time_idx,
time_point * X.shape[1] : (time_point + 1) * X.shape[1],
] = X[time_ - time_point, :]
return time_arr, y[n:]
@staticmethod
def append_samples_val(X_train, y_train, X_val, y_val, n):
X_train, y_train = Decoder.append_previous_n_samples(X_train, y_train, n=n)
X_val, y_val = Decoder.append_previous_n_samples(X_val, y_val, n=n)
return X_train, y_train, X_val, y_val
def fit_model(self, model, X_train, y_train):
if self.TRAIN_VAL_SPLIT:
X_train, X_val, y_train, y_val = model_selection.train_test_split(
X_train, y_train, train_size=0.7, shuffle=False
)
if y_train.sum() == 0 or y_val.sum(0) == 0:
raise Decoder.ClassMissingException
# if type(model) is xgboost.sklearn.XGBClassifier:
# classes_weights = class_weight.compute_sample_weight(
# class_weight="balanced", y=y_train
# )
# model.set_params(eval_metric="logloss")
# model.fit(
# X_train,
# y_train,
# eval_set=[(X_val, y_val)],
# early_stopping_rounds=7,
# sample_weight=classes_weights,
# verbose=self.VERBOSE,
# )
# elif type(model) is xgboost.sklearn.XGBRegressor:
# # might be necessary to adapt for other classifiers
#
# def evalerror(preds, dtrain):
# labels = dtrain.get_label()
# # return a pair metric_name, result. The metric name must not contain a
# # colon (:) or a space since preds are margin(before logistic
# # transformation, cutoff at 0)
#
# r2 = metrics.r2_score(labels, preds)
#
# if r2 < 0:
# r2 = 0
#
# return "r2", -r2
#
# model.set_params(eval_metric=evalerror)
# model.fit(
# X_train,
# y_train,
# eval_set=[(X_val, y_val)],
# early_stopping_rounds=10,
# verbose=self.VERBOSE,
# )
# else:
# model.fit(X_train, y_train, eval_set=[(X_val, y_val)])
else:
# check for LDA; and apply rebalancing
if self.oversampling:
X_train, y_train = self.ros.fit_resample(X_train, y_train)
if self.undersampling:
X_train, y_train = self.rus.fit_resample(X_train, y_train)
# if type(model) is xgboost.sklearn.XGBClassifier:
# model.set_params(eval_metric="logloss")
# model.fit(X_train, y_train)
# else:
model.fit(X_train, y_train)
return model
def eval_model(
self,
model_train,
X_train,
X_test,
y_train,
y_test,
cv_res: CV_res,
save_data=True,
save_probabilities=False,
) -> CV_res:
if self.save_coef:
cv_res.coef.append(model_train.coef_)
y_test_pr = model_train.predict(X_test)
y_train_pr = model_train.predict(X_train)
sc_te = self.eval_method(y_test, y_test_pr)
sc_tr = self.eval_method(y_train, y_train_pr)
if self.threshold_score:
if sc_tr < 0:
sc_tr = 0
if sc_te < 0:
sc_te = 0
if self.get_movement_detection_rate:
self._set_movement_detection_rates(
y_test, y_test_pr, y_train, y_train_pr, cv_res
)
cv_res.score_train.append(sc_tr)
cv_res.score_test.append(sc_te)
if save_data:
cv_res.X_train.append(X_train)
cv_res.X_test.append(X_test)
if self.model_save:
cv_res.model_save.append(deepcopy(model_train)) # clone won't copy params
cv_res.y_train.append(y_train)
cv_res.y_test.append(y_test)
if not save_probabilities:
cv_res.y_train_pr.append(y_train_pr)
cv_res.y_test_pr.append(y_test_pr)
else:
cv_res.y_train_pr.append(model_train.predict_proba(X_train))
cv_res.y_test_pr.append(model_train.predict_proba(X_test))
return cv_res
def _set_movement_detection_rates(
self,
y_test: np.ndarray,
y_test_pr: np.ndarray,
y_train: np.ndarray,
y_train_pr: np.ndarray,
cv_res: CV_res,
) -> CV_res:
mov_detection_rate, fpr, tpr = self.calc_movement_detection_rate(
y_test,
y_test_pr,
self.mov_detection_threshold,
self.min_consequent_count,
)
cv_res.mov_detection_rates_test.append(mov_detection_rate)
cv_res.tprate_test.append(tpr)
cv_res.fprate_test.append(fpr)
mov_detection_rate, fpr, tpr = self.calc_movement_detection_rate(
y_train,
y_train_pr,
self.mov_detection_threshold,
self.min_consequent_count,
)
cv_res.mov_detection_rates_train.append(mov_detection_rate)
cv_res.tprate_train.append(tpr)
cv_res.fprate_train.append(fpr)
return cv_res
def wrapper_model_train(
self,
X_train,
y_train,
X_test=None,
y_test=None,
cv_res: CV_res | None = None,
return_fitted_model_only: bool = False,
save_data=True,
):
if cv_res is None:
cv_res = CV_res(
get_movement_detection_rate=self.get_movement_detection_rate,
RUN_BAY_OPT=self.RUN_BAY_OPT,
mrmr_select=self.mrmr_select,
model_save=self.model_save,
)
model_train = clone(self.model)
if self.STACK_FEATURES_N_SAMPLES:
if X_test is not None:
X_train, y_train, X_test, y_test = Decoder.append_samples_val(
X_train,
y_train,
X_test,
y_test,
n=self.time_stack_n_samples,
)
else:
X_train, y_train = Decoder.append_previous_n_samples(
X_train, y_train, n=self.time_stack_n_samples
)
if y_train.sum() == 0 or (
y_test is not None and y_test.sum() == 0
): # only one class present
raise Decoder.ClassMissingException
if self.RUN_BAY_OPT:
model_train = self.bay_opt_wrapper(model_train, X_train, y_train)
if self.mrmr_select:
from mrmr import mrmr_classif
if len(self.feature_names) > X_train.shape[1]:
# analyze induvidual ch
columns_names = [
col
for col in self.feature_names
if col.startswith(self.ch_name_tested)
]
if self.columns_names_single_ch is None:
self.columns_names_single_ch = [
f[len(self.ch_name_tested) + 1 :] for f in columns_names
]
else:
# analyze all_ch_combined
columns_names = self.feature_names
X_train = pd.DataFrame(X_train, columns=columns_names)
X_test = pd.DataFrame(X_test, columns=columns_names)
y_train = pd.Series(y_train)
selected_features = mrmr_classif(X=X_train, y=y_train, K=20, n_jobs=60)
X_train = X_train[selected_features]
X_test = X_test[selected_features]
if self.pca:
from sklearn.decomposition import PCA
pca = PCA(n_components=10)
pca.fit(X_train)
X_train = pca.transform(X_train)
X_test = pca.transform(X_test)
if self.cca:
from sklearn.cross_decomposition import CCA
cca = CCA(n_components=10)
cca.fit(X_train, y_train)
X_train = cca.transform(X_train)
X_test = cca.transform(X_test)
if self.STACK_FEATURES_N_SAMPLES:
if return_fitted_model_only:
X_train, y_train = self.append_previous_n_samples(
X_train, y_train, self.time_stack_n_samples
)
else:
X_train, y_train, X_test, y_test = self.append_samples_val(
X_train, y_train, X_test, y_test, self.time_stack_n_samples
)
# fit model
model_train = self.fit_model(model_train, X_train, y_train)
if return_fitted_model_only:
return model_train
cv_res = self.eval_model(
model_train, X_train, X_test, y_train, y_test, cv_res, save_data
)
if self.mrmr_select:
cv_res.mrmr_select.append(selected_features)
return cv_res
[docs]
def run_CV(self, data, label):
"""Evaluate model performance on the specified cross validation.
If no data and label is specified, use whole feature class attributes.
Parameters
----------
data (np.ndarray):
data to train and test with shape samples, features
label (np.ndarray):
label to train and test with shape samples, features
"""
def split_data(data):
if self.cv_method == "NonShuffledTrainTestSplit":
# set outer 10s set to train index
# test index is thus in the middle starting at random number
N_samples = data.shape[0]
test_area_points = (N_samples - self.sfreq * 10) - (self.sfreq * 10)
test_points = int(N_samples * 0.3)
if test_area_points > test_points:
start_index = np.random.randint(
int(self.sfreq * 10),
N_samples - self.sfreq * 10 - test_points,
)
test_index = np.arange(start_index, start_index + test_points)
train_index = np.concatenate(
(
np.arange(0, start_index),
np.arange(start_index + test_points, N_samples),
),
axis=0,
).flatten()
yield train_index, test_index
else:
cv_single_tr_te_split = model_selection.check_cv(
cv=[
model_selection.train_test_split(
np.arange(data.shape[0]),
test_size=0.3,
shuffle=False,
)
]
)
for (
train_index,
test_index,
) in cv_single_tr_te_split.split():
yield train_index, test_index
else:
for train_index, test_index in self.cv_method.split(data):
yield train_index, test_index
cv_res = self.init_cv_res()
if self.use_nested_cv:
cv_res_inner = self.init_cv_res()
for train_index, test_index in split_data(data):
X_train, y_train = data[train_index, :], label[train_index]
X_test, y_test = data[test_index], label[test_index]
try:
cv_res = self.wrapper_model_train(
X_train, y_train, X_test, y_test, cv_res
)
except Decoder.ClassMissingException:
continue
if self.use_nested_cv:
data_inner = data[train_index]
label_inner = label[train_index]
for train_index_inner, test_index_inner in split_data(data_inner):
X_train_inner = data_inner[train_index_inner, :]
y_train_inner = label_inner[train_index_inner]
X_test_inner = data_inner[test_index_inner]
y_test_inner = label_inner[test_index_inner]
try:
cv_res_inner = self.wrapper_model_train(
X_train_inner,
y_train_inner,
X_test_inner,
y_test_inner,
cv_res_inner,
)
except Decoder.ClassMissingException:
continue
self.cv_res = cv_res
if self.use_nested_cv:
self.cv_res_inner = cv_res_inner
[docs]
def bay_opt_wrapper(self, model_train, X_train, y_train):
"""Run bayesian optimization and test best params to model_train
Save best params into self.best_bay_opt_params
"""
(
X_train_bo,
X_test_bo,
y_train_bo,
y_test_bo,
) = model_selection.train_test_split(
X_train, y_train, train_size=0.7, shuffle=False
)
if y_train_bo.sum() == 0 or y_test_bo.sum() == 0:
logger.critical("could not start Bay. Opt. with no labels > 0")
raise Decoder.ClassMissingException
params_bo = self.run_Bay_Opt(
X_train_bo, y_train_bo, X_test_bo, y_test_bo, rounds=10
)
# set bay. opt. obtained best params to model
params_bo_dict = {}
for i in range(len(params_bo)):
setattr(model_train, self.bay_opt_param_space[i].name, params_bo[i])
params_bo_dict[self.bay_opt_param_space[i].name] = params_bo[i]
self.best_bay_opt_params.append(params_bo_dict)
return model_train
[docs]
def run_Bay_Opt(
self,
X_train,
y_train,
X_test,
y_test,
rounds=30,
base_estimator="GP",
acq_func="EI",
acq_optimizer="sampling",
initial_point_generator="lhs",
):
"""Run skopt bayesian optimization
skopt.Optimizer:
https://scikit-optimize.github.io/stable/modules/generated/skopt.Optimizer.html#skopt.Optimizer
example:
https://scikit-optimize.github.io/stable/auto_examples/ask-and-tell.html#sphx-glr-auto-examples-ask-and-tell-py
Special attention needs to be made with the run_CV output,
some metrics are minimized (MAE), some are maximized (r^2)
Parameters
----------
X_train: np.ndarray
y_train: np.ndarray
X_test: np.ndarray
y_test: np.ndarray
rounds : int, optional
optimizing rounds, by default 10
base_estimator : str, optional
surrogate model, used as optimization function instead of cross validation, by default "GP"
acq_func : str, optional
function to minimize over the posterior distribution, by default "EI"
acq_optimizer : str, optional
method to minimize the acquisition function, by default "sampling"
initial_point_generator : str, optional
sets a initial point generator, by default "lhs"
Returns
-------
skopt result parameters
"""
def get_f_val(model_bo):
try:
model_bo = self.fit_model(model_bo, X_train, y_train)
except Decoder.ClassMissingException:
pass
return self.eval_method(y_test, model_bo.predict(X_test))
from skopt import Optimizer
opt = Optimizer(
self.bay_opt_param_space,
base_estimator=base_estimator,
acq_func=acq_func,
acq_optimizer=acq_optimizer,
initial_point_generator=initial_point_generator,
)
for _ in range(rounds):
next_x = opt.ask()
# set model values
model_bo = clone(self.model)
for i in range(len(next_x)):
setattr(model_bo, self.bay_opt_param_space[i].name, next_x[i])
f_val = get_f_val(model_bo)
res = opt.tell(next_x, f_val)
if self.VERBOSE:
logger.info(f_val)
# res is here automatically appended by skopt
return res.x
[docs]
def save(self, feature_path: str, feature_file: str, str_save_add=None) -> None:
"""Save decoder object to pickle"""
# why is the decoder not saved to a .json?
if str_save_add is None:
PATH_OUT = PurePath(feature_path, feature_file, feature_file + "_ML_RES.p")
else:
PATH_OUT = PurePath(
feature_path,
feature_file,
feature_file + "_" + str_save_add + "_ML_RES.p",
)
logger.info(f"model being saved to: {PATH_OUT}")
with open(PATH_OUT, "wb") as output: # Overwrites any existing file.
pickle.dump(self, output)