Source code for hnn_core.optimization

"""Parameter optimization functions."""

# Authors: Blake Caldwell <blake_caldwell@brown.edu>
#          Mainak Jas <mjas@mgh.harvard.edu>

from math import ceil, floor
from collections import OrderedDict

import numpy as np
import scipy.stats as stats
from scipy.optimize import fmin_cobyla

from .network_models import jones_2009_model
from .dipole import _rmse


def _get_range(val, multiplier):
    """Get range of values to sweep over."""
    range_min = max(0, val - val * multiplier / 100.)
    range_max = val + val * multiplier / 100.
    ranges = {'initial': val, 'minval': range_min, 'maxval': range_max}
    return ranges


def _split_by_evinput(params, sigma_range_multiplier, timing_range_multiplier,
                      synweight_range_multiplier):
    """ Sorts parameter ranges by evoked inputs into a dictionary

    Parameters
    ----------
    params: an instance of Params
        Full set of simulation parameters
    sigma_range_multiplier : float
        The scale of sigma values to sweep over.
    timing_range_multiplier : float
        The scale of timing values to sweep over.
    synweight_range_multiplier : float
        The scale of input synaptic weights to sweep over.

    Returns
    -------
    sorted_evinput_params: dict
        Dictionary with parameters grouped by evoked inputs.
        Keys for each evoked input are
        'mean', 'sigma', 'ranges', 'start', and 'end'.
        sorted_evinput_params['evprox1']['ranges'] is a dict
        with keys as the parameters to be optimized and values
        indicating the ranges over which they should be
        optimized. E.g.,
        sorted_evinput['evprox1']['ranges'] =
        {
            'gbar_evprox_1_L2Pyr_ampa':
                {
                    'initial': 0.05125,
                    'minval': 0,
                    'maxval': 0.0915
                }
        }
        Elements are sorted by their start time.
    """

    evinput_params = {}
    for evinput_t in params['t_*']:
        id_str = evinput_t.lstrip('t_')
        timing_mean = float(params[evinput_t])

        if f'sigma_{evinput_t}' in params:
            timing_sigma = float(params['sigma_' + evinput_t])
        else:
            timing_sigma = 3.0
            print("Couldn't fing timing_sigma. Using default %f" %
                  timing_sigma)

        if timing_sigma == 0.0:
            # sigma of 0 will not produce a CDF
            timing_sigma = 0.01

        evinput_params[id_str] = {'mean': timing_mean, 'sigma': timing_sigma,
                                  'ranges': {}}

        evinput_params[id_str]['ranges']['sigma_' + evinput_t] = \
            _get_range(timing_sigma, sigma_range_multiplier)

        # calculate range for time
        timing_bound = timing_sigma * timing_range_multiplier
        range_min = max(0, timing_mean - timing_bound)
        range_max = min(float(params['tstop']), timing_mean + timing_bound)

        evinput_params[id_str]['start'] = range_min
        evinput_params[id_str]['end'] = range_max
        evinput_params[id_str]['ranges'][evinput_t] =  \
            {'initial': timing_mean, 'minval': range_min, 'maxval': range_max}

        # calculate ranges for syn. weights
        for label in params[f'gbar_{id_str}*']:
            value = params[label]
            ranges = _get_range(value, synweight_range_multiplier)
            if value == 0.0:
                ranges['minval'] = value
                ranges['maxval'] = 1.0
            evinput_params[id_str]['ranges'][label] = ranges

    sorted_evinput_params = OrderedDict(sorted(evinput_params.items(),
                                               key=lambda x: x[1]['start']))
    return sorted_evinput_params


