# Licensed under a 3-clause BSD style license, see LICENSE and LICENSE_ASTROML
"""
Bayesian Block implementation
=============================
Dynamic programming algorithm for finding the optimal adaptive-width histogram. Modified from the
bayesian blocks python implementation found in astroML :cite:`VanderPlas_2012`.
* Based on Scargle et al 2012 :cite:`Scargle_2013`
* Initial Python Implementation :cite:`BB_jakevdp`
* Initial Examination in HEP context :cite:`Pollack:2017srh`
"""
from __future__ import annotations
from collections.abc import Iterable
import numpy as np
import pandas as pd
[docs]
class Prior:
"""Helper class for calculating the prior on the fitness function."""
def __init__(self, p0: float = 0.05, gamma: float | None = None):
"""
Args:
p0: False-positive rate, between 0 and 1. A lower number places a stricter penalty
against creating more bin edges, thus reducing the potential for false-positive bin edges. In general,
the larger the number of bins, the small the p0 should be to prevent the creation of spurious, jagged
bins. Defaults to 0.05.
gamma: If specified, then use this gamma to compute the general prior form,
:math:`p \\sim \\gamma^N`. If gamma is specified, p0 is ignored. Defaults to None.
"""
self.p0 = p0
self.gamma = gamma
[docs]
def calc(self, N: int) -> float:
"""
Computes the prior.
Args:
N: N-th change point.
Returns:
the prior.
"""
if self.gamma is not None:
return -np.log(self.gamma)
else:
# eq. 21 from Scargle 2012
return 4 - np.log(73.53 * self.p0 * (N**-0.478))
[docs]
def bayesian_blocks(
data: Iterable | np.ndarray,
weights: Iterable | np.ndarray | None = None,
p0: float = 0.05,
gamma: float | None = None,
) -> np.ndarray:
"""Bayesian Blocks Implementation.
This is a flexible implementation of the Bayesian Blocks algorithm described in :cite:`Scargle_2013`.
It has been modified to natively accept weighted events, for ease of use in HEP applications.
Args:
data: Input data values (one dimensional, length N). Repeat values are allowed.
weights: Weights for data (otherwise assume all data points have a weight of 1).
Must be same length as data. Defaults to None.
p0: False-positive rate, between 0 and 1. A lower number places a stricter penalty
against creating more bin edges, thus reducing the potential for false-positive bin edges. In general,
the larger the number of bins, the small the p0 should be to prevent the creation of spurious, jagged
bins. Defaults to 0.05.
gamma: If specified, then use this gamma to compute the general prior form,
:math:`p \\sim \\gamma^N`. If gamma is specified, p0 is ignored. Defaults to None.
Returns:
Array containing the (N+1) bin edges
Examples:
Unweighted data:
>>> d = np.random.normal(size=100)
>>> bins = bayesian_blocks(d, p0=0.01)
Unweighted data with repeats:
>>> d = np.random.normal(size=100)
>>> d[80:] = d[:20]
>>> bins = bayesian_blocks(d, p0=0.01)
Weighted data:
>>> d = np.random.normal(size=100)
>>> w = np.random.uniform(1,2, size=100)
>>> bins = bayesian_blocks(d, w, p0=0.01)
"""
# validate input data
data = np.asarray(data, dtype=float)
assert data.ndim == 1
# validate input weights
# set them to 1 if not given
weights = np.asarray(weights) if weights is not None else np.ones_like(data)
# initialize the prior
prior = Prior(p0, gamma)
# Place data and weights into a DataFrame.
# We want to sort the data array (without losing the associated weights), and combine duplicate
# data points by summing their weights together. We can accomplish all this with `groupby`
df = pd.DataFrame({"data": data, "weights": weights})
gb = df.groupby("data").sum()
data = gb.index.values
weights = gb.weights.values
N = weights.size
# create length-(N + 1) array of cell edges
edges = np.concatenate([data[:1], 0.5 * (data[1:] + data[:-1]), data[-1:]])
block_length = data[-1] - edges
# arrays to store the best configuration
best = np.zeros(N, dtype=float)
last = np.zeros(N, dtype=int)
# -----------------------------------------------------------------
# Start with first data cell; add one cell at each iteration
# -----------------------------------------------------------------
# last = core_loop(N, block_length, weights, fitfunc, best, last)
for R in range(N):
# Compute fit_vec : fitness of putative last block (end at R)
# T_k: width/duration of each block
T_k = block_length[: R + 1] - block_length[R + 1]
# N_k: number of elements in each block
N_k = np.cumsum(weights[: R + 1][::-1])[::-1]
# evaluate fitness function
fit_vec = N_k * (np.log(N_k / T_k))
# penalize function with prior
A_R = fit_vec - prior.calc(R + 1)
A_R[1:] += best[:R]
i_max = np.argmax(A_R)
last[R] = i_max
best[R] = A_R[i_max]
# -----------------------------------------------------------------
# Now find changepoints by iteratively peeling off the last block
# -----------------------------------------------------------------
change_points = np.zeros(N, dtype=int)
i_cp = N
ind = N
while True:
i_cp -= 1
change_points[i_cp] = ind
if ind == 0:
break
ind = last[ind - 1]
change_points = change_points[i_cp:]
return edges[change_points]