Source code for QCumulantFlow

# ===================================================
#
#    Copyright (c) 2023-2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

from sparkx.flow import FlowInterface
from sparkx.Particle import Particle
import numpy as np
import random as rd
from typing import Dict, List, Tuple, Any, Union, Optional

rd.seed(42)


[docs] class QCumulantFlow(FlowInterface.FlowInterface): """ This class implements the Q-Cumulant method for anisotropic flow analysis. References: - [1] `PhD Thesis A. Bilandzic <https://inspirehep.net/literature/1186272>`__ - [2] `Phys.Rev.C 83 (2011) 044913 <https://inspirehep.net/literature/871528>`__ Parameters ---------- n : int, optional The order of the harmonic flow (default is 2). k : int, optional The order of the cumulant (2, 4, or 6) (default is 2). imaginary : str, optional Specifies the treatment of imaginary roots. Options are :code:`zero`, :code:`negative`, or :code:`nan` (default is :code:`zero`). Attributes ---------- n_ : int The order of the harmonic flow. k_ : int The order of the cumulant. imaginary_ : str Specifies the treatment of imaginary roots. cumulant_factor_ : dict Dictionary mapping cumulant order to corresponding flow. rand_reaction_planes_ : list List to store randomly sampled reaction planes. Methods ------- integrated_flow: Computes the integrated flow. differential_flow: Computes the differential flow. Examples -------- A demonstration of how to use the QCumulantFlow class to calculate flow. .. highlight:: python .. code-block:: python :linenos: >>> flow_instance = QCumulantFlow(n=2, k=2, imaginary='zero') >>> result = flow_instance.integrated_flow(particle_data) """ def __init__(self, n: int = 2, k: int = 2, imaginary: str = "zero"): if not isinstance(n, int): raise TypeError("n has to be int") elif n <= 0: raise ValueError( "n-th harmonic with value n<=0 can not be computed" ) else: self.n_ = n if not isinstance(k, int): raise TypeError("k has to be int") elif k not in [2, 4, 6]: raise ValueError( f"{k} particle cumulant is not implemented, choose from [2,4,6]" ) else: self.k_ = k if not isinstance(imaginary, str): raise TypeError("Chosen 'imaginary' is not implemented") elif imaginary not in ["zero", "negative", "nan"]: raise ValueError( f"Chosen 'imaginary' = {imaginary} is not an option" ) else: self.imaginary_ = imaginary self.cumulant_factor_: Dict[int, float] = {2: 1, 4: -1, 6: 1.0 / 4.0} self.rand_reaction_planes_: List[float] = [] def __Qn(self, phi: List[List[float]], n: int) -> np.ndarray: """ Compute the Q_n vector for each event based on azimuthal angles. Parameters ---------- phi : list of lists List of particle data, where each sublist represents the azimuthal angles (phi) for each event. n : int Order of the flow vector (Q_n). Returns ------- np.ndarray Array of complex numbers representing the Q_n vector for each event. """ Q_vector = [] for event in range(len(phi)): phi_event = np.array([i for i in phi[event]]) Q_vector_val = np.sum(np.exp(1.0j * float(n) * phi_event)) Q_vector.append(Q_vector_val) return np.array(Q_vector) def __calculate_corr( self, phi: List[List[float]], k: int ) -> Tuple[float, float, float]: """ Calculate cumulant and its error for a given order (k). Parameters ---------- phi : list of lists List of particle data, where each sublist represents the azimuthal angles (phi) for each event. k : int Order of the cumulant to be calculated (2, 4, or 6). Returns ------- tuple A tuple containing the computed cumulant, its error, and the event-by-event cumulant. """ mult = np.array([float(len(i)) for i in phi]) sum_mult = np.sum(mult) sum_mult_squared = np.inner(mult, mult) Qn = self.__Qn(phi, self.n_) Qn_sq_sum = np.vdot(Qn, Qn).real # |Q_n|^2 if k == 2: # this implements Eq. (16) from Ref. [2] corr = (Qn_sq_sum - sum_mult) / (sum_mult_squared - sum_mult) W2 = mult * (mult - 1) sum_W2 = np.sum(W2) sum_W2_sq = np.inner(W2, W2) # ebe difference from mean: <2>_i - <<2>> ebe_2p_corr = (np.real(Qn * Qn.conj()) - mult) / W2 difference = ebe_2p_corr - corr # weighted variance variance = np.sum(W2 * np.square(difference)) / sum_W2 # unbiased variance^2 variance_sq = variance / (1.0 - sum_W2_sq / (sum_W2**2.0)) # error of <<2>>, Eq. (C18) Ref. [1] corr_err = np.sqrt(sum_W2_sq * variance_sq) / sum_W2 return corr, corr_err, ebe_2p_corr if k == 4: # this implements Eq. (18) from Ref. [2] Q2n = self.__Qn(phi, 2 * self.n_) Q2n_sq_sum = np.vdot(Q2n, Q2n).real Qn_sq = np.square(Qn.real) + np.square(Qn.imag) Qn_to4_sum = np.inner(Qn_sq, Qn_sq) corr = ( Qn_to4_sum + Q2n_sq_sum - 2.0 * np.inner(Q2n, np.square(Qn.conj())).real - 2 * np.sum(2 * (mult - 2) * Qn_sq - mult * (mult - 3)) ) / (np.sum(mult * (mult - 1) * (mult - 2) * (mult - 3))) # corr_err computation here: W4 = mult * (mult - 1) * (mult - 2) * (mult - 3) sum_W4 = np.sum(W4) sum_W4_sq = np.inner(W4, W4) # ebe difference from mean: <4>_i - <<4>> ebe_4p_corr = ( np.real(Qn * Qn * Qn.conj() * Qn.conj()) + np.real(Q2n * Q2n.conj()) - 2.0 * np.real(Q2n * Qn.conj() * Qn.conj()) - 2.0 * ( 2.0 * (mult - 2) * np.real(Qn * Qn.conj()) - mult * (mult - 3) ) ) / W4 difference = ebe_4p_corr - corr # weighted variance variance = np.sum(W4 * np.square(difference)) / sum_W4 # unbiased variance^2 variance_sq = variance / (1.0 - sum_W4_sq / (sum_W4**2.0)) # error of <<4>>, Eq. (C18) Ref. [1] corr_err = np.sqrt(sum_W4_sq * variance_sq) / sum_W4 return corr, corr_err, ebe_4p_corr if k == 6: # this implements Eq. (A10) from Ref. [2] Q2n = self.__Qn(phi, 2 * self.n_) Q2n_sq_sum = np.vdot(Q2n, Q2n).real Qn_sq = np.square(Qn.real) + np.square(Qn.imag) Qn_to4_sum = np.inner(Qn_sq, Qn_sq) Qn_to6 = np.power(Qn_sq, 3) Qn_to6_sum = np.sum(Qn_to6) Q3n = self.__Qn(phi, 3 * self.n_) Q3n_sq_sum = np.vdot(Q3n, Q3n).real ReQ2nQnConjSq = np.inner(Q2n, np.square(Qn.conj())).real ReQ3nQ2nConjQnConj = np.vdot( Q3n, np.multiply(Q2n.conj(), Qn.conj()) ).real QnConj_cub = np.power(Qn.conj(), 3) ReQ3nQnConjCub = np.inner(Q3n, QnConj_cub).real ReQ2nQnQnConjCub = np.inner(np.multiply(Q2n, Qn), QnConj_cub).real norm1 = ( mult * (mult - 1) * (mult - 2) * (mult - 3) * (mult - 4) * (mult - 5) ) norm1_sum = np.sum(norm1) norm2 = mult * (mult - 1) * (mult - 2) * (mult - 3) * (mult - 5) norm2_sum = np.sum(norm2) norm3 = mult * (mult - 1) * (mult - 3) * (mult - 4) norm3_sum = np.sum(norm3) norm4 = (mult - 1) * (mult - 2) * (mult - 3) norm4_sum = np.sum(norm4) corr1 = ( Qn_to6_sum + 9.0 * Q2n_sq_sum * Qn_sq_sum - 6.0 * ReQ2nQnQnConjCub ) / norm1_sum corr2 = ( 4.0 * (ReQ3nQnConjCub - 3.0 * ReQ3nQ2nConjQnConj) / norm1_sum ) corr3 = ( 2.0 * (9.0 * np.sum((mult - 4) * ReQ2nQnConjSq) + 2.0 * Q3n_sq_sum) / norm1_sum ) corr4 = -9.0 * (Qn_to4_sum + Q2n_sq_sum) / norm2_sum corr5 = 18.0 * Qn_sq_sum / norm3_sum corr6 = -6.0 / norm4_sum corr = corr1 + corr2 + corr3 + corr4 + corr5 + corr6 # corr_err computation here: W6 = ( mult * (mult - 1) * (mult - 2) * (mult - 3) * (mult - 4) * (mult - 5) ) sum_W6 = np.sum(W6) sum_W6_sq = np.inner(W6, W6) # ebe difference from mean: <6>_i - <<6>> ebe_6p_corr1 = ( np.real(Qn * Qn * Qn * Qn.conj() * Qn.conj() * Qn.conj()) + 9.0 * np.real(Q2n * Q2n.conj()) * np.real(Qn * Qn.conj()) - 6.0 * np.real(Q2n * Qn * QnConj_cub) ) / norm1 ebe_6p_corr2 = ( 4.0 * ( np.real(Q3n * QnConj_cub) - 3.0 * np.real(Q3n * Q2n.conj() * Qn.conj()) ) ) / norm1 ebe_6p_corr3 = ( 2.0 * ( 9.0 * (mult - 4) * np.real(Q2n * Qn.conj() * Qn.conj()) + 2.0 * np.real(Q3n * Q3n.conj()) ) ) / norm1 ebe_6p_corr4 = ( -9.0 * np.real(Qn * Qn * Qn.conj() * Qn.conj()) + np.real(Q2n * Q2n.conj()) ) / norm2 ebe_6p_corr5 = (18.0 * np.real(Qn * Qn.conj())) / norm3 ebe_6p_corr6 = -6.0 / norm4 ebe_6p_corr = ( ebe_6p_corr1 + ebe_6p_corr2 + ebe_6p_corr3 + ebe_6p_corr4 + ebe_6p_corr5 + ebe_6p_corr6 ) difference = ebe_6p_corr - corr # weighted variance variance = np.sum(W6 * np.square(difference)) / sum_W6 # unbiased variance^2 variance_sq = variance / (1.0 - sum_W6_sq / (sum_W6**2.0)) # error of <<6>>, Eq. (C18) Ref. [1] corr_err = np.sqrt(sum_W6_sq * variance_sq) / sum_W6 return corr, corr_err, ebe_6p_corr else: raise ValueError( f"{k} particle cumulant is not implemented, choose from [2,4,6]" ) def __cov( self, wx: np.ndarray, wy: np.ndarray, x: np.ndarray, y: np.ndarray ) -> float: """ Compute the covariance between two sets of variables based on event weights. Parameters ---------- wx : ndarray Event weights for the first set of variables. wy : ndarray Event weights for the second set of variables. x : ndarray Values of the first set of variables. y : ndarray Values of the second set of variables. Returns ------- float The computed covariance. Notes ----- This function calculates the covariance between two sets of variables using event weights. It follows the formula provided in Eq. (C12) from Ref. [1]. """ sum_wx_wy_x_y = np.sum(wx * wy * x * y).real sum_wx_wy = np.sum(wx * wy).real sum_wx_x = np.sum(wx * x).real sum_wx = np.sum(wx).real sum_wy_y = np.sum(wy * y).real sum_wy = np.sum(wy).real # Normalize the data to avoid overflow max_val = max( abs(sum_wx_wy_x_y), abs(sum_wx_wy), abs(sum_wx_x), abs(sum_wx), abs(sum_wy_y), abs(sum_wy), ) scale_factor = 1.0 if max_val > 1e15: scale_factor = 1.0 / max_val sum_wx_wy_x_y *= scale_factor sum_wx_wy *= scale_factor sum_wx_x *= scale_factor sum_wx *= scale_factor sum_wy_y *= scale_factor sum_wy *= scale_factor cov = ( (sum_wx_wy_x_y / sum_wx_wy) - (sum_wx_x / sum_wx) * (sum_wy_y / sum_wy) ) / (1.0 - (sum_wx_wy / (sum_wx * sum_wy))) return cov def __cov_term( self, k1: int, k2: int, phi: List[List[float]], ebe_corr1: np.ndarray, ebe_corr2: np.ndarray, ) -> float: """ Compute the covariance term in Eqs. (C29) and (C32) of Ref. [1]. Parameters ---------- k1 : int The order of the first cumulant. k2 : int The order of the second cumulant. phi : list of lists List of particle data, where each sublist represents the azimuthal angles (phi) for each event. ebe_corr1 : ndarray Event-by-event correlation data for the first cumulant. ebe_corr2 : ndarray Event-by-event correlation data for the second cumulant. Returns ------- float The computed covariance term. Notes ----- This function implements the fraction and covariance term in the last term of Eq. (C29) from Ref. [1]. The implementation is designed to be more general and can be used for all higher order cumulants. """ mult = np.array([float(len(i)) for i in phi]) W1 = np.ones_like(mult) for i in range(k1): W1 *= mult - i W2 = np.ones_like(mult) for i in range(k2): W2 *= mult - i W1_sum = np.sum(W1) W2_sum = np.sum(W2) W1W2_sum = np.inner(W1, W2) # Calculate the maximum value among the terms involved max_val = max(abs(W1W2_sum), abs(W1_sum), abs(W2_sum)) # Check if scaling is necessary scale_factor = 1.0 if max_val > 1e15: scale_factor = 1.0 / max_val # Apply scaling to the terms W1W2_sum_scaled = W1W2_sum * scale_factor denominator_scaled = W1_sum * scale_factor denominator_scaled *= W2_sum cov_term = (W1W2_sum_scaled / (denominator_scaled)) * self.__cov( W1, W2, ebe_corr1, ebe_corr2 ) return cov_term def __cov_term_differential( self, w_corr1: np.ndarray, w_corr2: np.ndarray, ebe_corr1: np.ndarray, ebe_corr2: np.ndarray, ) -> float: """ Compute the covariance term in Eqs. (C42) and (C46) of Ref. [1]. Parameters ---------- w_corr1 : ndarray Weights for correlation data of first cumulant. w_corr2 : ndarray Weights for correlation data of second cumulant. ebe_corr1 : ndarray Event-by-event correlation data for the first cumulant. ebe_corr2 : ndarray Event-by-event correlation data for the second cumulant. Returns ------- float The computed covariance term. Notes ----- This function implements the fraction and covariance term in the last term of Eq. (C42) from Ref. [1]. The implementation is designed to be more general and can be used for all higher order cumulants. """ # Calculate the maximum value among the terms involved max_val = max( abs(np.vdot(w_corr1, w_corr2)), abs(np.sum(w_corr1)), abs(np.sum(w_corr2)), ) # Check if scaling is necessary scale_factor = 1.0 if max_val > 1e15: scale_factor = 1.0 / max_val # Apply scaling to the terms w_corr1_w_corr2_scaled = np.vdot(w_corr1, w_corr2) * scale_factor denominator_scaled = np.sum(w_corr1) * scale_factor denominator_scaled *= np.sum(w_corr2) cov_term = (w_corr1_w_corr2_scaled / denominator_scaled) * self.__cov( w_corr1, w_corr2, ebe_corr1, ebe_corr2 ) return cov_term def __flow_from_cumulant(self, cnk: float) -> float: """ Compute the flow magnitude from a cumulant value. Parameters ---------- cnk : float Cumulant value corresponding to the order of the flow. Returns ------- float The computed flow magnitude. Notes ----- This function calculates the flow magnitude from a cumulant value using the specified cumulant order (k). It considers the sign of the cumulant value and the chosen behavior for imaginary roots. """ vnk_to_k = self.cumulant_factor_[self.k_] * cnk if vnk_to_k >= 0.0: vnk = vnk_to_k ** (1 / self.k_) elif self.imaginary_ == "negative": vnk = -1.0 * (-vnk_to_k) ** (1 / self.k_) elif self.imaginary_ == "zero": vnk = 0.0 else: vnk = float("nan") return vnk def __flow_from_cumulant_differential( self, cnk: float, dnk: float ) -> float: """ Compute the flow magnitude from a cumulant value. Parameters ---------- cnk : float Cumulant value corresponding to the full events. dnk : float Cumulant value corresponding to the POI. Returns ------- float The computed flow magnitude. Notes ----- This function calculates the flow magnitude from a cumulant value using the specified cumulant order (k). It considers the sign of the cumulant value and the chosen behavior for imaginary roots. """ vnk = float("nan") if self.k_ == 2: if cnk > 0.0: vnk = dnk / (self.cumulant_factor_[self.k_] * cnk) ** ( 1 / self.k_ ) elif self.imaginary_ == "negative": vnk = dnk / (-self.cumulant_factor_[self.k_] * cnk) ** ( 1 / self.k_ ) elif self.imaginary_ == "zero": vnk = 0.0 if self.k_ == 4: if cnk < 0.0: vnk = -dnk / (self.cumulant_factor_[self.k_] * cnk) ** ( 3 / self.k_ ) elif self.imaginary_ == "negative": vnk = -dnk / (-self.cumulant_factor_[self.k_] * cnk) ** ( 3 / self.k_ ) elif self.imaginary_ == "zero": vnk = 0.0 return vnk def __cumulant_flow(self, phi: List[List[float]]) -> Tuple[float, float]: """ Compute the flow magnitude and its uncertainty from cumulant values. Parameters ---------- phi : list of lists List of particle data, where each sublist represents the azimuthal angles (phi) for each event. Returns ------- tuple A tuple containing the computed flow magnitude and its uncertainty. Notes ----- This function calculates the flow magnitude and its uncertainty based on cumulant values. It supports cumulant of different orders, following the equations from Ref. [1]. """ if self.k_ == 2: n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2) # returns <v_n{2}> and s_{<v_n{2}>}, Eqs. (C22),(C23) Ref. [1] avg_vn2 = self.__flow_from_cumulant(n2_corr) avg_vn2_err_sq = (1.0 / (4.0 * n2_corr)) * n2_corr_err**2.0 return avg_vn2, np.sqrt(avg_vn2_err_sq) elif self.k_ == 4: n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2) n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr(phi, k=4) QC4 = n4_corr - 2.0 * n2_corr**2.0 avg_vn4 = self.__flow_from_cumulant(QC4) ebe_2p_corr_array = ( np.array([ebe_2p_corr]) if isinstance(ebe_2p_corr, float) else ebe_2p_corr ) ebe_4p_corr_array = ( np.array([ebe_4p_corr]) if isinstance(ebe_4p_corr, float) else ebe_4p_corr ) # compute Eq. (C28) Ref. [1] avg_vn4_err_sq = ( 1.0 / (2.0 * n2_corr**2.0 - n4_corr) ** (3.0 / 2) ) * ( n2_corr**2.0 * n2_corr_err**2.0 + (1.0 / 16.0) * n4_corr_err**2.0 - (1.0 / 2.0) * n2_corr * self.__cov_term( 2, 4, phi, ebe_2p_corr_array, ebe_4p_corr_array ) ) # returns <v_n{4}> and s_{<v_n{4}>}, Eqs. (C27),(C28) Ref. [1] return avg_vn4, np.sqrt(avg_vn4_err_sq) elif self.k_ == 6: n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2) n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr(phi, k=4) n6_corr, n6_corr_err, ebe_6p_corr = self.__calculate_corr(phi, k=6) QC6 = n6_corr - 9.0 * n2_corr * n4_corr + 12.0 * n2_corr**3.0 avg_vn6 = self.__flow_from_cumulant(QC6) ebe_2p_corr_array = ( np.array([ebe_2p_corr]) if isinstance(ebe_2p_corr, float) else ebe_2p_corr ) ebe_4p_corr_array = ( np.array([ebe_4p_corr]) if isinstance(ebe_4p_corr, float) else ebe_4p_corr ) ebe_6p_corr_array = ( np.array([ebe_6p_corr]) if isinstance(ebe_6p_corr, float) else ebe_6p_corr ) # compute Eq. (C32) Ref. [1] avg_vn6_err_sq = ( (1.0 / (2.0 * 2.0 ** (2.0 / 3.0))) * (1.0 / (QC6) ** (5.0 / 3.0)) * ( (9.0 / 2.0) * (4.0 * n2_corr**2.0 - n4_corr) ** 2.0 * n2_corr_err**2.0 + (9.0 / 2.0) * n2_corr**2.0 * n4_corr_err**2.0 + (1.0 / 18.0) * n6_corr_err**2.0 - 9.0 * n2_corr * (4.0 * n2_corr**2.0 - n4_corr) * self.__cov_term( 2, 4, phi, ebe_2p_corr_array, ebe_4p_corr_array ) + (4.0 * n2_corr**2.0 - n4_corr) * self.__cov_term( 2, 6, phi, ebe_2p_corr_array, ebe_6p_corr_array ) - n2_corr * self.__cov_term( 4, 6, phi, ebe_4p_corr_array, ebe_6p_corr_array ) ) ) # returns <v_n{6}> and s_{<v_n{6}>}, Eq. (C33) Ref. [1] return avg_vn6, np.sqrt(avg_vn6_err_sq) else: raise ValueError( f"{self.k_} particle cumulant is not implemented, choose from [2,4,6]" ) def __sample_random_reaction_planes(self, events: int) -> None: """ Sample random reaction planes for a specified number of events. Parameters ---------- events : int The number of events for which random reaction planes are sampled. Returns ------- None """ self.rand_reaction_planes_ = [ rd.uniform(0.0, 2.0 * np.pi) for _ in range(events) ]
[docs] def integrated_flow( self, particle_data: List[List[Particle]] ) -> Tuple[float, float]: """ Compute the integrated flow. Parameters ---------- particle_data : list List of particle data, where each sublist represents an event with particles. Returns ------- tuple A tuple containing the computed flow magnitude and its uncertainty. """ number_events = len(particle_data) self.__sample_random_reaction_planes(number_events) phi = [] for event in range(number_events): event_phi = [] for particle in particle_data[event]: event_phi.append( particle.phi() + self.rand_reaction_planes_[event] ) phi.extend([event_phi]) vnk, vnk_err = self.__cumulant_flow(phi) return vnk, vnk_err
def __compute_differential_flow_bin( self, full_event_quantities: List[Any], phi_bin: List[List[float]], phi_bin_poi: List[List[float]], ) -> List[float]: # full_event_quantities = Qn,M,n2_corr,n2_corr_err,ebe_2p_corr,Q2n,n4_corr,n4_corr_err,ebe_4p_corr pn = self.__Qn(phi_bin, self.n_) mp = np.array([len(i) for i in phi_bin_poi]) mq = np.array([len(i) for i in phi_bin]) Qn = np.array(full_event_quantities[0]) M = np.array(full_event_quantities[1]) if self.k_ == 4: qn = self.__Qn(phi_bin, self.n_) q2n = self.__Qn(phi_bin, 2 * self.n_) # compute Eq. (28) Ref. [2] corr2_ev = (pn * Qn.conj() - mq) / (mp * M - mq) # compute Eq. (24) Ref. [2] w2 = mp * M - mq # compute Eq. (29) Ref. [2] corr2 = np.vdot(w2, corr2_ev) / np.sum(w2) vn_bin = self.__flow_from_cumulant_differential( full_event_quantities[2], corr2 ) # ebe difference from mean: <2>_i - <<2>> difference = corr2_ev - corr2 # weighted variance variance = np.sum(w2 * np.square(difference)) / np.sum(w2) # unbiased variance^2 variance_sq = variance / (1.0 - np.vdot(w2, w2) / (np.sum(w2) ** 2.0)) # error of <<2>>, Eq. (C38) Ref. [1] corr2_err = np.sqrt(np.vdot(w2, w2) * variance_sq) / np.sum(w2) avg_vn2_err_sq = (1.0 / (4.0 * full_event_quantities[2] ** 3.0)) * ( corr2**2.0 * full_event_quantities[3] ** 2.0 + 4.0 * full_event_quantities[2] ** 2.0 * corr2_err**2.0 - 4.0 * full_event_quantities[2] * corr2 * self.__cov_term_differential( M * (M - 1), w2, np.array(full_event_quantities[4]), corr2_ev ) ) avg_vn_err = np.sqrt(avg_vn2_err_sq) if self.k_ == 4: Q2n = np.array(full_event_quantities[5]) # compute Eq. (32) Ref. [2] corr4_ev = ( pn * Qn * Qn.conj() * Qn.conj() - q2n * Qn.conj() * Qn.conj() - pn * Qn * Q2n.conj() - 2.0 * M * pn * Qn.conj() - 2.0 * mq * Qn * Qn.conj() + 7.0 * qn * Qn.conj() - Qn * qn.conj() + q2n * Q2n.conj() + 2.0 * pn * Qn.conj() + 2.0 * mq * M - 6.0 * mq ) / ((mp * M - 3.0 * mq) * (M - 1) * (M - 2)) # compute Eq. (25) Ref. [2] w4 = (mp * M - 3.0 * mq) * (M - 1) * (M - 2) # compute Eq. (33) Ref. [2] corr4 = np.vdot(w4, corr4_ev) / np.sum(w4) # compute Eq. (34) Ref. [2] dn4 = corr4 - 2.0 * corr2 * full_event_quantities[2] # compute Eq. (12) Ref. [2] cn4 = ( full_event_quantities[6] - 2.0 * full_event_quantities[2] ** 2.0 ) vn_bin = self.__flow_from_cumulant_differential(cn4, dn4) # ebe difference from mean: <4>_i - <<4>> difference = corr4_ev - corr4 # weighted variance variance = np.sum(w4 * np.square(difference)) / np.sum(w4) # unbiased variance^2 variance_sq = variance / ( 1.0 - np.vdot(w4, w4) / (np.sum(w4) ** 2.0) ) # error of <<4>>, Eq. (C38) Ref. [1] corr4_err = np.sqrt(np.vdot(w4, w4) * variance_sq) / np.sum(w4) minus_cn4 = -cn4 term1 = ( 2.0 * full_event_quantities[2] ** 2.0 * corr2 - 3.0 * full_event_quantities[2] * corr4 + 2.0 * full_event_quantities[6] * corr2 ) term2 = 2.0 * full_event_quantities[2] * corr2 - corr4 multiplicity_prefac = M * (M - 1) * (M - 2) * (M - 3) avg_vn4_err_sq = (1.0 / minus_cn4 ** (7.0 / 2.0)) * ( term1**2.0 * full_event_quantities[3] ** 2.0 + (9.0 / 16.0) * term2**2.0 * full_event_quantities[7] ** 2.0 + 4.0 * full_event_quantities[2] ** 2.0 * minus_cn4**2.0 * corr2_err**2.0 + minus_cn4**2.0 * corr4_err**2.0 - (3.0 / 2.0) * term2 * term1 * self.__cov_term_differential( M * (M - 1), multiplicity_prefac, np.array(full_event_quantities[4]), np.array(full_event_quantities[8]), ) - 4.0 * full_event_quantities[2] * minus_cn4 * term1 * self.__cov_term_differential( M * (M - 1), w2, np.array(full_event_quantities[4]), corr2_ev, ) + 2.0 * minus_cn4 * term1 * self.__cov_term_differential( M * (M - 1), w4, np.array(full_event_quantities[4]), corr4_ev, ) + 3.0 * full_event_quantities[2] * minus_cn4 * term2 * self.__cov_term_differential( multiplicity_prefac, w2, np.array(full_event_quantities[8]), corr2_ev, ) - (3.0 / 2.0) * minus_cn4 * term2 * self.__cov_term_differential( multiplicity_prefac, w4, np.array(full_event_quantities[8]), corr4_ev, ) - 4.0 * full_event_quantities[2] * minus_cn4**2.0 * self.__cov_term_differential(w2, w4, corr2_ev, corr4_ev) ) avg_vn_err = np.sqrt(avg_vn4_err_sq) return [vn_bin.real, avg_vn_err.real]
[docs] def differential_flow( self, particle_data: List[List[Particle]], bins: Union[np.ndarray, List[float]], flow_as_function_of: str, poi_pdg: Optional[Union[List[int], np.ndarray]] = None, ) -> List[List[float]]: """ Compute the differential flow. The cumulants of second and fourth order are implemented. Parameters ---------- particle_data : list List of particle data. bins : list or np.ndarray Bins used for the differential flow calculation. flow_as_function_of : str Variable on which the flow is calculated ("pT", "rapidity", or "pseudorapidity"). poi_pdg : list List of PDG id for identified particle differential flow. Returns ------- list of tuples A list of tuples containing flow values and their corresponding uncertainty. """ if not isinstance(bins, (list, np.ndarray)): raise TypeError("bins has to be list or np.ndarray") if not isinstance(flow_as_function_of, str): raise TypeError("flow_as_function_of is not a string") if poi_pdg is not None: if not isinstance(poi_pdg, (list, np.ndarray)): raise TypeError("poi_pdg has to be list or np.ndarray") for pdg in poi_pdg: if not isinstance(pdg, int): raise TypeError("poi_pdg elements must be integers") if flow_as_function_of not in ["pt", "rapidity", "pseudorapidity"]: raise ValueError( "flow_as_function_of must be either 'pT', 'rapidity', 'pseudorapidity'" ) if self.k_ == 6: raise ValueError( "6 particle Q-Cumulant differential flow is not implemented" ) number_events = len(particle_data) self.__sample_random_reaction_planes(number_events) phi_all = [] for event in range(number_events): event_phi = [] for particle in particle_data[event]: event_phi.append( particle.phi() + self.rand_reaction_planes_[event] ) phi_all.extend([event_phi]) phi_bin = [] phi_bin_poi = [] for bin in range(len(bins) - 1): events_bin = [] events_bin_poi = [] for event in range(len(particle_data)): particles_event = [] particles_event_poi = [] for particle in particle_data[event]: val = 0.0 if flow_as_function_of == "pT": val = particle.pT_abs() elif flow_as_function_of == "rapidity": val = particle.rapidity() elif flow_as_function_of == "pseudorapidity": val = particle.pseudorapidity() if val >= bins[bin] and val < bins[bin + 1]: particles_event.append( particle.phi() + self.rand_reaction_planes_[event] ) if poi_pdg is None or particle.pdg in poi_pdg: particles_event_poi.append( particle.phi() + self.rand_reaction_planes_[event] ) events_bin.extend([particles_event]) events_bin_poi.extend([particles_event_poi]) phi_bin.extend([events_bin]) phi_bin_poi.extend([events_bin_poi]) if poi_pdg is None: phi_bin_poi = phi_bin # compute full event quantities Qn, Q2n, ... Qn = self.__Qn(phi_all, self.n_) n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi_all, k=2) M = [len(i) for i in phi_all] full_event_quantities = [Qn, M, n2_corr, n2_corr_err, ebe_2p_corr] if self.k_ == 4: Q2n = self.__Qn(phi_all, 2 * self.n_) n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr( phi_all, k=4 ) full_event_quantities = [ Qn, M, n2_corr, n2_corr_err, ebe_2p_corr, Q2n, n4_corr, n4_corr_err, ebe_4p_corr, ] flow_bins = [] for bin in range(len(phi_bin)): total_elements_bin = sum(len(sublist) for sublist in phi_bin[bin]) total_elements_bin_poi = sum( len(sublist) for sublist in phi_bin_poi[bin] ) if ( len(phi_bin[bin]) > 0 and total_elements_bin > 0 and total_elements_bin_poi > 0 ): flow_bins.append( self.__compute_differential_flow_bin( full_event_quantities, phi_bin[bin], phi_bin_poi[bin] ) ) else: flow_bins.append([]) return flow_bins