Source code for bnelearn.sampler.base

"""This class provides primitives to implement samplers that support drawing of
   valuation and observation profiles for a set of players."""

from abc import ABC, abstractmethod
from math import ceil
from typing import List, Tuple, Union
from itertools import product
from operator import add
import torch
from torch.cuda import _device_t as Device

_ERR_MSG_COND_SAMPLE_FLUSHED = \
    "Conditional sampling from FlushedWrappedSampler only implemented for IPV base samplers!"


[docs]class ValuationObservationSampler(ABC): """Provides functionality to draw valuation and observation profiles.""" def __init__(self, n_players, valuation_size, observation_size, support_bounds, default_batch_size = 1, default_device = None): self.n_players: int = n_players # The number of players in the valuation profile self.valuation_size: int = valuation_size # The dimensionality / length of a single valuation vector self.observation_size: int = observation_size # The dimensionality / length of a single observation vector self.default_batch_size: int = default_batch_size # a default batch size self.default_device: Device = (default_device or 'cuda') if torch.cuda.is_available() else 'cpu' assert support_bounds.size() == torch.Size([n_players, valuation_size, 2]), \ "invalid support bounds." self.support_bounds: torch.FloatTensor = support_bounds.to(self.default_device) def _parse_batch_sizes_arg(self, batch_sizes_argument: Union[int , List[int] , None]) -> List[int]: """Parses an integer batch_size_argument into a list. If none given, defaults to the list containing the default_batch_size of the instance. """ batch_sizes = batch_sizes_argument or self.default_batch_size if isinstance(batch_sizes, int): batch_sizes = [batch_sizes] return batch_sizes
[docs] @abstractmethod def draw_profiles(self, batch_sizes: Union[int, List[int]] = None, device=None) -> Tuple[torch.Tensor, torch.Tensor]: """Draws and returns a batch of valuation and observation profiles. Kwargs: batch_sizes (optional): List[int], the batch_sizes to draw. If none provided, `[self.default_batch_size]` will be used. device (optional): torch.cuda.Device, the device to draw profiles on Returns: valuations: torch.Tensor (*batch_sizes x n_players x valuation_size): a valuation profile observations: torch.Tensor (*batch_sizes x n_players x observation_size): an observation profile """
[docs] @abstractmethod def draw_conditional_profiles(self, conditioned_player: int, conditioned_observation: torch.Tensor, inner_batch_size: int, device: Device = None) -> Tuple[torch.Tensor, torch.Tensor]: """Draws and returns batches conditional valuation and corresponding observation profile. For each entry of `conditioned_observation`, `batch_size` samples will be drawn! Note that here, we are returning full profiles instead (including `conditioned_player`'s observation and others' valuations.) Args: conditioned_player: int Index of the player whose observation we are conditioning on. conditioned_observation: torch.Tensor (*`outer_batch_sizes` (implicit), `observation_size`) A (batch of) observations of player `conditioned_player`. Kwargs: batch_size (optional): int, the "inner"batch_size to draw - i.e. how many conditional samples to draw for each provided `conditional_observation`. If none provided, will use `self.default_batch_size` of the class. Returns: valuations: torch.Tensor (outer_batch_size, inner_batch_size, n_players, valuation_size): a conditional valuation profile observations: torch.Tensor (*`outer_batch_size`s, inner_batch_size, n_players, observation_size): a corresponding conditional observation profile. observations[:,conditioned_observation,:] will be equal to `conditioned_observation` repeated `batch_size` times """ pass
[docs] def generate_valuation_grid(self, player_position: int, minimum_number_of_points: int, dtype=torch.float, device = None, support_bounds: torch.Tensor = None, return_mesh: bool=False) -> torch.Tensor: """Generates an evenly spaced grid of (approximately and at least) minimum_number_of_points valuations covering the support of the valuation space for the given player. These are meant as rational actions for the player to evaluate, e.g. in the util_loss estimator. The default reference implementation returns a rectangular grid on [0, upper_bound] x valuation_size. """ device = device or self.default_device if support_bounds is None: support_bounds = self.support_bounds bounds = support_bounds[player_position] # dimensionality dims = self.valuation_size # use equal density in each dimension of the valuation, such that # the total number of points is at least as high as the specified one n_points_per_dim = ceil(minimum_number_of_points**(1/dims)) # create equidistant lines along the support in each dimension lines = [torch.linspace(bounds[d][0], bounds[d][1], n_points_per_dim, device=device, dtype=dtype) for d in range(dims)] mesh = torch.meshgrid(lines) if return_mesh: grid = mesh else: grid = torch.stack(mesh, dim=-1).view(-1, dims) return grid
[docs] def generate_reduced_grid(self, player_position: int, minimum_number_of_points: int, dtype=torch.float, device = None) -> torch.Tensor: """For some samplers, the action dimension is smaller and the grid can be reduced to that lower dimension. """ return self.generate_valuation_grid(player_position, minimum_number_of_points, dtype, device)
[docs] def generate_action_grid(self, player_position: int, minimum_number_of_points: int, dtype=torch.float, device = None) -> torch.Tensor: """As the grid is also used for finding best responses, some samplers need extensive grids that sample a broader area. (E.g. when a bidder with high valuations massively shads her bids.) """ support_bounds = self.support_bounds.clone() # Grid bids should always start at zero if not specified otherwise support_bounds[:, :, 0] = 0 return self.generate_valuation_grid( player_position=player_position, minimum_number_of_points=minimum_number_of_points, dtype=dtype, device=device, support_bounds=support_bounds)
[docs] def generate_cell_partition(self, player_position: int, grid_size: int, dtype=torch.float, device=None): """Generate a rectangular grid partition of the valuation/observation prior and return cells with their vertices. """ grid = self.generate_valuation_grid( player_position=player_position, minimum_number_of_points=grid_size, dtype=dtype, device=device, return_mesh=True, ) grid_shape = grid[0].shape valuation_size = len(grid) # Note: sometimes this mismatches when we can reduce the prior's dim def index2vertex(vertex_index): """Get the grid point to the corresponding index.""" return torch.stack( [g[tuple(vertex_index)] for g in grid] ).view(1, valuation_size) # loop over all lower vertices of all cells for lower_vertex_index in product(*[list(range(j - 1)) for j in grid_shape]): # these are the indices: one for each of the grid dimensions # collecting all `upper' vertices adjacent to `lower_vertex` vertices_indices = list() for k in list(product(*([0, 1] for _ in range(valuation_size)))): vertex_index = list(map(add, lower_vertex_index, k)) # if all(i < j for i, j in zip(vertex_index, list(grid_shape))): vertices_indices.append(vertex_index) yield [index2vertex(vertex_index) for vertex_index in vertices_indices]
[docs]class PVSampler(ValuationObservationSampler, ABC): """A sampler for Private Value settings, i.e. when observations and valuations are identical. """ def __init__(self, n_players: int, valuation_size: int, support_bounds, default_batch_size: int = 1, default_device: Device = None): super().__init__(n_players, valuation_size, valuation_size, support_bounds, default_batch_size, default_device) @abstractmethod def _sample(self, batch_sizes: Union[int, List[int]], device: Device) -> torch.Tensor: """Returns a batch of profiles (which are both valuations and observations)"""
[docs] def draw_profiles(self, batch_sizes: int or List[int] = None, device: Device = None) -> Tuple[torch.Tensor, torch.Tensor]: batch_sizes = self._parse_batch_sizes_arg(batch_sizes) device = device or self.default_device # In the PV setting, valuations and observations are identical. profile = self._sample(batch_sizes, device) return profile, profile
[docs]class IPVSampler(PVSampler, ABC): """A sampler in Independent Private Value Settings. NOTE: We will only use this class as an interface to perform quick checks for IPV (e.g. in FlushedWrappedSampler below). Implementation is left to subclasses. See the module .samplers_ipv for examples. """
[docs]class FlushedWrappedSampler(ValuationObservationSampler): """A sampler that relies on a base sampler but flushes the last valuation and observations dimensions with zeros. This is useful when some players have lower observation / valuation size than others. Note on implementation: an alternative would be using a lower-dimensional base sampler and then adding extra zeroes. We instead go this route of overwriting unnecessary values because the incurred cost of sampling too many values will be cheaper in most cases compared to 'growing' tensors after the fact. """ def __init__(self, base_sampler: ValuationObservationSampler, flush_val_dims: int = 1, flush_obs_dims: int = 1): """ Args: base_sampler: A `ValuationObservationSampler` that will have some of its valuation/observation dimensions flushed. NOTE: if you want (n + f) total dimensions, where f is the number of flushed dims, then the base_sampler should be of size (n+f), not n. flush_val_dims (int): the number of valuation dims to be flushed (from the right) flush_obs_dims (int): the number of observation dims to be flushed (from the right) """ # pylint: disable = super-init-not-called (This is by design.) self._base_sampler = base_sampler self._flush_val_dims = flush_val_dims self._flush_obs_dims = flush_obs_dims self.n_players = base_sampler.n_players self.valuation_size = base_sampler.valuation_size self.observation_size = base_sampler.observation_size self.default_batch_size = base_sampler.default_batch_size self.default_device = base_sampler.default_device self.support_bounds = base_sampler.support_bounds # n_players x valuation_size x 2 (lower, upper) # NOTE: will bounds of (0,0) cause a bug somewhere? # TODO Stefan: At the very least, this breaks 3D plots, everything else # seems to work fine. self.support_bounds[:, -flush_val_dims:, :] = 0.0
[docs] def draw_profiles(self, batch_sizes: Union[int, List[int]] = None, device=None) -> Tuple[torch.Tensor, torch.Tensor]: v, o = self._base_sampler.draw_profiles( batch_sizes=batch_sizes, device=device) v[..., -self._flush_val_dims:] = 0.0 o[..., -self._flush_obs_dims:] = 0.0 return v,o
[docs] def draw_conditional_profiles(self, conditioned_player: int, conditioned_observation: torch.Tensor, inner_batch_size: int, device: Device = None) -> Tuple[torch.Tensor, torch.Tensor]: if not isinstance(self._base_sampler, IPVSampler): raise NotImplementedError(_ERR_MSG_COND_SAMPLE_FLUSHED) if conditioned_observation[..., -self._flush_obs_dims].any(): raise ValueError("conditioned observation contains nonzero entry in flushed dimensions!") # For IPV samplers, we can simply draw from the _base_sampler and then flush cv, co = self._base_sampler.draw_conditional_profiles( conditioned_player, conditioned_observation, inner_batch_size, device) cv[..., -self._flush_val_dims:] = 0.0 co[..., -self._flush_obs_dims:] = 0.0 return cv,co
[docs]class CompositeValuationObservationSampler(ValuationObservationSampler): """A class representing composite prior distributions that are made up of several groups of bidders, each of which can be represented by an atomic ValuationObservationSampler, and which are independent between-group (but not necessarily within-group). Limitation: The current implementation requires that all players nevertheless have the same valuation_size. """ def __init__(self, n_players: int, valuation_size: int, observation_size: int, subgroup_samplers: List[ValuationObservationSampler], default_batch_size = 1, default_device = None): self.n_groups = len(subgroup_samplers) self.group_sizes = [sampler.n_players for sampler in subgroup_samplers] assert sum(self.group_sizes) == n_players, "number of players in subgroup don't match total n_players." for sampler in subgroup_samplers: assert sampler.valuation_size == valuation_size, "incorrect valuation size in subgroup sampler." assert sampler.observation_size == observation_size, "incorrect observation size in subgroup sampler" self.group_samplers = subgroup_samplers self.group_indices: List[torch.IntTensor] = [ torch.tensor(range(sum(self.group_sizes[:i]), sum(self.group_sizes[:i+1]))) for i in range(self.n_groups) ] ## concatenate bounds in player dimension support_bounds = torch.vstack([s.support_bounds for s in self.group_samplers]) super().__init__(n_players, valuation_size, observation_size, support_bounds, default_batch_size, default_device)
[docs] def draw_profiles(self, batch_sizes: int or List[int] = None, device=None) -> Tuple[torch.Tensor, torch.Tensor]: """Draws and returns a batch of valuation and observation profiles. Kwargs: batch_sizes (optional): List[int], the batch_size to draw. If none provided, `self.default_batch_size` will be used. device (optional): torch.cuda.Device, the device to draw profiles on Returns: valuations: torch.Tensor (*batch_sizes x n_players x valuation_size): a valuation profile observations: torch.Tensor (*batch_sizes x n_players x observation_size): an observation profile """ device = device or self.default_device batch_sizes: List[int] = self._parse_batch_sizes_arg(batch_sizes) v = torch.empty([*batch_sizes, self.n_players, self.valuation_size], device=device) o = torch.empty([*batch_sizes, self.n_players, self.observation_size], device=device) ## Draw independently for each group. for g in range(self.n_groups): # player indices in the group players = self.group_indices[g] v[..., players, :], o[..., players, :] = self.group_samplers[g].draw_profiles(batch_sizes, device) return v, o
[docs] def draw_conditional_profiles(self, conditioned_player: int, conditioned_observation: torch.Tensor, inner_batch_size: int, device: Device = None) -> Tuple[torch.Tensor, torch.Tensor]: """Draws and returns batches conditional valuation and corresponding observation profile. For each entry of `conditioned_observation`, `inner_batch_size` samples will be drawn! Note that here, we are returning full profiles instead (including `conditioned_player`'s observation and others' valuations.) Args: conditioned_player: int Index of the player whose observation we are conditioning on. conditioned_observation: torch.Tensor (`*outer_batch_sizes` (implicit), `observation_size`) A batch of/batches of observations of player `conditioned_player`. Kwargs: batch_size (optional): int, the "inner"batch_size to draw - i.e. how many conditional samples to draw for each provided `conditional_observation`. If none provided, will use `self.default_batch_size` of the class. Returns: valuations: torch.Tensor (batch_size x n_players x valuation_size): a conditional valuation profile observations: torch.Tensor (`*outer_batch_sizes`, inner_batch_size, n_players, observation_size): a corresponding conditional observation profile. observations[...,conditioned_observation,:] will be equal to `conditioned_observation` repeated `batch_size` times """ device = device or self.default_device inner_batch = inner_batch_size or self.default_batch_size *outer_batches, observation_size = conditioned_observation.shape #pylint: disable=unused-variable i = conditioned_player cv = torch.empty([*outer_batches, inner_batch, self.n_players, self.valuation_size], device=device) co = torch.empty([*outer_batches, inner_batch, self.n_players, self.observation_size], device=device) ## Draw independently for each group. for g in range(self.n_groups): # player indices in the group players = self.group_indices[g] if i in players: # this is the group of the conditioned player, we need to sample # from the group's conditional distribution # i's relative position in the subgroup: sub_i = i - sum(self.group_sizes[:g]) cv[..., players, :], co[..., players, :] = \ self.group_samplers[g].draw_conditional_profiles( conditioned_player=sub_i, conditioned_observation=conditioned_observation, inner_batch_size=inner_batch, device=device ) else: # the conditioned player is not in this group, the group's draw # is independent of the observation cv[..., players, :], co[..., players, :] = \ self.group_samplers[g].draw_profiles([*outer_batches, inner_batch], device) return cv, co
[docs] def generate_valuation_grid(self, **kwargs) -> torch.Tensor: """Possibly need to call specific sampling""" for g in range(self.n_groups): # iterate over groups player_positions = self.group_indices[g] # player_positions within group for pos in player_positions: if kwargs['player_position'] == pos: kwargs['player_position'] = pos - sum(self.group_sizes[:g]) # i's relative position in subgroup return self.group_samplers[g].generate_valuation_grid(**kwargs)
[docs] def generate_reduced_grid(self, **kwargs) -> torch.Tensor: """Possibly need to call specific sampling""" for g in range(self.n_groups): # iterate over groups player_positions = self.group_indices[g] # player_positions within group for pos in player_positions: if kwargs['player_position'] == pos: kwargs['player_position'] = pos - sum(self.group_sizes[:g]) # i's relative position in subgroup return self.group_samplers[g].generate_reduced_grid(**kwargs)
[docs] def generate_action_grid(self, **kwargs) -> torch.Tensor: """Possibly need to call specific sampling""" for g in range(self.n_groups): # iterate over groups player_positions = self.group_indices[g] # player_positions within group for pos in player_positions: if kwargs['player_position'] == pos: kwargs['player_position'] = pos - sum(self.group_sizes[:g]) # i's relative position in subgroup return self.group_samplers[g].generate_action_grid(**kwargs)
[docs] def generate_cell_partition(self, **kwargs) -> torch.Tensor: """Possibly need to call specific sampling""" for g in range(self.n_groups): # iterate over groups player_positions = self.group_indices[g] # player_positions within group for pos in player_positions: if kwargs['player_position'] == pos: kwargs['player_position'] = pos - sum(self.group_sizes[:g]) # i's relative position in subgroup return self.group_samplers[g].generate_cell_partition(**kwargs)