def _generate_weights(evinput_params, params, decay_multiplier):
    """Calculation of weight function for wRMSE calcuation

    Returns
    -------
    evinput_params : dict
        Adds the keys 'weights', 'opt_start', 'opt_end'
        to evinput_params[input_name] and removes 'mean'
        and 'sigma' which were needed to compute 'weights'.
    """
    tstop, dt = params['tstop'], params['dt']
    num_step = ceil(tstop / dt) + 1
    times = np.linspace(0, tstop, num_step)

    for evinput_this in evinput_params.values():
        # calculate cdf using start time (minival of optimization range)
        evinput_this['cdf'] = stats.norm.cdf(
            times, evinput_this['start'], evinput_this['sigma'])

    for input_name, evinput_this in evinput_params.items():
        evinput_this['weights'] = evinput_this['cdf'].copy()

        for other_input, evinput_other in evinput_params.items():
            # check ordering to only use inputs after us
            # and don't subtract our own cdf(s)
            if (evinput_other['mean'] < evinput_this['mean'] or
                    input_name == other_input):
                continue

            decay_factor = decay_multiplier * \
                (evinput_other['mean'] - evinput_this['mean']) / tstop
            evinput_this['weights'] -= evinput_other['cdf'] * decay_factor

        # weights should not drop below 0
        np.clip(evinput_this['weights'], a_min=0, a_max=None,
                out=evinput_this['weights'])

        # start and stop optimization where the weights are insignificant
        indices = np.where(evinput_this['weights'] > 0.01)
        evinput_this['opt_start'] = min(evinput_this['start'],
                                        times[indices][0])
        evinput_this['opt_end'] = max(evinput_this['end'],
                                      times[indices][-1])

        # convert to multiples of dt
        evinput_this['opt_start'] = floor(evinput_this['opt_start'] / dt) * dt
        evinput_params[input_name]['opt_end'] = ceil(
            evinput_this['opt_end'] / dt) * dt

    for evinput_this in evinput_params.values():
        del evinput_this['mean'], evinput_this['sigma'], evinput_this['cdf']

    return evinput_params


def _create_last_chunk(input_chunks):
    """ This creates a chunk that combines parameters for
    all chunks in input_chunks (final step)

    Parameters
    ----------
    input_chunks: List
        List of ordered chunks for optimization

    Returns
    -------
    chunk: dict
        Dictionary of with parameters for combined
        chunk (final step)
    """
    chunk = {'inputs': [], 'ranges': {}, 'opt_start': 0.0,
             'opt_end': 0.0}

    for evinput in input_chunks:
        chunk['inputs'].extend(evinput['inputs'])
        chunk['ranges'].update(evinput['ranges'])
        if evinput['opt_end'] > chunk['opt_end']:
            chunk['opt_end'] = evinput['opt_end']

    # wRMSE with weights of 1's is the same as regular RMSE.
    chunk['weights'] = np.ones_like(input_chunks[-1]['weights'])

    return chunk


def _consolidate_chunks(inputs):
    """Consolidates inputs into optimization "chunks" defined by
    opt_start and opt_end

    Parameters
    ----------
    inputs: dict
        Sorted dictionary of inputs with their parameters
        and weight functions

    Returns
    -------
    chunks: list
        Combine the evinput_params whenever the end is overlapping
        with the next.
    """
    chunks = list()
    for input_name in inputs:
        input_dict = inputs[input_name].copy()
        input_dict['inputs'] = [input_name]

        if (len(chunks) > 0 and
                input_dict['start'] <= chunks[-1]['end']):
            # update previous chunk
            chunks[-1]['inputs'].extend(input_dict['inputs'])
            chunks[-1]['end'] = input_dict['end']
            chunks[-1]['ranges'].update(input_dict['ranges'])
            chunks[-1]['opt_end'] = max(chunks[-1]['opt_end'],
                                        input_dict['opt_end'])
            # average the weights
            chunks[-1]['weights'] = (chunks[-1]['weights'] +
                                     input_dict['weights']) / 2
        else:
            # new chunk
            chunks.append(input_dict)

    # add one last chunk to the end
    if len(chunks) > 1:
        last_chunk = _create_last_chunk(chunks)
        chunks.append(last_chunk)

    return chunks


