"""
pyhf workspaces hold the three data items:
* the statistical model p(data|parameters)
* the observed data (optional)
* fit configurations ("measurements")
"""
from __future__ import annotations
import collections
import copy
import logging
from typing import ClassVar
import jsonpatch
from pyhf import exceptions, schema
from pyhf.mixins import _ChannelSummaryMixin
from pyhf.pdf import Model
import functools
import operator
log = logging.getLogger(__name__)
__all__ = ["Workspace"]
def __dir__():
return __all__
def _join_items(join, left_items, right_items, key='name', deep_merge_key=None):
"""
Join two lists of dictionaries along the given key.
This is meant to be as generic as possible for any pairs of lists of dictionaries for many join operations.
Args:
join (:obj:`str`): The join operation to apply. See ~pyhf.workspace.Workspace for valid join operations.
left_items (:obj:`list`): A list of dictionaries to join on the left
right_items (:obj:`list`): A list of dictionaries to join on the right
deep_merge_key (:obj:`str`): A key on which to deeply merge items if set.
Returns:
:obj:`list`: A joined list of dictionaries.
"""
if join == 'right outer':
primary_items, secondary_items = right_items, left_items
else:
primary_items, secondary_items = left_items, right_items
joined_items = copy.deepcopy(primary_items)
keys = [item[key] for item in joined_items]
for secondary_item in secondary_items:
# first, check for deep merging
if secondary_item[key] in keys and deep_merge_key is not None:
_deep_left_items = joined_items[keys.index(secondary_item[key])][
deep_merge_key
]
_deep_right_items = secondary_item[deep_merge_key]
joined_items[keys.index(secondary_item[key])][deep_merge_key] = _join_items(
'left outer', _deep_left_items, _deep_right_items
)
# next, move over whole items where possible:
# - if no join logic
# - if outer join and item on right is not in left
# - if left outer join and item (by name) is on right and not in left
# - if right outer join and item (by name) is on left and not in right
# NB: this will be slow for large numbers of items
elif (
join == 'none'
or (join in ['outer'] and secondary_item not in primary_items)
or (
join in ['left outer', 'right outer']
and secondary_item[key] not in keys
)
):
joined_items.append(copy.deepcopy(secondary_item))
return joined_items
def _join_versions(join, left_version, right_version):
"""
Join two workspace versions.
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: Versions are incompatible.
Args:
join (:obj:`str`): The join operation to apply. See ~pyhf.workspace.Workspace for valid join operations.
left_version (:obj:`str`): The left workspace version.
right_version (:obj:`str`): The right workspace version.
Returns:
:obj:`str`: The workspace version.
"""
if left_version != right_version:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces of different versions cannot be combined: {left_version} != {right_version}"
)
return left_version
def _join_channels(join, left_channels, right_channels, merge=False):
"""
Join two workspace channel specifications.
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: Channel specifications are incompatible.
Args:
join (:obj:`str`): The join operation to apply. See ~pyhf.workspace.Workspace for valid join operations.
left_channels (:obj:`list`): The left channel specification.
right_channels (:obj:`list`): The right channel specification.
merge (:obj:`bool`): Whether to deeply merge channels or not.
Returns:
:obj:`list`: A joined list of channels. Each channel follows the :obj:`defs.json#/definitions/channel` `schema <https://scikit-hep.org/pyhf/likelihood.html#channel>`__
"""
joined_channels = _join_items(
join, left_channels, right_channels, deep_merge_key='samples' if merge else None
)
if join == 'none':
common_channels = {c['name'] for c in left_channels}.intersection(
c['name'] for c in right_channels
)
if common_channels:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces cannot have any channels in common with the same name: {common_channels}. You can also try a different join operation: {Workspace.valid_joins}."
)
elif join == 'outer':
counted_channels = collections.Counter(
channel['name'] for channel in joined_channels
)
incompatible_channels = [
channel for channel, count in counted_channels.items() if count > 1
]
if incompatible_channels:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces cannot have channels in common with incompatible structure: {incompatible_channels}. You can also try a different join operation: {Workspace.valid_joins}."
)
return joined_channels
def _join_observations(join, left_observations, right_observations):
"""
Join two workspace observation specifications.
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: Observation specifications are incompatible.
Args:
join (:obj:`str`): The join operation to apply. See ~pyhf.workspace.Workspace for valid join operations.
left_observations (:obj:`list`): The left observation specification.
right_observations (:obj:`list`): The right observation specification.
Returns:
:obj:`list`: A joined list of observations. Each observation follows the :obj:`defs.json#/definitions/observation` `schema <https://scikit-hep.org/pyhf/likelihood.html#observations>`__
"""
joined_observations = _join_items(join, left_observations, right_observations)
if join == 'none':
common_observations = {obs['name'] for obs in left_observations}.intersection(
obs['name'] for obs in right_observations
)
if common_observations:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces cannot have any observations in common with the same name: {common_observations}. You can also try a different join operation: {Workspace.valid_joins}."
)
elif join == 'outer':
counted_observations = collections.Counter(
observation['name'] for observation in joined_observations
)
incompatible_observations = [
observation
for observation, count in counted_observations.items()
if count > 1
]
if incompatible_observations:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces cannot have observations in common with incompatible structure: {incompatible_observations}. You can also try a different join operation: {Workspace.valid_joins}."
)
return joined_observations
def _join_parameter_configs(measurement_name, left_parameters, right_parameters):
"""
Join two measurement parameter config specifications.
Only uses by :method:`_join_measurements` when join='outer'.
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: Parameter configuration specifications are incompatible.
Args:
measurement_name (:obj:`str`): The name of the measurement being joined (a detail for raising exceptions correctly)
left_parameters (:obj:`list`): The left parameter configuration specification.
right_parameters (:obj:`list`): The right parameter configuration specification.
Returns:
:obj:`list`: A joined list of parameter configurations. Each parameter configuration follows the :obj:`defs.json#/definitions/config` schema
"""
joined_parameter_configs = _join_items('outer', left_parameters, right_parameters)
counted_parameter_configs = collections.Counter(
parameter['name'] for parameter in joined_parameter_configs
)
incompatible_parameter_configs = [
parameter for parameter, count in counted_parameter_configs.items() if count > 1
]
if incompatible_parameter_configs:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces cannot have a measurement ({measurement_name}) with incompatible parameter configs: {incompatible_parameter_configs}. You can also try a different join operation: {Workspace.valid_joins}."
)
return joined_parameter_configs
def _join_measurements(join, left_measurements, right_measurements):
"""
Join two workspace measurement specifications.
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: Measurement specifications are incompatible.
Args:
join (:obj:`str`): The join operation to apply. See ~pyhf.workspace.Workspace for valid join operations.
left_measurements (:obj:`list`): The left measurement specification.
right_measurements (:obj:`list`): The right measurement specification.
Returns:
:obj:`list`: A joined list of measurements. Each measurement follows the :obj:`defs.json#/definitions/measurement` `schema <https://scikit-hep.org/pyhf/likelihood.html#measurements>`__
"""
joined_measurements = _join_items(join, left_measurements, right_measurements)
if join == 'none':
common_measurements = {meas['name'] for meas in left_measurements}.intersection(
meas['name'] for meas in right_measurements
)
if common_measurements:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces cannot have any measurements in common with the same name: {common_measurements}. You can also try a different join operation: {Workspace.valid_joins}."
)
elif join == 'outer':
# need to store a mapping of measurement name to all measurement objects with that name
_measurement_mapping = {}
for measurement in joined_measurements:
_measurement_mapping.setdefault(measurement['name'], []).append(measurement)
# first check for incompatible POI
# then merge parameter configs
incompatible_poi = [
measurement_name
for measurement_name, measurements in _measurement_mapping.items()
if len({measurement['config']['poi'] for measurement in measurements}) > 1
]
if incompatible_poi:
raise exceptions.InvalidWorkspaceOperation(
f"Workspaces cannot have the same measurements with incompatible POI: {incompatible_poi}."
)
joined_measurements = []
for measurement_name, measurements in _measurement_mapping.items():
if len(measurements) != 1:
new_measurement = {
'name': measurement_name,
'config': {
'poi': measurements[0]['config']['poi'],
'parameters': _join_parameter_configs(
measurement_name,
*(
measurement['config']['parameters']
for measurement in measurements
),
),
},
}
else:
new_measurement = measurements[0]
joined_measurements.append(new_measurement)
return joined_measurements
[docs]
class Workspace(_ChannelSummaryMixin, dict):
"""
A JSON-serializable object that is built from an object that follows the :obj:`workspace.json` `schema <https://scikit-hep.org/pyhf/likelihood.html#workspace>`__.
"""
valid_joins: ClassVar[list[str]] = ['none', 'outer', 'left outer', 'right outer']
[docs]
def __init__(self, spec, validate: bool = True, **config_kwargs):
"""
Workspaces hold the model, data and measurements.
Args:
spec (:obj:`jsonable`): The HistFactory JSON specification
validate (:obj:`bool`): Whether to validate against a JSON schema
config_kwargs: Possible keyword arguments for the workspace configuration
Returns:
model (:class:`~pyhf.workspace.Workspace`): The Workspace instance
"""
spec = copy.deepcopy(spec)
self.schema = config_kwargs.pop('schema', 'workspace.json')
self.version = config_kwargs.pop('version', spec.get('version', None))
# run jsonschema validation of input specification against the (provided) schema
if validate:
log.info(f"Validating spec against schema: {self.schema}")
schema.validate(spec, self.schema, version=self.version)
super().__init__(spec, channels=spec['channels'])
self.measurement_names = []
for measurement in self.get('measurements', []):
self.measurement_names.append(measurement['name'])
self.observations = {}
for obs in self['observations']:
self.observations[obs['name']] = obs['data']
if config_kwargs:
raise exceptions.Unsupported(
f"Unsupported options were passed in: {list(config_kwargs)}."
)
def __eq__(self, other):
"""Equality is defined as equal dict representations."""
if not isinstance(other, Workspace):
return False
return dict(self) == dict(other)
def __ne__(self, other):
"""Negation of equality."""
return not self == other
def __repr__(self):
"""Representation of the Workspace."""
return object.__repr__(self)
[docs]
def get_measurement(self, measurement_name=None, measurement_index=None):
"""
Get a measurement object.
The following logic is used:
1. if the measurement name is given, find the measurement for the given name
2. if the measurement index is given, return the measurement at that index
3. if there are measurements but none of the above have been specified, return the 0th measurement
Raises:
~pyhf.exceptions.InvalidMeasurement: If the measurement was not found
Args:
measurement_name (:obj:`str`): The name of the measurement to use
measurement_index (:obj:`int`): The index of the measurement to use
Returns:
:obj:`dict`: A measurement object adhering to the schema defs.json#/definitions/measurement
"""
measurement = None
if self.measurement_names:
if measurement_name is not None:
if measurement_name not in self.measurement_names:
log.debug(f"measurements defined: {self.measurement_names}")
raise exceptions.InvalidMeasurement(
f'no measurement by name \'{measurement_name:s}\' was found in the workspace, pick from one of the valid ones above'
)
measurement = self['measurements'][
self.measurement_names.index(measurement_name)
]
else:
if measurement_index is None and len(self.measurement_names) > 1:
log.warning(
'multiple measurements defined. Taking the first measurement.'
)
measurement_index = (
measurement_index if measurement_index is not None else 0
)
try:
measurement = self['measurements'][measurement_index]
except IndexError:
raise exceptions.InvalidMeasurement(
f"The measurement index {measurement_index} is out of bounds as only {len(self.measurement_names)} measurement(s) have been defined."
)
else:
raise exceptions.InvalidMeasurement("No measurements have been defined.")
schema.validate(measurement, 'measurement.json', version=self.version)
return measurement
[docs]
def model(
self,
measurement_name=None,
measurement_index=None,
patches=None,
**config_kwargs,
):
"""
Create a model object with/without patches applied.
See :func:`pyhf.workspace.Workspace.get_measurement` and :class:`pyhf.pdf.Model` for possible keyword arguments.
Args:
measurement_name (:obj:`str`): The name of the measurement to use
in :func:`~pyhf.workspace.Workspace.get_measurement`.
measurement_index (:obj:`int`): The index of the measurement to use
in :func:`~pyhf.workspace.Workspace.get_measurement`.
patches (:obj:`list` of :class:`jsonpatch.JsonPatch` or :class:`pyhf.patchset.Patch`):
A list of patches to apply to the model specification.
config_kwargs: Possible keyword arguments for the model
configuration.
See :class:`~pyhf.pdf.Model` for more details.
poi_name (:obj:`str` or :obj:`None`): Specify this keyword argument
to override the default parameter of interest specified in the
measurement.
Set to :obj:`None` for a POI-less model.
Returns:
~pyhf.pdf.Model: A model object adhering to the schema model.json
"""
measurement = self.get_measurement(
measurement_name=measurement_name,
measurement_index=measurement_index,
)
# set poi_name if the user does not provide it
config_kwargs.setdefault('poi_name', measurement['config']['poi'])
log.debug(f"model being created for measurement {measurement['name']:s}")
modelspec = {
'channels': self['channels'],
'parameters': measurement['config']['parameters'],
}
patches = patches or []
for patch in patches:
modelspec = jsonpatch.JsonPatch(patch).apply(modelspec)
return Model(modelspec, **config_kwargs)
[docs]
def data(self, model, include_auxdata=True):
"""
Return the data for the supplied model with or without auxiliary data from the model.
The model is needed as the order of the data depends on the order of the channels in the model.
Raises:
KeyError: Invalid or missing channel
Args:
model (~pyhf.pdf.Model): A model object adhering to the schema model.json
include_auxdata (:obj:`bool`): Whether to include auxiliary data from the model or not
Returns:
:obj:`list`: data
"""
try:
observed_data = functools.reduce(
operator.iadd, (self.observations[c] for c in model.config.channels), []
)
except KeyError:
log.error(
"Invalid channel: the workspace does not have observation data for one of the channels in the model.",
exc_info=True,
)
raise
if include_auxdata:
observed_data += model.config.auxdata
return observed_data
[docs]
def _prune_and_rename(
self,
prune_modifiers=None,
prune_modifier_types=None,
prune_samples=None,
prune_channels=None,
prune_measurements=None,
rename_modifiers=None,
rename_samples=None,
rename_channels=None,
rename_measurements=None,
):
"""
Return a new, pruned, renamed workspace specification. This will not modify the original workspace.
Pruning removes pieces of the workspace whose name or type matches the
user-provided lists. The pruned, renamed workspace must also be a valid
workspace.
A workspace is composed of many named components, such as channels and
samples, as well as types of systematics (e.g. `histosys`). Components
can be removed (pruned away) filtering on name or be renamed according
to the provided :obj:`dict` mapping. Additionally, modifiers of
specific types can be removed (pruned away).
This function also handles specific peculiarities, such as
renaming/removing a channel which needs to rename/remove the
corresponding `observation`.
Args:
prune_modifiers: A :obj:`list` of modifiers to prune.
prune_modifier_types: A :obj:`list` of modifier types to prune.
prune_samples: A :obj:`list` of samples to prune.
prune_channels: A :obj:`list` of channels to prune.
prune_measurements: A :obj:`list` of measurements to prune.
rename_modifiers: A :obj:`dict` mapping old modifier name to new modifier name.
rename_samples: A :obj:`dict` mapping old sample name to new sample name.
rename_channels: A :obj:`dict` mapping old channel name to new channel name.
rename_measurements: A :obj:`dict` mapping old measurement name to new measurement name.
Returns:
~pyhf.workspace.Workspace: A new workspace object with the specified components removed or renamed
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: An item name to prune or rename does not exist in the workspace.
"""
# avoid mutable defaults
prune_modifiers = [] if prune_modifiers is None else prune_modifiers
prune_modifier_types = (
[] if prune_modifier_types is None else prune_modifier_types
)
prune_samples = [] if prune_samples is None else prune_samples
prune_channels = [] if prune_channels is None else prune_channels
prune_measurements = [] if prune_measurements is None else prune_measurements
rename_modifiers = {} if rename_modifiers is None else rename_modifiers
rename_samples = {} if rename_samples is None else rename_samples
rename_channels = {} if rename_channels is None else rename_channels
rename_measurements = {} if rename_measurements is None else rename_measurements
for modifier_type in prune_modifier_types:
if modifier_type not in dict(self.modifiers).values():
raise exceptions.InvalidWorkspaceOperation(
f"{modifier_type} is not one of the modifier types in this workspace."
)
for modifier_name in (*prune_modifiers, *rename_modifiers):
if modifier_name not in dict(self.modifiers):
raise exceptions.InvalidWorkspaceOperation(
f"{modifier_name} is not one of the modifiers in this workspace."
)
for sample_name in (*prune_samples, *rename_samples):
if sample_name not in self.samples:
raise exceptions.InvalidWorkspaceOperation(
f"{sample_name} is not one of the samples in this workspace."
)
for channel_name in (*prune_channels, *rename_channels):
if channel_name not in self.channels:
raise exceptions.InvalidWorkspaceOperation(
f"{channel_name} is not one of the channels in this workspace."
)
for measurement_name in (*prune_measurements, *rename_measurements):
if measurement_name not in self.measurement_names:
raise exceptions.InvalidWorkspaceOperation(
f"{measurement_name} is not one of the measurements in this workspace."
)
newspec = {
'channels': [
{
'name': rename_channels.get(channel['name'], channel['name']),
'samples': [
{
'name': rename_samples.get(sample['name'], sample['name']),
'data': sample['data'],
'modifiers': [
dict(
modifier,
name=rename_modifiers.get(
modifier['name'], modifier['name']
),
)
for modifier in sample['modifiers']
if modifier['name'] not in prune_modifiers
and modifier['type'] not in prune_modifier_types
],
}
for sample in channel['samples']
if sample['name'] not in prune_samples
],
}
for channel in self['channels']
if channel['name'] not in prune_channels
],
'measurements': [
{
'name': rename_measurements.get(
measurement['name'], measurement['name']
),
'config': {
'parameters': [
dict(
parameter,
name=rename_modifiers.get(
parameter['name'], parameter['name']
),
)
for parameter in measurement['config']['parameters']
if parameter['name'] not in prune_modifiers
],
'poi': rename_modifiers.get(
measurement['config']['poi'], measurement['config']['poi']
),
},
}
for measurement in self['measurements']
if measurement['name'] not in prune_measurements
],
'observations': [
dict(
copy.deepcopy(observation),
name=rename_channels.get(observation['name'], observation['name']),
)
for observation in self['observations']
if observation['name'] not in prune_channels
],
'version': self['version'],
}
return Workspace(newspec)
[docs]
def prune(
self,
modifiers=None,
modifier_types=None,
samples=None,
channels=None,
measurements=None,
):
"""
Return a new, pruned workspace specification. This will not modify the original workspace.
The pruned workspace must also be a valid workspace.
Args:
modifiers: A :obj:`list` of modifiers to prune.
modifier_types: A :obj:`list` of modifier types to prune.
samples: A :obj:`list` of samples to prune.
channels: A :obj:`list` of channels to prune.
measurements: A :obj:`list` of measurements to prune.
Returns:
~pyhf.workspace.Workspace: A new workspace object with the specified components removed
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: An item name to prune does not exist in the workspace.
"""
# avoid mutable defaults
modifiers = [] if modifiers is None else modifiers
modifier_types = [] if modifier_types is None else modifier_types
samples = [] if samples is None else samples
channels = [] if channels is None else channels
measurements = [] if measurements is None else measurements
return self._prune_and_rename(
prune_modifiers=modifiers,
prune_modifier_types=modifier_types,
prune_samples=samples,
prune_channels=channels,
prune_measurements=measurements,
)
[docs]
def rename(self, modifiers=None, samples=None, channels=None, measurements=None):
"""
Return a new workspace specification with certain elements renamed.
This will not modify the original workspace.
The renamed workspace must also be a valid workspace.
Args:
modifiers: A :obj:`dict` mapping old modifier name to new modifier name.
samples: A :obj:`dict` mapping old sample name to new sample name.
channels: A :obj:`dict` mapping old channel name to new channel name.
measurements: A :obj:`dict` mapping old measurement name to new measurement name.
Returns:
~pyhf.workspace.Workspace: A new workspace object with the specified components renamed
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: An item name to rename does not exist in the workspace.
"""
# avoid mutable defaults
modifiers = {} if modifiers is None else modifiers
samples = {} if samples is None else samples
channels = {} if channels is None else channels
measurements = {} if measurements is None else measurements
return self._prune_and_rename(
rename_modifiers=modifiers,
rename_samples=samples,
rename_channels=channels,
rename_measurements=measurements,
)
[docs]
@classmethod
def combine(
cls, left, right, join='none', merge_channels=False, validate: bool = True
):
"""
Return a new workspace specification that is the combination of the two workspaces.
The new workspace must also be a valid workspace. A combination of
workspaces is done by combining the set of:
- channels,
- observations, and
- measurements
between the two workspaces. If the two workspaces have modifiers that
follow the same naming convention, then correlations across the two
workspaces may be possible. In particular, the `lumi` modifier will be
fully-correlated.
If the two workspaces have the same measurement (with the same POI),
those measurements will get merged.
Raises:
~pyhf.exceptions.InvalidWorkspaceOperation: The workspaces have common channel names, incompatible measurements, or incompatible schema versions.
Args:
left (~pyhf.workspace.Workspace): A workspace
right (~pyhf.workspace.Workspace): Another workspace
join (:obj:`str`): How to join the two workspaces. Pick from "none", "outer", "left outer", or "right outer".
merge_channels (:obj:`bool`): Whether or not to merge channels when performing the combine. This is only done with "outer", "left outer", and "right outer" options.
validate (:obj:`bool`): Whether to validate against a JSON schema.
Returns:
~pyhf.workspace.Workspace: A new combined workspace object
"""
if join not in Workspace.valid_joins:
raise ValueError(
f"Workspaces must be joined using one of the valid join operations ({Workspace.valid_joins}); not {join}"
)
if merge_channels and join not in ['outer', 'left outer', 'right outer']:
raise ValueError(
f"You can only merge channels using the 'outer', 'left outer', or 'right outer' join operations; not {join}"
)
if join in ['left outer', 'right outer']:
log.warning(
"You are using an unsafe join operation. This will silence exceptions that might be raised during a normal 'outer' operation."
)
new_version = _join_versions(join, left['version'], right['version'])
new_channels = _join_channels(
join, left['channels'], right['channels'], merge=merge_channels
)
new_observations = _join_observations(
join, left['observations'], right['observations']
)
new_measurements = _join_measurements(
join, left['measurements'], right['measurements']
)
newspec = {
'channels': new_channels,
'measurements': new_measurements,
'observations': new_observations,
'version': new_version,
}
return cls(newspec, validate=validate)
[docs]
@classmethod
def sorted(cls, workspace):
"""
Return a new workspace specification that is sorted.
Args:
workspace (~pyhf.workspace.Workspace): A workspace to sort
Returns:
~pyhf.workspace.Workspace: A new sorted workspace object
"""
newspec = copy.deepcopy(dict(workspace))
newspec['channels'].sort(key=lambda e: e['name'])
for channel in newspec['channels']:
channel['samples'].sort(key=lambda e: e['name'])
for sample in channel['samples']:
sample['modifiers'].sort(key=lambda e: (e['name'], e['type']))
newspec['measurements'].sort(key=lambda e: e['name'])
for measurement in newspec['measurements']:
measurement['config']['parameters'].sort(key=lambda e: e['name'])
newspec['observations'].sort(key=lambda e: e['name'])
return cls(newspec)
[docs]
@classmethod
def build(cls, model, data, name='measurement', validate: bool = True):
"""
Build a workspace from model and data.
Args:
model (~pyhf.pdf.Model): A model to store into a workspace
data (:obj:`tensor`): A array holding observations to store into a workspace
name (:obj:`str`): The name of the workspace measurement
validate (:obj:`bool`): Whether to validate against a JSON schema
Returns:
~pyhf.workspace.Workspace: A new workspace object
"""
workspace = copy.deepcopy(dict(channels=model.spec['channels']))
workspace['version'] = schema.version
workspace['measurements'] = [
{
'name': name,
'config': {
'poi': model.config.poi_name,
'parameters': [
{
"bounds": [
list(x)
for x in parset_spec['paramset'].suggested_bounds
],
"inits": parset_spec['paramset'].suggested_init,
"fixed": parset_spec['paramset'].suggested_fixed_as_bool,
"name": parset_name,
}
for parset_name, parset_spec in model.config.par_map.items()
],
},
}
]
workspace['observations'] = [
{'name': k, 'data': list(data[model.config.channel_slices[k]])}
for k in model.config.channels
]
return cls(workspace, validate=validate)