Source code for hnn_core.optimization.general_optimization

"""Parameter optimization."""

# Authors: Carolina Fernandez <cxf418@miami.edu>
#          Nick Tolley <nicholas_tolley@brown.edu>
#          Ryan Thorpe <ryan_thorpe@brown.edu>
#          Mainak Jas <mjas@mgh.harvard.edu>

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import fmin_cobyla

from .objective_functions import _rmse_evoked, _anticorr_evoked, _maximize_psd
from ..externals.mne import _validate_type


[docs] class Optimizer: def __init__( self, initial_net, tstop, constraints, set_params, initial_params=None, solver="bayesian", obj_fun="dipole_rmse", max_iter=200, ): """Parameter optimization. Parameters ---------- initial_net : instance of Network The network object. tstop : float The simulated dipole's duration. constraints : dict The user-defined constraints. set_params : func User-defined function that sets parameters in network drives. ``set_params(net, params) -> None`` where ``net`` is a Network object and ``params`` is a dictionary of the parameters that will be set inside the function. initial_params : dict, optional Initial parameters for the objective function. Keys are parameter names, values are initial parameters. The default is None. If None, the parameters will be set to the midpoints of parameter ranges. solver : str The optimizer, 'bayesian', 'cobyla', or 'cma'. obj_fun : str | func The objective function to be minimized. Can be 'dipole_rmse', 'maximize_psd', "dipole_corr", or a user-defined function. The default is 'dipole_rmse'. max_iter : int, optional The max number of calls to the objective function. The default is 200. Attributes ---------- constraints : dict The user-defined constraints. initial_params : dict, None Initial parameters for the objective function. If None, initial_params is set to the midpoint of upper/lower bounds defined in constraints. max_iter : int The max number of calls to the objective function. solver : func The optimization function. obj_fun : func The objective function to be minimized. obj_fun_name : str The name of the template objective function. tstop : float The simulated dipole's duration. net_ : instance of Network The network object with optimized drives. obj_ : list The objective function values. opt_params_ : list The list of optimized parameter values. """ self._initial_net = initial_net self.constraints = constraints self._set_params = set_params self.max_iter = max_iter # Optimizer method if solver == "bayesian": self.solver = "bayesian" self._assemble_constraints = _assemble_constraints_bayesian self._run_opt = _run_opt_bayesian elif solver == "cobyla": self.solver = "cobyla" self._assemble_constraints = _assemble_constraints_cobyla self._run_opt = _run_opt_cobyla elif solver == "cma": self.solver = "cma" self._assemble_constraints = _assemble_constraints_cma self._run_opt = _run_opt_cma else: raise ValueError("solver must be 'bayesian' or 'cobyla'") # Response to be optimized if obj_fun == "dipole_rmse": self.obj_fun = _rmse_evoked self.obj_fun_name = "dipole_rmse" elif obj_fun == "maximize_psd": self.obj_fun = _maximize_psd self.obj_fun_name = "maximize_psd" elif obj_fun == "dipole_corr": self.obj_fun = _anticorr_evoked self.obj_fun_name = "dipole_corr" else: self.obj_fun = obj_fun # user-defined function self.obj_fun_name = None # Initial weights for the objective function if initial_params is not None: # Check if initial_params is not a dict _validate_type(initial_params, dict) # Check if initial_params keys match constraints keys if set(initial_params.keys()) != set(self.constraints.keys()): raise ValueError( "The keys of initial_params must match the keys of constraints." ) # Check if initial_params values contain only int or float for val in initial_params.values(): _validate_type(val, (int, float)) # Check if initial_params values are within constraints for key in initial_params: if not ( self.constraints[key][0] <= initial_params[key] <= self.constraints[key][1] ): raise ValueError( f"Initial parameter '{key}' with value {initial_params[key]} is out of bounds {self.constraints[key]}." ) self.initial_params = initial_params else: self.initial_params = _get_initial_params(self.constraints) self.tstop = tstop self.net_ = None self.obj_ = list() self.opt_params_ = None
[docs] def __repr__(self): is_fit = False if self.net_ is not None: is_fit = True name = self.__class__.__name__ return f"<{name}\nsolver={self.solver}\nfit={is_fit}>"
[docs] def fit(self, **obj_fun_kwargs): """Runs optimization routine. Parameters ---------- target : instance of Dipole (Required if obj_fun='dipole_rmse') A dipole object with experimental data. n_trials : int (Optional if obj_fun='dipole_rmse') Number of trials to simulate and average. f_bands : list of tuples (Required if obj_fun='maximize_psd') Lower and higher limit for each frequency band. relative_bandpower : list of float | float (Required if obj_fun='maximize_psd') Weight for each frequency band in f_bands. If a single float is provided, the same weight is applied to all frequency bands. sigma0 : float| array-like (Only used if solver='cma') Initial standard deviation of CME-ES algorithm. If float, sigma0 is scaled by bounds defined in the constraints for each parameter. If array-like, The length of sigma0 must equal the length of constraints. Default: 0.25 popsize : int (Only used if solver='cma') Number of parameter samples simulated per epoch. Default: 16 n_jobs : int (Only used if solver='cma') The number of jobs to start in parallel. If None, then 1 trial will be started without parallelism. tolfun : float Termination criteria. Stops if the range of the best objective function values of the last 10 + ((30 * n_parameters) / popsize) generations and all function values of the recent generation is below tolfun. Default 0.01. dt : float The integration time step of h.CVode (ms). Default: 0.025 ms scale_factor : float, optional The dipole scale factor. smooth_window_len : float, optional The smooth window length. seed : int, optional (Only used if solver='cma') Optional seed for random number generator of optimizer. verbose : bool If True, print build steps and simulation progress to console. Default: True. Notes ----- When defining sigma0 for CMA-ES as a float, the sigma0 applied to each parameter is calculated as sigma0 * (upper_bound - lower_bound) based on the constraints. It is recommended to choose a sigma0 such that the optimum is expected to lie within about initial_params +- 3*sigma0. A smaller sigma0 searches closer to initial_params. When defining popsize for CMA-ES, it is recommended to increase popsize relative to the number of parameters being optimized (N). 4+3*log(N) """ if self.obj_fun_name == "dipole_rmse" or self.obj_fun_name == "dipole_corr": if "target" not in obj_fun_kwargs: raise Exception("target must be specified") elif "n_trials" not in obj_fun_kwargs: obj_fun_kwargs["n_trials"] = 1 elif self.obj_fun_name == "maximize_psd" and ( "f_bands" not in obj_fun_kwargs or "relative_bandpower" not in obj_fun_kwargs ): raise Exception("f_bands and relative_bandpower must be specified") constraints = self._assemble_constraints(self.constraints) opt_params, obj, net_ = self._run_opt( self._initial_net, self.tstop, constraints, self._set_params, self.initial_params, self.obj_fun, self.max_iter, obj_fun_kwargs, ) self.net_ = net_ self.obj_ = obj self.opt_params_ = opt_params
[docs] def plot_convergence(self, ax=None, show=True): """Convergence plot. Parameters ---------- ax : instance of matplotlib figure, optional The matplotlib axis. The default is None. show : bool If True, show the figure. The default is True. Returns ------- fig : instance of plt.fig The matplotlib figure handle. """ if ax is None: fig, ax = plt.subplots(constrained_layout=True) axis = ax if isinstance(ax, matplotlib.axes._axes.Axes) else ax x = list(range(1, self.max_iter + 1)) y_min = min(self.obj_) - 0.01 y_max = max(self.obj_) + 0.01 axis.plot(x, self.obj_, color="black") axis.set_ylim([y_min, y_max]) axis.set_title("Convergence") axis.set_xlabel("Number of calls") axis.set_ylabel("Objective value") axis.grid(visible=True) if ax is None: fig.show(show) return axis.get_figure() else: return ax.get_figure()
def _get_initial_params(constraints): """Gets initial parameters as midpoints of parameter ranges. Parameters ---------- constraints : dict The user-defined constraints. Returns ------- initial_params : dict Keys are parameter names, values are initial parameters. """ initial_params = dict() for cons_key in constraints: initial_params.update( {cons_key: (constraints[cons_key][0] + constraints[cons_key][1]) / 2} ) return initial_params def _assemble_constraints_bayesian(constraints): """Assembles constraints in format required by gp_minimize. Parameters ---------- constraints : dict The user-defined constraints. Returns ------- cons_bayesian : list of tuples Lower and higher limit for each parameter. """ # assemble constraints in solver-specific format cons_bayesian = list(constraints.values()) return cons_bayesian def _assemble_constraints_cobyla(constraints): """Assembles constraints in format required by fmin_cobyla. Parameters ---------- constraints : dict The user-defined constraints. Returns ------- cons_bayesian : dict Set of functions. """ # assemble constraints in solver-specific format cons_cobyla = list() for idx, cons_key in enumerate(constraints): cons_cobyla.append(lambda x, idx=idx: float(constraints[cons_key][1]) - x[idx]) cons_cobyla.append(lambda x, idx=idx: x[idx] - float(constraints[cons_key][0])) return cons_cobyla def _assemble_constraints_cma(constraints): """Assembles constraints in format required by cma. Parameters ---------- constraints : dict The user-defined constraints. Returns ------- cons_cma : list of lists Lower and higher limit for each parameter. """ # assemble constraints in solver-specific format lower_bounds = [bound[0] for bound in constraints.values()] upper_bounds = [bound[1] for bound in constraints.values()] cons_cma = [lower_bounds, upper_bounds] return cons_cma def _update_params(initial_params, predicted_params): """Update params with predicted parameters. Parameters ---------- initial_params : dict Keys are parameter names, values are initial parameters. predicted_params : list Parameters selected by the optimizer. Returns ------- params : dict Keys are parameter names, values are parameters. """ params = dict() for param_key, param_name in enumerate(initial_params): params.update({param_name: predicted_params[param_key]}) return params def _run_opt_bayesian( initial_net, tstop, constraints, set_params, initial_params, obj_fun, max_iter, obj_fun_kwargs, ): """Runs optimization routine with gp_minimize optimizer. Parameters ---------- initial_net : instance of Network The network object. tstop : float The simulated dipole's duration. constraints : list of tuples Parameter constraints in solver-specific format. set_params : func User-defined function that sets parameters in network drives. initial_params : dict Keys are parameter names, values are initial parameters. obj_fun : func The objective function. max_iter : int Number of calls the optimizer makes. obj_fun_kwargs : dict Additional arguments along with their respective values to be passed to the objective function. Returns ------- opt_params : list Optimized parameters. obj : list Objective values. net_ : instance of Network Optimized network object. """ from ..externals.bayesopt import bayes_opt, expected_improvement # `obj_values` tracks optimizer loss over all epochs # the list is passed to `_obj_func()` and updated in place obj_values = list() def _obj_func(predicted_params): return obj_fun( initial_net=initial_net, initial_params=initial_params, set_params=set_params, predicted_params=predicted_params, update_params=_update_params, obj_values=obj_values, tstop=tstop, obj_fun_kwargs=obj_fun_kwargs, ) opt_results, _ = bayes_opt( func=_obj_func, x0=list(initial_params.values()), cons=constraints, acquisition=expected_improvement, maxfun=max_iter, ) # get optimized params opt_params = opt_results # get objective values obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)] # get optimized net params = _update_params(initial_params, opt_params) net_ = initial_net.copy() set_params(net_, params) return opt_params, obj, net_ def _run_opt_cma( initial_net, tstop, constraints, set_params, initial_params, obj_fun, max_iter, obj_fun_kwargs, ): """Runs optimization routine with CMA-ES Parameters ---------- initial_net : instance of Network The network object. tstop : float The simulated dipole's duration. constraints : list of tuples Parameter constraints in solver-specific format. set_params : func User-defined function that sets parameters in network drives. initial_params : dict Keys are parameter names, values are initial parameters. obj_fun : func The objective function. max_iter : int Number of calls the optimizer makes. obj_fun_kwargs : dict Additional arguments along with their respective values to be passed to the objective function. Returns ------- opt_params : list Optimized parameters. obj : list Objective values. net_ : instance of Network Optimized network object. """ import cma # `obj_values` tracks optimizer loss over all epochs # the list is passed to `_obj_func()` and updated in place obj_values = list() def _obj_func(predicted_params): return obj_fun( initial_net=initial_net, initial_params=initial_params, set_params=set_params, predicted_params=predicted_params, update_params=_update_params, obj_values=obj_values, tstop=tstop, obj_fun_kwargs=obj_fun_kwargs, ) default_sigma0 = 0.25 sigma0 = obj_fun_kwargs.get("sigma0", default_sigma0) _validate_type(sigma0, (int, float, list, np.ndarray)) if isinstance(sigma0, (float, int)): if sigma0 < 0: raise ValueError("sigma0 must be greater than zero") sigma0 = sigma0 * (np.array(constraints[1]) - np.array(constraints[0])) elif isinstance(sigma0, (list, np.ndarray)): if len(np.array(sigma0).shape) > 1: raise ValueError( f"When sigma0 is array-like, it must be shape (N,), got shape {np.array(sigma0).shape}" ) if len(sigma0) != len(constraints): raise ValueError( "When sigma0 is array-like, the length must be the same as the constraints." f"Expected {len(constraints)} elements, got {len(sigma0)} elements)" ) es = cma.CMAEvolutionStrategy( list(initial_params.values()), 1, { "bounds": constraints, "tolfun": obj_fun_kwargs.get("tolfun", 0.01), "maxiter": max_iter, "popsize": obj_fun_kwargs.get("popsize", 16), "CMA_stds": sigma0, "verbose": 0, "seed": obj_fun_kwargs.get("seed", 123), }, ) while not es.stop(): solutions = es.ask() es.tell(solutions, _obj_func(solutions)) es.result_pretty() # get best params best_params = es.result.xbest # get objective values obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)] # get optimized net params = _update_params(initial_params, best_params) net_ = initial_net.copy() set_params(net_, params) return best_params, obj, net_ def _run_opt_cobyla( initial_net, tstop, constraints, set_params, initial_params, obj_fun, max_iter, obj_fun_kwargs, ): """Runs optimization routine with fmin_cobyla optimizer. Parameters ---------- initial_net : instance of Network The network object. tstop : float The simulated dipole's duration. constraints : list of tuples Parameter constraints in solver-specific format. set_params : func User-defined function that sets parameters in network drives. initial_params : dict Keys are parameter names, values are initial parameters. obj_fun : func The objective function. max_iter : int Number of calls the optimizer makes. obj_fun_kwargs : dict Additional arguments along with their respective values to be passed to the objective function. Returns ------- opt_params : list Optimized parameters. obj : list Objective values. net_ : instance of Network Optimized network object. """ # `obj_values` tracks optimizer loss over all epochs # the list is passed to `_obj_func()` and updated in place obj_values = list() def _obj_func(predicted_params): return obj_fun( initial_net=initial_net, initial_params=initial_params, set_params=set_params, predicted_params=predicted_params, update_params=_update_params, obj_values=obj_values, tstop=tstop, obj_fun_kwargs=obj_fun_kwargs, ) opt_results = fmin_cobyla( _obj_func, cons=constraints, rhobeg=0.1, rhoend=1e-4, x0=list(initial_params.values()), maxfun=max_iter, catol=0.0, ) # get optimized params opt_params = opt_results # get objective values obj = [np.min(obj_values[:idx]) for idx in range(1, max_iter + 1)] # get optimized net params = _update_params(initial_params, opt_params) net_ = initial_net.copy() set_params(net_, params) return opt_params, obj, net_