"""Implements multi-agent learning rules"""
import warnings
from copy import deepcopy
import math
from abc import ABC, abstractmethod
from typing import Tuple, Type, Callable
from time import perf_counter as timer
import sympy.ntheory as sympy
import torch
from torch.nn.utils import parameters_to_vector, vector_to_parameters
from bnelearn.environment import Environment
from bnelearn.strategy import Strategy, NeuralNetStrategy
[docs]class Learner(ABC):
"""A learning rule used to update a player's policy in self-play"""
[docs] @abstractmethod
def update_strategy(self) -> None:
"""Updates the player's strategy."""
raise NotImplementedError()
[docs] @abstractmethod
def update_strategy_and_evaluate_utility(self, closure = None) -> torch.Tensor:
"""updates model and returns utility after the update."""
pass
[docs]class GradientBasedLearner(Learner):
"""A learning rule that is based on computing some version of (pseudo-)
gradient, then applying an SGD-like update via a ``torch.optim.Optimizer``
"""
def __init__(self,
model: torch.nn.Module, environment: Environment,
optimizer_type: Type[torch.optim.Optimizer], optimizer_hyperparams: dict,
scheduler_type: Type[torch.optim.lr_scheduler._LRScheduler] = None,
scheduler_hyperparams: dict = None, strat_to_player_kwargs: dict = None):
self.model = model
self.params = model.parameters
self.n_parameters = sum([p.numel() for p in self.params()])
self.environment = environment
self.strat_to_player_kwargs = strat_to_player_kwargs if strat_to_player_kwargs else {}
# warn if weird initialization
if 'player_position' not in self.strat_to_player_kwargs.keys():
warnings.warn('You haven\'t specified a player_position to evaluate the model. Defaulting to position 0.')
self.strat_to_player_kwargs['player_position'] = 0
if not isinstance(optimizer_hyperparams, dict):
raise ValueError('Optimizer hyperparams must be a dict (even if empty).')
self.optimizer_hyperparams = optimizer_hyperparams
self.optimizer: torch.optim.Optimizer = optimizer_type(self.params(), **self.optimizer_hyperparams)
if scheduler_type is None:
self.scheduler = None
else:
self.scheduler_hyperparams = scheduler_hyperparams
self.scheduler: torch.optim.lr_scheduler._LRScheduler = \
scheduler_type(self.optimizer, **self.scheduler_hyperparams)
@abstractmethod
def _set_gradients(self):
"""Calculate current (pseudo)gradient for all params."""
[docs] def update_strategy(self, closure: Callable=None) -> torch.Tensor or None: # pylint: disable=arguments-differ
"""Performs one model-update to the player's strategy.
Params:
closure: (optional) Callable that recomputes model loss.
Required by some optimizers such as LBFGS. When given,
optimizer.step() (and thus this function) return the last
evaluated loss. (Usually evaluated BEFORE the model update).
For correct usage see:
https://pytorch.org/docs/stable/optim.html#optimizer-step-closure
Returns: None or loss evaluated by closure. (See above.)
"""
self.optimizer.zero_grad()
self._set_gradients()
step = self.optimizer.step(closure=closure)
if self.scheduler is not None:
reward = self.environment.get_strategy_reward(self.model, **self.strat_to_player_kwargs).detach()
self.scheduler.step(reward)
return step
[docs] def update_strategy_and_evaluate_utility(self, closure = None):
"""updates model and returns utility after the update."""
self.update_strategy(closure)
return self.environment.get_strategy_reward(
self.model,
**self.strat_to_player_kwargs
).detach()
[docs]class ESPGLearner(GradientBasedLearner):
"""Neural Self-Play with Evolutionary Strategy Pseudo-PG as proposed in
Bichler et. al (2021).
Uses pseudo-policy gradients calculated as
``(rewards - baseline).mean() * epsilons / sigma**2``
over a population of models perturbed by parameter noise epsilon yielding
perturbed rewards.
Arguments:
model: bnelearn.bidder
environment: bnelearn.Environment
hyperparams: dict
(required:)
population_size: int
sigma: float
scale_sigma_by_model_size: bool
(optional:)
normalize_gradients: bool (default: False)
If true will scale rewards to N(0,1) in weighted-noise update:
(F - baseline).mean()/sigma/F.std() resulting in an (approximately)
normalized vector pointing in the same direction as the true
gradient. (normalization requires small enough sigma!)
If false or not provided, will approximate true gradient
using current utility as a baseline for variance reduction.
baseline: ('current_reward', 'mean_reward' or a float.)
If 'current_reward', will use current utility before update as a baseline.
If 'mean_reward', will use mean of candidate rewards.
For small perturbations, 'mean_reward' is cheaper to compute
(one fewer game played) and yields slightly lower gradient
sample variance but yields a biased estimate of the true gradient:
Expect(ES_grad with mean) = (pop_size - 1) / pop_size * true_grad
If a float is given, will use that float as reward.
Defaults to 'current_reward' if normalize_gradients is False, or
to 'mean_reward' if normalize_gradients is True.
regularization: dict of
initial_strength: float, initial penalization factor of bid value
regularize_decay: float, decay rate by which the regularization factor
is multiplied each iteration.
symmetric_sampling: bool
whether or not we sample symmetric pairs of perturbed parameters, e.g.
p + eps and p - eps.
optimizer_type: Type[torch.optim.Optimizer]
A class implementing torch's optimizer interface used for parameter update step.
strat_to_player_kwargs: dict
dict of arguments provided to environment used for evaluating
utility of current and candidate strategies.
"""
def __init__(self,
model: torch.nn.Module, environment: Environment, hyperparams: dict,
optimizer_type: Type[torch.optim.Optimizer], optimizer_hyperparams: dict,
scheduler_type: Type[torch.optim.lr_scheduler._LRScheduler] = None,
scheduler_hyperparams: dict = None, strat_to_player_kwargs: dict = None):
# Create and validate optimizer
super().__init__(model, environment,
optimizer_type, optimizer_hyperparams,
scheduler_type, scheduler_hyperparams,
strat_to_player_kwargs)
# Validate ES hyperparams
if not set(['population_size', 'sigma', 'scale_sigma_by_model_size']) <= set(hyperparams):
raise ValueError(
'Missing hyperparams for ES. Provide at least, population size, sigma and scale_sigma_by_model_size.')
if not isinstance(hyperparams['population_size'], int) or hyperparams['population_size'] < 2:
# one is invalid because there will be zero variance, leading to div by 0 errors
raise ValueError('Please provide a valid `population_size` parameter >=2')
# set hyperparams
self.population_size = hyperparams['population_size']
self.sigma = float(hyperparams['sigma'])
self.sigma_base = self.sigma
if hyperparams['scale_sigma_by_model_size']:
self.sigma = self.sigma / self.n_parameters
if 'normalize_gradients' in hyperparams and hyperparams['normalize_gradients']:
self.normalize_gradients = True
self.baseline = 'mean_reward'
else:
self.normalize_gradients = False
self.baseline = 'current_reward'
# overwrite baseline method if provided
if 'baseline' in hyperparams:
self.baseline_method = hyperparams['baseline']
if not isinstance(self.baseline_method, float) \
and not self.baseline_method in ['current_reward', 'mean_reward']:
raise ValueError('Invalid baseline provided. Should be float or '\
+ 'one of "mean_reward", "current_reward"')
if 'regularization' in hyperparams:
self.regularize = hyperparams['regularization']['initial_strength']
self.regularize_decay = hyperparams['regularization']['regularize_decay']
else:
self.regularize = 0.0
self.regularize_decay = 1.0
if 'symmetric_sampling' in hyperparams:
self.symmetric_sampling = hyperparams['symmetric_sampling']
else:
self.symmetric_sampling = False
def _set_gradients(self):
"""Calculates ES-pseudogradients and applies them to the model parameter
gradient data.
ES gradient is calculated as:
mean( rewards - baseline) * epsilons / sigma²
and approximates the true gradient.
In case of gradient normalization, we do not calculate a baseline
and instead use the following pseudogradient:
mean(rewards - rewards.mean()) / sigma / rewards.std()
For small sigma, this will yield a vector that points in the same
direction as the gradient and has length (slightly smaller than) 1.
Furthermore, the gradient samples will have low variance
Note that for large sigma, this grad becomes smaller tha
"""
### 1. if required redraw valuations / perform random moves (determined by env)
self.environment.prepare_iteration()
### 2. Create a population of perturbations of the original model
if not self.symmetric_sampling:
population = (self._perturb_model(self.model) for _ in range(self.population_size))
else:
mid = int(self.population_size / 2.)
population = [self._perturb_model(self.model) for _ in range(mid)]
sym_pop = [
self._perturb_model(self.model, -e)
for _, e in population
]
population += sym_pop
### 3. let each candidate against the environment and get their utils ###
# both of these as a row-matrix. i.e.
# rewards: population_size x 1
# epsilons: population_size x parameter_length
self.regularize *= self.regularize_decay
rewards, epsilons = (
torch.cat(tensors).view(self.population_size, -1)
for tensors in zip(*(
(
self.environment.get_strategy_reward(
model, **self.strat_to_player_kwargs, regularize=self.regularize
).detach().view(1),
epsilon
)
for (model, epsilon) in population
))
)
### 4. calculate the ES-pseuogradients ####
# See ES_Analysis notebook in repository for more information about where
# these choices come from.
if self.baseline == 'current_reward':
baseline = self.environment.get_strategy_reward(
self.model, regularize=self.regularize,
**self.strat_to_player_kwargs
).detach().view(1)
elif self.baseline == 'mean_reward':
baseline = rewards.mean(dim=0)
else: # baseline is a float
baseline = self.baseline
denominator = self.sigma * rewards.std() if self.normalize_gradients else self.sigma**2
if denominator == 0:
# all candidates returned same reward and normalize is true --> stationary
gradient_vector = torch.zeros_like(parameters_to_vector(self.params()))
else:
gradient_vector = ((rewards - baseline)*epsilons).mean(dim=0) / denominator
# put gradient vector into same format as model parameters
gradient_params = deepcopy(list(self.params()))
vector_to_parameters(gradient_vector, gradient_params)
### 5. assign gradients to model gradient ####
# We actually _add_ to existing gradient (as common in pytorch), to make it
# possible to accumulate gradients over multiple batches.
# When this is not desired (most of the time!), you need to flush the gradients
# before calling this method.
# NOTE: torch.otpimizers minimize but we use a maximization formulation
# in the rewards, thus we need to use the negative gradient here.
for p, d_p in zip(self.params(), gradient_params):
if p.grad is not None:
p.grad.add_(-d_p)
else:
p.grad = -d_p
def _perturb_model(self, model: torch.nn.Module, noise: torch.Tensor = None) -> Tuple[torch.nn.Module, torch.Tensor]:
"""
Returns a randomly perturbed copy of a model [torch.nn.Module],
as well as the noise vector used to generate the perturbation.
"""
perturbed = deepcopy(model)
params_flat = parameters_to_vector(model.parameters())
if noise is None:
noise = torch.zeros_like(params_flat).normal_(mean=0.0, std=self.sigma)
# copy perturbed params into copy
vector_to_parameters(params_flat + noise, perturbed.parameters())
return perturbed, noise
[docs]class PGLearner(GradientBasedLearner):
"""Neural Self-Play with directly computed Policy Gradients.
"""
def __init__(self, hyperparams: dict, **kwargs):
# Create and validate optimizer
super().__init__(**kwargs)
if 'normalize_gradient' in hyperparams and hyperparams['normalize_gradient']:
self.normalize_gradient = True
else:
self.normalize_gradients = False
if 'baseline' in hyperparams:
self.baseline_method = hyperparams['baseline']
if not isinstance(self.baseline_method, float) \
and not self.baseline_method in ['current_reward']:
raise ValueError('Invalid baseline provided. Should be float or '\
+ '"current_reward"')
if isinstance(self.baseline_method, float):
self.baseline = self.baseline_method
self.baseline_method = 'manual'
else:
self.baseline = 0 # initial baseline
else:
# standard baseline
self.baseline_method = 'current_reward'
self.baseline = 0 # init
def _set_gradients(self):
self.environment.prepare_iteration()
if self.baseline_method == 'current_reward':
self.baseline = self.environment.get_strategy_reward(
self.model,**self.strat_to_player_kwargs
).detach().view(1)
else:
pass # is already constant float
loss = -self.environment.get_strategy_reward(
self.model,**self.strat_to_player_kwargs
)
loss.backward()
[docs]class PSOLearner(Learner):
""" Implements the Particle Swarm Optimization Algorithm as a Learner
Particles represent a possible solutions to the model parameters.
Every update step they move one step in the search space to sample a new solution point.
They are guided by their previously best found solution (personal best position)
and the best solution found by the entire swarm (best position)
NOTE: dim = number of parameters in the model to be optimized
Arguments:
model: bnelearn.bidder
environment: bnelearn.Environment
hyperparams: dict
(required:)
swarm_size: int
Number of particles in the swarm
topology: str
Defines the communication network of the swarm
If 'global', particles are drawn to the global best position of the swarm.
Neighborhood size = swarm size
If 'ring', particles are drawn to the best position in their neighborhood.
Particles form a neighborhood based on their position in the population array.
The first and last particles are connected to form a ring structure.
Neighborhood size = 3. E.g., neighborhood of particle i: particle i-1, particle i, particle i+1
If 'von_neumann', particles are drawn to the best position in their neighborhood.
Particles form a neighborhood based on their position in the population matrix.
A particle is connected to its left, right, upper and lower neighbor in the matrix.
Neighborhood size = 5
max_velocity: float
Max step size in each direction during one update step
If velocity_clamping == False then only used for initialization
(optional:)
The default values for the inertia weight and the cognition & social ratio are commonly used values
performing well form most problem settings. Based on: Clerc, M., & Kennedy, J. (2002)
inertia_weight: float, List, Tuple (default: 0.792)
Scales the impact of the old velocity on the new one.
If float, will set value as constant
If List or Tuple, with lenght == 2, will take the first value as w_max and second as w_min for a
linear decreasing inertia weight
!!! max number of iteration is hardcoded to 2000 !!!
cognition_ratio: float (default: 1.49445)
Upper limit for the impact of the personal best solution on the velocity
social_ratio: float (default: 1.49445)
Upper limit for the impact of the swarm's best solution on the velocity
reevaluation_frequency: int (default: None)
Number of epochs after which the personal and overall bests are reevaluated
to prevent false memory introduced by varying batch data
decrease_fitness: List or Tuple (default None)
The to evaporation constants are used to reduce the remembered fitness of the bests to prevent
false memory introduced by varying batch data.
!!! Use either 'reevaluation_frequency'or 'decrease_fitness' !!!
with lenght == 2, will take the first value as evaporation constant for personal best
and second as evaporation constant for global (neighborhood) best
pretrain_deviation: float (default: 0)
If pretrain_deviation > 0 the positions will be initialized as:
model.parameters + N(mean=0.0, std=pretrain_deviation)
otherwise positions will be initialized randomly over the whole search space
bound_handling: bool (default: False)
If true will clamp particle's positions in each dim to the interval [-max_position, max_position]
velocity_clamping: bool (default: True)
If true will clamp particle's velocities in each dim to the interval [-max_velocity, max_velocity]
before adding to the positions
optimizer_type: Type[torch.optim.Optimizer]
A class implementing torch's optimizer interface used for parameter update step.
PSO does not need an torch optimizer to compute an parameter update step.
-> currently only used to have an consistent interface with other learners
optimizer_hyperparams: dict
strat_to_player_kwargs: dict
Dict of arguments provided to environment used for evaluating utility of current and candidate strategies.
"""
def __init__(self,
model: torch.nn.Module, environment: Environment, hyperparams: dict,
optimizer_type: Type[torch.optim.Optimizer], optimizer_hyperparams: dict,
strat_to_player_kwargs: dict = None):
self.model = model
self.particle_evaluation_model = deepcopy(model)
# PSO does not need gradient computation
for param in self.particle_evaluation_model.parameters():
param.requires_grad = False
self.environment = environment
self.cur_epoch = 0
# for logging
self.writer = None
self.utility_eval_counter = 0
self.strat_to_player_kwargs = strat_to_player_kwargs if strat_to_player_kwargs else {}
# warn if weird initialization
if 'player_position' not in self.strat_to_player_kwargs.keys():
warnings.warn('You haven\'t specified a player_position to evaluate the model. Defaulting to position 0.')
self.strat_to_player_kwargs['player_position'] = 0
# validate PSO hyperparams
if not set(['swarm_size', 'topology']) <= set(hyperparams):
raise ValueError('Missing hyperparams for PSO. Provide at least, swarm_size, topology.')
if not isinstance(hyperparams['swarm_size'], int) or hyperparams['swarm_size'] < 2:
raise ValueError('Please provide a valid `swarm_size` parameter >=2')
if hyperparams['topology'] not in ['global', 'Global', 'GLOBAL', 'von_neumann', 'von_Neumann', 'VON_NEUMANN',
'ring', 'Ring', 'RING']:
raise ValueError('Please provide a valid `topology`')
self.topology = hyperparams['topology']
# params needed only for initialization
swarm_size = hyperparams['swarm_size']
n_parameters = sum([p.numel() for p in self.model.parameters()])
# search range
if 'pretrain_deviation' in hyperparams:
pretrain_deviation = float(hyperparams['pretrain_deviation'])
else:
pretrain_deviation = 0.0
# model params a commonly initialized within max range [-1, 1]
# i.e., pytorch linear: stdv = 1. / math.sqrt(input_length); self.weight.data.uniform_(-stdv,stdv)
max_position_init = 1.0
max_velocity = max_position_init
max_position = max_position_init + pretrain_deviation
# initialize non-required parameters
if 'inertia_weight' in hyperparams:
if isinstance(hyperparams['inertia_weight'], float):
self.inertia = float(hyperparams['inertia_weight'])
self.decrease_w = False
else:
self.inertia_max = float(hyperparams['inertia_weight'][0])
self.inertia_min = float(hyperparams['inertia_weight'][1])
self.decrease_w = True
else:
self.inertia = 0.729
self.decrease_w = False
if 'cognition_ratio' in hyperparams:
self.cognition = float(hyperparams['cognition_ratio'])
else:
self.cognition = 1.49445
if 'social_ratio' in hyperparams:
self.social = float(hyperparams['social_ratio'])
else:
self.social = 1.49445
if 'reevaluation_frequency' in hyperparams:
self.reevaluation_frequency = int(hyperparams['reevaluation_frequency'])
else:
self.reevaluation_frequency = False
if 'decrease_fitness' in hyperparams:
self.decrease_pbest = float(hyperparams['decrease_fitness'][0])
self.decrease_best = float(hyperparams['decrease_fitness'][1])
self.decrease_fitness = True
else:
self.decrease_fitness = False
if 'bound_handling' in hyperparams and hyperparams['bound_handling']:
self.bound_handling = True
self.max_position = max_position
else:
self.bound_handling = False
if 'velocity_clamping' in hyperparams and not hyperparams['velocity_clamping']:
self.velocity_clamping = False
else:
self.velocity_clamping = True
self.max_velocity = max_velocity
#### --- initialize the swarm ---
# positions
if pretrain_deviation > 0:
# perturbation of pretrained model params
self.position = torch.zeros(swarm_size, n_parameters, device=torch.cuda.current_device()).normal_(mean=0.0,
std=pretrain_deviation)
self.position.add_(parameters_to_vector(self.model.parameters()))
else:
# random positions
self.position = 2 * max_position * torch.rand(swarm_size, n_parameters,
device=torch.cuda.current_device()) - max_position
# velocities
self.velocity = 2 * max_velocity * torch.rand_like(self.position) - max_velocity
# option for evaluation: zero velocities:
# self.velocity = torch.zeros_like(self.position)
# personal best fitness and positions
self.pbest_fitness = torch.full((swarm_size,), float("Inf"),
dtype=torch.float32, device=self.position.device)
self.pbest_position = torch.empty_like(self.position)
# the shape of swarm's best position and fitness depend on the topology structure
self.best_fitness, self.best_position, self.neighborhood = self._calculate_neighborhood(swarm_size)
def _calculate_neighborhood(self, swarm_size):
"""Initializes the swarm's best position and fitness
and information structure (neighborhood) defining the social attractor for each particle
Arguments:
swarm_size: int
Number of particles in the swarm
Returns:
best_fitness: Tensor
The fitness value of the social attractor for each particle
best_position: Tensor
The position of the social attractor for each particle
neighborhood: Tensor
The indices of all particles part of the particle's neighborhood for each particle
If a global topology is used the neighborhood size = swarm_size
all particle remember the same best position and fitness
only one global attractor is used as social influence
best_position: 1 x n_params, best_fitness: 1 x 1 (single value), neighborhood: None
If a local topology is use (ring, von Neumann) a neighborhood is defined for each particle
each particle is attracted by the local best position and fitness of its neighborhood
the neighborhood tensor holds the particle indices for each neighborhood
best_position: swarm_size x n_params, best_fitness: 1 x swarm_size, neighborhood: swarm_size x neighborhood_size
"""
if self.topology == 'global':
# all particle use the same global position as reference -> no neighborhood indices necessary
# the position will be set in step 0
return torch.tensor([float("Inf")], device=self.position.device), None, None
index = torch.unsqueeze(torch.arange(0, swarm_size, dtype=torch.long), 1)
if self.topology == 'ring':
# a neighborhood consists of 3 particle, the particle itself and its left and right index neighbor
# neighborhood of particle i: particle i-1, particle i, particle i+1
# first and last particle are connected to form a ring network
# NOTE: torch.remainder: The remainder has the same sign as the divisor
# neighborhood: swarm size x 3, structure: [left index, particle index, right index]
neighborhood = index.repeat(1, 3)
neighborhood.add_(torch.tensor([-1, 0, 1], dtype=torch.long)).remainder_(swarm_size)
else:
### --- von Neumann ---
# a neighborhood consists of 5 particle, the particle and its left, right, upper and lower index neighbor
# particles are arranged as a matrix (n,m); size: swarm_size N = n x m ; with n,m >= 3
# neighborhood of particle i in 1,...,N
# Above neighbor: N_a = (i-column) mod N; if N_a == 0, N_a = N
# Left neighbor: N_l = i-1; if (i-1) mod column == 0, N_l = i–1+column
# Right neighbor: N_r = i+1; if i mod column == 0, N_r = i+1-column
# Below neighbor: N_b = (i+column) mod N; if N_b == 0, N_b = N
# NOTE: torch.remainder: The remainder has the same sign as the divisor.
# neighborhood: swarm size x 5
# structure: [uppper index, left index, particle index, right index, lower index]
### 1. calculate the size of the matrix (column length)
if sympy.isprime(swarm_size) or swarm_size < 9:
raise ValueError("{} is not a valid value for von neumann neighborhood size".format(swarm_size))
if math.ceil(math.sqrt(swarm_size)) ** 2 == swarm_size:
column = math.ceil(math.sqrt(swarm_size))
else:
prime = torch.Tensor(list(sympy.primerange(3, math.ceil(math.sqrt(swarm_size)))))
column_candidates = torch.cat((torch.Tensor([4]), prime))
swarm_dividers = torch.remainder(torch.full((column_candidates.numel(),), swarm_size, dtype=torch.long),
column_candidates) == 0
if column_candidates[swarm_dividers].numel() == 0:
raise ValueError("{} is not a valid value for von neumann neighborhood size".format(swarm_size))
column = column_candidates[swarm_dividers].max().long()
### 2. initialize the neighborhood index tensor
neighborhood = index.repeat(1, 5)
neighborhood.add_(torch.Tensor([-column, -1, 0, 1, column]).long())
neighborhood[::column, 1].add_(column)
neighborhood[(column - 1)::column, 3].add_(-column)
neighborhood.remainder_(swarm_size)
# best_position: swarm_size x n_params, best_fitness: 1 x swarm size
return self.pbest_fitness.detach().clone(), torch.empty_like(self.position), neighborhood.to(
device=self.position.device)
def _calculate_fitness(self, position):
"""Let the candidate particle try against the environment and get its utility
NOTE: PSO minimize but we use a maximization formulation in the rewards,
thus we need to use the negative reward.
Arguments:
position: Tensor
The current particle's parameter values
Returns:
reward: Tensor
The fitness value (utility) of the current particle
"""
vector_to_parameters(position, self.particle_evaluation_model.parameters())
reward = self.environment.get_strategy_reward(self.particle_evaluation_model,
**self.strat_to_player_kwargs).detach()
assert reward.numel() == 1
self.utility_eval_counter += 1
return -reward
[docs] def update_strategy(self):
# Performs one model-update to the player's strategy.
start_time = timer()
### 1. if required redraw valuations / perform random moves (determined by env)
self.environment.prepare_iteration()
### 2. evaluate each particles current position (solution)
# fitness: 1 x swarm size
fitness = torch.tensor([self._calculate_fitness(p) for p in self.position], device=self.position.device)
# prevent stale memory: reevaluate the personal and overall best fitness
if self.reevaluation_frequency and self.cur_epoch > 0 and not self.cur_epoch % self.reevaluation_frequency:
old_best = self.best_fitness.detach().clone()
self.best_fitness = torch.squeeze(
torch.tensor([self._calculate_fitness(p) for p in self.best_position], device=self.position.device), 0)
if not torch.equal(old_best, self.best_fitness):
self.pbest_fitness = torch.tensor([self._calculate_fitness(p) for p in self.pbest_position],
device=self.position.device)
if self.decrease_fitness:
self.pbest_fitness = self.pbest_fitness * self.decrease_pbest
self.best_fitness = self.best_fitness * self.decrease_best
### --- best solution update ---
### 3. update the personal best positions:
# check if the current sample point of the particle is a better solution than the particles previous found solution
# -> check if the particle's current fitness is better than the particle's "personal best fitness".
# if so, update the personal best position and fitness to the values of the current ones
new_best = fitness < self.pbest_fitness
if new_best.any():
self.pbest_fitness[new_best] = fitness[new_best]
self.pbest_position[new_best, :] = self.position[new_best, :]
# 4. update the swarm's best position(s):
if self.topology == 'global':
# check if a particle found a better solution than the current global best
# -> check if the best fitness of all "personal best fitness" is better than the "global best fitness".
# if so, update the global best position and fitness to the values of the best personal best
if self.pbest_fitness.min() < self.best_fitness:
self.best_fitness = self.pbest_fitness.min()
self.best_position = torch.unsqueeze(self.pbest_position[self.pbest_fitness.argmin(), :], 0)
else:
# get the best particle of each neighborhood (best "personal best fitness" of the neighborhood)
# check if this particle's "personal best fitness" is better than the previous "best fitness" of the neighborhood.
# if so, update the neighborhoods best positions and fitness to the values of this particle's personal best
best_neighbor = self.pbest_fitness[self.neighborhood].min(axis=1)
new_best = best_neighbor.values < self.best_fitness
if new_best.any():
self.best_fitness[new_best] = best_neighbor.values[new_best]
index = self.neighborhood[torch.arange(0, self.neighborhood.size()[0]), best_neighbor.indices][new_best]
self.best_position[new_best, :] = self.pbest_position[index, :]
### --- move ---
### 5. update the velocities:
# save current velocity before updating
cur_velocity = self.velocity
cur_position = self.position
if self.decrease_w:
self.inertia = self.inertia_max - (self.inertia_max - self.inertia_min) * self.cur_epoch / 2000
# new velocity = old velocity + cognitive component + social component
self.velocity = self.inertia * self.velocity \
+ self.cognition * torch.rand_like(self.position) * (self.pbest_position - self.position) \
+ self.social * torch.rand_like(self.position) * (self.best_position - self.position)
# note: some version only draw RVs all each serach dimension (and not also for all particles)
# clamp particles velocity values to be <= the maximal allowed velocity step size
if self.velocity_clamping:
self.velocity.clamp_(-self.max_velocity, self.max_velocity)
### 6. update the positions
self.position += self.velocity
# clamp particles position values to lay inside the search space bounds
if self.bound_handling:
self.position.clamp_(-self.max_position, self.max_position)
# torch.max(torch.min(self.position, self.max_position), -self.max_position, out=self.position)
assert torch.isfinite(self.best_fitness.min())
# assign the parameters of the best particle to the model parameters
vector_to_parameters(self.best_position[self.best_fitness.argmin(), :], self.model.parameters())
time_per_step = timer()-start_time
if self.strat_to_player_kwargs == {'player_position': 0} and self.writer is not None:
self._log_pso_params(cur_velocity, cur_position, time_per_step)
self.cur_epoch += 1
[docs] def update_strategy_and_evaluate_utility(self):
self.update_strategy()
true_best_fitness = self.environment.get_strategy_reward(self.model, **self.strat_to_player_kwargs).detach()
if self.writer is not None:
self.writer.add_scalar('learner/fitness_error', torch.abs(torch.neg(self.best_fitness.min())-true_best_fitness), self.cur_epoch)
return true_best_fitness
#return self.environment.get_strategy_reward(self.model, **self.strat_to_player_kwargs).detach()
def _log_pso_params(self, velocity, position, time_per_step):
position_L_2_norm = torch.linalg.norm(position - self.best_position)*(1./float(position.shape[0]))**(1/2)
velocity_L_2_norm = torch.linalg.norm(velocity)*(1./float(velocity.shape[0]))**(1/2)
self.writer.add_scalar('learner/util_eval_counter', self.utility_eval_counter, self.cur_epoch)
self.writer.add_scalar('learner/velocity_L_2', velocity_L_2_norm, self.cur_epoch)
self.writer.add_scalar('learner/position_L_2', position_L_2_norm, self.cur_epoch)
self.writer.add_scalar('learner/best_fitness', torch.neg(self.best_fitness.min()), self.cur_epoch)
# self.writer.add_scalar('learner/time_per_step', time_per_step, self.cur_epoch)
[docs]class DPGLearner(GradientBasedLearner):
"""Implements Deterministic Policy Gradients
http://proceedings.mlr.press/v32/silver14.pdf
via directly calculating `dQ/da and da/d\\theta`
"""
def __init__(self):
raise NotImplementedError()
class _PerturbedActionModule(Strategy, torch.nn.Module):
def __init__(self, module, epsilon):
super().__init__()
self.module = module
self.epsilon = epsilon
def forward(self, x):
return (self.module(x) + self.epsilon).relu()
def play(self, x):
return self.forward(x)
[docs]class AESPGLearner(GradientBasedLearner):
"""Implements Deterministic Policy Gradients http://proceedings.mlr.press/v32/silver14.pdf
with ES-pseudogradients of dQ/da
"""
def __init__(self,
model: NeuralNetStrategy, environment: Environment, hyperparams: dict,
optimizer_type: Type[torch.optim.Optimizer], optimizer_hyperparams: dict,
strat_to_player_kwargs: dict = None):
# Create and validate optimizer
super().__init__(model, environment,
optimizer_type, optimizer_hyperparams,
strat_to_player_kwargs)
# Validate ES hyperparams
if not set(['population_size', 'sigma']) <= set(hyperparams):
raise ValueError(
'Missing hyperparams for ES. Provide at least, population size, sigma.')
if not isinstance(hyperparams['population_size'], int) or hyperparams['population_size'] < 2:
# one is invalid because there will be zero variance, leading to div by 0 errors
raise ValueError('Please provide a valid `population_size` parameter >=2')
# set hyperparams
self.population_size = hyperparams['population_size']
self.sigma = float(hyperparams['sigma'])
if 'normalize_gradients' in hyperparams and hyperparams['normalize_gradients']:
self.normalize_gradients = True
self.baseline = 'mean_reward'
else:
self.normalize_gradients = False
self.baseline = 'current_reward'
# overwrite baseline method if provided
if 'baseline' in hyperparams:
self.baseline_method = hyperparams['baseline']
if not isinstance(self.baseline_method, float) \
and not self.baseline_method in ['current_reward', 'mean_reward']:
raise ValueError('Invalid baseline provided. Should be float or '\
+ 'one of "mean_reward", "current_reward"')
def _set_gradients(self):
"""Calculates ES-pseudogradients and applies them to the model parameter
gradient data.
ES gradient is calculated as:
mean( rewards - baseline) * epsilons / sigma²
and approximates the true gradient.
In case of gradient normalization, we do not calculate a baseline
and instead use the following pseudogradient:
mean(rewards - rewards.mean()) / sigma / rewards.std()
For small sigma, this will yield a vector that points in the same
direction as the gradient and has length (slightly smaller than) 1.
Furthermore, the gradient samples will have low variance
Note that for large sigma, this grad becomes smaller tha
"""
n_pop = self.population_size
n_actions = self.model.output_length
n_batch = self.environment.batch_size
### 1. if required redraw valuations / perform random moves (determined by env)
self.environment.prepare_iteration()
### 2. Create a population of perturbations of the original model outputs
population = (self._perturb_model(self.model) for _ in range(n_pop))
### 3. let each candidate against the environment and get their utils ###
# rewards: population_size x n_batch x 1, epsilons: n_pop x n_batch x n_action
rewards, epsilons = (
torch.cat(tensors)#.view(n_pop, -1)
for tensors in zip(*(
(
self.environment.get_strategy_reward(
model, aggregate_batch=False, **self.strat_to_player_kwargs).detach().view(1,n_batch, 1),
epsilon.unsqueeze(0)
)
for (model, epsilon) in population
))
)
### 4. calculate the ES-pseudogradients ####
## base case: current reward
# action: batch x 1, baseline: batch
action, baseline = self.environment.get_strategy_action_and_reward(self.model,**self.strat_to_player_kwargs)
if self.baseline == 'mean_reward':
baseline = rewards.mean(dim=0)
elif isinstance(self.baseline, float):
baseline = self.baseline
if torch.is_tensor(baseline):
baseline = baseline.view(n_batch, 1)
denominator = self.sigma * rewards.std() if self.normalize_gradients else self.sigma**2
if denominator == 0:
# all candidates returned same reward and normalize is true --> stationary
es_dudb = torch.zeros(n_batch, n_actions, 1)
else:
# mean over pop --> result is (batch), we want batch x n_actions x 1
# this should be # batch x n_actions (TODO: test for n_actions >1)
# pop_size x batch x 1
scaled_rewards = (rewards - baseline)/denominator
es_dudb = (scaled_rewards*epsilons).mean(dim=0)
#es_dudb.unsqueeze_(-1) # batch x n_actions x 1
### 5. assign gradients to model gradient ####
# should be ∇_θ π * ∇^ES_b u
# assuming all current `param.grad`s are zero, we set db/da by
#for action_loss in -torch.einsum('ba,ba->b', action, es_dudb):
# action_loss.div(n_batch).backward(retain_graph=True)
loss = -torch.einsum('ba,ba->b', action, es_dudb).mean()
loss.backward()
def _perturb_model(self, model: NeuralNetStrategy) -> Tuple[torch.nn.Module, torch.Tensor]:
"""
Returns a model [torch.nn.Module] perturbed via adding random noise to
its outputs,
as well as the noise vector used to generate the perturbation.
"""
# for now, we'll assume model is a NeuralNetStrategy, i.e. has an attribute output_length
noise = torch.zeros([self.environment.batch_size, model.output_length],
device = next(model.parameters()).device
).normal_(mean=0.0, std=self.sigma)
perturbed = _PerturbedActionModule(model, noise)
return perturbed, noise
[docs]class DDPGLearner(GradientBasedLearner):
"""Implements Deep Deterministic Policy Gradients (Lilicrap et al 2016)
http://arxiv.org/abs/1509.02971
"""
def __init__(self):
raise NotImplementedError()
[docs]class DummyNonLearner(GradientBasedLearner):
"""A learner that does nothing."""
def __init__(self,
model: torch.nn.Module, environment: Environment, hyperparams: dict, #pylint:disable=unused-argument
optimizer_type: Type[torch.optim.Optimizer], optimizer_hyperparams: dict,
strat_to_player_kwargs: dict = None):
# Create and validate optimizer
super().__init__(model, environment,
optimizer_type, optimizer_hyperparams,
strat_to_player_kwargs)
def _set_gradients(self):
# This "Learner" doesn't learn.
pass