Source code for pyhf.writexml

import logging

from pathlib import Path
import shutil
import xml.etree.ElementTree as ET
import numpy as np

import uproot

from pyhf.mixins import _ChannelSummaryMixin
from pyhf.schema import path as schema_path

_ROOT_DATA_FILE = None

log = logging.getLogger(__name__)

__all__ = [
    "build_channel",
    "build_data",
    "build_measurement",
    "build_modifier",
    "build_sample",
    "indent",
]


def __dir__():
    return __all__


# 'spec' gets passed through all functions as NormFactor is a unique case of having
# parameter configurations stored at the modifier-definition-spec level. This means
# that build_modifier() needs access to the measurements. The call stack is:
#
#      writexml
#          ->build_channel
#              ->build_sample
#                  ->build_modifier
#
#  Therefore, 'spec' needs to be threaded through all these calls.


def _make_hist_name(channel, sample, modifier='', prefix='hist', suffix=''):
    middle = '_'.join(filter(lambda x: x, [channel, sample, modifier]))
    return f"{prefix}{middle}{suffix}"


def _export_root_histogram(hist_name, data):
    if hist_name in _ROOT_DATA_FILE:
        raise KeyError(f"Duplicate key {hist_name} being written.")
    _ROOT_DATA_FILE[hist_name] = uproot.to_writable(
        (np.asarray(data), np.arange(len(data) + 1))
    )


