# ===================================================
#
# Copyright (c) 2023-2024
# SPARKX Team
#
# GNU General Public License (GPLv3 or later)
#
# ===================================================
from sparkx.flow import FlowInterface
import numpy as np # type: ignore
from scipy import special # type: ignore
import scipy.optimize as optimize # type: ignore
import warnings
from sparkx.Particle import Particle
from typing import List, Tuple, Union
[docs]
class EventPlaneFlow(FlowInterface.FlowInterface):
"""
This class implements a event plane flow analysis algorithm
`Poskanzer, Arthur M., and Sergei A. Voloshin. "Methods for analyzing anisotropic flow in relativistic nuclear collisions." Physical Review C 58.3 (1998): 1671. <https://link.aps.org/pdf/10.1103/PhysRevC.58.1671?casa_token=rinfpV-zLrYAAAAA:Mv0gWZZBE8I9zpBPLog8myr7uKzLHO1TelYz0BN7boUORUWnP3Fyybuc0OKNKK5YH5ceNy5qVNO8rws>`__.
For this method, the flow is calculated by calculating the nth harmonic of
the angle the event plane :math:`Q`, relative to the event plane angle
determined from the flow itself.
Since finite multiplicity limits the estimation of the angle of the reaction
plane, the flow is corrected for the event plane resolution for each harmonic.
In summary, this class calculates the following:
.. math::
v_n = \\frac{v_{n}^{\\mathrm{obs}}}{R_n}
with
.. math::
v_{n}^{\\mathrm{obs}} = \\langle \\cos(n (\\phi_i -\\Psi_n)) \\rangle
where we average over all particles of all events. :math:`R_n` is the
resolution, which we calculate in an iterative method according to the
mentioned reference. This is done by using two sub-events, which we assume
to be of same size and which are determined by splitting the event into
subsets of positive and negative pseudorapidity.
Parameters
----------
n : int, optional
The value of the harmonic. Default is 2.
weight : str, optional
The weight used for calculating the flow. Default is :code:`pt2`.
pseudorapidity_gap : float, optional
The pseudorapidity gap used for dividing the particles into sub-events.
Default is 0.0.
Methods
-------
integrated_flow:
Computes the integrated flow.
differential_flow:
Computes the differential flow.
Examples
--------
A demonstration how to calculate flow according to the event plane of a
separate particle list.
The same particle list can also be used to determine the event plane and
the flow.
.. highlight:: python
.. code-block:: python
:linenos:
>>> from sparkx.Jetscape import Jetscape
>>> from sparkx.flow.EventPlaneFlow import EventPlaneFlow
>>>
>>> JETSCAPE_FILE_PATH_FLOW = [Jetscape_directory]/particle_lists_flow.dat
>>> JETSCAPE_FILE_PATH_EVENT_PLANE = [Jetscape_directory]/particle_lists_ep.dat
>>>
>>> # Jetscape object containing the particles on which we want to calculate flow
>>> jetscape_flow = Jetscape(JETSCAPE_FILE_PATH_FLOW).particle_objects_list()
>>>
>>> # Jetscape object containing the particles which determine the event plane
>>> jetscape_event = Jetscape(JETSCAPE_FILE_EVENT_PLANE).particle_objects_list()
>>>
>>> # Create flow objects for v2, weighted with pT**2 and v3 weighted with pT**2
>>> flow2 = EventPlaneFlow(n=2, weight="pt2",pseudorapidity_gap=0.1)
>>> flow3 = EventPlaneFlow(n=3, weight="pt2",pseudorapidity_gap=0.1)
>>>
>>> # Calculate the integrated flow with error
>>> v2, v2_error = flow2.integrated_flow(jetscape_flow,jetscape_event)
>>> v3, v3_error = flow3.integrated_flow(jetscape_flow,jetscape_event)
"""
def __init__(
self, n: int = 2, weight: str = "pT2", pseudorapidity_gap: float = 0.0
) -> None:
"""
Initialize the ScalarProductFlow object.
Parameters
----------
n : int, optional
The value of the harmonic. Default is 2.
weight : str, optional
The weight used for calculating the flow. Default is "pT2".
pseudorapidity_gap : float, optional
The pseudorapidity gap used for dividing the particles into sub-events.
Default is 0.0.
"""
if not isinstance(n, int):
raise TypeError("n has to be int")
elif n <= 0:
raise ValueError(
"n-th harmonic with value n<=0 can not be computed"
)
else:
self.n_ = n
if not isinstance(weight, str):
raise TypeError("weight has to be a string")
elif weight not in ["pT", "pT2", "pTn", "rapidity", "pseudorapidity"]:
raise ValueError(
"Invalid weight given, choose one of the following: 'pT', 'pT2', 'pTn', 'rapidity', 'pseudorapidity'"
)
else:
self.weight_ = weight
if not isinstance(pseudorapidity_gap, (int, float)):
raise TypeError("n has to be int")
elif pseudorapidity_gap < 0:
raise ValueError(
"pseudorapidity value with gap < 0 can not be computed"
)
else:
self.pseudorapidity_gap_ = pseudorapidity_gap
def __compute_particle_weights(
self, particle_data: List[List[Particle]]
) -> List[List[float]]:
event_weights = []
for event in range(len(particle_data)):
particle_weights = []
for particle in particle_data[event]:
weight = 0.0
if self.weight_ == "pT":
weight = particle.pT_abs()
elif self.weight_ == "pT2":
weight = particle.pT_abs() ** 2.0
elif self.weight_ == "pTn":
weight = particle.pT_abs() ** self.n_
elif self.weight_ == "rapidity":
weight = particle.rapidity()
elif self.weight_ == "pseudorapidity":
weight = particle.pseudorapidity()
particle_weights.append(weight)
event_weights.append(particle_weights)
return event_weights
def __compute_flow_vectors(
self, particle_data: List[List[Particle]], weights: List[List[float]]
) -> List[complex]:
# Q vector whole event
Q_vector = []
for event in range(len(particle_data)):
Q_vector_val = 0.0 + 0.0j
for particle in range(len(particle_data[event])):
Q_vector_val += weights[event][particle] * np.exp(
1.0j * float(self.n_) * particle_data[event][particle].phi()
)
Q_vector.append(Q_vector_val)
return Q_vector
def __sum_weights(self, weights: List[List[float]]) -> List[float]:
sum_weights = []
for event in weights:
weight_val = np.sum(np.square(event))
sum_weights.append(weight_val)
return sum_weights
def __compute_event_angles_sub_events(
self, particle_data: List[List[Particle]], weights: List[List[float]]
) -> Tuple[List[float], List[float]]:
# Q vector sub-event A
Q_vector_A = []
relevant_weights_A = []
for event in range(len(particle_data)):
Q_vector_A_val = 0.0 + 0.0j
relevant_weights_A_event = []
for particle in range(len(particle_data[event])):
if (
particle_data[event][particle].pseudorapidity()
>= +self.pseudorapidity_gap_
):
Q_vector_A_val += weights[event][particle] * np.exp(
1.0j
* float(self.n_)
* particle_data[event][particle].phi()
)
relevant_weights_A_event.append(weights[event][particle])
Q_vector_A.append(Q_vector_A_val)
relevant_weights_A.extend([relevant_weights_A_event])
# Q vector sub-event B
Q_vector_B = []
relevant_weights_B = []
for event in range(len(particle_data)):
Q_vector_B_val = 0.0 + 0.0j
relevant_weights_B_event = []
for particle in range(len(particle_data[event])):
if (
particle_data[event][particle].pseudorapidity()
< -self.pseudorapidity_gap_
):
Q_vector_B_val += weights[event][particle] * np.exp(
1.0j
* float(self.n_)
* particle_data[event][particle].phi()
)
relevant_weights_B_event.append(weights[event][particle])
Q_vector_B.append(Q_vector_B_val)
relevant_weights_B.extend([relevant_weights_B_event])
sum_weights_A = self.__sum_weights(relevant_weights_A)
sum_weights_B = self.__sum_weights(relevant_weights_B)
for event in range(len(particle_data)):
# avoid division by 0, if there is no weight, there is also no
# particle and no flow for an event
if sum_weights_A[event] == 0.0:
Q_vector_A[event] = 0.0
else:
Q_vector_A[event] /= np.sqrt(sum_weights_A[event])
if sum_weights_B[event] == 0.0:
Q_vector_B[event] = 0.0
else:
Q_vector_B[event] /= np.sqrt(sum_weights_B[event])
# compute event plane angles of sub-events
Psi_A = [
(1.0 / float(self.n_))
* np.arctan2(Q_vector_A[event].imag, Q_vector_A[event].real)
for event in range(len(particle_data))
]
Psi_B = [
(1.0 / float(self.n_))
* np.arctan2(Q_vector_B[event].imag, Q_vector_B[event].real)
for event in range(len(particle_data))
]
return Psi_A, Psi_B
def __compute_u_vectors(
self, particle_data: List[List[Particle]]
) -> List[List[complex]]:
u_vectors = [] # [event][particle]
for event in particle_data:
u_vector_event = []
for particle in event:
u_vector_event.append(
np.exp(1.0j * float(self.n_) * particle.phi())
)
u_vectors.extend([u_vector_event])
return u_vectors
def __compute_event_plane_resolution(
self, Psi_A: List[float], Psi_B: List[float]
) -> float:
RnSquared = np.asarray(
[
np.cos(self.n_ * (Psi_A[event] - Psi_B[event]))
for event in range(len(Psi_A))
]
)
RnSquaredSum = np.sum(RnSquared)
Rn = np.sqrt(RnSquaredSum / len(Psi_A))
# calculate x for given Rn, then update R with x value:
# x_new = sqrt(2) * x
# alternative: if Rn < 0.5: R = Rn * sqrt(2), we don't do that
# implements: arXiv:0809.2949
def resolution(x: float) -> float:
R = (
(np.sqrt(np.pi) / 2.0)
* x
* np.exp(-0.5 * x * x)
* (special.i0(0.5 * x * x) + special.i1(0.5 * x * x))
)
return R
def f1(x: float, Rn: float) -> float:
return resolution(x) - Rn
def f1_wrapper(x: float) -> float:
return f1(x, Rn)
try:
xi = optimize.root_scalar(
f1_wrapper, bracket=[0, 20], method="brentq"
).root
except BaseException:
warnings.warn(
"Could not find solution of resolution equation. Use approximation instead."
)
return Rn
xi_new = np.sqrt(2) * xi
return resolution(xi_new)
def __compute_flow_particles(
self,
particle_data: List[List[Particle]],
weights: List[List[float]],
Q_vector: List[complex],
u_vectors: List[List[complex]],
resolution: float,
self_corr: bool,
) -> Tuple[List[List[float]], List[List[float]]]:
flow_values = []
psi_values = []
for event in range(len(particle_data)):
flow_values_event = []
psi_values_event = []
for particle in range(len(particle_data[event])):
weight_particle = np.abs(weights[event][particle])
Q_vector_particle = Q_vector[event]
if self_corr:
Q_vector_particle -= (
weight_particle * u_vectors[event][particle]
) # avoid autocorrelation
Psi_n = (1.0 / float(self.n_)) * np.arctan2(
Q_vector_particle.imag, Q_vector_particle.real
)
vn_obs = np.cos(
float(self.n_)
* (particle_data[event][particle].phi() - Psi_n)
)
flow_of_particle = vn_obs / resolution
flow_values_event.append(flow_of_particle)
psi_values_event.append(Psi_n)
flow_values.extend([flow_values_event])
psi_values.extend([psi_values_event])
return flow_values, psi_values
def __calculate_reference(
self, particle_data_event_plane: List[List[Particle]]
) -> Tuple[float, List[complex]]:
event_weights_event_plane = self.__compute_particle_weights(
particle_data_event_plane
)
Psi_A, Psi_B = self.__compute_event_angles_sub_events(
particle_data_event_plane, event_weights_event_plane
)
resolution = self.__compute_event_plane_resolution(Psi_A, Psi_B)
Q_vector = self.__compute_flow_vectors(
particle_data_event_plane, event_weights_event_plane
)
return resolution, Q_vector
def __calculate_particle_flow(
self,
particle_data: List[List[Particle]],
resolution: float,
Q_vector: List[complex],
self_corr: bool,
) -> Tuple[List[List[float]], List[List[float]]]:
event_weights = self.__compute_particle_weights(particle_data)
u_vectors = self.__compute_u_vectors(particle_data)
flow_values, psi_values = self.__compute_flow_particles(
particle_data,
event_weights,
Q_vector,
u_vectors,
resolution,
self_corr,
)
return flow_values, psi_values
def __calculate_flow_event_average(
self,
particle_data: List[List[Particle]],
flow_particle_list: List[List[float]],
psi_particle_list: List[List[float]],
) -> Tuple[float, float, float, float]:
# compute the integrated flow
number_of_particles = 0.0
flowvalue = 0.0
flowvalue_squared = 0.0
psivalue = 0.0
psivalue_squared = 0.0
for event in range(len(flow_particle_list)):
for particle in range(len(flow_particle_list[event])):
weight = (
1.0
if np.isnan(particle_data[event][particle].weight)
else particle_data[event][particle].weight
)
number_of_particles += weight
flowvalue += flow_particle_list[event][particle] * weight
flowvalue_squared += (
flow_particle_list[event][particle] ** 2.0 * weight**2.0
)
psivalue += psi_particle_list[event][particle] * weight
psivalue_squared += (
psi_particle_list[event][particle] ** 2.0 * weight**2.0
)
vn_integrated = 0.0
Psi_n = 0.0
sigma = 0.0
sigma_Psi = 0.0
if number_of_particles == 0:
vn_integrated = 0.0
Psi_n = 0.0
sigma = 0.0
sigma_Psi = 0.0
else:
vn_integrated = flowvalue / number_of_particles
Psi_n = psivalue / number_of_particles
vn_squared = flowvalue_squared / number_of_particles**2.0
Psi_n_squared = psivalue_squared / number_of_particles**2.0
std_deviation = np.sqrt(vn_integrated**2.0 - vn_squared)
std_deviation_Psi = np.sqrt(Psi_n**2.0 - Psi_n_squared)
sigma = std_deviation / np.sqrt(number_of_particles)
sigma_Psi = std_deviation_Psi / np.sqrt(number_of_particles)
return vn_integrated, sigma, Psi_n, sigma_Psi
[docs]
def integrated_flow(
self,
particle_data: List[List[Particle]],
particle_data_event_plane: List[List[Particle]],
self_corr: bool = True,
) -> Tuple[float, float, float, float]:
"""
Compute the integrated flow.
Parameters
----------
particle_data : list
List of particle data of which the flow is calculated.
particle_data_event_plane : list
List of particle data for the event plane calculation.
self_corr : bool, optional
Whether to consider self-correlation in the flow calculation.
Default is True.
Returns
-------
tuple
A tuple containing the integrated flow value and the corresponding
uncertainty. The third and fourth element are the event plane angle
and the corresponding uncertainty.
"""
if not isinstance(self_corr, bool):
raise TypeError("self_corr has to be bool")
resolution, Q_vector = self.__calculate_reference(
particle_data_event_plane
)
flow_values, psi_values = self.__calculate_particle_flow(
particle_data, resolution, Q_vector, self_corr
)
return self.__calculate_flow_event_average(
particle_data, flow_values, psi_values
)
[docs]
def differential_flow(
self,
particle_data: List[List[Particle]],
bins: Union[np.ndarray, List[float]],
flow_as_function_of: str,
particle_data_event_plane: List[List[Particle]],
self_corr: bool = True,
) -> List[Tuple[float, float, float, float]]:
"""
Compute the differential flow.
Parameters
----------
particle_data : list
List of particle data of which the flow is calculated.
bins : list or np.ndarray
Bins used for the differential flow calculation.
flow_as_function_of : str
Variable on which the flow is calculated
("pT", "rapidity", or "pseudorapidity").
particle_data_event_plane : list
List of particle data for the event plane calculation.
self_corr : bool, optional
Whether to consider self-correlation in the flow calculation.
Default is True.
Returns
-------
list
A list of tuples containing the flow values and uncertainties for
each bin. The third and fourth element are the event plane angle
and the corresponding uncertainty.
"""
if not isinstance(self_corr, bool):
raise TypeError("self_corr has to be bool")
if not isinstance(bins, (list, np.ndarray)):
raise TypeError("bins has to be list or np.ndarray")
if not isinstance(flow_as_function_of, str):
raise TypeError("flow_as_function_of is not a string")
if flow_as_function_of not in ["pT", "rapidity", "pseudorapidity"]:
raise ValueError(
"flow_as_function_of must be either 'pT', 'rapidity', 'pseudorapidity'"
)
particles_bin = []
for bin in range(len(bins) - 1):
events_bin = []
for event in range(len(particle_data)):
particles_event = []
for particle in particle_data[event]:
val = 0.0
if flow_as_function_of == "pT":
val = particle.pT_abs()
elif flow_as_function_of == "rapidity":
val = particle.rapidity()
elif flow_as_function_of == "pseudorapidity":
val = particle.pseudorapidity()
if val >= bins[bin] and val < bins[bin + 1]:
particles_event.append(particle)
events_bin.extend([particles_event])
particles_bin.extend([events_bin])
resolution, Q_vector = self.__calculate_reference(
particle_data_event_plane
)
flow_bin = []
for bin in range(len(bins) - 1):
flow_values, psi_values = self.__calculate_particle_flow(
particles_bin[bin], resolution, Q_vector, self_corr
)
flow_bin.append(
self.__calculate_flow_event_average(
particle_data, flow_values, psi_values
)
)
return flow_bin