Source code for ScalarProductFlow

# ===================================================
#
#    Copyright (c) 2023-2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

from sparkx.flow import FlowInterface
from sparkx.Particle import Particle
import numpy as np
from typing import List, Tuple, Union


[docs] class ScalarProductFlow(FlowInterface.FlowInterface): """ This class implements a scalar product flow analysis algorithm `Adler, C., et al. "Elliptic flow from two-and four-particle correlations in Au+ Au collisions at s NN= 130 GeV." Physical Review C 66.3 (2002): 034904 <https://journals.aps.org/prc/pdf/10.1103/PhysRevC.66.034904?casa_token=lQ6DZfopfxgAAAAA%3ANYaROBYUxtCjJ_2xHDHWLx4tfi9LE6SC92EcH-8Cm0GFhXn-RzpyPIYAyIedFaweDvYjkhSEeaK1K8A>`__. For this method, the flow is calculated by correlating the event vector :math:`Q` with the conjugated unit momentum vector of the particle. This is normalized by square root of the scalar product of the event vectors of two equal-sized sub-events. We choose here to divide the sub-events by positive and negative pseudorapidity. Note that for asymmetric systems, this will not be sufficient. In summary, this class calculates the following: .. math:: v_n = \\frac{\\langle Q_n u_{n,i}^\\ast \\rangle}{2\\sqrt{\\langle Q_n^aQ_n^{b \\ast}\\rangle}} where we average over all particles of all events. Parameters ---------- n : int, optional The value of the harmonic. Default is 2. weight : str, optional The weight used for calculating the flow. Default is :code:`pt2`. pseudorapidity_gap : float, optional The pseudorapidity gap used for dividing the particles into sub-events. Default is 0.0. Methods ------- integrated_flow: Computes the integrated flow. differential_flow: Computes the differential flow. Examples -------- A demonstration how to calculate flow according to the event plane of a separate particle list. The same particle list can also be used to determine the event plane and the flow. .. highlight:: python .. code-block:: python :linenos: >>> from sparkx.Jetscape import Jetscape >>> from sparkx.flow.ScalarProductFlow import ScalarProductFlow >>> >>> JETSCAPE_FILE_PATH_FLOW = [Jetscape_directory]/particle_lists_flow.dat >>> JETSCAPE_FILE_PATH_EVENT_PLANE = [Jetscape_directory]/particle_lists_ep.dat >>> >>> # Jetscape object containing the particles on which we want to calculate flow >>> jetscape_flow = Jetscape(JETSCAPE_FILE_PATH_FLOW).particle_objects_list() >>> >>> # Jetscape object containing the particles which determine the event plane >>> jetscape_event = Jetscape(JETSCAPE_FILE_EVENT_PLANE).particle_objects_list() >>> >>> # Create flow objects for v2, weighted with pT**2 and v3 weighted with pT**2 >>> flow2 = ScalarProductFlow(n=2, weight="pt2",pseudorapidity_gap=0.1) >>> flow3 = ScalarProductFlow(n=3, weight="pt2",pseudorapidity_gap=0.1) >>> >>> # Calculate the integrated flow with error >>> v2, v2_error = flow2.integrated_flow(jetscape_flow,jetscape_event) >>> v3, v3_error = flow3.integrated_flow(jetscape_flow,jetscape_event) """ def __init__( self, n: int = 2, weight: str = "pt2", pseudorapidity_gap: float = 0.0 ) -> None: """ Initialize the ScalarProductFlow object. Parameters ---------- n : int, optional The value of the harmonic. Default is 2. weight : str, optional The weight used for calculating the flow. Default is "pt2". pseudorapidity_gap : float, optional The pseudorapidity gap used for dividing the particles into sub-events. Default is 0.0. """ if not isinstance(n, int): raise TypeError("n has to be int") elif n <= 0: raise ValueError( "n-th harmonic with value n<=0 can not be computed" ) else: self.n_ = n if not isinstance(weight, str): raise TypeError("weight has to be a string") elif weight not in ["pT", "pT2", "pTn", "rapidity", "pseudorapidity"]: raise ValueError( "Invalid weight given, choose one of the following: 'pT', 'pT2', 'pTn', 'rapidity', 'pseudorapidity'" ) else: self.weight_ = weight if not isinstance(pseudorapidity_gap, (int, float)): raise TypeError("n has to be int") elif pseudorapidity_gap < 0: raise ValueError( "pseudorapidity value with gap < 0 can not be computed" ) else: self.pseudorapidity_gap_ = pseudorapidity_gap def __compute_particle_weights( self, particle_data: List[List[Particle]] ) -> List[List[float]]: event_weights = [] for event in range(len(particle_data)): particle_weights = [] for particle in particle_data[event]: weight = 0.0 if self.weight_ == "pT": weight = particle.pT_abs() elif self.weight_ == "pT2": weight = particle.pT_abs() ** 2.0 elif self.weight_ == "pTn": weight = particle.pT_abs() ** self.n_ elif self.weight_ == "rapidity": weight = particle.rapidity() elif self.weight_ == "pseudorapidity": weight = particle.pseudorapidity() particle_weights.append(weight) event_weights.append(particle_weights) return event_weights def __compute_flow_vectors( self, particle_data: List[List[Particle]], weights: List[List[float]] ) -> List[complex]: # Q vector whole event Q_vector = [] for event in range(len(particle_data)): Q_vector_val = 0.0 + 0.0j for particle in range(len(particle_data[event])): Q_vector_val += weights[event][particle] * np.exp( 1.0j * float(self.n_) * particle_data[event][particle].phi() ) Q_vector.append(Q_vector_val) return Q_vector def __compute_event_angles_sub_events( self, particle_data: List[List[Particle]], weights: List[List[float]] ) -> Tuple[List[complex], List[complex]]: # Q vector sub-event A Q_vector_A = [] relevant_weights_A = [] for event in range(len(particle_data)): Q_vector_A_val = 0.0 + 0.0j relevant_weights_A_event = [] for particle in range(len(particle_data[event])): if ( particle_data[event][particle].pseudorapidity() >= +self.pseudorapidity_gap_ ): Q_vector_A_val += weights[event][particle] * np.exp( 1.0j * float(self.n_) * particle_data[event][particle].phi() ) relevant_weights_A_event.append(weights[event][particle]) Q_vector_A.append(Q_vector_A_val) relevant_weights_A.extend([relevant_weights_A_event]) # Q vector sub-event B Q_vector_B = [] relevant_weights_B = [] # count=0 for event in range(len(particle_data)): Q_vector_B_val = 0.0 + 0.0j relevant_weights_B_event = [] for particle in range(len(particle_data[event])): if ( particle_data[event][particle].pseudorapidity() < -self.pseudorapidity_gap_ ): Q_vector_B_val += weights[event][particle] * np.exp( 1.0j * float(self.n_) * particle_data[event][particle].phi() ) relevant_weights_B_event.append(weights[event][particle]) # count+=1 Q_vector_B.append(Q_vector_B_val) relevant_weights_B.extend([relevant_weights_B_event]) return Q_vector_A, Q_vector_B def __compute_u_vectors( self, particle_data: List[List[Particle]] ) -> List[List[complex]]: u_vectors = [] # [event][particle] for event in particle_data: u_vector_event = [] for particle in event: u_vector_event.append( np.exp(1.0j * float(self.n_) * particle.phi()) ) u_vectors.extend([u_vector_event]) return u_vectors def __compute_event_plane_resolution( self, Q_vector_A: List[complex], Q_vector_B: List[complex] ) -> float: # implements Eq.15 from arXiv:0809.2949 QnSquared = np.asarray( [ (np.conjugate(Q_vector_A[event]) * Q_vector_B[event]).real for event in range(len(Q_vector_A)) ] ) QnSquaredSum = np.mean(QnSquared) return 2.0 * np.sqrt(QnSquaredSum) def __compute_flow_particles( self, particle_data: List[List[Particle]], weights: List[List[float]], Q_vector: List[complex], u_vectors: List[List[complex]], resolution: float, self_corr: bool, ) -> List[List[float]]: flow_values = [] for event in range(len(particle_data)): flow_values_event = [] for particle in range(len(particle_data[event])): weight_particle = np.abs(weights[event][particle]) Q_vector_particle = Q_vector[event] if self_corr: Q_vector_particle -= ( weight_particle * u_vectors[event][particle] ) # avoid autocorrelation u_vector = u_vectors[event][particle] vn_obs = (np.conjugate(u_vector) * Q_vector_particle).real flow_of_particle = vn_obs / resolution flow_values_event.append(flow_of_particle) flow_values.extend([flow_values_event]) return flow_values def __calculate_reference( self, particle_data_event_plane: List[List[Particle]] ) -> Tuple[float, List[complex]]: event_weights_event_plane = self.__compute_particle_weights( particle_data_event_plane ) Q_vector_A, Q_vector_B = self.__compute_event_angles_sub_events( particle_data_event_plane, event_weights_event_plane ) resolution = self.__compute_event_plane_resolution( Q_vector_A, Q_vector_B ) Q_vector = self.__compute_flow_vectors( particle_data_event_plane, event_weights_event_plane ) return resolution, Q_vector def __calculate_particle_flow( self, particle_data: List[List[Particle]], resolution: float, Q_vector: List[complex], self_corr: bool, ) -> List[List[float]]: event_weights = self.__compute_particle_weights(particle_data) u_vectors = self.__compute_u_vectors(particle_data) return self.__compute_flow_particles( particle_data, event_weights, Q_vector, u_vectors, resolution, self_corr, ) def __calculate_flow_event_average( self, particle_data: List[List[Particle]], flow_particle_list: List[List[float]], ) -> Tuple[float, float]: # compute the integrated flow number_of_particles = 0.0 flowvalue = 0.0 flowvalue_squared = 0.0 for event in range(len(flow_particle_list)): for particle in range(len(flow_particle_list[event])): weight = ( 1.0 if np.isnan(particle_data[event][particle].weight) else particle_data[event][particle].weight ) number_of_particles += weight flowvalue += flow_particle_list[event][particle] * weight flowvalue_squared += ( flow_particle_list[event][particle] ** 2.0 * weight**2.0 ) vn_integrated = 0.0 sigma = 0.0 if number_of_particles == 0: vn_integrated = 0.0 sigma = 0.0 else: vn_integrated = flowvalue / number_of_particles vn_squared = flowvalue_squared / number_of_particles**2.0 std_deviation = np.sqrt(vn_integrated**2.0 - vn_squared) sigma = std_deviation / np.sqrt(number_of_particles) return vn_integrated, sigma
[docs] def integrated_flow( self, particle_data: List[List[Particle]], particle_data_event_plane: List[List[Particle]], self_corr: bool = True, ) -> Tuple[float, float]: """ Compute the integrated flow. Parameters ---------- particle_data : list List of particle data of which the flow is calculated. particle_data_event_plane : list List of particle data for the event plane calculation. self_corr : bool, optional Whether to consider self-correlation in the flow calculation. Default is True. Returns ------- tuple A tuple containing the integrated flow value and the corresponding uncertainty. """ if not isinstance(self_corr, bool): raise TypeError("self_corr has to be bool") resolution, Q_vector = self.__calculate_reference( particle_data_event_plane ) return self.__calculate_flow_event_average( particle_data, self.__calculate_particle_flow( particle_data, resolution, Q_vector, self_corr ), )
[docs] def differential_flow( self, particle_data: List[List[Particle]], bins: Union[np.ndarray, List[float]], flow_as_function_of: str, particle_data_event_plane: List[List[Particle]], self_corr: bool = True, ) -> List[Tuple[float, float]]: """ Compute the differential flow. Parameters ---------- particle_data : list List of particle data of which the flow is calculated. bins : list or np.ndarray Bins used for the differential flow calculation. flow_as_function_of : str Variable on which the flow is calculated ("pT", "rapidity", or "pseudorapidity"). particle_data_event_plane : list List of particle data for the event plane calculation. self_corr : bool, optional Whether to consider self-correlation in the flow calculation. Default is True. Returns ------- list A list of tuples containing the flow values and uncertainties for each bin. """ if not isinstance(self_corr, bool): raise TypeError("self_corr has to be bool") if not isinstance(bins, (list, np.ndarray)): raise TypeError("bins has to be list or np.ndarray") if not isinstance(flow_as_function_of, str): raise TypeError("flow_as_function_of is not a string") if flow_as_function_of not in ["pT", "rapidity", "pseudorapidity"]: raise ValueError( "flow_as_function_of must be either 'pT', 'rapidity', 'pseudorapidity'" ) particles_bin = [] for bin in range(len(bins) - 1): events_bin = [] for event in range(len(particle_data)): particles_event = [] for particle in particle_data[event]: val = 0.0 if flow_as_function_of == "pT": val = particle.pT_abs() elif flow_as_function_of == "rapidity": val = particle.rapidity() elif flow_as_function_of == "pseudorapidity": val = particle.pseudorapidity() if val >= bins[bin] and val < bins[bin + 1]: particles_event.append(particle) events_bin.extend([particles_event]) particles_bin.extend([events_bin]) resolution, Q_vector = self.__calculate_reference( particle_data_event_plane ) flow_bin = [] for bin in range(len(bins) - 1): flow_bin.append( self.__calculate_flow_event_average( particle_data, self.__calculate_particle_flow( particles_bin[bin], resolution, Q_vector, self_corr ), ) ) return flow_bin