"""Parameter optimization."""
# Authors: Carolina Fernandez <cxf418@miami.edu>
# Nick Tolley <nicholas_tolley@brown.edu>
# Ryan Thorpe <ryan_thorpe@brown.edu>
# Mainak Jas <mjas@mgh.harvard.edu>
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import fmin_cobyla
from .objective_functions import _rmse_evoked, _anticorr_evoked, _maximize_psd
from ..externals.mne import _validate_type
[docs]
class Optimizer:
def __init__(
self,
initial_net,
tstop,
constraints,
set_params,
initial_params=None,
solver="bayesian",
obj_fun="dipole_rmse",
max_iter=200,
):
"""Parameter optimization.
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : dict
The user-defined constraints.
set_params : func
User-defined function that sets parameters in network drives.
``set_params(net, params) -> None``
where ``net`` is a Network object and ``params`` is a dictionary
of the parameters that will be set inside the function.
initial_params : dict, optional
Initial parameters for the objective function. Keys are parameter
names, values are initial parameters. The default is None.
If None, the parameters will be set to the midpoints of parameter ranges.
solver : str
The optimizer, 'bayesian', 'cobyla', or 'cma'.
obj_fun : str | func
The objective function to be minimized. Can be 'dipole_rmse',
'maximize_psd', "dipole_corr", or a user-defined function. The default is
'dipole_rmse'.
max_iter : int, optional
The max number of calls to the objective function. The default is
200.
Attributes
----------
constraints : dict
The user-defined constraints.
initial_params : dict, None
Initial parameters for the objective function. If None, initial_params
is set to the midpoint of upper/lower bounds defined in constraints.
max_iter : int
The max number of calls to the objective function.
solver : func
The optimization function.
obj_fun : func
The objective function to be minimized.
obj_fun_name : str
The name of the template objective function.
tstop : float
The simulated dipole's duration.
net_ : instance of Network
The network object with optimized drives.
obj_ : list
The objective function values.
opt_params_ : list
The list of optimized parameter values.
"""
self._initial_net = initial_net
self.constraints = constraints
self._set_params = set_params
self.max_iter = max_iter
# Optimizer method
if solver == "bayesian":
self.solver = "bayesian"
self._assemble_constraints = _assemble_constraints_bayesian
self._run_opt = _run_opt_bayesian
elif solver == "cobyla":
self.solver = "cobyla"
self._assemble_constraints = _assemble_constraints_cobyla
self._run_opt = _run_opt_cobyla
elif solver == "cma":
self.solver = "cma"
self._assemble_constraints = _assemble_constraints_cma
self._run_opt = _run_opt_cma
else:
raise ValueError("solver must be 'bayesian' or 'cobyla'")
# Response to be optimized
if obj_fun == "dipole_rmse":
self.obj_fun = _rmse_evoked
self.obj_fun_name = "dipole_rmse"
elif obj_fun == "maximize_psd":
self.obj_fun = _maximize_psd
self.obj_fun_name = "maximize_psd"
elif obj_fun == "dipole_corr":
self.obj_fun = _anticorr_evoked
self.obj_fun_name = "dipole_corr"
else:
self.obj_fun = obj_fun # user-defined function
self.obj_fun_name = None
# Initial weights for the objective function
if initial_params is not None:
# Check if initial_params is not a dict
_validate_type(initial_params, dict)
# Check if initial_params keys match constraints keys
if set(initial_params.keys()) != set(self.constraints.keys()):
raise ValueError(
"The keys of initial_params must match the keys of constraints."
)
# Check if initial_params values contain only int or float
for val in initial_params.values():
_validate_type(val, (int, float))
# Check if initial_params values are within constraints
for key in initial_params:
if not (
self.constraints[key][0]
<= initial_params[key]
<= self.constraints[key][1]
):
raise ValueError(
f"Initial parameter '{key}' with value {initial_params[key]} is out of bounds {self.constraints[key]}."
)
self.initial_params = initial_params
else:
self.initial_params = _get_initial_params(self.constraints)
self.tstop = tstop
self.net_ = None
self.obj_ = list()
self.opt_params_ = None
[docs]
def __repr__(self):
is_fit = False
if self.net_ is not None:
is_fit = True
name = self.__class__.__name__
return f"<{name}\nsolver={self.solver}\nfit={is_fit}>"
[docs]
def fit(self, **obj_fun_kwargs):
"""Runs optimization routine.
Parameters
----------
target : instance of Dipole (Required if obj_fun='dipole_rmse')
A dipole object with experimental data.
n_trials : int (Optional if obj_fun='dipole_rmse')
Number of trials to simulate and average.
f_bands : list of tuples (Required if obj_fun='maximize_psd')
Lower and higher limit for each frequency band.
relative_bandpower : list of float | float (Required if obj_fun='maximize_psd')
Weight for each frequency band in f_bands. If a single float is provided,
the same weight is applied to all frequency bands.
sigma0 : float| array-like (Only used if solver='cma')
Initial standard deviation of CME-ES algorithm. If float,
sigma0 is scaled by bounds defined in the constraints for each parameter.
If array-like, The length of sigma0 must equal the length of constraints.
Default: 0.25
popsize : int (Only used if solver='cma')
Number of parameter samples simulated per epoch. Default: 16
n_jobs : int (Only used if solver='cma')
The number of jobs to start in parallel. If None, then 1 trial will be
started without parallelism.
tolfun : float
Termination criteria. Stops if the range of the best objective function
values of the last 10 + ((30 * n_parameters) / popsize) generations and all
function values of the recent generation is below tolfun. Default 0.01.
dt : float
The integration time step of h.CVode (ms). Default: 0.025 ms
scale_factor : float, optional
The dipole scale factor.
smooth_window_len : float, optional
The smooth window length.
seed : int, optional (Only used if solver='cma')
Optional seed for random number generator of optimizer.
verbose : bool
If True, print build steps and simulation progress to console. Default: True.
Notes
-----
When defining sigma0 for CMA-ES as a float, the sigma0 applied to each parameter
is calculated as sigma0 * (upper_bound - lower_bound) based on the constraints.
It is recommended to choose a sigma0 such that the optimum is expected to lie within
about initial_params +- 3*sigma0. A smaller sigma0 searches closer to initial_params.
When defining popsize for CMA-ES, it is recommended to increase popsize relative to
the number of parameters being optimized (N). 4+3*log(N)
"""
if self.obj_fun_name == "dipole_rmse" or self.obj_fun_name == "dipole_corr":
if "target" not in obj_fun_kwargs:
raise Exception("target must be specified")
elif "n_trials" not in obj_fun_kwargs:
obj_fun_kwargs["n_trials"] = 1
elif self.obj_fun_name == "maximize_psd" and (
"f_bands" not in obj_fun_kwargs
or "relative_bandpower" not in obj_fun_kwargs
):
raise Exception("f_bands and relative_bandpower must be specified")
constraints = self._assemble_constraints(self.constraints)
opt_params, obj, net_ = self._run_opt(
self._initial_net,
self.tstop,
constraints,
self._set_params,
self.initial_params,
self.obj_fun,
self.max_iter,
obj_fun_kwargs,
)
self.net_ = net_
self.obj_ = obj
self.opt_params_ = opt_params
[docs]
def plot_convergence(self, ax=None, show=True):
"""Convergence plot.
Parameters
----------
ax : instance of matplotlib figure, optional
The matplotlib axis. The default is None.
show : bool
If True, show the figure. The default is True.
Returns
-------
fig : instance of plt.fig
The matplotlib figure handle.
"""
if ax is None:
fig, ax = plt.subplots(constrained_layout=True)
axis = ax if isinstance(ax, matplotlib.axes._axes.Axes) else ax
x = list(range(1, self.max_iter + 1))
y_min = min(self.obj_) - 0.01
y_max = max(self.obj_) + 0.01
axis.plot(x, self.obj_, color="black")
axis.set_ylim([y_min, y_max])
axis.set_title("Convergence")
axis.set_xlabel("Number of calls")
axis.set_ylabel("Objective value")
axis.grid(visible=True)
if ax is None:
fig.show(show)
return axis.get_figure()
else:
return ax.get_figure()
def _get_initial_params(constraints):
"""Gets initial parameters as midpoints of parameter ranges.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
initial_params : dict
Keys are parameter names, values are initial parameters.
"""
initial_params = dict()
for cons_key in constraints:
initial_params.update(
{cons_key: (constraints[cons_key][0] + constraints[cons_key][1]) / 2}
)
return initial_params
def _assemble_constraints_bayesian(constraints):
"""Assembles constraints in format required by gp_minimize.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
cons_bayesian : list of tuples
Lower and higher limit for each parameter.
"""
# assemble constraints in solver-specific format
cons_bayesian = list(constraints.values())
return cons_bayesian
def _assemble_constraints_cobyla(constraints):
"""Assembles constraints in format required by fmin_cobyla.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
cons_bayesian : dict
Set of functions.
"""
# assemble constraints in solver-specific format
cons_cobyla = list()
for idx, cons_key in enumerate(constraints):
cons_cobyla.append(lambda x, idx=idx: float(constraints[cons_key][1]) - x[idx])
cons_cobyla.append(lambda x, idx=idx: x[idx] - float(constraints[cons_key][0]))
return cons_cobyla
def _assemble_constraints_cma(constraints):
"""Assembles constraints in format required by cma.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
cons_cma : list of lists
Lower and higher limit for each parameter.
"""
# assemble constraints in solver-specific format
lower_bounds = [bound[0] for bound in constraints.values()]
upper_bounds = [bound[1] for bound in constraints.values()]
cons_cma = [lower_bounds, upper_bounds]
return cons_cma
def _update_params(initial_params, predicted_params):
"""Update params with predicted parameters.
Parameters
----------
initial_params : dict
Keys are parameter names, values are initial parameters.
predicted_params : list
Parameters selected by the optimizer.
Returns
-------
params : dict
Keys are parameter names, values are parameters.
"""
params = dict()
for param_key, param_name in enumerate(initial_params):
params.update({param_name: predicted_params[param_key]})
return params
def _run_opt_bayesian(
initial_net,
tstop,
constraints,
set_params,
initial_params,
obj_fun,
max_iter,
obj_fun_kwargs,
):
"""Runs optimization routine with gp_minimize optimizer.
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : list of tuples
Parameter constraints in solver-specific format.
set_params : func
User-defined function that sets parameters in network drives.
initial_params : dict
Keys are parameter names, values are initial parameters.
obj_fun : func
The objective function.
max_iter : int
Number of calls the optimizer makes.
obj_fun_kwargs : dict
Additional arguments along with their respective values to be passed
to the objective function.
Returns
-------
opt_params : list
Optimized parameters.
obj : list
Objective values.
net_ : instance of Network
Optimized network object.
"""
from ..externals.bayesopt import bayes_opt, expected_improvement
# `obj_values` tracks optimizer loss over all epochs
# the list is passed to `_obj_func()` and updated in place
obj_values = list()
def _obj_func(predicted_params):
return obj_fun(
initial_net=initial_net,
initial_params=initial_params,
set_params=set_params,
predicted_params=predicted_params,
update_params=_update_params,
obj_values=obj_values,
tstop=tstop,
obj_fun_kwargs=obj_fun_kwargs,
)
opt_results, _ = bayes_opt(
func=_obj_func,
x0=list(initial_params.values()),
cons=constraints,
acquisition=expected_improvement,
maxfun=max_iter,
)
# get optimized params
opt_params = opt_results
# get objective values
obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)]
# get optimized net
params = _update_params(initial_params, opt_params)
net_ = initial_net.copy()
set_params(net_, params)
return opt_params, obj, net_
def _run_opt_cma(
initial_net,
tstop,
constraints,
set_params,
initial_params,
obj_fun,
max_iter,
obj_fun_kwargs,
):
"""Runs optimization routine with CMA-ES
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : list of tuples
Parameter constraints in solver-specific format.
set_params : func
User-defined function that sets parameters in network drives.
initial_params : dict
Keys are parameter names, values are initial parameters.
obj_fun : func
The objective function.
max_iter : int
Number of calls the optimizer makes.
obj_fun_kwargs : dict
Additional arguments along with their respective values to be passed
to the objective function.
Returns
-------
opt_params : list
Optimized parameters.
obj : list
Objective values.
net_ : instance of Network
Optimized network object.
"""
import cma
# `obj_values` tracks optimizer loss over all epochs
# the list is passed to `_obj_func()` and updated in place
obj_values = list()
def _obj_func(predicted_params):
return obj_fun(
initial_net=initial_net,
initial_params=initial_params,
set_params=set_params,
predicted_params=predicted_params,
update_params=_update_params,
obj_values=obj_values,
tstop=tstop,
obj_fun_kwargs=obj_fun_kwargs,
)
default_sigma0 = 0.25
sigma0 = obj_fun_kwargs.get("sigma0", default_sigma0)
_validate_type(sigma0, (int, float, list, np.ndarray))
if isinstance(sigma0, (float, int)):
if sigma0 < 0:
raise ValueError("sigma0 must be greater than zero")
sigma0 = sigma0 * (np.array(constraints[1]) - np.array(constraints[0]))
elif isinstance(sigma0, (list, np.ndarray)):
if len(np.array(sigma0).shape) > 1:
raise ValueError(
f"When sigma0 is array-like, it must be shape (N,), got shape {np.array(sigma0).shape}"
)
if len(sigma0) != len(constraints):
raise ValueError(
"When sigma0 is array-like, the length must be the same as the constraints."
f"Expected {len(constraints)} elements, got {len(sigma0)} elements)"
)
es = cma.CMAEvolutionStrategy(
list(initial_params.values()),
1,
{
"bounds": constraints,
"tolfun": obj_fun_kwargs.get("tolfun", 0.01),
"maxiter": max_iter,
"popsize": obj_fun_kwargs.get("popsize", 16),
"CMA_stds": sigma0,
"verbose": 0,
"seed": obj_fun_kwargs.get("seed", 123),
},
)
while not es.stop():
solutions = es.ask()
es.tell(solutions, _obj_func(solutions))
es.result_pretty()
# get best params
best_params = es.result.xbest
# get objective values
obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)]
# get optimized net
params = _update_params(initial_params, best_params)
net_ = initial_net.copy()
set_params(net_, params)
return best_params, obj, net_
def _run_opt_cobyla(
initial_net,
tstop,
constraints,
set_params,
initial_params,
obj_fun,
max_iter,
obj_fun_kwargs,
):
"""Runs optimization routine with fmin_cobyla optimizer.
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : list of tuples
Parameter constraints in solver-specific format.
set_params : func
User-defined function that sets parameters in network drives.
initial_params : dict
Keys are parameter names, values are initial parameters.
obj_fun : func
The objective function.
max_iter : int
Number of calls the optimizer makes.
obj_fun_kwargs : dict
Additional arguments along with their respective values to be passed
to the objective function.
Returns
-------
opt_params : list
Optimized parameters.
obj : list
Objective values.
net_ : instance of Network
Optimized network object.
"""
# `obj_values` tracks optimizer loss over all epochs
# the list is passed to `_obj_func()` and updated in place
obj_values = list()
def _obj_func(predicted_params):
return obj_fun(
initial_net=initial_net,
initial_params=initial_params,
set_params=set_params,
predicted_params=predicted_params,
update_params=_update_params,
obj_values=obj_values,
tstop=tstop,
obj_fun_kwargs=obj_fun_kwargs,
)
opt_results = fmin_cobyla(
_obj_func,
cons=constraints,
rhobeg=0.1,
rhoend=1e-4,
x0=list(initial_params.values()),
maxfun=max_iter,
catol=0.0,
)
# get optimized params
opt_params = opt_results
# get objective values
obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)]
# get optimized net
params = _update_params(initial_params, opt_params)
net_ = initial_net.copy()
set_params(net_, params)
return opt_params, obj, net_