# ===================================================
#
# Copyright (c) 2023-2024
# SPARKX Team
#
# GNU General Public License (GPLv3 or later)
#
# ===================================================
from sparkx.flow import FlowInterface
from sparkx.Particle import Particle
import numpy as np
import random as rd
from typing import Dict, List, Tuple, Any, Union, Optional
rd.seed(42)
[docs]
class QCumulantFlow(FlowInterface.FlowInterface):
"""
This class implements the Q-Cumulant method for anisotropic flow analysis.
References:
- [1] `PhD Thesis A. Bilandzic <https://inspirehep.net/literature/1186272>`__
- [2] `Phys.Rev.C 83 (2011) 044913 <https://inspirehep.net/literature/871528>`__
Parameters
----------
n : int, optional
The order of the harmonic flow (default is 2).
k : int, optional
The order of the cumulant (2, 4, or 6) (default is 2).
imaginary : str, optional
Specifies the treatment of imaginary roots. Options are :code:`zero`,
:code:`negative`, or :code:`nan` (default is :code:`zero`).
Attributes
----------
n_ : int
The order of the harmonic flow.
k_ : int
The order of the cumulant.
imaginary_ : str
Specifies the treatment of imaginary roots.
cumulant_factor_ : dict
Dictionary mapping cumulant order to corresponding flow.
rand_reaction_planes_ : list
List to store randomly sampled reaction planes.
Methods
-------
integrated_flow:
Computes the integrated flow.
differential_flow:
Computes the differential flow.
Examples
--------
A demonstration of how to use the QCumulantFlow class to calculate flow.
.. highlight:: python
.. code-block:: python
:linenos:
>>> flow_instance = QCumulantFlow(n=2, k=2, imaginary='zero')
>>> result = flow_instance.integrated_flow(particle_data)
"""
def __init__(self, n: int = 2, k: int = 2, imaginary: str = "zero"):
if not isinstance(n, int):
raise TypeError("n has to be int")
elif n <= 0:
raise ValueError(
"n-th harmonic with value n<=0 can not be computed"
)
else:
self.n_ = n
if not isinstance(k, int):
raise TypeError("k has to be int")
elif k not in [2, 4, 6]:
raise ValueError(
f"{k} particle cumulant is not implemented, choose from [2,4,6]"
)
else:
self.k_ = k
if not isinstance(imaginary, str):
raise TypeError("Chosen 'imaginary' is not implemented")
elif imaginary not in ["zero", "negative", "nan"]:
raise ValueError(
f"Chosen 'imaginary' = {imaginary} is not an option"
)
else:
self.imaginary_ = imaginary
self.cumulant_factor_: Dict[int, float] = {2: 1, 4: -1, 6: 1.0 / 4.0}
self.rand_reaction_planes_: List[float] = []
def __Qn(self, phi: List[List[float]], n: int) -> np.ndarray:
"""
Compute the Q_n vector for each event based on azimuthal angles.
Parameters
----------
phi : list of lists
List of particle data, where each sublist represents the azimuthal
angles (phi) for each event.
n : int
Order of the flow vector (Q_n).
Returns
-------
np.ndarray
Array of complex numbers representing the Q_n vector for each event.
"""
Q_vector = []
for event in range(len(phi)):
phi_event = np.array([i for i in phi[event]])
Q_vector_val = np.sum(np.exp(1.0j * float(n) * phi_event))
Q_vector.append(Q_vector_val)
return np.array(Q_vector)
def __calculate_corr(
self, phi: List[List[float]], k: int
) -> Tuple[float, float, float]:
"""
Calculate cumulant and its error for a given order (k).
Parameters
----------
phi : list of lists
List of particle data, where each sublist represents the azimuthal
angles (phi) for each event.
k : int
Order of the cumulant to be calculated (2, 4, or 6).
Returns
-------
tuple
A tuple containing the computed cumulant, its error,
and the event-by-event cumulant.
"""
mult = np.array([float(len(i)) for i in phi])
sum_mult = np.sum(mult)
sum_mult_squared = np.inner(mult, mult)
Qn = self.__Qn(phi, self.n_)
Qn_sq_sum = np.vdot(Qn, Qn).real # |Q_n|^2
if k == 2:
# this implements Eq. (16) from Ref. [2]
corr = (Qn_sq_sum - sum_mult) / (sum_mult_squared - sum_mult)
W2 = mult * (mult - 1)
sum_W2 = np.sum(W2)
sum_W2_sq = np.inner(W2, W2)
# ebe difference from mean: <2>_i - <<2>>
ebe_2p_corr = (np.real(Qn * Qn.conj()) - mult) / W2
difference = ebe_2p_corr - corr
# weighted variance
variance = np.sum(W2 * np.square(difference)) / sum_W2
# unbiased variance^2
variance_sq = variance / (1.0 - sum_W2_sq / (sum_W2**2.0))
# error of <<2>>, Eq. (C18) Ref. [1]
corr_err = np.sqrt(sum_W2_sq * variance_sq) / sum_W2
return corr, corr_err, ebe_2p_corr
if k == 4:
# this implements Eq. (18) from Ref. [2]
Q2n = self.__Qn(phi, 2 * self.n_)
Q2n_sq_sum = np.vdot(Q2n, Q2n).real
Qn_sq = np.square(Qn.real) + np.square(Qn.imag)
Qn_to4_sum = np.inner(Qn_sq, Qn_sq)
corr = (
Qn_to4_sum
+ Q2n_sq_sum
- 2.0 * np.inner(Q2n, np.square(Qn.conj())).real
- 2 * np.sum(2 * (mult - 2) * Qn_sq - mult * (mult - 3))
) / (np.sum(mult * (mult - 1) * (mult - 2) * (mult - 3)))
# corr_err computation here:
W4 = mult * (mult - 1) * (mult - 2) * (mult - 3)
sum_W4 = np.sum(W4)
sum_W4_sq = np.inner(W4, W4)
# ebe difference from mean: <4>_i - <<4>>
ebe_4p_corr = (
np.real(Qn * Qn * Qn.conj() * Qn.conj())
+ np.real(Q2n * Q2n.conj())
- 2.0 * np.real(Q2n * Qn.conj() * Qn.conj())
- 2.0
* (
2.0 * (mult - 2) * np.real(Qn * Qn.conj())
- mult * (mult - 3)
)
) / W4
difference = ebe_4p_corr - corr
# weighted variance
variance = np.sum(W4 * np.square(difference)) / sum_W4
# unbiased variance^2
variance_sq = variance / (1.0 - sum_W4_sq / (sum_W4**2.0))
# error of <<4>>, Eq. (C18) Ref. [1]
corr_err = np.sqrt(sum_W4_sq * variance_sq) / sum_W4
return corr, corr_err, ebe_4p_corr
if k == 6:
# this implements Eq. (A10) from Ref. [2]
Q2n = self.__Qn(phi, 2 * self.n_)
Q2n_sq_sum = np.vdot(Q2n, Q2n).real
Qn_sq = np.square(Qn.real) + np.square(Qn.imag)
Qn_to4_sum = np.inner(Qn_sq, Qn_sq)
Qn_to6 = np.power(Qn_sq, 3)
Qn_to6_sum = np.sum(Qn_to6)
Q3n = self.__Qn(phi, 3 * self.n_)
Q3n_sq_sum = np.vdot(Q3n, Q3n).real
ReQ2nQnConjSq = np.inner(Q2n, np.square(Qn.conj())).real
ReQ3nQ2nConjQnConj = np.vdot(
Q3n, np.multiply(Q2n.conj(), Qn.conj())
).real
QnConj_cub = np.power(Qn.conj(), 3)
ReQ3nQnConjCub = np.inner(Q3n, QnConj_cub).real
ReQ2nQnQnConjCub = np.inner(np.multiply(Q2n, Qn), QnConj_cub).real
norm1 = (
mult
* (mult - 1)
* (mult - 2)
* (mult - 3)
* (mult - 4)
* (mult - 5)
)
norm1_sum = np.sum(norm1)
norm2 = mult * (mult - 1) * (mult - 2) * (mult - 3) * (mult - 5)
norm2_sum = np.sum(norm2)
norm3 = mult * (mult - 1) * (mult - 3) * (mult - 4)
norm3_sum = np.sum(norm3)
norm4 = (mult - 1) * (mult - 2) * (mult - 3)
norm4_sum = np.sum(norm4)
corr1 = (
Qn_to6_sum
+ 9.0 * Q2n_sq_sum * Qn_sq_sum
- 6.0 * ReQ2nQnQnConjCub
) / norm1_sum
corr2 = (
4.0 * (ReQ3nQnConjCub - 3.0 * ReQ3nQ2nConjQnConj) / norm1_sum
)
corr3 = (
2.0
* (9.0 * np.sum((mult - 4) * ReQ2nQnConjSq) + 2.0 * Q3n_sq_sum)
/ norm1_sum
)
corr4 = -9.0 * (Qn_to4_sum + Q2n_sq_sum) / norm2_sum
corr5 = 18.0 * Qn_sq_sum / norm3_sum
corr6 = -6.0 / norm4_sum
corr = corr1 + corr2 + corr3 + corr4 + corr5 + corr6
# corr_err computation here:
W6 = (
mult
* (mult - 1)
* (mult - 2)
* (mult - 3)
* (mult - 4)
* (mult - 5)
)
sum_W6 = np.sum(W6)
sum_W6_sq = np.inner(W6, W6)
# ebe difference from mean: <6>_i - <<6>>
ebe_6p_corr1 = (
np.real(Qn * Qn * Qn * Qn.conj() * Qn.conj() * Qn.conj())
+ 9.0 * np.real(Q2n * Q2n.conj()) * np.real(Qn * Qn.conj())
- 6.0 * np.real(Q2n * Qn * QnConj_cub)
) / norm1
ebe_6p_corr2 = (
4.0
* (
np.real(Q3n * QnConj_cub)
- 3.0 * np.real(Q3n * Q2n.conj() * Qn.conj())
)
) / norm1
ebe_6p_corr3 = (
2.0
* (
9.0 * (mult - 4) * np.real(Q2n * Qn.conj() * Qn.conj())
+ 2.0 * np.real(Q3n * Q3n.conj())
)
) / norm1
ebe_6p_corr4 = (
-9.0 * np.real(Qn * Qn * Qn.conj() * Qn.conj())
+ np.real(Q2n * Q2n.conj())
) / norm2
ebe_6p_corr5 = (18.0 * np.real(Qn * Qn.conj())) / norm3
ebe_6p_corr6 = -6.0 / norm4
ebe_6p_corr = (
ebe_6p_corr1
+ ebe_6p_corr2
+ ebe_6p_corr3
+ ebe_6p_corr4
+ ebe_6p_corr5
+ ebe_6p_corr6
)
difference = ebe_6p_corr - corr
# weighted variance
variance = np.sum(W6 * np.square(difference)) / sum_W6
# unbiased variance^2
variance_sq = variance / (1.0 - sum_W6_sq / (sum_W6**2.0))
# error of <<6>>, Eq. (C18) Ref. [1]
corr_err = np.sqrt(sum_W6_sq * variance_sq) / sum_W6
return corr, corr_err, ebe_6p_corr
else:
raise ValueError(
f"{k} particle cumulant is not implemented, choose from [2,4,6]"
)
def __cov(
self, wx: np.ndarray, wy: np.ndarray, x: np.ndarray, y: np.ndarray
) -> float:
"""
Compute the covariance between two sets of variables based on event
weights.
Parameters
----------
wx : ndarray
Event weights for the first set of variables.
wy : ndarray
Event weights for the second set of variables.
x : ndarray
Values of the first set of variables.
y : ndarray
Values of the second set of variables.
Returns
-------
float
The computed covariance.
Notes
-----
This function calculates the covariance between two sets of variables
using event weights. It follows the formula provided in Eq. (C12)
from Ref. [1].
"""
sum_wx_wy_x_y = np.sum(wx * wy * x * y).real
sum_wx_wy = np.sum(wx * wy).real
sum_wx_x = np.sum(wx * x).real
sum_wx = np.sum(wx).real
sum_wy_y = np.sum(wy * y).real
sum_wy = np.sum(wy).real
# Normalize the data to avoid overflow
max_val = max(
abs(sum_wx_wy_x_y),
abs(sum_wx_wy),
abs(sum_wx_x),
abs(sum_wx),
abs(sum_wy_y),
abs(sum_wy),
)
scale_factor = 1.0
if max_val > 1e15:
scale_factor = 1.0 / max_val
sum_wx_wy_x_y *= scale_factor
sum_wx_wy *= scale_factor
sum_wx_x *= scale_factor
sum_wx *= scale_factor
sum_wy_y *= scale_factor
sum_wy *= scale_factor
cov = (
(sum_wx_wy_x_y / sum_wx_wy)
- (sum_wx_x / sum_wx) * (sum_wy_y / sum_wy)
) / (1.0 - (sum_wx_wy / (sum_wx * sum_wy)))
return cov
def __cov_term(
self,
k1: int,
k2: int,
phi: List[List[float]],
ebe_corr1: np.ndarray,
ebe_corr2: np.ndarray,
) -> float:
"""
Compute the covariance term in Eqs. (C29) and (C32) of Ref. [1].
Parameters
----------
k1 : int
The order of the first cumulant.
k2 : int
The order of the second cumulant.
phi : list of lists
List of particle data, where each sublist represents the azimuthal
angles (phi) for each event.
ebe_corr1 : ndarray
Event-by-event correlation data for the first cumulant.
ebe_corr2 : ndarray
Event-by-event correlation data for the second cumulant.
Returns
-------
float
The computed covariance term.
Notes
-----
This function implements the fraction and covariance term in the last
term of Eq. (C29) from Ref. [1]. The implementation is designed to be
more general and can be used for all higher order cumulants.
"""
mult = np.array([float(len(i)) for i in phi])
W1 = np.ones_like(mult)
for i in range(k1):
W1 *= mult - i
W2 = np.ones_like(mult)
for i in range(k2):
W2 *= mult - i
W1_sum = np.sum(W1)
W2_sum = np.sum(W2)
W1W2_sum = np.inner(W1, W2)
# Calculate the maximum value among the terms involved
max_val = max(abs(W1W2_sum), abs(W1_sum), abs(W2_sum))
# Check if scaling is necessary
scale_factor = 1.0
if max_val > 1e15:
scale_factor = 1.0 / max_val
# Apply scaling to the terms
W1W2_sum_scaled = W1W2_sum * scale_factor
denominator_scaled = W1_sum * scale_factor
denominator_scaled *= W2_sum
cov_term = (W1W2_sum_scaled / (denominator_scaled)) * self.__cov(
W1, W2, ebe_corr1, ebe_corr2
)
return cov_term
def __cov_term_differential(
self,
w_corr1: np.ndarray,
w_corr2: np.ndarray,
ebe_corr1: np.ndarray,
ebe_corr2: np.ndarray,
) -> float:
"""
Compute the covariance term in Eqs. (C42) and (C46) of Ref. [1].
Parameters
----------
w_corr1 : ndarray
Weights for correlation data of first cumulant.
w_corr2 : ndarray
Weights for correlation data of second cumulant.
ebe_corr1 : ndarray
Event-by-event correlation data for the first cumulant.
ebe_corr2 : ndarray
Event-by-event correlation data for the second cumulant.
Returns
-------
float
The computed covariance term.
Notes
-----
This function implements the fraction and covariance term in the last
term of Eq. (C42) from Ref. [1]. The implementation is designed to be
more general and can be used for all higher order cumulants.
"""
# Calculate the maximum value among the terms involved
max_val = max(
abs(np.vdot(w_corr1, w_corr2)),
abs(np.sum(w_corr1)),
abs(np.sum(w_corr2)),
)
# Check if scaling is necessary
scale_factor = 1.0
if max_val > 1e15:
scale_factor = 1.0 / max_val
# Apply scaling to the terms
w_corr1_w_corr2_scaled = np.vdot(w_corr1, w_corr2) * scale_factor
denominator_scaled = np.sum(w_corr1) * scale_factor
denominator_scaled *= np.sum(w_corr2)
cov_term = (w_corr1_w_corr2_scaled / denominator_scaled) * self.__cov(
w_corr1, w_corr2, ebe_corr1, ebe_corr2
)
return cov_term
def __flow_from_cumulant(self, cnk: float) -> float:
"""
Compute the flow magnitude from a cumulant value.
Parameters
----------
cnk : float
Cumulant value corresponding to the order of the flow.
Returns
-------
float
The computed flow magnitude.
Notes
-----
This function calculates the flow magnitude from a cumulant value using
the specified cumulant order (k).
It considers the sign of the cumulant value and the chosen behavior
for imaginary roots.
"""
vnk_to_k = self.cumulant_factor_[self.k_] * cnk
if vnk_to_k >= 0.0:
vnk = vnk_to_k ** (1 / self.k_)
elif self.imaginary_ == "negative":
vnk = -1.0 * (-vnk_to_k) ** (1 / self.k_)
elif self.imaginary_ == "zero":
vnk = 0.0
else:
vnk = float("nan")
return vnk
def __flow_from_cumulant_differential(
self, cnk: float, dnk: float
) -> float:
"""
Compute the flow magnitude from a cumulant value.
Parameters
----------
cnk : float
Cumulant value corresponding to the full events.
dnk : float
Cumulant value corresponding to the POI.
Returns
-------
float
The computed flow magnitude.
Notes
-----
This function calculates the flow magnitude from a cumulant value using
the specified cumulant order (k).
It considers the sign of the cumulant value and the chosen behavior
for imaginary roots.
"""
vnk = float("nan")
if self.k_ == 2:
if cnk > 0.0:
vnk = dnk / (self.cumulant_factor_[self.k_] * cnk) ** (
1 / self.k_
)
elif self.imaginary_ == "negative":
vnk = dnk / (-self.cumulant_factor_[self.k_] * cnk) ** (
1 / self.k_
)
elif self.imaginary_ == "zero":
vnk = 0.0
if self.k_ == 4:
if cnk < 0.0:
vnk = -dnk / (self.cumulant_factor_[self.k_] * cnk) ** (
3 / self.k_
)
elif self.imaginary_ == "negative":
vnk = -dnk / (-self.cumulant_factor_[self.k_] * cnk) ** (
3 / self.k_
)
elif self.imaginary_ == "zero":
vnk = 0.0
return vnk
def __cumulant_flow(self, phi: List[List[float]]) -> Tuple[float, float]:
"""
Compute the flow magnitude and its uncertainty from cumulant values.
Parameters
----------
phi : list of lists
List of particle data, where each sublist represents the azimuthal
angles (phi) for each event.
Returns
-------
tuple
A tuple containing the computed flow magnitude and its uncertainty.
Notes
-----
This function calculates the flow magnitude and its uncertainty based on
cumulant values. It supports cumulant of different orders, following the
equations from Ref. [1].
"""
if self.k_ == 2:
n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2)
# returns <v_n{2}> and s_{<v_n{2}>}, Eqs. (C22),(C23) Ref. [1]
avg_vn2 = self.__flow_from_cumulant(n2_corr)
avg_vn2_err_sq = (1.0 / (4.0 * n2_corr)) * n2_corr_err**2.0
return avg_vn2, np.sqrt(avg_vn2_err_sq)
elif self.k_ == 4:
n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2)
n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr(phi, k=4)
QC4 = n4_corr - 2.0 * n2_corr**2.0
avg_vn4 = self.__flow_from_cumulant(QC4)
ebe_2p_corr_array = (
np.array([ebe_2p_corr])
if isinstance(ebe_2p_corr, float)
else ebe_2p_corr
)
ebe_4p_corr_array = (
np.array([ebe_4p_corr])
if isinstance(ebe_4p_corr, float)
else ebe_4p_corr
)
# compute Eq. (C28) Ref. [1]
avg_vn4_err_sq = (
1.0 / (2.0 * n2_corr**2.0 - n4_corr) ** (3.0 / 2)
) * (
n2_corr**2.0 * n2_corr_err**2.0
+ (1.0 / 16.0) * n4_corr_err**2.0
- (1.0 / 2.0)
* n2_corr
* self.__cov_term(
2, 4, phi, ebe_2p_corr_array, ebe_4p_corr_array
)
)
# returns <v_n{4}> and s_{<v_n{4}>}, Eqs. (C27),(C28) Ref. [1]
return avg_vn4, np.sqrt(avg_vn4_err_sq)
elif self.k_ == 6:
n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2)
n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr(phi, k=4)
n6_corr, n6_corr_err, ebe_6p_corr = self.__calculate_corr(phi, k=6)
QC6 = n6_corr - 9.0 * n2_corr * n4_corr + 12.0 * n2_corr**3.0
avg_vn6 = self.__flow_from_cumulant(QC6)
ebe_2p_corr_array = (
np.array([ebe_2p_corr])
if isinstance(ebe_2p_corr, float)
else ebe_2p_corr
)
ebe_4p_corr_array = (
np.array([ebe_4p_corr])
if isinstance(ebe_4p_corr, float)
else ebe_4p_corr
)
ebe_6p_corr_array = (
np.array([ebe_6p_corr])
if isinstance(ebe_6p_corr, float)
else ebe_6p_corr
)
# compute Eq. (C32) Ref. [1]
avg_vn6_err_sq = (
(1.0 / (2.0 * 2.0 ** (2.0 / 3.0)))
* (1.0 / (QC6) ** (5.0 / 3.0))
* (
(9.0 / 2.0)
* (4.0 * n2_corr**2.0 - n4_corr) ** 2.0
* n2_corr_err**2.0
+ (9.0 / 2.0) * n2_corr**2.0 * n4_corr_err**2.0
+ (1.0 / 18.0) * n6_corr_err**2.0
- 9.0
* n2_corr
* (4.0 * n2_corr**2.0 - n4_corr)
* self.__cov_term(
2, 4, phi, ebe_2p_corr_array, ebe_4p_corr_array
)
+ (4.0 * n2_corr**2.0 - n4_corr)
* self.__cov_term(
2, 6, phi, ebe_2p_corr_array, ebe_6p_corr_array
)
- n2_corr
* self.__cov_term(
4, 6, phi, ebe_4p_corr_array, ebe_6p_corr_array
)
)
)
# returns <v_n{6}> and s_{<v_n{6}>}, Eq. (C33) Ref. [1]
return avg_vn6, np.sqrt(avg_vn6_err_sq)
else:
raise ValueError(
f"{self.k_} particle cumulant is not implemented, choose from [2,4,6]"
)
def __sample_random_reaction_planes(self, events: int) -> None:
"""
Sample random reaction planes for a specified number of events.
Parameters
----------
events : int
The number of events for which random reaction planes are sampled.
Returns
-------
None
"""
self.rand_reaction_planes_ = [
rd.uniform(0.0, 2.0 * np.pi) for _ in range(events)
]
[docs]
def integrated_flow(
self, particle_data: List[List[Particle]]
) -> Tuple[float, float]:
"""
Compute the integrated flow.
Parameters
----------
particle_data : list
List of particle data, where each sublist represents an event
with particles.
Returns
-------
tuple
A tuple containing the computed flow magnitude and its uncertainty.
"""
number_events = len(particle_data)
self.__sample_random_reaction_planes(number_events)
phi = []
for event in range(number_events):
event_phi = []
for particle in particle_data[event]:
event_phi.append(
particle.phi() + self.rand_reaction_planes_[event]
)
phi.extend([event_phi])
vnk, vnk_err = self.__cumulant_flow(phi)
return vnk, vnk_err
def __compute_differential_flow_bin(
self,
full_event_quantities: List[Any],
phi_bin: List[List[float]],
phi_bin_poi: List[List[float]],
) -> List[float]:
# full_event_quantities = Qn,M,n2_corr,n2_corr_err,ebe_2p_corr,Q2n,n4_corr,n4_corr_err,ebe_4p_corr
pn = self.__Qn(phi_bin, self.n_)
mp = np.array([len(i) for i in phi_bin_poi])
mq = np.array([len(i) for i in phi_bin])
Qn = np.array(full_event_quantities[0])
M = np.array(full_event_quantities[1])
if self.k_ == 4:
qn = self.__Qn(phi_bin, self.n_)
q2n = self.__Qn(phi_bin, 2 * self.n_)
# compute Eq. (28) Ref. [2]
corr2_ev = (pn * Qn.conj() - mq) / (mp * M - mq)
# compute Eq. (24) Ref. [2]
w2 = mp * M - mq
# compute Eq. (29) Ref. [2]
corr2 = np.vdot(w2, corr2_ev) / np.sum(w2)
vn_bin = self.__flow_from_cumulant_differential(
full_event_quantities[2], corr2
)
# ebe difference from mean: <2>_i - <<2>>
difference = corr2_ev - corr2
# weighted variance
variance = np.sum(w2 * np.square(difference)) / np.sum(w2)
# unbiased variance^2
variance_sq = variance / (1.0 - np.vdot(w2, w2) / (np.sum(w2) ** 2.0))
# error of <<2>>, Eq. (C38) Ref. [1]
corr2_err = np.sqrt(np.vdot(w2, w2) * variance_sq) / np.sum(w2)
avg_vn2_err_sq = (1.0 / (4.0 * full_event_quantities[2] ** 3.0)) * (
corr2**2.0 * full_event_quantities[3] ** 2.0
+ 4.0 * full_event_quantities[2] ** 2.0 * corr2_err**2.0
- 4.0
* full_event_quantities[2]
* corr2
* self.__cov_term_differential(
M * (M - 1), w2, np.array(full_event_quantities[4]), corr2_ev
)
)
avg_vn_err = np.sqrt(avg_vn2_err_sq)
if self.k_ == 4:
Q2n = np.array(full_event_quantities[5])
# compute Eq. (32) Ref. [2]
corr4_ev = (
pn * Qn * Qn.conj() * Qn.conj()
- q2n * Qn.conj() * Qn.conj()
- pn * Qn * Q2n.conj()
- 2.0 * M * pn * Qn.conj()
- 2.0 * mq * Qn * Qn.conj()
+ 7.0 * qn * Qn.conj()
- Qn * qn.conj()
+ q2n * Q2n.conj()
+ 2.0 * pn * Qn.conj()
+ 2.0 * mq * M
- 6.0 * mq
) / ((mp * M - 3.0 * mq) * (M - 1) * (M - 2))
# compute Eq. (25) Ref. [2]
w4 = (mp * M - 3.0 * mq) * (M - 1) * (M - 2)
# compute Eq. (33) Ref. [2]
corr4 = np.vdot(w4, corr4_ev) / np.sum(w4)
# compute Eq. (34) Ref. [2]
dn4 = corr4 - 2.0 * corr2 * full_event_quantities[2]
# compute Eq. (12) Ref. [2]
cn4 = (
full_event_quantities[6] - 2.0 * full_event_quantities[2] ** 2.0
)
vn_bin = self.__flow_from_cumulant_differential(cn4, dn4)
# ebe difference from mean: <4>_i - <<4>>
difference = corr4_ev - corr4
# weighted variance
variance = np.sum(w4 * np.square(difference)) / np.sum(w4)
# unbiased variance^2
variance_sq = variance / (
1.0 - np.vdot(w4, w4) / (np.sum(w4) ** 2.0)
)
# error of <<4>>, Eq. (C38) Ref. [1]
corr4_err = np.sqrt(np.vdot(w4, w4) * variance_sq) / np.sum(w4)
minus_cn4 = -cn4
term1 = (
2.0 * full_event_quantities[2] ** 2.0 * corr2
- 3.0 * full_event_quantities[2] * corr4
+ 2.0 * full_event_quantities[6] * corr2
)
term2 = 2.0 * full_event_quantities[2] * corr2 - corr4
multiplicity_prefac = M * (M - 1) * (M - 2) * (M - 3)
avg_vn4_err_sq = (1.0 / minus_cn4 ** (7.0 / 2.0)) * (
term1**2.0 * full_event_quantities[3] ** 2.0
+ (9.0 / 16.0) * term2**2.0 * full_event_quantities[7] ** 2.0
+ 4.0
* full_event_quantities[2] ** 2.0
* minus_cn4**2.0
* corr2_err**2.0
+ minus_cn4**2.0 * corr4_err**2.0
- (3.0 / 2.0)
* term2
* term1
* self.__cov_term_differential(
M * (M - 1),
multiplicity_prefac,
np.array(full_event_quantities[4]),
np.array(full_event_quantities[8]),
)
- 4.0
* full_event_quantities[2]
* minus_cn4
* term1
* self.__cov_term_differential(
M * (M - 1),
w2,
np.array(full_event_quantities[4]),
corr2_ev,
)
+ 2.0
* minus_cn4
* term1
* self.__cov_term_differential(
M * (M - 1),
w4,
np.array(full_event_quantities[4]),
corr4_ev,
)
+ 3.0
* full_event_quantities[2]
* minus_cn4
* term2
* self.__cov_term_differential(
multiplicity_prefac,
w2,
np.array(full_event_quantities[8]),
corr2_ev,
)
- (3.0 / 2.0)
* minus_cn4
* term2
* self.__cov_term_differential(
multiplicity_prefac,
w4,
np.array(full_event_quantities[8]),
corr4_ev,
)
- 4.0
* full_event_quantities[2]
* minus_cn4**2.0
* self.__cov_term_differential(w2, w4, corr2_ev, corr4_ev)
)
avg_vn_err = np.sqrt(avg_vn4_err_sq)
return [vn_bin.real, avg_vn_err.real]
[docs]
def differential_flow(
self,
particle_data: List[List[Particle]],
bins: Union[np.ndarray, List[float]],
flow_as_function_of: str,
poi_pdg: Optional[Union[List[int], np.ndarray]] = None,
) -> List[List[float]]:
"""
Compute the differential flow. The cumulants of second and fourth order
are implemented.
Parameters
----------
particle_data : list
List of particle data.
bins : list or np.ndarray
Bins used for the differential flow calculation.
flow_as_function_of : str
Variable on which the flow is calculated
("pT", "rapidity", or "pseudorapidity").
poi_pdg : list
List of PDG id for identified particle differential flow.
Returns
-------
list of tuples
A list of tuples containing flow values and their corresponding
uncertainty.
"""
if not isinstance(bins, (list, np.ndarray)):
raise TypeError("bins has to be list or np.ndarray")
if not isinstance(flow_as_function_of, str):
raise TypeError("flow_as_function_of is not a string")
if poi_pdg is not None:
if not isinstance(poi_pdg, (list, np.ndarray)):
raise TypeError("poi_pdg has to be list or np.ndarray")
for pdg in poi_pdg:
if not isinstance(pdg, int):
raise TypeError("poi_pdg elements must be integers")
if flow_as_function_of not in ["pt", "rapidity", "pseudorapidity"]:
raise ValueError(
"flow_as_function_of must be either 'pT', 'rapidity', 'pseudorapidity'"
)
if self.k_ == 6:
raise ValueError(
"6 particle Q-Cumulant differential flow is not implemented"
)
number_events = len(particle_data)
self.__sample_random_reaction_planes(number_events)
phi_all = []
for event in range(number_events):
event_phi = []
for particle in particle_data[event]:
event_phi.append(
particle.phi() + self.rand_reaction_planes_[event]
)
phi_all.extend([event_phi])
phi_bin = []
phi_bin_poi = []
for bin in range(len(bins) - 1):
events_bin = []
events_bin_poi = []
for event in range(len(particle_data)):
particles_event = []
particles_event_poi = []
for particle in particle_data[event]:
val = 0.0
if flow_as_function_of == "pT":
val = particle.pT_abs()
elif flow_as_function_of == "rapidity":
val = particle.rapidity()
elif flow_as_function_of == "pseudorapidity":
val = particle.pseudorapidity()
if val >= bins[bin] and val < bins[bin + 1]:
particles_event.append(
particle.phi() + self.rand_reaction_planes_[event]
)
if poi_pdg is None or particle.pdg in poi_pdg:
particles_event_poi.append(
particle.phi()
+ self.rand_reaction_planes_[event]
)
events_bin.extend([particles_event])
events_bin_poi.extend([particles_event_poi])
phi_bin.extend([events_bin])
phi_bin_poi.extend([events_bin_poi])
if poi_pdg is None:
phi_bin_poi = phi_bin
# compute full event quantities Qn, Q2n, ...
Qn = self.__Qn(phi_all, self.n_)
n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi_all, k=2)
M = [len(i) for i in phi_all]
full_event_quantities = [Qn, M, n2_corr, n2_corr_err, ebe_2p_corr]
if self.k_ == 4:
Q2n = self.__Qn(phi_all, 2 * self.n_)
n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr(
phi_all, k=4
)
full_event_quantities = [
Qn,
M,
n2_corr,
n2_corr_err,
ebe_2p_corr,
Q2n,
n4_corr,
n4_corr_err,
ebe_4p_corr,
]
flow_bins = []
for bin in range(len(phi_bin)):
total_elements_bin = sum(len(sublist) for sublist in phi_bin[bin])
total_elements_bin_poi = sum(
len(sublist) for sublist in phi_bin_poi[bin]
)
if (
len(phi_bin[bin]) > 0
and total_elements_bin > 0
and total_elements_bin_poi > 0
):
flow_bins.append(
self.__compute_differential_flow_bin(
full_event_quantities, phi_bin[bin], phi_bin_poi[bin]
)
)
else:
flow_bins.append([])
return flow_bins