# Source code for hepstats.modeling.bayesian_blocks

# Licensed under a 3-clause BSD style license, see LICENSE and LICENSE_ASTROML
"""
Bayesian Block implementation
=============================

Dynamic programming algorithm for finding the optimal adaptive-width histogram.  Modified from the
bayesian blocks python implementation found in astroML :cite:VanderPlas_2012.

* Based on Scargle et al 2012 :cite:Scargle_2013
* Initial Python Implementation :cite:BB_jakevdp
* Initial Examination in HEP context :cite:Pollack:2017srh

"""

from __future__ import annotations

from collections.abc import Iterable

import numpy as np
import pandas as pd

[docs]
class Prior:
"""Helper class for calculating the prior on the fitness function."""

def __init__(self, p0: float = 0.05, gamma: float | None = None):
"""
Args:
p0: False-positive rate, between 0 and 1.  A lower number places a stricter penalty
against creating more bin edges, thus reducing the potential for false-positive bin edges. In general,
the larger the number of bins, the small the p0 should be to prevent the creation of spurious, jagged
bins. Defaults to 0.05.

gamma: If specified, then use this gamma to compute the general prior form,
:math:p \\sim \\gamma^N. If gamma is specified, p0 is ignored. Defaults to None.
"""

self.p0 = p0
self.gamma = gamma

[docs]
def calc(self, N: int) -> float:
"""
Computes the prior.

Args:
N: N-th change point.

Returns:
the prior.
"""
if self.gamma is not None:
return -np.log(self.gamma)
else:
# eq. 21 from Scargle 2012
return 4 - np.log(73.53 * self.p0 * (N**-0.478))

[docs]
def bayesian_blocks(
data: Iterable | np.ndarray,
weights: Iterable | np.ndarray | None = None,
p0: float = 0.05,
gamma: float | None = None,
) -> np.ndarray:
"""Bayesian Blocks Implementation.

This is a flexible implementation of the Bayesian Blocks algorithm described in :cite:Scargle_2013.
It has been modified to natively accept weighted events, for ease of use in HEP applications.

Args:
data: Input data values (one dimensional, length N). Repeat values are allowed.

weights: Weights for data (otherwise assume all data points have a weight of 1).
Must be same length as data. Defaults to None.

p0: False-positive rate, between 0 and 1.  A lower number places a stricter penalty
against creating more bin edges, thus reducing the potential for false-positive bin edges. In general,
the larger the number of bins, the small the p0 should be to prevent the creation of spurious, jagged
bins. Defaults to 0.05.

gamma: If specified, then use this gamma to compute the general prior form,
:math:p \\sim \\gamma^N. If gamma is specified, p0 is ignored. Defaults to None.

Returns:
Array containing the (N+1) bin edges

Examples:
Unweighted data:

>>> d = np.random.normal(size=100)
>>> bins = bayesian_blocks(d, p0=0.01)

Unweighted data with repeats:

>>> d = np.random.normal(size=100)
>>> d[80:] = d[:20]
>>> bins = bayesian_blocks(d, p0=0.01)

Weighted data:

>>> d = np.random.normal(size=100)
>>> w = np.random.uniform(1,2, size=100)
>>> bins = bayesian_blocks(d, w, p0=0.01)

"""
# validate input data
data = np.asarray(data, dtype=float)
assert data.ndim == 1

# validate input weights
# set them to 1 if not given
weights = np.asarray(weights) if weights is not None else np.ones_like(data)

# initialize the prior
prior = Prior(p0, gamma)

# Place data and weights into a DataFrame.
# We want to sort the data array (without losing the associated weights), and combine duplicate
# data points by summing their weights together.  We can accomplish all this with groupby

df = pd.DataFrame({"data": data, "weights": weights})
gb = df.groupby("data").sum()
data = gb.index.values
weights = gb.weights.values

N = weights.size

# create length-(N + 1) array of cell edges
edges = np.concatenate([data[:1], 0.5 * (data[1:] + data[:-1]), data[-1:]])
block_length = data[-1] - edges

# arrays to store the best configuration
best = np.zeros(N, dtype=float)
last = np.zeros(N, dtype=int)

# -----------------------------------------------------------------
# -----------------------------------------------------------------
# last = core_loop(N, block_length, weights, fitfunc, best, last)
for R in range(N):
# Compute fit_vec : fitness of putative last block (end at R)

# T_k: width/duration of each block
T_k = block_length[: R + 1] - block_length[R + 1]

# N_k: number of elements in each block
N_k = np.cumsum(weights[: R + 1][::-1])[::-1]

# evaluate fitness function
fit_vec = N_k * (np.log(N_k / T_k))

# penalize function with prior
A_R = fit_vec - prior.calc(R + 1)
A_R[1:] += best[:R]

i_max = np.argmax(A_R)
last[R] = i_max
best[R] = A_R[i_max]

# -----------------------------------------------------------------
# Now find changepoints by iteratively peeling off the last block
# -----------------------------------------------------------------
change_points = np.zeros(N, dtype=int)
i_cp = N
ind = N
while True:
i_cp -= 1
change_points[i_cp] = ind
if ind == 0:
break
ind = last[ind - 1]
change_points = change_points[i_cp:]

return edges[change_points]