Source code for QCumulantFlow

# ===================================================
#
#    Copyright (c) 2023-2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

from sparkx.flow import FlowInterface
import numpy as np
import random as rd

rd.seed(42)


[docs] class QCumulantFlow(FlowInterface.FlowInterface): """ This class implements the Q-Cumulant method for anisotropic flow analysis. References: - [1] `PhD Thesis A. Bilandzic <https://inspirehep.net/literature/1186272>`__ - [2] `Phys.Rev.C 83 (2011) 044913 <https://inspirehep.net/literature/871528>`__ Parameters ---------- n : int, optional The order of the harmonic flow (default is 2). k : int, optional The order of the cumulant (2, 4, or 6) (default is 2). imaginary : str, optional Specifies the treatment of imaginary roots. Options are 'zero', 'negative', or 'nan' (default is 'zero'). Attributes ---------- n_ : int The order of the harmonic flow. k_ : int The order of the cumulant. imaginary_ : str Specifies the treatment of imaginary roots. cumulant_factor_ : dict Dictionary mapping cumulant order to corresponding flow. rand_reaction_planes_ : list List to store randomly sampled reaction planes. Methods ------- integrated_flow: Computes the integrated flow. differential_flow: Computes the differential flow. Examples -------- A demonstration of how to use the QCumulantFlow class to calculate flow. .. highlight:: python .. code-block:: python :linenos: >>> flow_instance = QCumulantFlow(n=2, k=2, imaginary='zero') >>> result = flow_instance.integrated_flow(particle_data) """ def __init__(self, n=2, k=2, imaginary='zero'): if not isinstance(n, int): raise TypeError('n has to be int') elif n <= 0: raise ValueError( 'n-th harmonic with value n<=0 can not be computed') else: self.n_ = n if not isinstance(k, int): raise TypeError('k has to be int') elif k not in [2, 4, 6]: raise ValueError( f"{k} particle cumulant is not implemented, choose from [2,4,6]") else: self.k_ = k if not isinstance(imaginary, str): raise TypeError("Chosen 'imaginary' is not implemented") elif imaginary not in ['zero', 'negative', 'nan']: raise ValueError( f"Chosen 'imaginary' = {imaginary} is not an option") else: self.imaginary_ = imaginary self.cumulant_factor_ = { 2: 1, 4: -1, 6: 1. / 4. } self.rand_reaction_planes_ = [] def __Qn(self, phi, n): """ Compute the Q_n vector for each event based on azimuthal angles. Parameters ---------- phi : list List of azimuthal angles for each particle in an event. n : int Order of the flow vector (Q_n). Returns ------- np.ndarray Array of complex numbers representing the Q_n vector for each event. """ Q_vector = [] for event in range(len(phi)): phi_event = np.array([i for i in phi[event]]) Q_vector_val = np.sum(np.exp(1.0j * float(n) * phi_event)) Q_vector.append(Q_vector_val) return np.array(Q_vector) def __calculate_corr(self, phi, k): """ Calculate cumulant and its error for a given order (k). Parameters ---------- phi : list List of azimuthal angles for each particle in an event. k : int Order of the cumulant to be calculated (2, 4, or 6). Returns ------- tuple A tuple containing the computed cumulant, its error, and the event-by-event cumulant. """ mult = np.array([float(len(i)) for i in phi]) sum_mult = np.sum(mult) sum_mult_squared = np.inner(mult, mult) Qn = self.__Qn(phi, self.n_) Qn_sq_sum = np.vdot(Qn, Qn).real # |Q_n|^2 if k == 2: # this implements Eq. (16) from Ref. [2] corr = (Qn_sq_sum - sum_mult) / (sum_mult_squared - sum_mult) W2 = mult * (mult - 1) sum_W2 = np.sum(W2) sum_W2_sq = np.inner(W2, W2) # ebe difference from mean: <2>_i - <<2>> ebe_2p_corr = ((np.real(Qn * Qn.conj()) - mult) / W2) difference = ebe_2p_corr - corr # weighted variance variance = np.sum(W2 * np.square(difference)) / sum_W2 # unbiased variance^2 variance_sq = variance / (1. - sum_W2_sq / (sum_W2**2.)) # error of <<2>>, Eq. (C18) Ref. [1] corr_err = np.sqrt(sum_W2_sq * variance_sq) / sum_W2 return corr, corr_err, ebe_2p_corr if k == 4: # this implements Eq. (18) from Ref. [2] Q2n = self.__Qn(phi, 2. * self.n_) Q2n_sq_sum = np.vdot(Q2n, Q2n).real Qn_sq = np.square(Qn.real) + np.square(Qn.imag) Qn_to4_sum = np.inner(Qn_sq, Qn_sq) corr = (Qn_to4_sum + Q2n_sq_sum - 2. * np.inner(Q2n, np.square(Qn.conj())).real - 2 * np.sum(2 * (mult - 2) * Qn_sq - mult * (mult - 3)) ) / (np.sum(mult * (mult - 1) * (mult - 2) * (mult - 3))) # corr_err computation here: W4 = mult * (mult - 1) * (mult - 2) * (mult - 3) sum_W4 = np.sum(W4) sum_W4_sq = np.inner(W4, W4) # ebe difference from mean: <4>_i - <<4>> ebe_4p_corr = (np.real(Qn * Qn * Qn.conj() * Qn.conj()) + np.real(Q2n * Q2n.conj()) - 2. * np.real(Q2n * Qn.conj() * Qn.conj()) - 2. * (2. * (mult - 2) * np.real(Qn * Qn.conj()) - mult * (mult - 3))) / W4 difference = ebe_4p_corr - corr # weighted variance variance = np.sum(W4 * np.square(difference)) / sum_W4 # unbiased variance^2 variance_sq = variance / (1. - sum_W4_sq / (sum_W4**2.)) # error of <<4>>, Eq. (C18) Ref. [1] corr_err = np.sqrt(sum_W4_sq * variance_sq) / sum_W4 return corr, corr_err, ebe_4p_corr if k == 6: # this implements Eq. (A10) from Ref. [2] Q2n = self.__Qn(phi, 2. * self.n_) Q2n_sq_sum = np.vdot(Q2n, Q2n).real Qn_sq = np.square(Qn.real) + np.square(Qn.imag) Qn_to4_sum = np.inner(Qn_sq, Qn_sq) Qn_to6 = np.power(Qn_sq, 3) Qn_to6_sum = np.sum(Qn_to6) Q3n = self.__Qn(phi, 3. * self.n_) Q3n_sq_sum = np.vdot(Q3n, Q3n).real ReQ2nQnConjSq = np.inner(Q2n, np.square(Qn.conj())).real ReQ3nQ2nConjQnConj = np.vdot( Q3n, np.multiply( Q2n.conj(), Qn.conj())).real QnConj_cub = np.power(Qn.conj(), 3) ReQ3nQnConjCub = np.inner(Q3n, QnConj_cub).real ReQ2nQnQnConjCub = np.inner(np.multiply(Q2n, Qn), QnConj_cub).real norm1 = mult * (mult - 1) * (mult - 2) * \ (mult - 3) * (mult - 4) * (mult - 5) norm1_sum = np.sum(norm1) norm2 = mult * (mult - 1) * (mult - 2) * (mult - 3) * (mult - 5) norm2_sum = np.sum(norm2) norm3 = mult * (mult - 1) * (mult - 3) * (mult - 4) norm3_sum = np.sum(norm3) norm4 = (mult - 1) * (mult - 2) * (mult - 3) norm4_sum = np.sum(norm4) corr1 = (Qn_to6_sum + 9. * Q2n_sq_sum * Qn_sq_sum - 6. * ReQ2nQnQnConjCub ) / norm1_sum corr2 = 4. * (ReQ3nQnConjCub - 3. * ReQ3nQ2nConjQnConj ) / norm1_sum corr3 = 2. * (9. * np.sum((mult - 4) * ReQ2nQnConjSq) + 2. * Q3n_sq_sum ) / norm1_sum corr4 = -9. * (Qn_to4_sum + Q2n_sq_sum ) / norm2_sum corr5 = 18. * Qn_sq_sum / norm3_sum corr6 = -6. / norm4_sum corr = corr1 + corr2 + corr3 + corr4 + corr5 + corr6 # corr_err computation here: W6 = mult * (mult - 1) * (mult - 2) * \ (mult - 3) * (mult - 4) * (mult - 5) sum_W6 = np.sum(W6) sum_W6_sq = np.inner(W6, W6) # ebe difference from mean: <6>_i - <<6>> ebe_6p_corr1 = (np.real(Qn * Qn * Qn * Qn.conj() * Qn.conj() * Qn.conj()) + 9. * np.real(Q2n * Q2n.conj()) * np.real(Qn * Qn.conj()) - 6. * np.real(Q2n * Qn * QnConj_cub)) / norm1 ebe_6p_corr2 = (4. * (np.real(Q3n * QnConj_cub) - 3. * np.real(Q3n * Q2n.conj() * Qn.conj()))) / norm1 ebe_6p_corr3 = (2. * (9. * (mult - 4) * np.real(Q2n * Qn.conj() * Qn.conj()) + 2. * np.real(Q3n * Q3n.conj()))) / norm1 ebe_6p_corr4 = (-9. * np.real(Qn * Qn * Qn.conj() * Qn.conj()) + np.real(Q2n * Q2n.conj())) / norm2 ebe_6p_corr5 = (18. * np.real(Qn * Qn.conj())) / norm3 ebe_6p_corr6 = -6. / norm4 ebe_6p_corr = (ebe_6p_corr1 + ebe_6p_corr2 + ebe_6p_corr3 + ebe_6p_corr4 + ebe_6p_corr5 + ebe_6p_corr6) difference = ebe_6p_corr - corr # weighted variance variance = np.sum(W6 * np.square(difference)) / sum_W6 # unbiased variance^2 variance_sq = variance / (1. - sum_W6_sq / (sum_W6**2.)) # error of <<6>>, Eq. (C18) Ref. [1] corr_err = np.sqrt(sum_W6_sq * variance_sq) / sum_W6 return corr, corr_err, ebe_6p_corr def __cov(self, wx, wy, x, y): """ Compute the covariance between two sets of variables based on event weights. Parameters ---------- wx : ndarray Event weights for the first set of variables. wy : ndarray Event weights for the second set of variables. x : ndarray Values of the first set of variables. y : ndarray Values of the second set of variables. Returns ------- float The computed covariance. Notes ----- This function calculates the covariance between two sets of variables using event weights. It follows the formula provided in Eq. (C12) from Ref. [1]. """ sum_wx_wy_x_y = np.sum(wx * wy * x * y).real sum_wx_wy = np.sum(wx * wy).real sum_wx_x = np.sum(wx * x).real sum_wx = np.sum(wx).real sum_wy_y = np.sum(wy * y).real sum_wy = np.sum(wy).real # Normalize the data to avoid overflow max_val = max(abs(sum_wx_wy_x_y), abs(sum_wx_wy), abs(sum_wx_x), abs(sum_wx), abs(sum_wy_y), abs(sum_wy)) scale_factor = 1.0 if max_val > 1e15: scale_factor = 1.0 / max_val sum_wx_wy_x_y *= scale_factor sum_wx_wy *= scale_factor sum_wx_x *= scale_factor sum_wx *= scale_factor sum_wy_y *= scale_factor sum_wy *= scale_factor cov = (((sum_wx_wy_x_y / sum_wx_wy) - (sum_wx_x / sum_wx) * (sum_wy_y / sum_wy)) / (1. - (sum_wx_wy / (sum_wx * sum_wy)))) return cov def __cov_term(self, k1, k2, phi, ebe_corr1, ebe_corr2): """ Compute the covariance term in Eqs. (C29) and (C32) of Ref. [1]. Parameters ---------- k1 : int The order of the first cumulant. k2 : int The order of the second cumulant. phi : list of lists List of particle data, where each sublist represents the azimuthal angles (phi) for each event. ebe_corr1 : ndarray Event-by-event correlation data for the first cumulant. ebe_corr2 : ndarray Event-by-event correlation data for the second cumulant. Returns ------- float The computed covariance term. Notes ----- This function implements the fraction and covariance term in the last term of Eq. (C29) from Ref. [1]. The implementation is designed to be more general and can be used for all higher order cumulants. """ mult = np.array([float(len(i)) for i in phi]) W1 = 1. for i in range(k1): W1 *= (mult - i) W2 = 1. for i in range(k2): W2 *= (mult - i) W1_sum = np.sum(W1) W2_sum = np.sum(W2) W1W2_sum = np.inner(W1, W2) # Calculate the maximum value among the terms involved max_val = max(abs(W1W2_sum), abs(W1_sum), abs(W2_sum)) # Check if scaling is necessary scale_factor = 1.0 if max_val > 1e15: scale_factor = 1.0 / max_val # Apply scaling to the terms W1W2_sum_scaled = W1W2_sum * scale_factor denominator_scaled = W1_sum * scale_factor denominator_scaled *= W2_sum cov_term = (W1W2_sum_scaled / (denominator_scaled)) * \ self.__cov(W1, W2, ebe_corr1, ebe_corr2) return cov_term def __cov_term_differential(self, w_corr1, w_corr2, ebe_corr1, ebe_corr2): """ Compute the covariance term in Eqs. (C42) and (C46) of Ref. [1]. Parameters ---------- w_corr1 : ndarray Weights for correlation data of first cumulant. w_corr2 : ndarray Weights for correlation data of second cumulant. ebe_corr1 : ndarray Event-by-event correlation data for the first cumulant. ebe_corr2 : ndarray Event-by-event correlation data for the second cumulant. Returns ------- float The computed covariance term. Notes ----- This function implements the fraction and covariance term in the last term of Eq. (C42) from Ref. [1]. The implementation is designed to be more general and can be used for all higher order cumulants. """ # Calculate the maximum value among the terms involved max_val = max(abs(np.vdot(w_corr1, w_corr2)), abs( np.sum(w_corr1)), abs(np.sum(w_corr2))) # Check if scaling is necessary scale_factor = 1.0 if max_val > 1e15: scale_factor = 1.0 / max_val # Apply scaling to the terms w_corr1_w_corr2_scaled = np.vdot(w_corr1, w_corr2) * scale_factor denominator_scaled = np.sum(w_corr1) * scale_factor denominator_scaled *= np.sum(w_corr2) cov_term = (w_corr1_w_corr2_scaled / denominator_scaled ) * self.__cov(w_corr1, w_corr2, ebe_corr1, ebe_corr2) return cov_term def __flow_from_cumulant(self, cnk): """ Compute the flow magnitude from a cumulant value. Parameters ---------- cnk : float Cumulant value corresponding to the order of the flow. Returns ------- float The computed flow magnitude. Notes ----- This function calculates the flow magnitude from a cumulant value using the specified cumulant order (k). It considers the sign of the cumulant value and the chosen behavior for imaginary roots. """ vnk_to_k = self.cumulant_factor_[self.k_] * cnk if vnk_to_k >= 0.: vnk = vnk_to_k**(1 / self.k_) elif self.imaginary_ == 'negative': vnk = -1. * (-vnk_to_k)**(1 / self.k_) elif self.imaginary_ == 'zero': vnk = 0. else: vnk = float('nan') return vnk def __flow_from_cumulant_differential(self, cnk, dnk): """ Compute the flow magnitude from a cumulant value. Parameters ---------- cnk : float Cumulant value corresponding to the full events. dnk : float Cumulant value corresponding to the POI. Returns ------- float The computed flow magnitude. Notes ----- This function calculates the flow magnitude from a cumulant value using the specified cumulant order (k). It considers the sign of the cumulant value and the chosen behavior for imaginary roots. """ vnk = float('nan') if self.k_ == 2: if cnk > 0.: vnk = dnk / \ (self.cumulant_factor_[self.k_] * cnk)**(1 / self.k_) elif self.imaginary_ == 'negative': vnk = dnk / \ (-self.cumulant_factor_[self.k_] * cnk)**(1 / self.k_) elif self.imaginary_ == 'zero': vnk = 0. if self.k_ == 4: if cnk < 0.: vnk = -dnk / \ (self.cumulant_factor_[self.k_] * cnk)**(3 / self.k_) elif self.imaginary_ == 'negative': vnk = -dnk / \ (-self.cumulant_factor_[self.k_] * cnk)**(3 / self.k_) elif self.imaginary_ == 'zero': vnk = 0. return vnk def __cumulant_flow(self, phi): """ Compute the flow magnitude and its uncertainty from cumulant values. Parameters ---------- phi : list of lists List of particle data, where each sublist represents the azimuthal angles (phi) for each event. Returns ------- tuple A tuple containing the computed flow magnitude and its uncertainty. Notes ----- This function calculates the flow magnitude and its uncertainty based on cumulant values. It supports cumulant of different orders, following the equations from Ref. [1]. """ if self.k_ == 2: n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2) # returns <v_n{2}> and s_{<v_n{2}>}, Eqs. (C22),(C23) Ref. [1] avg_vn2 = self.__flow_from_cumulant(n2_corr) avg_vn2_err_sq = (1. / (4. * n2_corr)) * n2_corr_err**2. return avg_vn2, np.sqrt(avg_vn2_err_sq) elif self.k_ == 4: n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2) n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr(phi, k=4) QC4 = n4_corr - 2. * n2_corr**2. avg_vn4 = self.__flow_from_cumulant(QC4) # compute Eq. (C28) Ref. [1] avg_vn4_err_sq = (1. / (2. * n2_corr**2. - n4_corr)**(3. / 2)) * ( n2_corr**2. * n2_corr_err**2. + (1. / 16.) * n4_corr_err**2. - (1. / 2.) * n2_corr * self.__cov_term(2, 4, phi, ebe_2p_corr, ebe_4p_corr) ) # returns <v_n{4}> and s_{<v_n{4}>}, Eqs. (C27),(C28) Ref. [1] return avg_vn4, np.sqrt(avg_vn4_err_sq) elif self.k_ == 6: n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi, k=2) n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr(phi, k=4) n6_corr, n6_corr_err, ebe_6p_corr = self.__calculate_corr(phi, k=6) QC6 = n6_corr - 9. * n2_corr * n4_corr + 12. * n2_corr**3. avg_vn6 = self.__flow_from_cumulant(QC6) # compute Eq. (C32) Ref. [1] avg_vn6_err_sq = ((1. / (2. * 2.**(2. / 3.))) * (1. / (QC6)**(5. / 3.)) * ((9. / 2.) * (4. * n2_corr**2. - n4_corr)**2. * n2_corr_err**2. + (9. / 2.) * n2_corr**2. * n4_corr_err**2. + (1. / 18.) * n6_corr_err**2. - 9. * n2_corr * (4. * n2_corr**2. - n4_corr) * self.__cov_term(2, 4, phi, ebe_2p_corr, ebe_4p_corr) + (4. * n2_corr**2. - n4_corr) * self.__cov_term(2, 6, phi, ebe_2p_corr, ebe_6p_corr) - n2_corr * self.__cov_term(4, 6, phi, ebe_4p_corr, ebe_6p_corr))) # returns <v_n{6}> and s_{<v_n{6}>}, Eq. (C33) Ref. [1] return avg_vn6, np.sqrt(avg_vn6_err_sq) def __sample_random_reaction_planes(self, events): """ Sample random reaction planes for a specified number of events. Parameters ---------- events : int The number of events for which random reaction planes are sampled. Returns ------- None """ self.rand_reaction_planes_ = [rd.uniform( 0., 2. * np.pi) for _ in range(events)]
[docs] def integrated_flow(self, particle_data): """ Compute the integrated flow. Parameters ---------- particle_data : list List of particle data, where each sublist represents an event with particles. Returns ------- tuple A tuple containing the computed flow magnitude and its uncertainty. """ number_events = len(particle_data) self.__sample_random_reaction_planes(number_events) phi = [] for event in range(number_events): event_phi = [] for particle in particle_data[event]: event_phi.append( particle.phi() + self.rand_reaction_planes_[event]) phi.extend([event_phi]) vnk, vnk_err = self.__cumulant_flow(phi) return vnk, vnk_err
def __compute_differential_flow_bin( self, full_event_quantities, phi_bin, phi_bin_poi): # full_event_quantities = Qn,M,n2_corr,n2_corr_err,ebe_2p_corr,Q2n,n4_corr,n4_corr_err,ebe_4p_corr pn = self.__Qn(phi_bin, self.n_) mp = np.array([len(i) for i in phi_bin_poi]) mq = np.array([len(i) for i in phi_bin]) Qn = np.array(full_event_quantities[0]) M = np.array(full_event_quantities[1]) if self.k_ == 4: qn = self.__Qn(phi_bin, self.n_) q2n = self.__Qn(phi_bin, 2 * self.n_) # compute Eq. (28) Ref. [2] corr2_ev = (pn * Qn.conj() - mq) / (mp * M - mq) # compute Eq. (24) Ref. [2] w2 = mp * M - mq # compute Eq. (29) Ref. [2] corr2 = np.vdot(w2, corr2_ev) / np.sum(w2) vn_bin = self.__flow_from_cumulant_differential( full_event_quantities[2], corr2) # ebe difference from mean: <2>_i - <<2>> difference = corr2_ev - corr2 # weighted variance variance = np.sum(w2 * np.square(difference)) / np.sum(w2) # unbiased variance^2 variance_sq = variance / (1. - np.vdot(w2, w2) / (np.sum(w2)**2.)) # error of <<2>>, Eq. (C38) Ref. [1] corr2_err = np.sqrt(np.vdot(w2, w2) * variance_sq) / np.sum(w2) avg_vn2_err_sq = (1. / (4. * full_event_quantities[2]**3.)) * ( corr2**2. * full_event_quantities[3]**2. + 4. * full_event_quantities[2]**2. * corr2_err**2. - 4. * full_event_quantities[2] * corr2 * self.__cov_term_differential(M * (M - 1), w2, full_event_quantities[4], corr2_ev) ) avg_vn_err = np.sqrt(avg_vn2_err_sq) if self.k_ == 4: Q2n = np.array(full_event_quantities[5]) # compute Eq. (32) Ref. [2] corr4_ev = (pn * Qn * Qn.conj() * Qn.conj() - q2n * Qn.conj() * Qn.conj() - pn * Qn * Q2n.conj() - 2. * M * pn * Qn.conj() - 2. * mq * Qn * Qn.conj() + 7. * qn * Qn.conj() - Qn * qn.conj() + q2n * Q2n.conj() + 2. * pn * Qn.conj() + 2. * mq * M - 6. * mq) / ((mp * M - 3. * mq) * (M - 1) * (M - 2)) # compute Eq. (25) Ref. [2] w4 = (mp * M - 3. * mq) * (M - 1) * (M - 2) # compute Eq. (33) Ref. [2] corr4 = np.vdot(w4, corr4_ev) / np.sum(w4) # compute Eq. (34) Ref. [2] dn4 = corr4 - 2. * corr2 * full_event_quantities[2] # compute Eq. (12) Ref. [2] cn4 = full_event_quantities[6] - 2. * full_event_quantities[2]**2. vn_bin = self.__flow_from_cumulant_differential(cn4, dn4) # ebe difference from mean: <4>_i - <<4>> difference = corr4_ev - corr4 # weighted variance variance = np.sum(w4 * np.square(difference)) / np.sum(w4) # unbiased variance^2 variance_sq = variance / (1. - np.vdot(w4, w4) / (np.sum(w4)**2.)) # error of <<4>>, Eq. (C38) Ref. [1] corr4_err = np.sqrt(np.vdot(w4, w4) * variance_sq) / np.sum(w4) minus_cn4 = -cn4 term1 = (2. * full_event_quantities[2]**2. * corr2 - 3. * full_event_quantities[2] * corr4 + 2. * full_event_quantities[6] * corr2 ) term2 = 2. * full_event_quantities[2] * corr2 - corr4 multiplicity_prefac = M * (M - 1) * (M - 2) * (M - 3) avg_vn4_err_sq = (1. / minus_cn4**(7. / 2.)) * ( term1**2. * full_event_quantities[3]**2. + (9. / 16.) * term2**2. * full_event_quantities[7]**2. + 4. * full_event_quantities[2]**2. * minus_cn4**2. * corr2_err**2. + minus_cn4**2. * corr4_err**2. - (3. / 2.) * term2 * term1 * self.__cov_term_differential(M * (M - 1), multiplicity_prefac, full_event_quantities[4], full_event_quantities[8]) - 4. * full_event_quantities[2] * minus_cn4 * term1 * self.__cov_term_differential(M * (M - 1), w2, full_event_quantities[4], corr2_ev) + 2. * minus_cn4 * term1 * self.__cov_term_differential(M * (M - 1), w4, full_event_quantities[4], corr4_ev) + 3. * full_event_quantities[2] * minus_cn4 * term2 * self.__cov_term_differential(multiplicity_prefac, w2, full_event_quantities[8], corr2_ev) - (3. / 2.) * minus_cn4 * term2 * self.__cov_term_differential(multiplicity_prefac, w4, full_event_quantities[8], corr4_ev) - 4. * full_event_quantities[2] * minus_cn4**2. * self.__cov_term_differential(w2, w4, corr2_ev, corr4_ev) ) avg_vn_err = np.sqrt(avg_vn4_err_sq) return [vn_bin.real, avg_vn_err.real]
[docs] def differential_flow( self, particle_data, bins, flow_as_function_of, poi_pdg=None): """ Compute the differential flow. The cumulants of second and fourth order are implemented. Parameters ---------- particle_data : list List of particle data. bins : list or np.ndarray Bins used for the differential flow calculation. flow_as_function_of : str Variable on which the flow is calculated ("pt", "rapidity", or "pseudorapidity"). poi_pdg : list List of PDG id for identified particle differential flow. Returns ------- list of tuples A list of tuples containing flow values and their corresponding uncertainty. """ if not isinstance(bins, (list, np.ndarray)): raise TypeError('bins has to be list or np.ndarray') if not isinstance(flow_as_function_of, str): raise TypeError('flow_as_function_of is not a string') if poi_pdg is not None: if not isinstance(poi_pdg, (list, np.ndarray)): raise TypeError('poi_pdg has to be list or np.ndarray') for pdg in poi_pdg: if not isinstance(pdg, int): raise TypeError('poi_pdg elements must be integers') if flow_as_function_of not in ["pt", "rapidity", "pseudorapidity"]: raise ValueError( "flow_as_function_of must be either 'pt', 'rapidity', 'pseudorapidity'") if self.k_ == 6: raise ValueError( "6 particle Q-Cumulant differential flow is not implemented") number_events = len(particle_data) self.__sample_random_reaction_planes(number_events) phi_all = [] for event in range(number_events): event_phi = [] for particle in particle_data[event]: event_phi.append( particle.phi() + self.rand_reaction_planes_[event]) phi_all.extend([event_phi]) phi_bin = [] phi_bin_poi = [] for bin in range(len(bins) - 1): events_bin = [] events_bin_poi = [] for event in range(len(particle_data)): particles_event = [] particles_event_poi = [] for particle in particle_data[event]: val = 0. if flow_as_function_of == "pt": val = particle.pt_abs() elif flow_as_function_of == "rapidity": val = particle.momentum_rapidity_Y() elif flow_as_function_of == "pseudorapidity": val = particle.pseudorapidity() if val >= bins[bin] and val < bins[bin + 1]: particles_event.append( particle.phi() + self.rand_reaction_planes_[event]) if poi_pdg is None or particle.pdg in poi_pdg: particles_event_poi.append( particle.phi() + self.rand_reaction_planes_[event]) events_bin.extend([particles_event]) events_bin_poi.extend([particles_event_poi]) phi_bin.extend([events_bin]) phi_bin_poi.extend([events_bin_poi]) if poi_pdg is None: phi_bin_poi = phi_bin # compute full event quantities Qn, Q2n, ... Qn = self.__Qn(phi_all, self.n_) n2_corr, n2_corr_err, ebe_2p_corr = self.__calculate_corr(phi_all, k=2) M = [len(i) for i in phi_all] full_event_quantities = [Qn, M, n2_corr, n2_corr_err, ebe_2p_corr] if self.k_ == 4: Q2n = self.__Qn(phi_all, 2 * self.n_) n4_corr, n4_corr_err, ebe_4p_corr = self.__calculate_corr( phi_all, k=4) full_event_quantities = [ Qn, M, n2_corr, n2_corr_err, ebe_2p_corr, Q2n, n4_corr, n4_corr_err, ebe_4p_corr] flow_bins = [] for bin in range(len(phi_bin)): total_elements_bin = sum(len(sublist) for sublist in phi_bin[bin]) total_elements_bin_poi = sum(len(sublist) for sublist in phi_bin_poi[bin]) if len( phi_bin[bin]) > 0 and total_elements_bin > 0 and total_elements_bin_poi > 0: flow_bins.append( self.__compute_differential_flow_bin( full_event_quantities, phi_bin[bin], phi_bin_poi[bin])) else: flow_bins.append(None) return flow_bins