Source code for pyhf.modifiers.shapefactor

import logging

import pyhf
from pyhf import events
from pyhf.tensor.manager import get_backend
from pyhf.parameters import ParamViewer

log = logging.getLogger(__name__)


[docs] def required_parset(sample_data, modifier_data): return { 'paramset_type': 'unconstrained', 'n_parameters': len(sample_data), 'is_scalar': False, 'inits': (1.0,) * len(sample_data), 'bounds': ((0.0, 10.0),) * len(sample_data), 'fixed': False, }
[docs] class shapefactor_builder: """Builder class for collecting shapefactor modifier data""" is_shared = True
[docs] def __init__(self, config): self.builder_data = {} self.config = config self.required_parsets = {}
[docs] def collect(self, thismod, nom): maskval = True if thismod else False mask = [maskval] * len(nom) return {'mask': mask}
[docs] def append(self, key, channel, sample, thismod, defined_samp): self.builder_data.setdefault(key, {}).setdefault(sample, {}).setdefault( 'data', {'mask': []} ) nom = ( defined_samp['data'] if defined_samp else [0.0] * self.config.channel_nbins[channel] ) moddata = self.collect(thismod, nom) self.builder_data[key][sample]['data']['mask'] += moddata['mask'] if thismod: self.required_parsets.setdefault( thismod['name'], [required_parset(defined_samp['data'], thismod['data'])], )
[docs] def finalize(self): return self.builder_data
[docs] class shapefactor_combined: name = 'shapefactor' op_code = 'multiplication'
[docs] def __init__(self, modifiers, pdfconfig, builder_data, batch_size=None): """ Args: modifiers (:obj:`list` of :obj:`tuple`): List of tuples of form ``(modifier, modifier_type)``. pdfconfig (:class:`~pyhf.pdf._ModelConfig`): Configuration for the model. builder_data (:obj:`dict`): Map of keys ``'modifier_type/modifier'`` to the channels and bins they are applied to. batch_size (:obj:`int`): The number of rows in the resulting tensor. If :obj:`None` defaults to ``1``. Imagine a situation where we have 2 channels (``SR``, ``CR``), 3 samples (``sig1``, ``bkg1``, ``bkg2``), and 2 :class:`~pyhf.modifiers.shapefactor` modifiers (``coupled_shapefactor``, ``uncoupled_shapefactor``). Let's say this is the set-up: .. code-block:: SR(nbins=2) sig1 -> subscribes to normfactor bkg1 -> subscribes to coupled_shapefactor CR(nbins=3) bkg2 -> subscribes to coupled_shapefactor, uncoupled_shapefactor The ``coupled_shapefactor`` needs to have 3 nuisance parameters to account for the ``CR``, with 2 of them shared in the ``SR``. The ``uncoupled_shapefactor`` just has 3 nuisance parameters. ``self._parindices`` will look like .. code-block:: [0, 1, 2, 3, 4, 5, 6] ``self._shapefactor_indices`` will look like .. code-block:: [0, 1, 2, 3, 4, 5, 6] [[1,2,3],[4,5,6]] ^^^^^^^ = coupled_shapefactor ^^^^^^^ = uncoupled_shapefactor with the 0th par-index corresponding to the :class:`~pyhf.modifiers.normfactor`. Because the ``SR`` channel has 2 bins, and the ``CR`` channel has 3 bins (with ``SR`` before ``CR``), ``global_concatenated_bin_indices`` looks like .. code-block:: [0, 1, 0, 1, 2] ^^^^^ = SR channel ^^^^^^^^^ = CR channel So now we need to gather the corresponding :class:`~pyhf.modifiers.shapefactor` indices according to ``global_concatenated_bin_indices``. Therefore ``self._shapefactor_indices`` now looks like .. code-block:: [[1, 2, 1, 2, 3], [4, 5, 4, 5, 6]] and at that point can be used to compute the effect of :class:`~pyhf.modifiers.shapefactor`. """ default_backend = pyhf.default_backend self.batch_size = batch_size keys = [f'{mtype}/{m}' for m, mtype in modifiers] shapefactor_mods = [m for m, _ in modifiers] parfield_shape = (self.batch_size or 1, pdfconfig.npars) self.param_viewer = ParamViewer( parfield_shape, pdfconfig.par_map, shapefactor_mods ) self._shapefactor_mask = [ [[builder_data[m][s]['data']['mask']] for s in pdfconfig.samples] for m in keys ] global_concatenated_bin_indices = [ [[j for c in pdfconfig.channels for j in range(pdfconfig.channel_nbins[c])]] ] self._access_field = default_backend.tile( global_concatenated_bin_indices, (len(shapefactor_mods), self.batch_size or 1, 1), ) # access field is now # e.g. for a 3 channel (3 bins, 2 bins, 5 bins) model # [ # [0 1 2 0 1 0 1 2 3 4] (number of rows according to batch_size but at least 1) # [0 1 2 0 1 0 1 2 3 4] # [0 1 2 0 1 0 1 2 3 4] # ] # the index selection of param_viewer is a # list of (batch_size, par_slice) tensors # so self.param_viewer.index_selection[s][t] # points to the indices for a given systematic # at a given position in the batch # we thus populate the access field with these indices # up to the point where we run out of bins (in case) # the paramset slice is larger than the number of bins # in which case we use a dummy index that will be masked # anyways in apply (here: 0) # access field is shape (sys, batch, globalbin) for s, syst_access in enumerate(self._access_field): for t, batch_access in enumerate(syst_access): selection = self.param_viewer.index_selection[s][t] for b, bin_access in enumerate(batch_access): self._access_field[s, t, b] = ( selection[bin_access] if bin_access < len(selection) else 0 ) self._precompute() events.subscribe('tensorlib_changed')(self._precompute)
[docs] def _precompute(self): if not self.param_viewer.index_selection: return tensorlib, _ = get_backend() self.shapefactor_mask = tensorlib.tile( tensorlib.astensor(self._shapefactor_mask, dtype="bool"), (1, 1, self.batch_size or 1, 1), ) self.access_field = tensorlib.astensor(self._access_field, dtype='int') self.shapefactor_default = tensorlib.ones( tensorlib.shape(self.shapefactor_mask) ) self.sample_ones = tensorlib.ones(tensorlib.shape(self.shapefactor_mask)[1])
[docs] def apply(self, pars): """ Returns: modification tensor: Shape (n_modifiers, n_global_samples, n_alphas, n_global_bin) """ if not self.param_viewer.index_selection: return tensorlib, _ = get_backend() if self.batch_size is None: flat_pars = pars else: flat_pars = tensorlib.reshape(pars, (-1,)) shapefactors = tensorlib.gather(flat_pars, self.access_field) results_shapefactor = tensorlib.einsum( 'mab,s->msab', shapefactors, self.sample_ones ) results_shapefactor = tensorlib.where( self.shapefactor_mask, results_shapefactor, self.shapefactor_default ) return results_shapefactor