# ===================================================
#
# Copyright (c) 2024
# SPARKX Team
#
# GNU General Public License (GPLv3 or later)
#
# ===================================================
from sparkx.Histogram import Histogram
from sparkx.Particle import Particle
from typing import List, Tuple, Union, Optional, Iterator
import warnings
# This class ensures a list to be read-only
class ReadOnlyList:
def __init__(self, nested_list: List[List[Particle]]) -> None:
self._nested_list = nested_list # Store reference to the original list
def __getitem__(self, index: int) -> List[Particle]:
return self._nested_list[index] # Allow read access to list items
def __len__(self) -> int:
return len(self._nested_list) # Allow getting the length of the list
def __iter__(self) -> Iterator[List[Particle]]:
return iter(self._nested_list) # Allow iteration over the list
def __repr__(self) -> str:
return repr(self._nested_list) # Allow printing the list
# Prevent modification by raising exceptions
def __setitem__(self, index: int, value: List[Particle]) -> None:
raise TypeError("This list is read-only.")
def append(self, value: List[Particle]) -> None:
raise TypeError("This list is read-only.")
def extend(self, values: List[List[Particle]]) -> None:
raise TypeError("This list is read-only.")
def insert(self, index: int, value: List[Particle]) -> None:
raise TypeError("This list is read-only.")
def remove(self, value: List[Particle]) -> None:
raise TypeError("This list is read-only.")
def pop(self, index: int = -1) -> None:
raise TypeError("This list is read-only.")
def clear(self) -> None:
raise TypeError("This list is read-only.")
[docs]
class BulkObservables:
"""
Class to calculate bulk observables from a list of Particle objects. It is
assumed that all necessary cuts were performed to the particle list before.
Attributes
----------
particle_objects: ReadOnlyList
A read-only list of lists of Particle objects.
Methods
-------
dNdy:
Calculate the event averaged yield :math:`\\frac{dN}{dy}`.
dNdpT:
Calculate the event averaged yield :math:`\\frac{dN}{dp_T}`.
dNdEta:
Calculate the event averaged yield :math:`\\frac{dN}{d\\eta}`
dNdmT:
Calculate the event averaged yield :math:`\\frac{dN}{dm_T}`.
mid_rapidity_yield:
Calculate the event-averaged particle yield at mid-rapidity.
mid_rapidity_mean_pT:
Calculate the event-averaged mean transverse momentum :math:`p_T` at mid-rapidity.
mid_rapidity_mean_mT:
Calculate the event-averaged mean transverse mass :math:`m_T` at mid-rapidity.
Examples
--------
.. highlight:: python
.. code-block:: python
:linenos:
>>> from sparkx.BulkObservables import BulkObservables
>>> # Initialize the BulkObservables class
>>> bulk_observables = BulkObservables(particle_objects_list)
>>> # Calculate dN/dy
>>> histogram_dNdy = bulk_observables.dNdy()
>>> # Calculate dN/dpT
>>> histogram_dNdpT = bulk_observables.dNdpT()
>>> # Calculate dN/dη
>>> histogram_dNdEta = bulk_observables.dNdEta()
>>> # Calculate dN/dmT
>>> histogram_dNdmT = bulk_observables.dNdmT()
>>> # Calculate mid-rapidity yield
>>> mid_rapidity_yield = bulk_observables.mid_rapidity_yield()
>>> print(mid_rapidity_yield)
>>> # Calculate mid-rapidity mean pT
>>> mid_rapidity_mean_pT = bulk_observables.mid_rapidity_mean_pT()
>>> print(mid_rapidity_mean_pT)
>>> # Calculate mid-rapidity mean mT
>>> mid_rapidity_mean_mT = bulk_observables.mid_rapidity_mean_mT()
>>> print(mid_rapidity_mean_mT)
"""
def __init__(self, particle_objects_list: List[List[Particle]]) -> None:
# Wrapping in ReadOnlyList prevents particles from being modified
self.particle_objects = ReadOnlyList(particle_objects_list)
# PRIVATE CLASS METHODS
"""
This is the base method for all differential yields. It is called by all
corresponding methods for each specific differential yield
"""
def _differential_yield(
self,
quantity: str,
bin_properties: Union[
Tuple[Union[int, float], Union[int, float], int],
List[Union[int, float]],
],
) -> Histogram:
if not isinstance(quantity, str):
raise TypeError("quantity must be of type str")
elif not isinstance(bin_properties, (tuple, list)):
raise TypeError("bin_properties must be of type tuple or list")
# Check if bin_properties is a tuple
if isinstance(bin_properties, tuple):
if (
len(bin_properties) != 3
or not isinstance(bin_properties[0], (int, float))
or not isinstance(bin_properties[1], (int, float))
or not isinstance(bin_properties[2], int)
):
raise ValueError(
"If bin_properties is a tuple, it must be of the form (int/float, int/float, int)."
)
# Check if bin_properties is a list
elif isinstance(bin_properties, list):
if not all(isinstance(x, (int, float)) for x in bin_properties):
raise ValueError(
"If bin_properties is a list, all elements must be of type int or float"
)
hist = Histogram(bin_properties)
num_events = len(self.particle_objects)
inverse_bin_width = 1 / hist.bin_width()
# Fill histograms
for event in range(num_events):
for particle in self.particle_objects[event]:
"""
Call the corresponding method in Particle given by
'quantity' as string
"""
particle_method = getattr(particle, quantity)
if callable(particle_method):
hist.add_value(particle_method())
else:
raise AttributeError(
f"'{quantity}' is not a callable method of Particle"
)
# Do not add an empty histogram at the end
if num_events > 1 and not event == (num_events - 1):
hist.add_histogram()
hist.average()
hist.scale_histogram(inverse_bin_width)
return hist
# PUBLIC CLASS METHODS
[docs]
def dNdy(
self,
bin_properties: Optional[
Union[
Tuple[Union[int, float], Union[int, float], int],
List[Union[int, float]],
]
] = None,
) -> Histogram:
"""
Calculate the event averaged yield :math:`\\frac{dN}{dy}`
Parameters
----------
bin_properties: tuple, list
Optional tuple (start, stop, num) for histogram binning.
If not given, a default will be used
Returns
-------
Histogram
1D histogram containing the event averaged particle counts per
rapidity bin.
"""
if bin_properties is None:
return self._differential_yield("rapidity", (-2, 2, 11))
else:
return self._differential_yield("rapidity", bin_properties)
[docs]
def dNdpT(
self,
bin_properties: Optional[
Union[
Tuple[Union[int, float], Union[int, float], int],
List[Union[int, float]],
]
] = None,
) -> Histogram:
"""
Calculate the event averaged yield :math:`\\frac{dN}{dp_T}`
Parameters
----------
bin_properties: tuple, list
Optional tuple (start, stop, num) for histogram binning.
If not given, a default will be used
Returns
-------
Histogram
1D histogram containing the event averaged particle counts per
transverse momentum bin.
"""
if isinstance(bin_properties, tuple) and (
bin_properties[0] < 0 or bin_properties[1] < 0
):
warn_msg = "Bins must be positive for dNdpT! All negative bins will be empty."
warnings.warn(warn_msg)
elif isinstance(bin_properties, list) and any(
bin_edge < 0 for bin_edge in bin_properties
):
warn_msg = "Bins must be positive for dNdpT! All negative bins will be empty."
warnings.warn(warn_msg)
if bin_properties is None:
return self._differential_yield("pT_abs", (0, 4, 11))
else:
return self._differential_yield("pT_abs", bin_properties)
[docs]
def dNdEta(
self,
bin_properties: Optional[
Union[
Tuple[Union[int, float], Union[int, float], int],
List[Union[int, float]],
]
] = None,
) -> Histogram:
"""
Calculate the event averaged yield :math:`\\frac{dN}{d\\eta}`
Parameters
----------
bin_properties: tuple, list
Optional tuple (start, stop, num) for histogram binning.
If not given, a default will be used
Returns
-------
Histogram
1D histogram containing the event averaged particle counts per
pseudo-rapidity bin.
"""
if bin_properties is None:
return self._differential_yield("pseudorapidity", (-2, 2, 11))
else:
return self._differential_yield("pseudorapidity", bin_properties)
[docs]
def dNdmT(
self,
bin_properties: Optional[
Union[
Tuple[Union[int, float], Union[int, float], int],
List[Union[int, float]],
]
] = None,
) -> Histogram:
"""
Calculate the event averaged yield :math:`\\frac{dN}{dm_T}`
Parameters
----------
bin_properties: tuple, list
Optional tuple (start, stop, num) for histogram binning.
If not given, a default will be used
Returns
-------
Histogram
1D histogram containing the event averaged particle counts per
transverse mass bin.
"""
if isinstance(bin_properties, tuple) and (
bin_properties[0] < 0 or bin_properties[1] < 0
):
warn_msg = "Bins must be positive for dNdmT! All negative bins will be empty."
warnings.warn(warn_msg)
elif isinstance(bin_properties, list) and any(
bin_edge < 0 for bin_edge in bin_properties
):
warn_msg = "Bins must be positive for dNdmT! All negative bins will be empty."
warnings.warn(warn_msg)
if bin_properties is None:
return self._differential_yield("mT", (0, 4, 11))
else:
return self._differential_yield("mT", bin_properties)
[docs]
def mid_rapidity_yield(
self, y_width: float = 1.0, quantity: str = "rapidity"
) -> float:
"""
Calculate the event-averaged particle yield at mid-rapidity.
Parameters
----------
y_width: float
The rapidity window width, centered at 0, within which
particles are counted. The default value is 1, meaning the function
will count particles with rapidity between -0.5 and 0.5.
quantity: str
The quantity to be used for the rapidity calculation
(rapidity, pseudorapidity, spacetime_rapidity).
Returns
-------
particle_counter / num_events: float
The average number of particles per event that fall within the
specified rapidity range.
"""
if not isinstance(y_width, (int, float)):
raise TypeError("y_width must be of type int or float")
if y_width <= 0:
raise ValueError("y_width must be a positive number.")
num_events = len(self.particle_objects)
if num_events == 0:
return 0
particle_method = getattr(self.particle_objects[0][0], quantity)
if not callable(particle_method):
raise AttributeError(
f"'{quantity}' is not a callable method of Particle"
)
particle_counter = 0
# Fill histograms
for event in self.particle_objects:
for particle in event:
if -y_width / 2 <= particle_method() <= y_width / 2:
particle_counter += 1
return particle_counter / num_events
[docs]
def mid_rapidity_mean_pT(
self, y_width: float = 1.0, quantity: str = "rapidity"
) -> float:
"""
Calculate the event-averaged mean transverse momentum :math:`p_T` at
mid-rapidity.
It is assumed that detector cuts have been performed on the particle
list.
Parameters
----------
y_width: float
The rapidity window width, centered at 0, within which
particles are counted. The default value is 1, meaning the function
will count particles with rapidity between -0.5 and 0.5.
quantity: str
The quantity to be used for the rapidity calculation
(rapidity, pseudorapidity, spacetime_rapidity).
Returns
-------
particle_counter / num_events: float
The average pT of particles per event that fall within the
specified rapidity range.
"""
if not isinstance(y_width, (int, float)):
raise TypeError("y_width must be of type int or float")
if y_width <= 0:
raise ValueError("y_width must be a positive number.")
num_events = len(self.particle_objects)
if num_events == 0:
return 0
pT_sum = 0.0
particle_counter = 0
particle_method = getattr(self.particle_objects[0][0], quantity)
if not callable(particle_method):
raise AttributeError(
f"'{quantity}' is not a callable method of Particle"
)
# Fill histograms
for event in self.particle_objects:
for particle in event:
particle_counter += 1
if -y_width / 2 <= particle_method() <= y_width / 2:
pT_sum += particle.pT_abs()
pT_sum /= particle_counter
particle_counter = 0
return pT_sum / num_events
[docs]
def mid_rapidity_mean_mT(
self, y_width: float = 1.0, quantity: str = "rapidity"
) -> float:
"""
Calculate the event-averaged mean transverse mass :math:`m_T` at
mid-rapidity.
Parameters
----------
y_width: float
The rapidity window width, centered at 0, within which
particles are counted. The default value is 1, meaning the function
will count particles with rapidity between -0.5 and 0.5.
quantity: str
The quantity to be used for the rapidity calculation
(rapidity, pseudorapidity, spacetime_rapidity).
Returns
-------
particle_counter / num_events: float
The average mT of particles per event that fall within the
specified rapidity range.
"""
if not isinstance(y_width, (int, float)):
raise TypeError("y_width must be of type int or float")
if y_width <= 0:
raise ValueError("y_width must be a positive number.")
num_events = len(self.particle_objects)
if num_events == 0:
return 0
pT_sum = 0.0
particle_counter = 0
particle_method = getattr(self.particle_objects[0][0], quantity)
if not callable(particle_method):
raise AttributeError(
f"'{quantity}' is not a callable method of Particle"
)
# Fill histograms
for event in self.particle_objects:
for particle in event:
particle_counter += 1
if -y_width / 2 <= particle_method() <= y_width / 2:
pT_sum += particle.mT()
pT_sum /= particle_counter
particle_counter = 0
return pT_sum / num_events