def _optrun(new_params, opt_params, params, opt_dpls, scale_factor,
            smooth_window_len):
    """This is the function to run a simulation

    Parameters
    ----------
    new_params: array
        List or numpy array with the parameters chosen by
        optimization engine. Order is consistent with
        opt_params['ranges'].
    opt_params : dict
        The optimization parameters
    params : dict
        The params dictionary.
    opt_dpls : dict
        Dictionary with keys 'target_dpl' and 'best' for
        the experimental dipole and best dipole.
    scale_factor : float
        Scales the simulated dipoles by scale_factor to match
        exp_dpl.
    smooth_window_len : int
        The length of the hamming window (in samples) to smooth the
        simulated dipole waveform in each optimization step.

    Returns
    -------
    avg_rmse: float
        Weighted RMSE between data in dpl and exp_dpl
    """
    print("Optimization step %d, iteration %d" % (opt_params['cur_step'] + 1,
                                                  opt_params['optiter'] + 1))

    from .parallel_backends import _BACKEND, JoblibBackend
    if _BACKEND is None:
        _BACKEND = JoblibBackend(n_jobs=1)

    # set parameters
    # tiny negative weights are possible. Clip them to 0.
    new_params[new_params < 0] = 0
    for param_name, test_value in zip(opt_params['ranges'].keys(), new_params):
        params[param_name] = test_value

    # run the simulation, but stop early if possible
    net = jones_2009_model(params, add_drives_from_params=True)
    tstop = params['tstop'] = opt_params['opt_end']
    net._instantiate_drives(n_trials=1, tstop=tstop)
    avg_dpl = _BACKEND.simulate(net, tstop=tstop, dt=0.025, n_trials=1)[0]
    avg_dpl = avg_dpl.scale(scale_factor)
    if smooth_window_len is not None:
        avg_dpl = avg_dpl.smooth(smooth_window_len)

    avg_rmse = _rmse(avg_dpl, opt_dpls['target_dpl'],
                     tstart=opt_params['opt_start'],
                     tstop=opt_params['opt_end'],
                     weights=opt_params['weights'])

    if avg_rmse < opt_params['stepminopterr']:
        best = "[best] "
        opt_params['stepminopterr'] = avg_rmse
        opt_dpls['best_dpl'] = avg_dpl
    else:
        best = ""

    print("%sweighted RMSE: %.2f over range [%3.3f-%3.3f] ms" %
          (best, avg_rmse, opt_params['opt_start'], opt_params['opt_end']))

    opt_params['optiter'] += 1

    return avg_rmse  # nlopt expects error


def _run_optimization(maxiter, param_ranges, optrun):

    cons = list()
    x0 = list()
    for idx, param_name in enumerate(param_ranges):
        x0.append(param_ranges[param_name]['initial'])
        cons.append(
            lambda x, idx=idx: param_ranges[param_name]['maxval'] - x[idx])
        cons.append(
            lambda x, idx=idx: x[idx] - param_ranges[param_name]['minval'])
    result = fmin_cobyla(optrun, cons=cons, rhobeg=0.1, rhoend=1e-4,
                         x0=x0, maxfun=maxiter, catol=0.0)
    return result


