# ===================================================
#
# Copyright (c) 2023-2024
# SPARKX Team
#
# GNU General Public License (GPLv3 or later)
#
# ===================================================
from sparkx.flow import FlowInterface
from sparkx.Particle import Particle
import numpy as np
import warnings
import random as rd
from typing import Optional, Union, List, Any
rd.seed(42)
[docs]
class PCAFlow(FlowInterface.FlowInterface):
"""
This class implements the flow analysis with Principal Component
Analysis (PCA).
PCAFlow is designed for analyzing anisotropic flow in high-energy nuclear
collisions using two-particle correlations.
It uses Principal Component Analysis (PCA) to extract flow information from
particle data.
This class implements the method proposed in:
- [1] `Phys.Rev.Lett. 114 (2015) 15, 152301 [nucl-th/1410.7739] <https://inspirehep.net/literature/1324816>`__
The implementation is done in a similar way as in:
- [2] `Bachelor thesis D.J.W. Verweij (2016) <https://studenttheses.uu.nl/handle/20.500.12932/26817>`__
Parameters
----------
n : int, optional
The flow harmonic to compute. Must be a positive integer. Default is 2.
alpha : int, optional
The order in sub-leading flow up to which the flow is computed.
Must be an integer greater than or equal to 1. Default is 2.
number_subcalc : int, optional
The number of sub-calculations to estimate the error of the flow.
Must be an integer greater than or equal to 2. Default is 4.
Methods
-------
differential_flow:
Computes the differential flow.
Pearson_correlation:
Returns the Pearson coefficient and its uncertainty if the flow is
already computed.
Attributes
----------
n_ : int
The flow harmonic to compute.
alpha_ : int
The order in sub-leading flow up to which the flow is computed.
number_subcalc_ : int
The number of sub-calculations to estimate the error of the flow.
number_events_ : int or None
The total number of events.
subcalc_counter_ : int
A counter to keep track of the current sub-calculation.
normalization_ : None or numpy.ndarray
Normalization factors for each bin.
bin_multiplicity_total_ : None or numpy.ndarray
Total multiplicity in each bin across all events.
sigma_multiplicity_total_ : list of numpy.ndarray
List of arrays containing the multiplicity in each bin for each
sub-calculation.
number_events_subcalc_ : None or numpy.ndarray
Number of events used in each sub-calculation.
QnRe_total_ : None or numpy.ndarray
Real part of the flow vector for each bin across all events.
QnIm_total_ : None or numpy.ndarray
Imaginary part of the flow vector for each bin across all events.
SigmaQnReSub_total_ : None or numpy.ndarray
Real part of the flow vector for each bin and sub-calculation.
SigmaQnImSub_total_ : None or numpy.ndarray
Imaginary part of the flow vector for each bin and sub-calculation.
VnDelta_total_ : None or numpy.ndarray
Covariance matrix for flow vectors across all events.
SigmaVnDelta_total_ : None or numpy.ndarray
Covariance matrix for flow vectors for each sub-calculation.
FlowSubCalc_ : None or numpy.ndarray
An array containing the flow magnitude for sub-calculations.
Flow_ : None or numpy.ndarray
An array containing the flow magnitude.
FlowUncertainty_ : None or numpy.ndarray
An array containing the uncertainty of the flow.
Pearson_r_ : None or numpy.ndarray
An array containing the Pearson correlation between two bins.
Pearson_r_uncertainty_ : None or numpy.ndarray
An array containing the Pearson correlation uncertainty between two bins.
Raises
------
TypeError
If :code:`n`, :code:`alpha`, or :code:`number_subcalc` is not an integer.
ValueError
If :code:`n` is not a positive integer, alpha is less than 1,
or :code:`number_subcalc` is less than 2.
"""
def __init__(
self, n: int = 2, alpha: int = 2, number_subcalc: int = 4
) -> None:
# flow harmonic to compute
if not isinstance(n, int):
raise TypeError("n has to be int")
elif n <= 0:
raise ValueError(
"n-th harmonic with value n<=0 can not be computed"
)
else:
self.n_ = n
# order in sub-leading flow up to which the flow is computed
if not isinstance(alpha, int):
raise TypeError("alpha has to be int")
elif alpha < 1:
raise ValueError("alpha has to be >= 1")
else:
self.alpha_ = alpha
# number of sub-calculations to estimate the error of the flow
if not isinstance(number_subcalc, int):
raise TypeError("number_subcalc has to be int")
elif number_subcalc < 2:
raise ValueError("number_subcalc has to be >= 2")
else:
self.number_subcalc_ = number_subcalc
self.number_events_: Optional[int] = None
self.subcalc_counter_: int = 0
self.normalization_: Optional[np.ndarray] = None
self.bin_multiplicity_total_: Optional[np.ndarray] = None
self.sigma_multiplicity_total_: List[np.ndarray] = []
self.number_events_subcalc_: Optional[np.ndarray] = None
self.QnRe_total_: Optional[np.ndarray] = None
self.QnIm_total_: Optional[np.ndarray] = None
self.SigmaQnReSub_total_: Optional[np.ndarray] = None
self.SigmaQnImSub_total_: Optional[np.ndarray] = None
self.VnDelta_total_: Optional[np.ndarray] = None
self.SigmaVnDelta_total_: Optional[np.ndarray] = None
self.FlowSubCalc_: Optional[np.ndarray] = None
self.Flow_: Optional[np.ndarray] = None
self.FlowUncertainty_: Optional[np.ndarray] = None
self.Pearson_r_: Optional[np.ndarray] = None
self.Pearson_r_uncertainty_: Optional[np.ndarray] = None
def integrated_flow(self, particle_data: List[List[Particle]]) -> None:
warnings.warn("Integrated flow is not supported for PCAFlow")
return None
def __compute_normalization(
self, bins: Union[List[float], np.ndarray]
) -> None:
self.normalization_ = np.zeros((len(bins) - 1))
bin_widths = np.diff(bins)
for bin in range(len(bins) - 1):
self.normalization_[bin] = (
1.0 / (2.0 * np.pi * bin_widths[bin]) ** 2.0
)
def __update_event(
self,
event_data: List[Particle],
bins: Union[List[float], np.ndarray],
flow_as_function_of: str,
event_number: int,
) -> None:
"""
Update the anisotropic flow calculations based on particle data from
a single event.
Parameters
----------
event_data : list
List of particle data for a single event.
bins : list or np.ndarray
Bins used for the flow calculation.
flow_as_function_of : str
Variable on which the flow is calculated ("pT", "rapidity"
or "pseudorapidity").
event_number : int
Index of the current event.
Returns
-------
None
"""
if self.normalization_ is None:
raise TypeError(
"'normalization_' is None. It must be initialized before calling the '__update_event' function."
)
if self.bin_multiplicity_total_ is None:
raise TypeError(
"'bin_multiplicity_total_' is None. It must be initialized before calling the '__update_event' function."
)
if self.sigma_multiplicity_total_ is None:
raise TypeError(
"'sigma_multiplicity_total_' is None. It must be initialized before calling the '__update_event' function."
)
if self.number_events_subcalc_ is None:
raise TypeError(
"'number_events_subcalc_' is None. It must be initialized before calling the '__update_event' function."
)
if self.QnRe_total_ is None:
raise TypeError(
"'QnRe_total_' is None. It must be initialized before calling the '__update_event' function."
)
if self.QnIm_total_ is None:
raise TypeError(
"'QnIm_total_' is None. It must be initialized before calling the '__update_event' function."
)
if self.SigmaQnReSub_total_ is None:
raise TypeError(
"'SigmaQnReSub_total_' is None. It must be initialized before calling the '__update_event' function."
)
if self.SigmaQnImSub_total_ is None:
raise TypeError(
"'SigmaQnImSub_total_' is None. It must be initialized before calling the '__update_event' function."
)
if self.VnDelta_total_ is None:
raise TypeError(
"'VnDelta_total_' is None. It must be initialized before calling the '__update_event' function."
)
if self.SigmaVnDelta_total_ is None:
raise TypeError(
"'SigmaVnDelta' is None. It must be initialized before calling the '__update_event' function."
)
number_bins = len(bins) - 1
bin_multiplicity_event = np.zeros(number_bins)
QnRe = np.zeros(number_bins)
QnIm = np.zeros(number_bins)
SigmaQnReSubTot = np.zeros((number_bins, self.number_subcalc_))
SigmaQnImSubTot = np.zeros((number_bins, self.number_subcalc_))
VnDelta_event = np.zeros((number_bins, number_bins))
SigmaVnDelta_event = np.zeros(
(number_bins, number_bins, self.number_subcalc_)
)
if event_number == 0:
self.bin_multiplicity_total_ = np.zeros(number_bins)
for i in range(self.number_subcalc_):
self.sigma_multiplicity_total_.append(np.zeros(number_bins))
self.number_events_subcalc_ = np.zeros(self.number_subcalc_)
self.QnRe_total_ = np.zeros(number_bins)
self.QnIm_total_ = np.zeros(number_bins)
self.SigmaQnReSub_total_ = np.zeros(
(number_bins, self.number_subcalc_)
)
self.SigmaQnImSub_total_ = np.zeros(
(number_bins, self.number_subcalc_)
)
self.VnDelta_total_ = np.zeros((number_bins, number_bins))
self.SigmaVnDelta_total_ = np.zeros(
(number_bins, number_bins, self.number_subcalc_)
)
# update the sub-calculation counter if needed
if self.number_events_ is not None:
number_events_subcalc = self.number_events_ // self.number_subcalc_
if (
(event_number % number_events_subcalc == 0)
and (event_number != 0)
and (self.subcalc_counter_ < self.number_subcalc_ - 1)
):
self.subcalc_counter_ += 1
random_reaction_plane = rd.random() * 2.0 * np.pi
# loop over all event particles and compute the flow vectors in the bins
for particle in event_data:
if flow_as_function_of == "pT":
val = particle.pT_abs()
elif flow_as_function_of == "rapidity":
val = particle.rapidity()
elif flow_as_function_of == "pseudorapidity":
val = particle.pseudorapidity()
bin_index = np.digitize(val, bins, right=False) - 1
# handle the case in which the particle is not in the binned phase
# space
if (bin_index < 0) or (bin_index > number_bins - 1):
continue
phi = particle.phi() + random_reaction_plane
bin_multiplicity_event[bin_index] += 1.0
QnRe[bin_index] += np.cos(self.n_ * phi)
QnIm[bin_index] += np.sin(self.n_ * phi)
SigmaQnReSubTot[bin_index][self.subcalc_counter_] += np.cos(
self.n_ * phi
)
SigmaQnImSubTot[bin_index][self.subcalc_counter_] += np.sin(
self.n_ * phi
)
# compute the covariance matrix
for a in range(number_bins):
for b in range(number_bins):
if a == b: # correct with multiplicity term
VnDelta_event[a][b] += (
QnRe[a] * QnRe[b]
+ QnIm[a] * QnIm[b]
- bin_multiplicity_event[a]
) / self.normalization_[a]
SigmaVnDelta_event[a][b][self.subcalc_counter_] += (
QnRe[a] * QnRe[b]
+ QnIm[a] * QnIm[b]
- bin_multiplicity_event[a]
) / self.normalization_[a]
else:
VnDelta_event[a][b] += (
QnRe[a] * QnRe[b] + QnIm[a] * QnIm[b]
) / self.normalization_[a]
SigmaVnDelta_event[a][b][self.subcalc_counter_] += (
QnRe[a] * QnRe[b] + QnIm[a] * QnIm[b]
) / self.normalization_[a]
# update the class members
self.bin_multiplicity_total_ += bin_multiplicity_event
self.sigma_multiplicity_total_[
self.subcalc_counter_
] += bin_multiplicity_event
self.number_events_subcalc_[self.subcalc_counter_] += 1
self.QnRe_total_ += QnRe
self.QnIm_total_ += QnIm
self.SigmaQnReSub_total_ += SigmaQnReSubTot
self.SigmaQnImSub_total_ += SigmaQnImSubTot
self.VnDelta_total_ += VnDelta_event
self.SigmaVnDelta_total_ += SigmaVnDelta_event
def __compute_flow_PCA(self, bins: Union[List[float], np.ndarray]) -> None:
"""
Perform Principal Component Analysis (PCA) to compute the anisotropic
flow.
Parameters
----------
bins : list or np.ndarray
Bins used for the flow calculation.
Returns
-------
None
"""
if self.VnDelta_total_ is None:
raise TypeError(
"'VnDelta_total_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.QnRe_total_ is None:
raise TypeError(
"'QnRe_total_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.QnIm_total_ is None:
raise TypeError(
"'QnIm_total_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.SigmaVnDelta_total_ is None:
raise TypeError(
"'SigmaVnDelta_total_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.number_events_subcalc_ is None:
raise TypeError(
"'number_events_subcalc_' is None. It must be initialize before calling the '__compute_flow_PCA' function."
)
if self.SigmaQnReSub_total_ is None:
raise TypeError(
"'SigmaQnReSub_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.SigmaQnImSub_total_ is None:
raise TypeError(
"'SigmaQnImSub_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.number_events_ is None:
raise TypeError(
"'number_events_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.bin_multiplicity_total_ is None:
raise TypeError(
"'bin_multiplicity_total_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
if self.normalization_ is None:
raise TypeError(
"'normalization_' is None. It must be initialized before calling the '__compute_flow_PCA' function."
)
number_bins = len(bins) - 1
for a in range(number_bins):
for b in range(number_bins):
self.VnDelta_total_[a][b] /= self.number_events_
self.VnDelta_total_[a][b] -= (
self.QnRe_total_[a] * self.QnRe_total_[b]
+ self.QnIm_total_[a] * self.QnIm_total_[b]
) / self.number_events_**2.0
for sub in range(self.number_subcalc_):
self.SigmaVnDelta_total_[a][b][
sub
] /= self.number_events_subcalc_[sub]
self.SigmaVnDelta_total_[a][b][sub] -= (
self.SigmaQnReSub_total_[a][sub]
* self.SigmaQnReSub_total_[b][sub]
+ self.SigmaQnImSub_total_[a][sub]
* self.SigmaQnImSub_total_[b][sub]
) / (
self.number_events_subcalc_[sub] ** 2.0
* self.normalization_[a]
)
# perform sub calculations for error estimation
self.FlowSubCalc_ = np.zeros(
(self.number_subcalc_, number_bins, self.alpha_)
)
for sub in range(self.number_subcalc_):
VnDelta_local = np.zeros((number_bins, number_bins))
for a in range(number_bins):
for b in range(number_bins):
VnDelta_local[a][b] = self.SigmaVnDelta_total_[a][b][sub]
eigenvalues, eigenvectors = np.linalg.eig(VnDelta_local)
sort_indices = np.argsort(eigenvalues)[::-1]
# Apply sorting to eigenvalues and eigenvectors
sorted_eigenvalues = eigenvalues[sort_indices]
sorted_eigenvectors = eigenvectors[:, sort_indices]
for bin in range(number_bins):
for alpha in range(self.alpha_):
eval = sorted_eigenvalues[alpha]
evec = sorted_eigenvectors[alpha]
if (eval >= 0.0) and (
self.sigma_multiplicity_total_[sub][bin] > 0.0
):
# compute the flow here for the subcalc
self.FlowSubCalc_[sub][bin][alpha] = (
np.sqrt(eval)
* evec[bin]
* self.number_events_subcalc_[sub]
/ self.sigma_multiplicity_total_[sub][bin]
)
else:
self.FlowSubCalc_[sub][bin][alpha] = None
# compute the anisotropic flow
VnDelta_local = np.zeros((number_bins, number_bins))
for a in range(number_bins):
for b in range(number_bins):
VnDelta_local[a][b] = self.VnDelta_total_[a][b]
eigenvalues, eigenvectors = np.linalg.eig(VnDelta_local)
sort_indices = np.argsort(eigenvalues)[::-1]
# Apply sorting to eigenvalues and eigenvectors
sorted_eigenvalues = eigenvalues[sort_indices]
sorted_eigenvectors = eigenvectors[:, sort_indices]
self.Flow_ = np.zeros((number_bins, self.alpha_))
for bin in range(number_bins):
for alpha in range(self.alpha_):
eval = sorted_eigenvalues[alpha]
evec = sorted_eigenvectors[alpha]
if (eval >= 0.0) and (self.bin_multiplicity_total_[bin] > 0.0):
self.Flow_[bin][alpha] = (
np.sqrt(eval)
* evec[bin]
* self.number_events_
/ self.bin_multiplicity_total_[bin]
)
else:
self.Flow_[bin][alpha] = None
def __compute_uncertainty(
self, bins: Union[List[float], np.ndarray]
) -> None:
"""
Compute the uncertainty of the anisotropic flow for each bin and
order alpha.
Parameters
----------
bins : list or np.ndarray
Bins used for the flow calculation.
Returns
-------
None
"""
if self.FlowSubCalc_ is None:
raise TypeError(
"'FlowSubCalc_' is None. It must be initialized before calling the '__compute_uncertainty' function."
)
if self.Flow_ is None:
raise TypeError(
"'Flow_' is None. It must be initialized before calling the '__compute_uncertainty' function."
)
number_bins = len(bins) - 1
self.FlowUncertainty_ = np.zeros((number_bins, self.alpha_))
# sum the squared deviation from the mean
for sub in range(self.number_subcalc_):
for bin in range(number_bins):
for alpha in range(self.alpha_):
self.FlowUncertainty_[bin][alpha] += (
self.FlowSubCalc_[sub][bin][alpha]
- self.Flow_[bin][alpha]
) ** 2.0 / (self.number_subcalc_ - 1)
# take the sqrt to obtain the standard deviation
for bin in range(number_bins):
for alpha in range(self.alpha_):
self.FlowUncertainty_[bin][alpha] = np.sqrt(
self.FlowUncertainty_[bin][alpha]
)
# Conservative ansatz: If the error cannot be computed
# (e.g., negative eigenvalue), set the flow magnitude to NaN
for bin in range(number_bins):
for alpha in range(self.alpha_):
if np.isnan(self.FlowUncertainty_[bin][alpha]):
self.Flow_[bin][alpha] = np.nan
def __compute_factorization_breaking(
self, bins: Union[List[float], np.ndarray]
) -> None:
"""
Compute the factorization breaking using the Pearson correlation
coefficient Eq. (12), Ref. [1].
Parameters
----------
bins : list or np.ndarray
Bins used for the flow calculation.
Returns
-------
None
"""
if self.VnDelta_total_ is None:
raise TypeError(
"'VnDelta_total_' is None. It must be initialized before calling the '__compute_factorization_breaking' function."
)
if self.SigmaVnDelta_total_ is None:
raise TypeError(
"'SigmaVnDelta_total_' is None. It must be initialized before calling the '__compute_factorization_breaking' function."
)
# compute the Pearson coefficient Eq. (12), Ref. [1]
number_bins = len(bins) - 1
r = np.zeros((number_bins, number_bins))
for a in range(number_bins):
for b in range(number_bins):
r[a][b] = self.VnDelta_total_[a][b] / np.sqrt(
self.VnDelta_total_[a][a] * self.VnDelta_total_[b][b]
)
self.Pearson_r_ = r
# do this for each sub calculation
r_sub = np.zeros((number_bins, number_bins, self.number_subcalc_))
for sub in range(self.number_subcalc_):
for a in range(number_bins):
for b in range(number_bins):
r_sub[a][b][sub] = self.SigmaVnDelta_total_[a][b][
sub
] / np.sqrt(
self.SigmaVnDelta_total_[a][a][sub]
* self.SigmaVnDelta_total_[b][b][sub]
)
r_err = np.zeros((number_bins, number_bins))
# sum the squared deviation from the mean
for sub in range(self.number_subcalc_):
for a in range(number_bins):
for b in range(number_bins):
r_err[a][b] += (r_sub[a][b][sub] - r[a][b]) ** 2.0 / (
self.number_subcalc_ - 1
)
# take the sqrt to obtain the standard deviation
for a in range(number_bins):
for b in range(number_bins):
r_err[a][b] = np.sqrt(r_err[a][b])
self.Pearson_r_uncertainty_ = r_err
[docs]
def differential_flow(
self,
particle_data: List[List[Particle]],
bins: Union[List[float], np.ndarray],
flow_as_function_of: str,
) -> List[Optional[np.ndarray]]:
"""
Compute the differential flow.
Parameters
----------
particle_data : list
List of particle data for multiple events.
bins : list or np.ndarray
Bins used for the differential flow calculation.
flow_as_function_of : str
Variable on which the flow is calculated ("pT", "rapidity"
or "pseudorapidity").
Returns
-------
list of numpy.ndarrays
A list containing the differential flow and its uncertainty.
Each array in the list corresponds:
- anisotropic flow of order alpha (float): Differential flow magnitude for the bin.
- anisotropic flow error (float): Error on the differential flow magnitude for the bin.
Notes
-----
- The flow or the uncertainty can be accessed by: :code:`function_return[bin][alpha]`
- If a bin has no events or the uncertainty could not be computed, the corresponding element in the result list is set to :code:`np.nan`.
"""
if not isinstance(bins, (list, np.ndarray)):
raise TypeError("bins has to be list or np.ndarray")
if not isinstance(flow_as_function_of, str):
raise TypeError("flow_as_function_of is not a string")
if flow_as_function_of not in ["pt", "rapidity", "pseudorapidity"]:
raise ValueError(
"flow_as_function_of must be either 'pT', 'rapidity', 'pseudorapidity'"
)
self.__compute_normalization(bins)
self.number_events_ = len(particle_data)
for event in range(self.number_events_):
self.__update_event(
particle_data[event], bins, flow_as_function_of, event
)
self.__compute_flow_PCA(bins)
self.__compute_uncertainty(bins)
self.__compute_factorization_breaking(bins)
return [self.Flow_, self.FlowUncertainty_]
[docs]
def Pearson_correlation(self) -> List[Optional[np.ndarray]]:
"""
Retrieve the Pearson correlation coefficient and its uncertainty
(Eq. (12), Ref. [1]).
Returns
-------
list
A list containing the Pearson correlation coefficient and its uncertainty.
- Pearson_r (numpy.ndarray): The Pearson correlation coefficient for factorization breaking between each pair of bins.
- Pearson_r_uncertainty (numpy.ndarray): The uncertainty of the Pearson correlation coefficient for factorization breaking between each pair of bins.
Notes
-----
The values in the list can be accessed by name[bin1][bin2].
"""
return [self.Pearson_r_, self.Pearson_r_uncertainty_]