Source code for PCAFlow

# ===================================================
#
#    Copyright (c) 2023-2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

from sparkx.flow import FlowInterface
import numpy as np
import warnings
import random as rd

rd.seed(42)


[docs] class PCAFlow(FlowInterface.FlowInterface): """ This class implements the flow analysis with Principal Component Analysis (PCA). PCAFlow is designed for analyzing anisotropic flow in high-energy nuclear collisions using two-particle correlations. It uses Principal Component Analysis (PCA) to extract flow information from particle data. This class implements the method proposed in: - [1] `Phys.Rev.Lett. 114 (2015) 15, 152301 [nucl-th/1410.7739] <https://inspirehep.net/literature/1324816>`__ The implementation is done in a similar way as in: - [2] `Bachelor thesis D.J.W. Verweij (2016) <https://studenttheses.uu.nl/handle/20.500.12932/26817>`__ Parameters ---------- n : int, optional The flow harmonic to compute. Must be a positive integer. Default is 2. alpha : int, optional The order in sub-leading flow up to which the flow is computed. Must be an integer greater than or equal to 1. Default is 2. number_subcalc : int, optional The number of sub-calculations to estimate the error of the flow. Must be an integer greater than or equal to 2. Default is 4. Methods ------- differential_flow: Computes the differential flow. Pearson_correlation: Returns the Pearson coefficient and its uncertainty if the flow is already computed. Attributes ---------- n_ : int The flow harmonic to compute. alpha_ : int The order in sub-leading flow up to which the flow is computed. number_subcalc_ : int The number of sub-calculations to estimate the error of the flow. number_events_ : int or None The total number of events. subcalc_counter_ : int A counter to keep track of the current sub-calculation. normalization_ : None or numpy.ndarray Normalization factors for each bin. bin_multiplicity_total_ : None or numpy.ndarray Total multiplicity in each bin across all events. sigma_multiplicity_total_ : list of numpy.ndarray List of arrays containing the multiplicity in each bin for each sub-calculation. number_events_subcalc_ : None or numpy.ndarray Number of events used in each sub-calculation. QnRe_total_ : None or numpy.ndarray Real part of the flow vector for each bin across all events. QnIm_total_ : None or numpy.ndarray Imaginary part of the flow vector for each bin across all events. SigmaQnReSub_total_ : None or numpy.ndarray Real part of the flow vector for each bin and sub-calculation. SigmaQnImSub_total_ : None or numpy.ndarray Imaginary part of the flow vector for each bin and sub-calculation. VnDelta_total_ : None or numpy.ndarray Covariance matrix for flow vectors across all events. SigmaVnDelta_total_ : None or numpy.ndarray Covariance matrix for flow vectors for each sub-calculation. FlowSubCalc_ : None or numpy.ndarray An array containing the flow magnitude for sub-calculations. Flow_ : None or numpy.ndarray An array containing the flow magnitude. FlowUncertainty_ : None or numpy.ndarray An array containing the uncertainty of the flow. Pearson_r_ : None or numpy.ndarray An array containing the Pearson correlation between two bins. Pearson_r_uncertainty_ : None or numpy.ndarray An array containing the Pearson correlation uncertainty between two bins. Raises ------ TypeError If n, alpha, or number_subcalc is not an integer. ValueError If n is not a positive integer, alpha is less than 1, or number_subcalc is less than 2. """ def __init__(self, n=2, alpha=2, number_subcalc=4): # flow harmonic to compute if not isinstance(n, int): raise TypeError('n has to be int') elif n <= 0: raise ValueError( 'n-th harmonic with value n<=0 can not be computed') else: self.n_ = n # order in sub-leading flow up to which the flow is computed if not isinstance(alpha, int): raise TypeError('alpha has to be int') elif alpha < 1: raise ValueError('alpha has to be >= 1') else: self.alpha_ = alpha # number of sub-calculations to estimate the error of the flow if not isinstance(number_subcalc, int): raise TypeError('number_subcalc has to be int') elif number_subcalc < 2: raise ValueError('number_subcalc has to be >= 2') else: self.number_subcalc_ = number_subcalc self.number_events_ = None self.subcalc_counter_ = 0 self.normalization_ = None self.bin_multiplicity_total_ = None self.sigma_multiplicity_total_ = [] self.number_events_subcalc_ = None self.QnRe_total_ = None self.QnIm_total_ = None self.SigmaQnReSub_total_ = None self.SigmaQnImSub_total_ = None self.VnDelta_total_ = None self.SigmaVnDelta_total_ = None self.FlowSubCalc_ = None self.Flow_ = None self.FlowUncertainty_ = None self.Pearson_r_ = None self.Pearson_r_uncertainty_ = None def integrated_flow(self, particle_data): warnings.warn("Integrated flow is not supported for PCAFlow") return None def __compute_normalization(self, bins): self.normalization_ = np.zeros((len(bins) - 1)) bin_widths = np.diff(bins) for bin in range(len(bins) - 1): self.normalization_[bin] = 1. / (2. * np.pi * bin_widths[bin])**2. def __update_event( self, event_data, bins, flow_as_function_of, event_number): """ Update the anisotropic flow calculations based on particle data from a single event. Parameters ---------- event_data : list List of particle data for a single event. bins : list or np.ndarray Bins used for the flow calculation. flow_as_function_of : str Variable on which the flow is calculated ("pt", "rapidity" or "pseudorapidity"). event_number : int Index of the current event. Returns ------- None """ number_bins = len(bins) - 1 bin_multiplicity_event = np.zeros(number_bins) QnRe = np.zeros(number_bins) QnIm = np.zeros(number_bins) SigmaQnReSubTot = np.zeros((number_bins, self.number_subcalc_)) SigmaQnImSubTot = np.zeros((number_bins, self.number_subcalc_)) VnDelta_event = np.zeros((number_bins, number_bins)) SigmaVnDelta_event = np.zeros( (number_bins, number_bins, self.number_subcalc_)) if event_number == 0: self.bin_multiplicity_total_ = np.zeros(number_bins) for i in range(self.number_subcalc_): self.sigma_multiplicity_total_.append(np.zeros(number_bins)) self.number_events_subcalc_ = np.zeros(self.number_subcalc_) self.QnRe_total_ = np.zeros(number_bins) self.QnIm_total_ = np.zeros(number_bins) self.SigmaQnReSub_total_ = np.zeros( (number_bins, self.number_subcalc_)) self.SigmaQnImSub_total_ = np.zeros( (number_bins, self.number_subcalc_)) self.VnDelta_total_ = np.zeros((number_bins, number_bins)) self.SigmaVnDelta_total_ = np.zeros( (number_bins, number_bins, self.number_subcalc_)) # update the sub-calculation counter if needed number_events_subcalc = self.number_events_ // self.number_subcalc_ if (event_number % number_events_subcalc == 0) and (event_number != 0) and ( self.subcalc_counter_ < self.number_subcalc_ - 1): self.subcalc_counter_ += 1 random_reaction_plane = rd.random() * 2. * np.pi # loop over all event particles and compute the flow vectors in the bins for particle in event_data: if flow_as_function_of == "pt": val = particle.pt_abs() elif flow_as_function_of == "rapidity": val = particle.momentum_rapidity_Y() elif flow_as_function_of == "pseudorapidity": val = particle.pseudorapidity() bin_index = np.digitize(val, bins, right=False) - 1 # handle the case in which the particle is not in the binned phase # space if (bin_index < 0) or (bin_index > number_bins - 1): continue phi = particle.phi() + random_reaction_plane bin_multiplicity_event[bin_index] += 1. QnRe[bin_index] += np.cos(self.n_ * phi) QnIm[bin_index] += np.sin(self.n_ * phi) SigmaQnReSubTot[bin_index][self.subcalc_counter_] += np.cos( self.n_ * phi) SigmaQnImSubTot[bin_index][self.subcalc_counter_] += np.sin( self.n_ * phi) # compute the covariance matrix for a in range(number_bins): for b in range(number_bins): if a == b: # correct with multiplicity term VnDelta_event[a][b] += (QnRe[a] * QnRe[b] + QnIm[a] * QnIm[b] - bin_multiplicity_event[a]) / self.normalization_[a] SigmaVnDelta_event[a][b][self.subcalc_counter_] += ( QnRe[a] * QnRe[b] + QnIm[a] * QnIm[b] - bin_multiplicity_event[a]) / self.normalization_[a] else: VnDelta_event[a][b] += (QnRe[a] * QnRe[b] + QnIm[a] * QnIm[b]) / self.normalization_[a] SigmaVnDelta_event[a][b][self.subcalc_counter_] += ( QnRe[a] * QnRe[b] + QnIm[a] * QnIm[b]) / self.normalization_[a] # update the class members self.bin_multiplicity_total_ += bin_multiplicity_event self.sigma_multiplicity_total_[ self.subcalc_counter_] += bin_multiplicity_event self.number_events_subcalc_[self.subcalc_counter_] += 1 self.QnRe_total_ += QnRe self.QnIm_total_ += QnIm self.SigmaQnReSub_total_ += SigmaQnReSubTot self.SigmaQnImSub_total_ += SigmaQnImSubTot self.VnDelta_total_ += VnDelta_event self.SigmaVnDelta_total_ += SigmaVnDelta_event def __compute_flow_PCA(self, bins): """ Perform Principal Component Analysis (PCA) to compute the anisotropic flow. Parameters ---------- bins : list or np.ndarray Bins used for the flow calculation. Returns ------- None """ number_bins = len(bins) - 1 for a in range(number_bins): for b in range(number_bins): self.VnDelta_total_[a][b] /= self.number_events_ self.VnDelta_total_[a][b] -= (self.QnRe_total_[a] * self.QnRe_total_[ b] + self.QnIm_total_[a] * self.QnIm_total_[b]) / self.number_events_**2. for sub in range(self.number_subcalc_): self.SigmaVnDelta_total_[ a][b][sub] /= self.number_events_subcalc_[sub] self.SigmaVnDelta_total_[a][b][sub] -= (self.SigmaQnReSub_total_[a][sub] * self.SigmaQnReSub_total_[b][sub] + self.SigmaQnImSub_total_[ a][sub] * self.SigmaQnImSub_total_[b][sub]) / (self.number_events_subcalc_[sub]**2. * self.normalization_[a]) # perform sub calculations for error estimation self.FlowSubCalc_ = np.zeros( (self.number_subcalc_, number_bins, self.alpha_)) for sub in range(self.number_subcalc_): VnDelta_local = np.zeros((number_bins, number_bins)) for a in range(number_bins): for b in range(number_bins): VnDelta_local[a][b] = self.SigmaVnDelta_total_[a][b][sub] eigenvalues, eigenvectors = np.linalg.eig(VnDelta_local) sort_indices = np.argsort(eigenvalues)[::-1] # Apply sorting to eigenvalues and eigenvectors sorted_eigenvalues = eigenvalues[sort_indices] sorted_eigenvectors = eigenvectors[:, sort_indices] for bin in range(number_bins): for alpha in range(self.alpha_): eval = sorted_eigenvalues[alpha] evec = sorted_eigenvectors[alpha] if (eval >= 0.) and ( self.sigma_multiplicity_total_[sub][bin] > 0.): # compute the flow here for the subcalc self.FlowSubCalc_[sub][bin][alpha] = np.sqrt( eval) * evec[bin] * self.number_events_subcalc_[sub] / self.sigma_multiplicity_total_[sub][bin] else: self.FlowSubCalc_[sub][bin][alpha] = None # compute the anisotropic flow VnDelta_local = np.zeros((number_bins, number_bins)) for a in range(number_bins): for b in range(number_bins): VnDelta_local[a][b] = self.VnDelta_total_[a][b] eigenvalues, eigenvectors = np.linalg.eig(VnDelta_local) sort_indices = np.argsort(eigenvalues)[::-1] # Apply sorting to eigenvalues and eigenvectors sorted_eigenvalues = eigenvalues[sort_indices] sorted_eigenvectors = eigenvectors[:, sort_indices] self.Flow_ = np.zeros((number_bins, self.alpha_)) for bin in range(number_bins): for alpha in range(self.alpha_): eval = sorted_eigenvalues[alpha] evec = sorted_eigenvectors[alpha] if (eval >= 0.) and (self.bin_multiplicity_total_[bin] > 0.): self.Flow_[bin][alpha] = np.sqrt( eval) * evec[bin] * self.number_events_ / self.bin_multiplicity_total_[bin] else: self.Flow_[bin][alpha] = None def __compute_uncertainty(self, bins): """ Compute the uncertainty of the anisotropic flow for each bin and order alpha. Parameters ---------- bins : list or np.ndarray Bins used for the flow calculation. Returns ------- None """ number_bins = len(bins) - 1 self.FlowUncertainty_ = np.zeros((number_bins, self.alpha_)) # sum the squared deviation from the mean for sub in range(self.number_subcalc_): for bin in range(number_bins): for alpha in range(self.alpha_): self.FlowUncertainty_[bin][alpha] += (self.FlowSubCalc_[sub][bin][alpha] - self.Flow_[ bin][alpha])**2. / (self.number_subcalc_ - 1) # take the sqrt to obtain the standard deviation for bin in range(number_bins): for alpha in range(self.alpha_): self.FlowUncertainty_[bin][alpha] = np.sqrt( self.FlowUncertainty_[bin][alpha]) # Conservative ansatz: If the error cannot be computed # (e.g., negative eigenvalue), set the flow magnitude to NaN for bin in range(number_bins): for alpha in range(self.alpha_): if np.isnan(self.FlowUncertainty_[bin][alpha]): self.Flow_[bin][alpha] = np.nan def __compute_factorization_breaking(self, bins): """ Compute the factorization breaking using the Pearson correlation coefficient Eq. (12), Ref. [1]. Parameters ---------- bins : list or np.ndarray Bins used for the flow calculation. Returns ------- None """ # compute the Pearson coefficient Eq. (12), Ref. [1] number_bins = len(bins) - 1 r = np.zeros((number_bins, number_bins)) for a in range(number_bins): for b in range(number_bins): r[a][b] = self.VnDelta_total_[a][b] / \ np.sqrt(self.VnDelta_total_[a][a] * self.VnDelta_total_[b][b]) self.Pearson_r_ = r # do this for each sub calculation r_sub = np.zeros((number_bins, number_bins, self.number_subcalc_)) for sub in range(self.number_subcalc_): for a in range(number_bins): for b in range(number_bins): r_sub[a][b][sub] = self.SigmaVnDelta_total_[a][b][sub] / np.sqrt( self.SigmaVnDelta_total_[a][a][sub] * self.SigmaVnDelta_total_[b][b][sub]) r_err = np.zeros((number_bins, number_bins)) # sum the squared deviation from the mean for sub in range(self.number_subcalc_): for a in range(number_bins): for b in range(number_bins): r_err[a][b] += (r_sub[a][b][sub] - r[a][b] )**2. / (self.number_subcalc_ - 1) # take the sqrt to obtain the standard deviation for a in range(number_bins): for b in range(number_bins): r_err[a][b] = np.sqrt(r_err[a][b]) self.Pearson_r_uncertainty_ = r_err
[docs] def differential_flow(self, particle_data, bins, flow_as_function_of): """ Compute the differential flow. Parameters ---------- particle_data : list List of particle data for multiple events. bins : list or np.ndarray Bins used for the differential flow calculation. flow_as_function_of : str Variable on which the flow is calculated ("pt", "rapidity" or "pseudorapidity"). Returns ------- list of numpy.ndarrays A list containing the differential flow and its uncertainty. Each array in the list corresponds: - anisotropic flow of order alpha (float): Differential flow magnitude for the bin. - anisotropic flow error (float): Error on the differential flow magnitude for the bin. Notes ----- - The flow or the uncertainty can be accessed by: `function_return[bin][alpha]` - If a bin has no events or the uncertainty could not be computed, the corresponding element in the result list is set to `np.nan`. """ if not isinstance(bins, (list, np.ndarray)): raise TypeError('bins has to be list or np.ndarray') if not isinstance(flow_as_function_of, str): raise TypeError('flow_as_function_of is not a string') if flow_as_function_of not in ["pt", "rapidity", "pseudorapidity"]: raise ValueError( "flow_as_function_of must be either 'pt', 'rapidity', 'pseudorapidity'") self.__compute_normalization(bins) self.number_events_ = len(particle_data) for event in range(self.number_events_): self.__update_event( particle_data[event], bins, flow_as_function_of, event) self.__compute_flow_PCA(bins) self.__compute_uncertainty(bins) self.__compute_factorization_breaking(bins) return [self.Flow_, self.FlowUncertainty_]
[docs] def Pearson_correlation(self): """ Retrieve the Pearson correlation coefficient and its uncertainty (Eq. (12), Ref. [1]). Returns ------- list A list containing the Pearson correlation coefficient and its uncertainty. - Pearson_r (numpy.ndarray): The Pearson correlation coefficient for factorization breaking between each pair of bins. - Pearson_r_uncertainty (numpy.ndarray): The uncertainty of the Pearson correlation coefficient for factorization breaking between each pair of bins. Notes ----- The values in the list can be accessed by name[bin1][bin2]. """ return [self.Pearson_r_, self.Pearson_r_uncertainty_]