Source code for ReactionPlaneFlow

# ===================================================
#
#    Copyright (c) 2023-2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

from sparkx.flow import FlowInterface
from sparkx.Particle import Particle
import numpy as np
from typing import List, Union


[docs] class ReactionPlaneFlow(FlowInterface.FlowInterface): """ This class implements a reaction plane flow analysis algorithm. For this method, the flow is calculated under the assumption that the reaction plane angle is constant throughout all events, i.e., the impact parameter is always oriented in the same direction. The flow is calculated as .. math:: v_n = \\left\\langle \\exp{in\\phi_i}\\right\\rangle, where we average over all particles of all events. We return complex numbers, which contain the information about the magnitude and the angle. If a weight is set for the particles in the `Particle` objects, then this is used in the flow calculation. Parameters ---------- n : int, optional The value of the harmonic. Default is 2. Methods ------- integrated_flow: Computes the integrated flow. differential_flow: Computes the differential flow. Examples -------- A demonstration how to calculate flow with the reaction plane method. .. highlight:: python .. code-block:: python :linenos: >>> from sparkx.Jetscape import Jetscape >>> from sparkx.flow.ReactionPlaneFlow import ReactionPlaneFlow >>> >>> JETSCAPE_FILE_PATH_FLOW = [Jetscape_directory]/particle_lists_flow.dat >>> # Jetscape object containing the particles on which we want to calculate flow >>> jetscape_flow = Jetscape(JETSCAPE_FILE_PATH_FLOW) >>> >>> # Create flow objects for v2 >>> flow2 = ReactionPlaneFlow(n=2).particle_objects_list() >>> >>> # Calculate the integrated flow with error >>> v2 = flow2.integrated_flow(jetscape_flow) >>> >>> # Calculate the differential flow with error >>> pT_bins = [0.0,0.5,1.0,2.0,3.0,4.0] >>> v2_differential = flow2.integrated_flow(jetscape_flow,pT_bins,'pT') """ def __init__(self, n: int = 2) -> None: """ Initialize the ReactionPlaneFlow object. Parameters ---------- n : int, optional The value of the harmonic. Default is 2. """ if not isinstance(n, int): raise TypeError("n has to be int") elif n <= 0: raise ValueError( "n-th harmonic with value n<=0 can not be computed" ) else: self.n_ = n
[docs] def integrated_flow(self, particle_data: List[List[Particle]]) -> complex: """ Compute the integrated flow. Parameters ---------- particle_data : list List of particle data. Returns ------- complex The integrated flow value, represented as a complex number. """ flow_event_average = 0.0 + 0.0j number_particles = 0.0 for event in range(len(particle_data)): flow_event = 0.0 + 0.0j for particle in range(len(particle_data[event])): weight = ( 1.0 if np.isnan(particle_data[event][particle].weight) else particle_data[event][particle].weight ) phi = particle_data[event][particle].phi() flow_event += weight * np.exp(1j * self.n_ * phi) number_particles += weight if number_particles != 0.0: flow_event_average += flow_event else: flow_event_average = 0.0 + 0.0j flow_event_average /= number_particles return flow_event_average
[docs] def differential_flow( self, particle_data: List[List[Particle]], bins: Union[np.ndarray, List[float]], flow_as_function_of: str, ) -> List[complex]: """ Compute the differential flow. Parameters ---------- particle_data : list List of particle data. bins : list or np.ndarray Bins used for the differential flow calculation. flow_as_function_of : str Variable on which the flow is calculated ("pT", "rapidity", or "pseudorapidity"). Returns ------- list A list of complex numbers representing the flow values for each bin. """ if not isinstance(bins, (list, np.ndarray)): raise TypeError("bins has to be list or np.ndarray") if not isinstance(flow_as_function_of, str): raise TypeError("flow_as_function_of is not a string") if flow_as_function_of not in ["pT", "rapidity", "pseudorapidity"]: raise ValueError( "flow_as_function_of must be either 'pT', 'rapidity', 'pseudorapidity'" ) particles_bin = [] for bin in range(len(bins) - 1): events_bin = [] for event in range(len(particle_data)): particles_event = [] for particle in particle_data[event]: val = 0.0 if flow_as_function_of == "pT": val = particle.pT_abs() elif flow_as_function_of == "rapidity": val = particle.rapidity() elif flow_as_function_of == "pseudorapidity": val = particle.pseudorapidity() if val >= bins[bin] and val < bins[bin + 1]: particles_event.append(particle) events_bin.extend([particles_event]) particles_bin.extend([events_bin]) return self.__differential_flow_calculation(particles_bin)
def __differential_flow_calculation( self, binned_particle_data: List[List[List[Particle]]] ) -> List[complex]: flow_differential = [ 0.0 + 0.0j for i in range(len(binned_particle_data)) ] for bin in range(len(binned_particle_data)): number_particles = 0.0 flow_event_average = 0.0 + 0.0j for event in range(len(binned_particle_data[bin])): flow_event = 0.0 + 0.0j for particle in range(len(binned_particle_data[bin][event])): weight = ( 1.0 if np.isnan( binned_particle_data[bin][event][particle].weight ) else binned_particle_data[bin][event][particle].weight ) phi = binned_particle_data[bin][event][particle].phi() flow_event += weight * np.exp(1j * self.n_ * phi) number_particles += weight flow_event_average += flow_event if number_particles != 0.0: flow_event_average /= number_particles else: flow_event_average = 0.0 + 0.0j flow_differential[bin] = flow_event_average return flow_differential