Source code for MultiParticlePtCorrelations

# ===================================================
#
#    Copyright (c) 2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

import numpy as np
from sparkx.Jackknife import Jackknife
from typing import List, Tuple, Union, Any
from sparkx.Particle import Particle


[docs] class MultiParticlePtCorrelations: """ Compute multi-particle transverse momentum correlations and cumulants up to the 8th order. This class is based on the following paper: - [1] `Eur. Phys. J. A 60 (2024) 2, 38 [2312.00492 [nucl-th]] <https://inspirehep.net/literature/2729183>`__ For the computation of transverse momentum correlations and cumulants, the implementation closely follows the equations and methods described in Ref. [1]. Parameters ---------- max_order : int Maximum order of correlations and cumulants to compute (must be between 1 and 8). Attributes ---------- mean_pt_correlation : np.ndarray, optional Mean transverse momentum correlations for each order up to `max_order`. mean_pt_correlation_error : np.ndarray, optional Error estimates (if computed) for mean transverse momentum correlations. kappa : np.ndarray, optional Mean transverse momentum cumulants for each order up to `max_order`. kappa_error : np.ndarray, optional Error estimates (if computed) for mean transverse momentum cumulants. N_events : list List to store numerators (N) of correlations for each event. D_events : list List to store denominators (D) of correlations for each event. Methods ------- mean_pt_correlations: Computes the mean transverse momentum correlations for each order k across all events. mean_pt_cumulants: Computes the mean transverse momentum cumulants for each order k from the correlations. Examples -------- A demonstration of how to use the MultiParticlePtCorrelations class to calculate transverse momentum correlations and cumulants. .. highlight:: python .. code-block:: python :linenos: >>> from sparkx import * >>> >>> # Maximum order for correlations and cumulants >>> max_order = 8 >>> # Create a MultiParticlePtCorrelations object >>> corr_obj = MultiParticlePtCorrelations(max_order=max_order) >>> >>> # List of events, where each event is a list of particle objects >>> particle_list = Jetscape("./particles.dat").particle_object_list() >>> >>> # Compute mean transverse momentum correlations >>> mean_pt_correlations = corr_obj.mean_pt_correlations(particle_list_all_events) >>> print(mean_pt_correlations) >>> >>> # Compute mean transverse momentum cumulants >>> mean_pt_cumulants = corr_obj.mean_pt_cumulants(particle_list_all_events) >>> print(mean_pt_cumulants) """ def __init__(self, max_order: int) -> None: self.max_order = max_order # Check if max_order is an integer if not isinstance(self.max_order, int): raise TypeError("max_order must be an integer") # Check that max_order is greater than 0 and less than 9 if self.max_order < 1 or self.max_order > 8: raise ValueError("max_order must be greater than 0 and less than 9") self.mean_pt_correlation: Union[np.ndarray, None] = None self.mean_pt_correlation_error: Union[np.ndarray, None] = None self.kappa: Union[np.ndarray, None] = None self.kappa_error: Union[np.ndarray, None] = None self.N_events: Any = None self.D_events: Any = None def _P_W_k( self, particle_list_event: List[Particle]) -> Tuple[np.ndarray, np.ndarray]: """ This implements Eq. 7 in [1]. Parameters ---------- particle_list_event : list List of particle objects in a single event. Returns ------- tuple of np.ndarray Pk : ndarray Transverse momentum for each order. Wk : ndarray Weights for each order. """ Pk = np.zeros(self.max_order) Wk = np.zeros(self.max_order) for particle in particle_list_event: for k in range(self.max_order): # if particle.weight is np.nan, then set it to 1 if np.isnan(particle.weight): particle.weight = 1.0 Pk[k] += (particle.weight * particle.pt_abs())**(k + 1) Wk[k] += particle.weight**(k + 1) return (Pk, Wk) def _transverse_momentum_correlations_event_num_denom( self, particle_list_event: List[Particle]) -> None: """ Compute the transverse momentum correlations for a single event. Computes the numerators and denominators of Eqs. A1-A7 in Ref. [1] separately. Parameters ---------- particle_list_event : list List of particle objects in a single event. Returns ------- None """ Pk, Wk = self._P_W_k(particle_list_event) N = np.zeros(self.max_order) D = np.zeros(self.max_order) for order in range(self.max_order): if order == 0: # k = 1 N[order] = Pk[order] D[order] = Wk[order] elif order == 1: # k = 2 N[order] = Pk[0]**2. - Pk[1] D[order] = Wk[0]**2. - Wk[1] elif order == 2: # k = 3 N[order] = Pk[0]**3. - 3. * Pk[1] * Pk[0] + 2. * Pk[2] D[order] = Wk[0]**3. - 3. * Wk[1] * Wk[0] + 2. * Wk[2] elif order == 3: # k = 4 N[order] = (Pk[0]**4. - 6. * Pk[1]**2. * Pk[0] + 3. * Pk[1]**2. + 8. * Pk[2] * Pk[0] - 6. * Pk[3]) D[order] = (Wk[0]**4. - 6. * Wk[1]**2. * Wk[0] + 3. * Wk[1]**2. + 8. * Wk[2] * Wk[0] - 6. * Wk[3]) elif order == 4: # k = 5 N[order] = ( Pk[0]**5. - 10. * Pk[1] * Pk[0]**3. + 15. * Pk[1]**2. * Pk[0] + 20. * Pk[2] * Pk[0]**2. - 20. * Pk[2] * Pk[1] - 30. * Pk[3] * Pk[0] + 24. * Pk[4]) D[order] = ( Wk[0]**5. - 10. * Wk[1] * Wk[0]**3. + 15. * Wk[1]**2. * Wk[0] + 20. * Wk[2] * Wk[0]**2. - 20. * Wk[2] * Wk[1] - 30. * Wk[3] * Wk[0] + 24. * Wk[4]) elif order == 5: # k = 6 N[order] = ( Pk[0]**6. - 15. * Pk[1] * Pk[0]**4. + 45. * Pk[0]**2. * Pk[1]**2. - 15. * Pk[1]**3. - 40. * Pk[2] * Pk[0]**3. - 120. * Pk[2] * Pk[1] * Pk[0] + 40. * Pk[2]**2. - 90. * Pk[3] * Pk[0]**2. + 90. * Pk[3] * Pk[1] + 144. * Pk[4] * Pk[0] - 120. * Pk[5]) D[order] = ( Wk[0]**6. - 15. * Wk[1] * Wk[0]**4. + 45. * Wk[0]**2. * Wk[1]**2. - 15. * Wk[1]**3. - 40. * Wk[2] * Wk[0]**3. - 120. * Wk[2] * Wk[1] * Wk[0] + 40. * Wk[2]**2. - 90. * Wk[3] * Wk[0]**2. + 90. * Wk[3] * Wk[1] + 144. * Wk[4] * Wk[0] - 120. * Wk[5]) elif order == 6: # k = 7 N[order] = ( Pk[0]**7. - 21. * Pk[1] * Pk[0]**5. + 105. * Pk[0]**3. * Pk[1]**2. - 105. * Pk[1]**3. * Pk[0] + 70. * Pk[2] * Pk[0]**4. - 420. * Pk[2] * Pk[1] * Pk[0]**2. + 210. * Pk[2] * Pk[1]**2. + 280. * Pk[2]**2. * Pk[0] - 210. * Pk[3] * Pk[0]**3. - 630. * Pk[3] * Pk[1] * Pk[0] - 420. * Pk[3] * Pk[2] + 504. * Pk[4] * Pk[0]**2. - 504. * Pk[4] * Pk[1] - 840. * Pk[5] * Pk[0] + 720. * Pk[6]) D[order] = ( Wk[0]**7. - 21. * Wk[1] * Wk[0]**5. + 105. * Wk[0]**3. * Wk[1]**2. - 105. * Wk[1]**3. * Wk[0] + 70. * Wk[2] * Wk[0]**4. - 420. * Wk[2] * Wk[1] * Wk[0]**2. + 210. * Wk[2] * Wk[1]**2. + 280. * Wk[2]**2. * Wk[0] - 210. * Wk[3] * Wk[0]**3. - 630. * Wk[3] * Wk[1] * Wk[0] - 420. * Wk[3] * Wk[2] + 504. * Wk[4] * Wk[0]**2. - 504. * Wk[4] * Wk[1] - 840. * Wk[5] * Wk[0] + 720. * Wk[6]) elif order == 7: # k = 8 N[order] = ( Pk[0]**8. - 28. * Pk[1] * Pk[0]**6. - 210. * Pk[1]**2. * Pk[0]**4. - 420. * Pk[1]**3. * Pk[0]**2. + 105. * Pk[1]**4. + 112. * Pk[2] * Pk[0]**5. + 1120. * Pk[2] * Pk[1] * Pk[0]**3. + 1680. * Pk[2] * Pk[1]**2. * Pk[0] + 1120. * Pk[2]**2. * Pk[0]**2. + 1120. * Pk[2]**2. * Pk[1] - 420. * Pk[3] * Pk[0]**4. + 2520. * Pk[3] * Pk[1] * Pk[0]**2. - 1260. * Pk[3] * Pk[1]**2. - 3360. * Pk[3] * Pk[2] * Pk[0] + 1260. * Pk[4]**2. + 1344. * Pk[4] * Pk[0]**3. - 4032. * Pk[4] * Pk[1] * Pk[0] + 2688. * Pk[4] * Pk[2] - 3360. * Pk[5] * Pk[0]**2. + 3360. * Pk[5] * Pk[1] + 5760. * Pk[6] * Pk[0] - 5040. * Pk[7]) D[order] = ( Wk[0]**8. - 28. * Wk[1] * Wk[0]**6. - 210. * Wk[1]**2. * Wk[0]**4. - 420. * Wk[1]**3. * Wk[0]**2. + 105. * Wk[1]**4. + 112. * Wk[2] * Wk[0]**5. + 1120. * Wk[2] * Wk[1] * Wk[0]**3. + 1680. * Wk[2] * Wk[1]**2. * Wk[0] + 1120. * Wk[2]**2. * Wk[0]**2. + 1120. * Wk[2]**2. * Wk[1] - 420. * Wk[3] * Wk[0]**4. + 2520. * Wk[3] * Wk[1] * Wk[0]**2. - 1260. * Wk[3] * Wk[1]**2. - 3360. * Wk[3] * Wk[2] * Wk[0] + 1260. * Wk[4]**2. + 1344. * Wk[4] * Wk[0]**3. - 4032. * Wk[4] * Wk[1] * Wk[0] + 2688. * Wk[4] * Wk[2] - 3360. * Wk[5] * Wk[0]**2. + 3360. * Wk[5] * Wk[1] + 5760. * Wk[6] * Wk[0] - 5040. * Wk[7]) self.N_events.append(N) self.D_events.append(D) def _compute_numerator_denominator_all_events( self, particle_list_all_events: List[List[Particle]]) -> None: """ Compute numerators and denominators of correlations for all events. Parameters ---------- particle_list_all_events : list List of events, where each event is a list of particle objects. """ for event in particle_list_all_events: self._transverse_momentum_correlations_event_num_denom(event) def _compute_mean_pt_correlations( self, numerator_denominator_array: np.ndarray) -> float: """ Compute mean transverse momentum correlations from the array containing the numerator and denominator. Parameters ---------- numerator_denominator_array : np.ndarray Array of shape (num_events, 2) containing numerators and denominators. Returns ------- float Mean transverse momentum correlations. """ sum_numerator = 0.0 sum_denominator = 0.0 for i in range(numerator_denominator_array.shape[0]): sum_numerator += numerator_denominator_array[i, 0] sum_denominator += numerator_denominator_array[i, 1] return sum_numerator / sum_denominator
[docs] def mean_pt_correlations(self, particle_list_all_events: List[List[Particle]], compute_error: bool = True, delete_fraction: float = 0.4, number_samples: int = 100, seed: int = 42) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Computes mean transverse momentum correlations for each order up to max_order using Eq. [14] in Ref. [1]. The weight is chosen to be :math:`W^{\\prime}_{m} = D\\langle m\\rangle_{p_{\\mathrm{T}}}`. Parameters ---------- particle_list_all_events : list List of events, where each event is a list of particle objects. compute_error : bool, optional Whether to compute error estimates (default is True). delete_fraction : float, optional Fraction of data to delete for jackknife method (default is 0.4). number_samples : int, optional Number of jackknife samples (default is 100). seed : int, optional Random seed for reproducibility (default is 42). Returns ------- np.ndarray or tuple Mean transverse momentum correlations for each order. If compute_error is True, returns a tuple (mean_pt_correlation, mean_pt_correlation_error). Raises ------ TypeError If delete_fraction is not a float. If number_samples is not an integer. If seed is not an integer. If compute_error is not a boolean. ValueError If delete_fraction is not between 0 and 1. If number_samples is not greater than 0. """ if not isinstance(delete_fraction, float): raise TypeError("delete_fraction must be a float") if not 0.0 < delete_fraction < 1.0: raise ValueError("delete_fraction must be between 0 and 1") if not isinstance(number_samples, int): raise TypeError("number_samples must be an integer") if not number_samples > 0: raise ValueError("number_samples must be greater than 0") if not isinstance(seed, int): raise TypeError("seed must be an integer") if not isinstance(compute_error, bool): raise TypeError("compute_error must be a boolean") self.N_events = [] self.D_events = [] self._compute_numerator_denominator_all_events(particle_list_all_events) self.N_events = np.array(self.N_events) self.D_events = np.array(self.D_events) mean_pt_correlations_store = np.zeros(self.max_order) mean_pt_correlations_error_store = np.zeros(self.max_order) for order in range(self.max_order): # combine the numerator and denominator for each order in an array numerator_denominator_array = np.array( [self.N_events[:, order], self.D_events[:, order]]).T mean_pt_correlations_store[order] = self._compute_mean_pt_correlations( numerator_denominator_array) if compute_error: jackknife = Jackknife(delete_fraction, number_samples, seed) mean_pt_correlations_error_store[order] = jackknife.compute_jackknife_estimates( numerator_denominator_array, function=self._compute_mean_pt_correlations) self.mean_pt_correlation = mean_pt_correlations_store if compute_error: self.mean_pt_correlation_error = mean_pt_correlations_error_store return ( mean_pt_correlations_store, mean_pt_correlations_error_store) else: return mean_pt_correlations_store
def _kappa_cumulant(self, C: np.ndarray, k: int) -> float: """ Compute cumulant kappa from correlations C up to order k (Eqns. B9-B16 in [1]). Parameters ---------- C : np.ndarray Array of correlations up to order k. k : int Order of cumulant to compute. Returns ------- float Cumulant kappa for the given order k. """ if k == 1: kappa = C[0] elif k == 2: kappa = C[1] - C[0]**2 elif k == 3: kappa = C[2] - 3. * C[1] * C[0] + 2. * C[0]**3 elif k == 4: kappa = ( C[3] - 4. * C[2] * C[0] - 3. * C[1]**2 + 12. * C[1] * C[0]**2 - 6. * C[0]**4) elif k == 5: kappa = (C[4] - 5. * C[3] * C[0] - 10. * C[2] * C[1] + 30. * C[1]**2 * C[0] + 20. * C[2] * C[0]**2 - 60. * C[1] * C[0]**3 + 24. * C[0]**5) elif k == 6: kappa = (C[5] - 6. * C[4] * C[0] - 15. * C[3] * C[1] - 10. * C[2]**2 + 30. * C[1]**3 + 30. * C[3] * C[0]**2 + 120. * C[2] * C[1] * C[0] - 270. * C[1]**2 * C[0]**2 - 120. * C[2] * C[0]**3 + 360. * C[1] * C[0]**4 - 120. * C[0]**6) elif k == 7: kappa = (C[6] - 7. * C[5] * C[0] - 21. * C[4] * C[1] + 42. * C[4] * C[0]**2 - 35. * C[3] * C[2] + 210. * C[3] * C[1] * C[0] - 210. * C[3] * C[0]**3 + 140. * C[2]**2 * C[0] + 210. * C[2] * C[1]**2 - 1260. * C[2] * C[1] * C[0]**2 + 840. * C[2] * C[0]**4 - 630. * C[1]**3 * C[0] + 2520. * C[1]**2 * C[0]**3 - 2520. * C[1] * C[0]**5 + 720. * C[0]**7) elif k == 8: kappa = (C[7] - 8. * C[6] * C[0] - 28. * C[5] * C[1] + 56. * C[5] * C[0]**2 - 56. * C[4] * C[2] + 336. * C[4] * C[1] * C[0] - 336. * C[4] * C[0]**3 - 35. * C[3]**2 + 560. * C[3] * C[2] * C[0] + 420. * C[1]**2 * C[3] - 2520. * C[3] * C[1] * C[0]**2 + 1680. * C[3] * C[0]**4 + 560. * C[2]**2 * C[1] - 1680. * C[2]**2 * C[0]**2 - 5040. * C[2] * C[1]**2 * C[0] + 13440. * C[2] * C[1] * C[0]**3 - 6720. * C[2] * C[0]**5 - 630. * C[1]**4 + 10080. * C[1]**3 * C[0]**2 - 25200. * C[1]**2 * C[0]**4 + 20160. * C[1] * C[0]**6 - 5040. * C[0]**8) else: raise ValueError("Invalid order k for cumulant calculation") return kappa def _compute_mean_pt_cumulants(self, data: np.ndarray, k: int) -> float: """ Compute mean transverse momentum cumulants from data at order k. Parameters ---------- data : np.ndarray Array of shape (num_events, 2*(k+1)) containing numerators and denominators up to order k. k : int Order of cumulant to compute. Returns ------- float Mean transverse momentum cumulant for order k. """ kappa_store = 0.0 C = np.zeros(k + 1) for order in range(k + 1): sum_numerator_tmp = 0.0 sum_denominator_tmp = 0.0 for i in range(data.shape[0]): sum_numerator_tmp += data[i, 2 * order] sum_denominator_tmp += data[i, 2 * order + 1] C_order = sum_numerator_tmp / sum_denominator_tmp C[order] = C_order kappa_store = self._kappa_cumulant(C, k + 1) return kappa_store
[docs] def mean_pt_cumulants(self, particle_list_all_events: List[List[Particle]], compute_error: bool = True, delete_fraction: float = 0.4, number_samples: int = 100, seed: int = 42) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Computes the mean transverse momentum cumulants for each order k from Eqs. B9-B16 in Ref. [1]. Parameters ---------- particle_list_all_events : list List of events, where each event is a list of particle objects. compute_error : bool, optional Whether to compute error estimates (default is True). delete_fraction : float, optional Fraction of data to delete for jackknife method (default is 0.4). number_samples : int, optional Number of jackknife samples (default is 100). seed : int, optional Random seed for reproducibility (default is 42). Returns ------- np.ndarray or tuple Mean transverse momentum cumulants for each order. If compute_error is True, returns a tuple (kappa, kappa_error). Raises ------ TypeError If delete_fraction is not a float. If number_samples is not an integer. If seed is not an integer. If compute_error is not a boolean. ValueError If delete_fraction is not between 0 and 1. If number_samples is not greater than 0. """ if not isinstance(delete_fraction, float): raise TypeError("delete_fraction must be a float") if not 0.0 < delete_fraction < 1.0: raise ValueError("delete_fraction must be between 0 and 1") if not isinstance(number_samples, int): raise TypeError("number_samples must be an integer") if not number_samples > 0: raise ValueError("number_samples must be greater than 0") if not isinstance(seed, int): raise TypeError("seed must be an integer") if not isinstance(compute_error, bool): raise TypeError("compute_error must be a boolean") self.N_events = [] self.D_events = [] self._compute_numerator_denominator_all_events(particle_list_all_events) self.N_events = np.array(self.N_events) self.D_events = np.array(self.D_events) kappa_store = np.zeros(self.max_order) kappa_store_error = np.zeros(self.max_order) for order in range(self.max_order): dataN = self.N_events[:, :2 * (order + 1)] dataD = self.D_events[:, :2 * (order + 1)] # combine the numerator and denominator for each order in data # always alternate the columns of the numerator and denominator combined_data = np.empty( (dataN.shape[0], dataN.shape[1] + dataD.shape[1]), dtype=dataN.dtype) combined_data[:, 0::2] = dataN combined_data[:, 1::2] = dataD kappa_store[order] = self._compute_mean_pt_cumulants( combined_data, order) if compute_error: jackknife = Jackknife(delete_fraction, number_samples, seed) kappa_store_error[order] = jackknife.compute_jackknife_estimates( combined_data, function=self._compute_mean_pt_cumulants, k=order) self.kappa = kappa_store if compute_error: self.kappa_error = kappa_store_error return (kappa_store, kappa_store_error) else: return kappa_store