[docs]def optimize_evoked(params, target_dpl, initial_dpl, maxiter=50, timing_range_multiplier=3.0, sigma_range_multiplier=50.0, synweight_range_multiplier=500.0, decay_multiplier=1.6, scale_factor=1., smooth_window_len=None): """Optimize drives to generate evoked response. Parameters ---------- params : dict The initial params target_dpl : instance of Dipole The target experimental dipole. initial_dpl : instance of Dipole The initial dipole to start the optimization. maxiter : int The maximum number of simulations to run for optimizing one "chunk". timing_range_multiplier : float The scale of timing values to sweep over. sigma_range_multiplier : float The scale of sigma values to sweep over. synweight_range_multiplier : float The scale of input synaptic weights to sweep over. decay_multiplier : float The decay multiplier. scale_factor : float Scales the simulated dipoles by scale_factor to match target_dpl. smooth_window_len : int The length of the hamming window (in samples) to smooth the simulated dipole waveform in each optimization step. Returns ------- params : dict The optimized params dictionary. Notes ----- This optimization protocol utilizes the Constrained Optimization By Linear Approximation (COBYLA) method: https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.fmin_cobyla.html # noqa """ from .parallel_backends import _BACKEND, JoblibBackend if _BACKEND is None: _BACKEND = JoblibBackend(n_jobs=1) params = params.copy() # Create a sorted dictionary with the inputs and parameters # belonging to each. # Then, calculate the appropriate weight function to be used # in RMSE using the CDF defined by the input timing's mean and # std. deviation parameters. # Lastly, use the weights to define non-overlapping "chunks" of # the simulation timeframe to optimize. Chunks are consolidated if # more than one input should # be optimized at a time. evinput_params = _split_by_evinput(params, sigma_range_multiplier, timing_range_multiplier, synweight_range_multiplier) evinput_params = _generate_weights(evinput_params, params, decay_multiplier) param_chunks = _consolidate_chunks(evinput_params) avg_rmse = _rmse(initial_dpl, target_dpl, tstop=params['tstop']) print("Initial RMSE: %.2f" % avg_rmse) opt_params = dict() for step in range(len(param_chunks)): opt_params['cur_step'] = step total_steps = len(param_chunks) # param_chunks is the optimization information for all steps. # opt_params is a pointer to the params for each step opt_params.update(param_chunks[step]) if maxiter == 0: print("Skipping optimization step %d (0 simulations)" % (step + 1)) continue if (opt_params['cur_step'] > 0 and opt_params['cur_step'] == total_steps - 1): # update currently used params for var_name in opt_params['ranges']: params[var_name] = opt_params['ranges'][var_name]['initial'] # For the last step (all inputs), recalculate ranges and update # param_chunks. If previous optimization steps increased # std. dev. this could result in fewer optimization steps as # inputs may be deemed too close together and be grouped in a # single optimization step. # # The purpose of the last step (with regular RMSE) is to clean up # overfitting introduced by local weighted RMSE optimization. evinput_params = _split_by_evinput(params, sigma_range_multiplier, timing_range_multiplier, synweight_range_multiplier) evinput_params = _generate_weights(evinput_params, params, decay_multiplier) param_chunks = _consolidate_chunks(evinput_params) # reload opt_params for the last step in case the number of # steps was changed by updateoptparams() opt_params.update(param_chunks[total_steps - 1]) print("Starting optimization step %d/%d" % (step + 1, total_steps)) opt_params['optiter'] = 0 opt_params['stepminopterr'] = 1e9 # min optimization error so far opt_dpls = dict(best_dpl=None, target_dpl=target_dpl) def _myoptrun(new_params): return _optrun(new_params, opt_params, params, opt_dpls=opt_dpls, scale_factor=scale_factor, smooth_window_len=smooth_window_len) print('Optimizing from [%3.3f-%3.3f] ms' % (opt_params['opt_start'], opt_params['opt_end'])) opt_results = _run_optimization(maxiter=maxiter, param_ranges=opt_params['ranges'], optrun=_myoptrun) # tiny negative weights are possible. Clip them to 0. opt_results[opt_results < 0] = 0 # update opt_params for the next round for var_name, value in zip(opt_params['ranges'], opt_results): opt_params['ranges'][var_name]['initial'] = value # save the optimized params for var_name in opt_params['ranges']: params[var_name] = opt_params['ranges'][var_name]['initial'] avg_rmse = _rmse(opt_dpls['best_dpl'], target_dpl, tstop=params['tstop']) print("Final RMSE: %.2f" % avg_rmse) return params