# https://stackoverflow.com/a/4590052
[docs] def indent(elem, level=0): i = "\n" + level * " " if elem is not None: if not elem.text or not elem.text.strip(): elem.text = i + " " if not elem.tail or not elem.tail.strip(): elem.tail = i for subelem in elem: indent(subelem, level + 1) if not elem.tail or not elem.tail.strip(): elem.tail = i else: if level and (not elem.tail or not elem.tail.strip()): elem.tail = i
[docs] def build_measurement(measurementspec, modifiertypes): """ Build the XML measurement specification for a given measurement adhering to defs.json/#definitions/measurement. Args: measurementspec (:obj:`dict`): The measurements specification from a :class:`~pyhf.workspace.Workspace`. modifiertypes (:obj:`dict`): A mapping from modifier name (:obj:`str`) to modifier type (:obj:`str`). Returns: :class:`xml.etree.cElementTree.Element`: The XML measurement specification. """ # need to determine prefixes prefixes = { 'normsys': 'alpha_', 'histosys': 'alpha_', 'shapesys': 'gamma_', 'staterror': 'gamma_', } config = measurementspec['config'] name = measurementspec['name'] poi = config['poi'] # we want to know which parameters are fixed (constant) # and to additionally extract the luminosity information fixed_params = [] lumi = 1.0 lumierr = 0.0 for parameter in config['parameters']: if parameter.get('fixed', False): pname = parameter['name'] if pname == 'lumi': fixed_params.append('Lumi') else: prefix = prefixes.get(modifiertypes[pname], '') fixed_params.append(f'{prefix}{pname}') # we found luminosity, so handle it if parameter['name'] == 'lumi': lumi = parameter['auxdata'][0] lumierr = parameter['sigmas'][0] # define measurement meas = ET.Element( "Measurement", Name=name, Lumi=str(lumi), LumiRelErr=str(lumierr), ExportOnly=str(True), ) poiel = ET.Element('POI') poiel.text = poi meas.append(poiel) # add fixed parameters (constant) if fixed_params: se = ET.Element('ParamSetting', Const='True') se.text = ' '.join(fixed_params) meas.append(se) return meas
[docs] def build_modifier(spec, modifierspec, channelname, samplename, sampledata): if modifierspec['name'] == 'lumi': return None mod_map = { 'histosys': 'HistoSys', 'staterror': 'StatError', 'normsys': 'OverallSys', 'shapesys': 'ShapeSys', 'normfactor': 'NormFactor', 'shapefactor': 'ShapeFactor', } attrs = {'Name': modifierspec['name']} if modifierspec['type'] == 'histosys': attrs['HistoNameLow'] = _make_hist_name( channelname, samplename, modifierspec['name'], suffix='Low' ) attrs['HistoNameHigh'] = _make_hist_name( channelname, samplename, modifierspec['name'], suffix='High' ) _export_root_histogram(attrs['HistoNameLow'], modifierspec['data']['lo_data']) _export_root_histogram(attrs['HistoNameHigh'], modifierspec['data']['hi_data']) elif modifierspec['type'] == 'normsys': attrs['High'] = str(modifierspec['data']['hi']) attrs['Low'] = str(modifierspec['data']['lo']) elif modifierspec['type'] == 'normfactor': # NB: only look at first measurement for normfactor configs. In order # to dump as HistFactory XML, this has to be the same for all # measurements or it will not work correctly. Why? # # Unlike other modifiers, NormFactor has the unique circumstance of # defining its parameter configurations at the modifier level inside # the channel specification, instead of at the measurement level, like # all of the other modifiers. # # However, since I strive for perfection, the "Const" attribute will # never be set here, but at the per-measurement configuration instead # like all other parameters. This is an acceptable compromise. # # Lastly, if a normfactor parameter configuration doesn't exist in the # first measurement parameter configuration, then set defaults. val = 1 low = 0 high = 10 for p in spec['measurements'][0]['config']['parameters']: if p['name'] == modifierspec['name']: val = p.get('inits', [val])[0] low, high = p.get('bounds', [[low, high]])[0] attrs['Val'] = str(val) attrs['Low'] = str(low) attrs['High'] = str(high) elif modifierspec['type'] == 'staterror': attrs['Activate'] = 'True' attrs['HistoName'] = _make_hist_name( channelname, samplename, modifierspec['name'] ) # must be deleted, HiFa XML specification does not support 'Name' del attrs['Name'] # need to make this a relative uncertainty stored in ROOT file _export_root_histogram( attrs['HistoName'], np.divide( modifierspec['data'], sampledata, out=np.zeros_like(sampledata), where=np.asarray(sampledata) != 0, dtype='float', ).tolist(), ) elif modifierspec['type'] == 'shapesys': attrs['ConstraintType'] = 'Poisson' attrs['HistoName'] = _make_hist_name( channelname, samplename, modifierspec['name'] ) # need to make this a relative uncertainty stored in ROOT file _export_root_histogram( attrs['HistoName'], [ np.divide( a, b, out=np.zeros_like(a), where=np.asarray(b) != 0, dtype='float' ) for a, b in np.array( (modifierspec['data'], sampledata), dtype="float" ).T ], ) elif modifierspec['type'] == 'shapefactor': pass else: log.warning( f"Skipping modifier {modifierspec['name']}({modifierspec['type']}) for now" ) return None modifier = ET.Element(mod_map[modifierspec['type']], **attrs) return modifier
[docs] def build_sample(spec, samplespec, channelname): histname = _make_hist_name(channelname, samplespec['name']) attrs = { 'Name': samplespec['name'], 'HistoName': histname, 'InputFile': _ROOT_DATA_FILE.file_path, 'NormalizeByTheory': 'False', } sample = ET.Element('Sample', **attrs) for modspec in samplespec['modifiers']: # if lumi modifier added for this sample, need to set NormalizeByTheory if modspec['type'] == 'lumi': sample.attrib.update({'NormalizeByTheory': 'True'}) modifier = build_modifier( spec, modspec, channelname, samplespec['name'], samplespec['data'] ) if modifier is not None: sample.append(modifier) _export_root_histogram(histname, samplespec['data']) return sample
[docs] def build_data(obsspec, channelname): histname = _make_hist_name(channelname, 'data') data = ET.Element('Data', HistoName=histname, InputFile=_ROOT_DATA_FILE.file_path) observation = next((obs for obs in obsspec if obs['name'] == channelname), None) _export_root_histogram(histname, observation['data']) return data
[docs] def build_channel(spec, channelspec, obsspec): channel = ET.Element( 'Channel', Name=channelspec['name'], InputFile=_ROOT_DATA_FILE.file_path ) if obsspec: data = build_data(obsspec, channelspec['name']) channel.append(data) for samplespec in channelspec['samples']: channel.append(build_sample(spec, samplespec, channelspec['name'])) return channel
def writexml(spec, specdir, data_rootdir, resultprefix): global _ROOT_DATA_FILE shutil.copyfile( schema_path.joinpath('HistFactorySchema.dtd'), Path(specdir).parent.joinpath('HistFactorySchema.dtd'), ) combination = ET.Element( "Combination", OutputFilePrefix=str(Path(specdir).joinpath(resultprefix)) ) with uproot.recreate(Path(data_rootdir).joinpath('data.root')) as _ROOT_DATA_FILE: for channelspec in spec['channels']: channelfilename = str( Path(specdir).joinpath(f'{resultprefix}_{channelspec["name"]}.xml') ) with open(channelfilename, "w", encoding="utf-8") as channelfile: channel = build_channel(spec, channelspec, spec.get('observations')) indent(channel) channelfile.write( "<!DOCTYPE Channel SYSTEM '../HistFactorySchema.dtd'>\n\n" ) channelfile.write( ET.tostring(channel, encoding='utf-8').decode('utf-8') ) inp = ET.Element("Input") inp.text = channelfilename combination.append(inp) # need information about modifier types to get the right prefix in measurement mixin = _ChannelSummaryMixin(channels=spec['channels']) for measurement in spec['measurements']: combination.append(build_measurement(measurement, dict(mixin.modifiers))) indent(combination) return b"<!DOCTYPE Combination SYSTEM 'HistFactorySchema.dtd'>\n\n" + ET.tostring( combination, encoding='utf-8' )