"""Parameter optimization."""
# Authors: Carolina Fernandez <cxf418@miami.edu>
# Nick Tolley <nicholas_tolley@brown.edu>
# Ryan Thorpe <ryan_thorpe@brown.edu>
# Mainak Jas <mjas@mgh.harvard.edu>
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import fmin_cobyla
from .objective_functions import _rmse_evoked, _anticorr_evoked, _maximize_psd
from ..externals.mne import _validate_type
[docs]
class Optimizer:
def __init__(
self,
initial_net,
tstop,
constraints,
set_params,
initial_params=None,
solver="bayesian",
obj_fun="dipole_rmse",
max_iter=200,
):
"""Parameter optimization.
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : dict
The user-defined constraints.
set_params : func
User-defined function that sets parameters in network drives.
``set_params(net, params) -> None``
where ``net`` is a Network object and ``params`` is a dictionary of the
parameters that will be set inside the function.
initial_params : dict, optional
Initial parameters for the objective function. Keys are parameter names,
values are initial parameters. The default is None. If None, the parameters
will be set to the midpoints of parameter ranges.
solver : str
The optimizer, 'bayesian', 'cobyla', or 'cma'.
obj_fun : str | func
The objective function to be minimized. Can be 'dipole_rmse',
'maximize_psd', or a user-defined function. The default is 'dipole_rmse'. If
a user-defined function is provided, it must have the same function
signature as the existing objective functions (i.e. `_rmse_evoked` and
`_maximize_psd` in objective_functions.py).
max_iter : int, optional
The max number of calls to the objective function. The default is 200.
Attributes
----------
constraints : dict
The user-defined constraints.
initial_params : dict, None
Initial parameters for the objective function. If None, initial_params
is set to the midpoint of upper/lower bounds defined in constraints.
max_iter : int
The max number of calls to the objective function.
solver : func
The optimization function.
obj_fun : func
The objective function to be minimized.
obj_fun_name : str
The name of the template objective function.
tstop : float
The simulated dipole's duration.
net_ : instance of Network
The network object with optimized drives.
obj_ : list
The objective function values.
opt_params_ : list
The list of optimized parameter values.
"""
self._initial_net = initial_net
self.constraints = constraints
self._set_params = set_params
self.max_iter = max_iter
# Optimizer method
if solver == "bayesian":
self.solver = "bayesian"
self._assemble_constraints = _assemble_constraints_bayesian
self._run_opt = _run_opt_bayesian
elif solver == "cobyla":
self.solver = "cobyla"
self._assemble_constraints = _assemble_constraints_cobyla
self._run_opt = _run_opt_cobyla
elif solver == "cma":
self.solver = "cma"
self._assemble_constraints = _assemble_constraints_cma
self._run_opt = _run_opt_cma
else:
raise ValueError("solver must be 'bayesian' or 'cobyla'")
# Response to be optimized
if obj_fun == "dipole_rmse":
self.obj_fun = _rmse_evoked
self.obj_fun_name = "dipole_rmse"
elif obj_fun == "maximize_psd":
self.obj_fun = _maximize_psd
self.obj_fun_name = "maximize_psd"
elif obj_fun == "dipole_corr":
self.obj_fun = _anticorr_evoked
self.obj_fun_name = "dipole_corr"
else:
self.obj_fun = obj_fun # user-defined function
self.obj_fun_name = None
# Initial weights for the objective function
if initial_params is not None:
# Check if initial_params is not a dict
_validate_type(initial_params, dict)
# Check if initial_params keys match constraints keys
if set(initial_params.keys()) != set(self.constraints.keys()):
raise ValueError(
"The keys of initial_params must match the keys of constraints."
)
# Check if initial_params values contain only int or float
for val in initial_params.values():
_validate_type(val, (int, float))
# Check if initial_params values are within constraints
for key in initial_params:
if not (
self.constraints[key][0]
<= initial_params[key]
<= self.constraints[key][1]
):
raise ValueError(
f"Initial parameter '{key}' with value {initial_params[key]} is out of bounds {self.constraints[key]}."
)
self.initial_params = initial_params
else:
self.initial_params = _get_initial_params(self.constraints)
self.tstop = tstop
self.net_ = None
self.obj_ = list()
self.opt_params_ = None
[docs]
def __repr__(self):
is_fit = False
if self.net_ is not None:
is_fit = True
name = self.__class__.__name__
return f"<{name}\nsolver={self.solver}\nfit={is_fit}>"
[docs]
def fit(self, **obj_fun_kwargs):
"""Runs optimization routine.
Parameters
----------
target : instance of Dipole (Required if obj_fun='dipole_rmse' or 'dipole_corr')
A dipole object with experimental data.
n_trials : int (Optional if obj_fun='dipole_rmse' or 'dipole_corr')
Number of trials to simulate and average.
f_bands : list of tuples (Required if obj_fun='maximize_psd')
Lower and higher limit for each frequency band in Hz.
relative_bandpower : list of float | float (Required if obj_fun='maximize_psd')
Weight for each frequency band in f_bands. If a single float is provided,
the same weight is applied to all frequency bands.
n_jobs : int (Only used if solver='cma')
The number of jobs to start in parallel. If None, then 1 trial will be
started without parallelism.
popsize : int (Only used if solver='cma')
Number of parameter samples simulated per epoch. Default: 16
seed : int, optional (Only used if solver='cma')
Optional seed for random number generator of optimizer.
sigma0 : float | array-like (Only used if solver='cma')
Initial standard deviation of CME-ES algorithm. If float, sigma0 is scaled
by bounds defined in the constraints for each parameter. If array-like, The
length of sigma0 must equal the length of constraints. Default: 0.25
tolfun : float (Only used if solver='cma')
Termination criteria. Stops if the range of the best objective function
values of the last 10 + ((30 * n_parameters) / popsize) generations and all
function values of the recent generation is below tolfun. Default 0.01.
dt : float, default=0.025
The integration time step of h.CVode (in ms)
scale_factor : float, optional
The dipole scale factor to use after every optimization iteration before
data comparison. There is no scaling applied by default, so you must pass a
value if you want any scaling.
smooth_window_len : float, optional
The smooth window length to use after every optimization iteration before
data comparison. There is no smoothing applied by default, so you must pass
a value if you want any smoothing.
verbose : bool, default=True
If True, print build steps and simulation progress to console.
Notes
-----
When defining sigma0 for CMA-ES as a float, the sigma0 applied to each parameter
is calculated as sigma0 * (upper_bound - lower_bound) based on the constraints.
It is recommended to choose a sigma0 such that the optimum is expected to lie
within about initial_params +- 3*sigma0. A smaller sigma0 searches closer to
initial_params.
When defining popsize for CMA-ES, it is recommended to increase popsize relative
to the number of parameters being optimized (N). 4+3*log(N)
"""
if self.obj_fun_name == "dipole_rmse" or self.obj_fun_name == "dipole_corr":
if "target" not in obj_fun_kwargs:
raise Exception("target must be specified")
elif "n_trials" not in obj_fun_kwargs:
obj_fun_kwargs["n_trials"] = 1
elif self.obj_fun_name == "maximize_psd" and (
"f_bands" not in obj_fun_kwargs
or "relative_bandpower" not in obj_fun_kwargs
):
raise Exception("f_bands and relative_bandpower must be specified")
constraints = self._assemble_constraints(self.constraints)
opt_params, obj, net_ = self._run_opt(
self._initial_net,
self.tstop,
constraints,
self._set_params,
self.initial_params,
self.obj_fun,
self.max_iter,
obj_fun_kwargs,
)
self.net_ = net_
self.obj_ = obj
self.opt_params_ = opt_params
[docs]
def plot_convergence(self, ax=None, show=True):
"""Convergence plot.
Parameters
----------
ax : instance of matplotlib figure, optional
The matplotlib axis. The default is None.
show : bool
If True, show the figure. The default is True.
Returns
-------
fig : instance of plt.fig
The matplotlib figure handle.
"""
if ax is None:
fig, ax = plt.subplots(constrained_layout=True)
axis = ax if isinstance(ax, matplotlib.axes._axes.Axes) else ax
x = list(range(1, self.max_iter + 1))
y_min = min(self.obj_) - 0.01
y_max = max(self.obj_) + 0.01
axis.plot(x, self.obj_, color="black")
axis.set_ylim([y_min, y_max])
axis.set_title("Convergence")
axis.set_xlabel("Number of calls")
axis.set_ylabel("Objective value")
axis.grid(visible=True)
if ax is None:
fig.show(show)
return axis.get_figure()
else:
return ax.get_figure()
def _get_initial_params(constraints):
"""Gets initial parameters as midpoints of parameter ranges.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
initial_params : dict
Keys are parameter names, values are initial parameters.
"""
initial_params = dict()
for cons_key in constraints:
initial_params.update(
{cons_key: (constraints[cons_key][0] + constraints[cons_key][1]) / 2}
)
return initial_params
def _assemble_constraints_bayesian(constraints):
"""Assembles constraints in format required by gp_minimize.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
cons_bayesian : list of tuples
Lower and higher limit for each parameter.
"""
# assemble constraints in solver-specific format
cons_bayesian = list(constraints.values())
return cons_bayesian
def _assemble_constraints_cobyla(constraints):
"""Assembles constraints in format required by fmin_cobyla.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
cons_bayesian : dict
Set of functions.
"""
# assemble constraints in solver-specific format
cons_cobyla = list()
for idx, cons_key in enumerate(constraints):
cons_cobyla.append(lambda x, idx=idx: float(constraints[cons_key][1]) - x[idx])
cons_cobyla.append(lambda x, idx=idx: x[idx] - float(constraints[cons_key][0]))
return cons_cobyla
def _assemble_constraints_cma(constraints):
"""Assembles constraints in format required by cma.
Parameters
----------
constraints : dict
The user-defined constraints.
Returns
-------
cons_cma : list of lists
Lower and higher limit for each parameter.
"""
# assemble constraints in solver-specific format
lower_bounds = [bound[0] for bound in constraints.values()]
upper_bounds = [bound[1] for bound in constraints.values()]
cons_cma = [lower_bounds, upper_bounds]
return cons_cma
def _update_params(initial_params, predicted_params):
"""Update params with predicted parameters.
Parameters
----------
initial_params : dict
Keys are parameter names, values are initial parameters.
predicted_params : list
Parameters selected by the optimizer.
Returns
-------
params : dict
Keys are parameter names, values are parameters.
"""
params = dict()
for param_key, param_name in enumerate(initial_params):
params.update({param_name: predicted_params[param_key]})
return params
def _run_opt_bayesian(
initial_net,
tstop,
constraints,
set_params,
initial_params,
obj_fun,
max_iter,
obj_fun_kwargs,
):
"""Runs optimization routine with gp_minimize optimizer.
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : list of tuples
Parameter constraints in solver-specific format.
set_params : func
User-defined function that sets parameters in network drives.
initial_params : dict
Keys are parameter names, values are initial parameters.
obj_fun : func
The objective function.
max_iter : int
Number of calls the optimizer makes.
obj_fun_kwargs : dict
Additional arguments along with their respective values to be passed
to the objective function.
Returns
-------
opt_params : list
Optimized parameters.
obj : list
Objective values.
net_ : instance of Network
Optimized network object.
"""
from ..externals.bayesopt import bayes_opt, expected_improvement
# `obj_values` tracks optimizer loss over all epochs
# the list is passed to `_obj_func()` and updated in place
obj_values = list()
def _obj_func(predicted_params):
return obj_fun(
initial_net=initial_net,
initial_params=initial_params,
set_params=set_params,
predicted_params=predicted_params,
update_params=_update_params,
obj_values=obj_values,
tstop=tstop,
obj_fun_kwargs=obj_fun_kwargs,
)
opt_results, _ = bayes_opt(
func=_obj_func,
x0=list(initial_params.values()),
cons=constraints,
acquisition=expected_improvement,
maxfun=max_iter,
)
# get optimized params
opt_params = opt_results
# get objective values
obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)]
# get optimized net
params = _update_params(initial_params, opt_params)
net_ = initial_net.copy()
set_params(net_, params)
return opt_params, obj, net_
def _run_opt_cma(
initial_net,
tstop,
constraints,
set_params,
initial_params,
obj_fun,
max_iter,
obj_fun_kwargs,
):
"""Runs optimization routine with CMA-ES
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : list of tuples
Parameter constraints in solver-specific format.
set_params : func
User-defined function that sets parameters in network drives.
initial_params : dict
Keys are parameter names, values are initial parameters.
obj_fun : func
The objective function.
max_iter : int
Number of calls the optimizer makes.
obj_fun_kwargs : dict
Additional arguments along with their respective values to be passed
to the objective function.
Returns
-------
opt_params : list
Optimized parameters.
obj : list
Objective values.
net_ : instance of Network
Optimized network object.
"""
import cma
# `obj_values` tracks optimizer loss over all epochs
# the list is passed to `_obj_func()` and updated in place
obj_values = list()
def _obj_func(predicted_params):
return obj_fun(
initial_net=initial_net,
initial_params=initial_params,
set_params=set_params,
predicted_params=predicted_params,
update_params=_update_params,
obj_values=obj_values,
tstop=tstop,
obj_fun_kwargs=obj_fun_kwargs,
)
default_sigma0 = 0.25
sigma0 = obj_fun_kwargs.get("sigma0", default_sigma0)
_validate_type(sigma0, (int, float, list, np.ndarray))
if isinstance(sigma0, (float, int)):
if sigma0 < 0:
raise ValueError("sigma0 must be greater than zero")
sigma0 = sigma0 * (np.array(constraints[1]) - np.array(constraints[0]))
elif isinstance(sigma0, (list, np.ndarray)):
if len(np.array(sigma0).shape) > 1:
raise ValueError(
f"When sigma0 is array-like, it must be shape (N,), got shape {np.array(sigma0).shape}"
)
if len(sigma0) != len(constraints):
raise ValueError(
"When sigma0 is array-like, the length must be the same as the constraints."
f"Expected {len(constraints)} elements, got {len(sigma0)} elements)"
)
es = cma.CMAEvolutionStrategy(
list(initial_params.values()),
1,
{
"bounds": constraints,
"tolfun": obj_fun_kwargs.get("tolfun", 0.01),
"maxiter": max_iter,
"popsize": obj_fun_kwargs.get("popsize", 16),
"CMA_stds": sigma0,
"verbose": 0,
"seed": obj_fun_kwargs.get("seed", 123),
},
)
while not es.stop():
solutions = es.ask()
es.tell(solutions, _obj_func(solutions))
es.result_pretty()
# get best params
best_params = es.result.xbest
# get objective values
obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)]
# get optimized net
params = _update_params(initial_params, best_params)
net_ = initial_net.copy()
set_params(net_, params)
return best_params, obj, net_
def _run_opt_cobyla(
initial_net,
tstop,
constraints,
set_params,
initial_params,
obj_fun,
max_iter,
obj_fun_kwargs,
):
"""Runs optimization routine with fmin_cobyla optimizer.
Parameters
----------
initial_net : instance of Network
The network object.
tstop : float
The simulated dipole's duration.
constraints : list of tuples
Parameter constraints in solver-specific format.
set_params : func
User-defined function that sets parameters in network drives.
initial_params : dict
Keys are parameter names, values are initial parameters.
obj_fun : func
The objective function.
max_iter : int
Number of calls the optimizer makes.
obj_fun_kwargs : dict
Additional arguments along with their respective values to be passed
to the objective function.
Returns
-------
opt_params : list
Optimized parameters.
obj : list
Objective values.
net_ : instance of Network
Optimized network object.
"""
# `obj_values` tracks optimizer loss over all epochs
# the list is passed to `_obj_func()` and updated in place
obj_values = list()
best = {
"obj": np.inf,
"params": None,
}
def _obj_func(predicted_params):
return obj_fun(
initial_net=initial_net,
initial_params=initial_params,
set_params=set_params,
predicted_params=predicted_params,
update_params=_update_params,
obj_values=obj_values,
tstop=tstop,
obj_fun_kwargs=obj_fun_kwargs,
best=best,
)
fmin_cobyla(
_obj_func,
cons=constraints,
rhobeg=0.1,
rhoend=1e-4,
x0=list(initial_params.values()),
maxfun=max_iter,
catol=0.0,
)
# get optimized params
opt_params = best["params"]
# get objective values
obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)]
# get optimized net
params = _update_params(initial_params, opt_params)
net_ = initial_net.copy()
set_params(net_, params)
return opt_params, obj, net_
def generate_opt_history_table(opt_results, report_timestamp):
"""Generate a human-readable text table of optimization history.
Parameters
----------
opt_results : list of dict
List of optimization results, where each dict contains the following key-value pairs:
"initial_params" : dict
Initial parameter values
"opt_params" : list
Optimized parameter values
"obj_values" : list
Objective function values at each iteration
"solver" : str
Solver name used
"obj_fun" : str
Objective function name
"max_iter" : int
Maximum iterations
"target_data" : str, optional
Target dataset name (for dipole_rmse)
"n_trials" : int, optional
Number of trials (for dipole_rmse)
"psd_f_bands" : list of tuple, optional
Frequency bands (for maximize_psd)
"psd_relative_bandpower" : list of float, optional
Relative bandpower weights (for maximize_psd)
report_timestamp : str
A timestamp which will be used to indicate the time when the report
was generated. This will represent the same time as the time used to
generate the filename for the report.
Returns
-------
str
A formatted text table documenting the initial and final values
of optimized parameters and the first/final objective function values
for each full optimization run.
"""
if not opt_results:
return "No optimization runs have been completed yet."
lines = []
lines.append("=" * 100)
lines.append("OPTIMIZATION HISTORY")
lines.append(f"Generated: {report_timestamp}")
lines.append("=" * 100)
lines.append("")
for run_idx, opt_result in enumerate(opt_results, start=1):
solver = opt_result.get("solver", "N/A")
obj_fun = opt_result.get("obj_fun", "unknown")
max_iter = opt_result.get("max_iter", "N/A")
lines.append(f"Run #{run_idx}")
lines.append("-" * 100)
lines.append(f"Solver: {solver}")
lines.append(f"Objective Function: {obj_fun}")
lines.append(f"Max Iterations: {max_iter}")
# Add target dataset info if using dipole_rmse
if obj_fun == "dipole_rmse" and "target_data" in opt_result:
lines.append(f"Target Dataset: {opt_result['target_data']}")
if "n_trials" in opt_result:
lines.append(f"Number of Trials: {opt_result['n_trials']}")
# Add frequency band info if using maximize_psd
if obj_fun == "maximize_psd" and "psd_f_bands" in opt_result:
f_bands = opt_result["psd_f_bands"]
rel_bandpower = opt_result["psd_relative_bandpower"]
for i, (band, weight) in enumerate(zip(f_bands, rel_bandpower), start=1):
lines.append(
f" Frequency Band {i}: {band[0]:.1f}-{band[1]:.1f} Hz (weight: {weight:.3f})"
)
lines.append("")
# Get parameter names from initial_params dict
param_names = list(opt_result["initial_params"].keys())
# Create header
header = f"{'Parameter':<40} {'Initial Value':>15} {'Final Value':>15} {'Change':>15}"
lines.append(header)
lines.append("-" * 100)
# Add each parameter
for param_idx, param_name in enumerate(param_names):
initial_val = opt_result["initial_params"][param_name]
# opt_params should be a list in the same order as "initial_params"
final_val = opt_result["opt_params"][param_idx]
change = final_val - initial_val
# Format the values with appropriate precision
param_str = f"{param_name:<40}"
initial_str = f"{initial_val:>15.6f}"
final_str = f"{final_val:>15.6f}"
change_str = f"{change:>+15.6f}"
line = f"{param_str} {initial_str} {final_str} {change_str}"
lines.append(line)
lines.append("")
lines.append(f"Objective Function ({obj_fun}) Values:")
for obj_idx, obj_out in enumerate(opt_result["obj_values"]):
lines.append(f" Iteration {obj_idx}: {obj_out:>15.6f}")
obj_diff = opt_result["obj_values"][-1] - opt_result["obj_values"][0]
lines.append(f"Total Change: {obj_diff:>+15.6f}")
lines.append("")
lines.append("=" * 100)
lines.append("")
return "\n".join(lines)