import logging
import pyhf
from pyhf import events, interpolators
from pyhf.exceptions import InvalidModifier
from pyhf.parameters import ParamViewer
from pyhf.tensor.manager import get_backend
log = logging.getLogger(__name__)
[docs]
def required_parset(sample_data, modifier_data):
return {
'paramset_type': 'constrained_by_normal',
'n_parameters': 1,
'is_scalar': True,
'inits': (0.0,),
'bounds': ((-5.0, 5.0),),
'fixed': False,
'auxdata': (0.0,),
}
[docs]
class histosys_builder:
"""Builder class for collecting histoys modifier data"""
is_shared = True
[docs]
def __init__(self, config):
self.builder_data = {}
self.config = config
self.required_parsets = {}
[docs]
def collect(self, thismod, nom):
lo_data = thismod['data']['lo_data'] if thismod else nom
hi_data = thismod['data']['hi_data'] if thismod else nom
maskval = bool(thismod)
mask = [maskval] * len(nom)
return {'lo_data': lo_data, 'hi_data': hi_data, 'mask': mask, 'nom_data': nom}
[docs]
def append(self, key, channel, sample, thismod, defined_samp):
self.builder_data.setdefault(key, {}).setdefault(sample, {}).setdefault(
'data', {'hi_data': [], 'lo_data': [], 'nom_data': [], 'mask': []}
)
nom = (
defined_samp['data']
if defined_samp
else [0.0] * self.config.channel_nbins[channel]
)
moddata = self.collect(thismod, nom)
self.builder_data[key][sample]['data']['lo_data'].append(moddata['lo_data'])
self.builder_data[key][sample]['data']['hi_data'].append(moddata['hi_data'])
self.builder_data[key][sample]['data']['nom_data'].append(moddata['nom_data'])
self.builder_data[key][sample]['data']['mask'].append(moddata['mask'])
if thismod:
self.required_parsets.setdefault(
thismod['name'],
[required_parset(defined_samp['data'], thismod['data'])],
)
[docs]
def finalize(self):
default_backend = pyhf.default_backend
for modifier_name, modifier in self.builder_data.items():
for sample_name, sample in modifier.items():
sample["data"]["mask"] = default_backend.concatenate(
sample["data"]["mask"]
)
sample["data"]["lo_data"] = default_backend.concatenate(
sample["data"]["lo_data"]
)
sample["data"]["hi_data"] = default_backend.concatenate(
sample["data"]["hi_data"]
)
sample["data"]["nom_data"] = default_backend.concatenate(
sample["data"]["nom_data"]
)
if (
not len(sample["data"]["nom_data"])
== len(sample["data"]["lo_data"])
== len(sample["data"]["hi_data"])
):
_modifier_type, _modifier_name = modifier_name.split("/")
_sample_data_len = len(sample["data"]["nom_data"])
_lo_data_len = len(sample["data"]["lo_data"])
_hi_data_len = len(sample["data"]["hi_data"])
raise InvalidModifier(
f"The '{sample_name}' sample {_modifier_type} modifier"
+ f" '{_modifier_name}' has data shape inconsistent with the sample.\n"
+ f"{sample_name} has 'data' of length {_sample_data_len} but {_modifier_name}"
+ f" has 'lo_data' of length {_lo_data_len} and 'hi_data' of length {_hi_data_len}."
)
return self.builder_data
[docs]
class histosys_combined:
name = 'histosys'
op_code = 'addition'
[docs]
def __init__(
self, modifiers, pdfconfig, builder_data, interpcode='code0', batch_size=None
):
self.batch_size = batch_size
self.interpcode = interpcode
assert self.interpcode in ['code0', 'code2', 'code4p']
keys = [f'{mtype}/{m}' for m, mtype in modifiers]
histosys_mods = [m for m, _ in modifiers]
parfield_shape = (
(self.batch_size, pdfconfig.npars)
if self.batch_size
else (pdfconfig.npars,)
)
self.param_viewer = ParamViewer(
parfield_shape, pdfconfig.par_map, histosys_mods
)
self._histosys_histoset = [
[
[
builder_data[m][s]['data']['lo_data'],
builder_data[m][s]['data']['nom_data'],
builder_data[m][s]['data']['hi_data'],
]
for s in pdfconfig.samples
]
for m in keys
]
self._histosys_mask = [
[[builder_data[m][s]['data']['mask']] for s in pdfconfig.samples]
for m in keys
]
if histosys_mods:
self.interpolator = getattr(interpolators, self.interpcode)(
self._histosys_histoset
)
self._precompute()
events.subscribe('tensorlib_changed')(self._precompute)
[docs]
def _precompute(self):
if not self.param_viewer.index_selection:
return
tensorlib, _ = get_backend()
self.histosys_mask = tensorlib.astensor(self._histosys_mask, dtype="bool")
self.histosys_default = tensorlib.zeros(self.histosys_mask.shape)
if self.batch_size is None:
self.indices = tensorlib.reshape(
self.param_viewer.indices_concatenated, (-1, 1)
)
[docs]
def apply(self, pars):
"""
Returns:
modification tensor: Shape (n_modifiers, n_global_samples, n_alphas, n_global_bin)
"""
if not self.param_viewer.index_selection:
return
tensorlib, _ = get_backend()
if self.batch_size is None:
histosys_alphaset = self.param_viewer.get(pars, self.indices)
else:
histosys_alphaset = self.param_viewer.get(pars)
results_histo = self.interpolator(histosys_alphaset)
# either rely on numerical no-op or force with line below
results_histo = tensorlib.where(
self.histosys_mask, results_histo, self.histosys_default
)
return results_histo