""" This pytest test file checks whether valuation and observation samplers have the
expected behavior.
"""
from math import sqrt
import numpy as np
import pytest
import torch
import bnelearn.sampler as vs
device = 'cuda' if torch.cuda.is_available() else 'cpu'
batch_size = 2**20
alternative_batch_size = 2**15
conditioned_inner_batch_size = 2**17
# helper functions
[docs]def correlation(valuation_profile):
"""Pearson correlation between valuations of bidders.
valuation_profile should be (batch x n_players x 1)
"""
assert valuation_profile.dim() >= 3, "invalid valuation profile."
if valuation_profile.dim() > 3:
# collapse batch dimensions
*batch_sizes, n_players, valuation_size = valuation_profile.shape
valuation_profile = valuation_profile.view(-1, n_players, valuation_size)
assert valuation_profile.shape[2] == 1, "correlation matrix only valid for 1-d valuations"
return torch.from_numpy(
np.corrcoef(valuation_profile.squeeze(-1).t().cpu())
).float() #numpy is float 64
[docs]def check_validity(valuation_profile, expected_shape,
expected_mean, expected_std, expected_correlation=None):
"""Checks whether a given batch of profiles has expected shape, mean, std
and correlation matrix.
"""
assert valuation_profile.dim() == 3, "invalid number of dimensions!"
assert valuation_profile.shape == expected_shape, "invalid shape!"
assert torch.allclose(valuation_profile.mean(dim=0), expected_mean.to(device), atol=0.02), \
"unexpected sample mean!"
if expected_std is not None:
assert torch.allclose(valuation_profile.std(dim=0), expected_std.to(device), atol=0.02), \
"unexpected sample variance!"
if expected_correlation is not None:
for k in range(valuation_profile.shape[2]):
assert torch.allclose(correlation(valuation_profile[:,:,[k]]),
expected_correlation.cpu(), atol=0.01), \
"unexpected correlation between bidders!"
[docs]def check_validity_of_conditional_sampler(sampler: vs.CorrelatedSymmetricUniformPVSampler,
n_players, valuation_size, u_lo, u_hi):
"""Check runtime at minimum, maximum, midpoint inputs, as well as shapes,
devices and known entries.
"""
conditioned_observation = \
torch.tensor([[u_lo],
[u_hi],
[u_lo + 0.5*(u_hi-u_lo)],
[u_lo + 0.25*(u_hi-u_lo)]]) \
.repeat(1, valuation_size)
outer_batch, observation_size = conditioned_observation.shape
#### test conditional sampling for different players (first and last)
for i in [0, n_players-1]:
co, cv = sampler.draw_conditional_profiles(
i, conditioned_observation,
conditioned_inner_batch_size)
assert co.device == cv.device, "Private values, obs and valuations should be identical."
assert co.device.type == device, "Output is not on excpected standard device!"
assert torch.equal(
co[...,i,:],
conditioned_observation \
.to(sampler.default_device) \
.view(outer_batch, 1, observation_size)
.repeat(1, conditioned_inner_batch_size, 1)
), "conditioned sample must respect conditioned_observation input!"
# When outer batch is drawn from real distribution and inner_batch size is
# 1, then the conditional samples should have the same distribution as 'regular'
# samples.
# get outer obs
_, o = sampler.draw_profiles(batch_size)
for i in range(o.shape[1]): # draw conditionals for each player
_, co = sampler.draw_conditional_profiles(
conditioned_player=i,
conditioned_observation=o[:,i,:],
inner_batch_size=1
)
# with batch_size = 1 (no repitition of inputs),
# co should follow same distribution as o
assert torch.allclose(o.mean(dim=0), co.mean(dim=0), atol=.01)
assert torch.allclose(o.std(dim=0), co.std(dim=0), atol=.01)
for k in range(valuation_size):
assert torch.allclose(
correlation( o[:,:,[k]]),
correlation(co[...,:,[k]]),
atol=0.01), "Unexpected correlation matrix encountered!"
# test cases
# base case is 2p, U[0,1], correlation of 0.5
ids, test_cases = zip(*[
# nplayers, valuation_size, gamma, u_lo, u_hi
['base_case', (2, 1, 0.5, 0, 1 )],
['perfect_corr', (2, 1, 1.0, 0, 1 )],
['independent', (2, 1, 0.0, 0, 1 )],
['other_corr', (2, 1, 0.3, 0, 1 )],
['three_players', (3, 1, 0.5, 0, 1 )],
['two_valuations', (2, 2, 0.5, 0, 1 )],
['scaled_bounds', (2, 1, 0.5, 0, 2 )],
['shifted_bounds', (2, 1, 0.5, 1, 2 )],
['affine_bounds', (2, 1, 0.5, 1, 3 )],
])