Source code for BaseStorer

# ===================================================
#
#    Copyright (c) 2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

from sparkx.Filter import *
import numpy as np
from abc import ABC, abstractmethod
from sparkx.Particle import Particle
from typing import List, Union, Tuple, Optional, Any
from sparkx.loader.BaseLoader import BaseLoader


[docs] class BaseStorer(ABC): """ Defines a generic BaseStorer object. Attributes ---------- num_output_per_event_ : numpy.array Array containing the event number and the number of particles in this event as :code:`num_output_per_event_[event i][num_output in event i]` (updated when filters are applied) num_events_ : int Number of events contained in the Oscar object (updated when filters are applied) loader_: Loader object Loader object that loads the data Methods ------- load: Load data particle_list: Returns current events data as nested list particle_objects_list: Returns current events data as nested list of Particle objects num_events: Get number of events num_output_per_event: Get number of particles in each event particle_species: Keep only particles with given PDG ids remove_particle_species: Remove particles with given PDG ids participants: Keep participants only spectators: Keep spectators only lower_event_energy_cut: Filters out events with total energy lower than a threshold charged_particles: Keep charged particles only uncharged_particles: Keep uncharged particles only strange_particles: Keep strange particles only pT_cut: Apply pT cut to all particles mT_cut: Apply mT cut to all particles rapidity_cut: Apply rapidity cut to all particles pseudorapidity_cut: Apply pseudorapidity cut to all particles spacetime_rapidity_cut: Apply spacetime rapidity cut to all particles multiplicity_cut: Apply multiplicity cut to all particles spacetime_cut: Apply spacetime cut to all particles """ def __init__( self, path: Union[str, List[List[Particle]]], **kwargs: Any ) -> None: """ Parameters ---------- data : list Nested list containing the event data num_events : int Number of events in the data num_output_per_event : numpy.array Array containing the event number and the number of particles in this event as :code:`num_output_per_event_[event i][num_output in event i]` Returns ------- None """ self.loader_: Optional[BaseLoader] = None self.num_output_per_event_: Optional[np.ndarray] = None self.num_events_: Optional[int] = None self.particle_list_: List[List[Particle]] = [[]] self.custom_attr_list: List = [] self.create_loader(path) if self.loader_ is not None: ( self.particle_list_, self.num_events_, self.num_output_per_event_, self.custom_attr_list, ) = self.loader_.load(**kwargs) else: raise ValueError("Loader has not been created properly") def __add__(self, other: "BaseStorer") -> "BaseStorer": """ Adds two BaseStorer objects by combining their particle lists and updating num_output_per_event accordingly. This method ensures that both objects are instances of the same class before combining them. If the objects are not of the same class, a TypeError is raised. Parameters ---------- other : BaseStorer The other BaseStorer object to be added. Raises ------ TypeError If the other object is not an instance of BaseStorer or if the objects are not of the same class. Returns ------- BaseStorer A new BaseStorer object with combined particle lists and updated num_output_per_event. """ if not isinstance(other, BaseStorer): raise TypeError("Can only add BaseStorer objects") # Ensure that both instances are of the same class if type(self) is not type(other): raise TypeError("Can only add objects of the same class") combined_particle_list: list = ( self.particle_list_ + other.particle_list_ ) # Ensure num_output_per_event_ is not None if self.num_output_per_event_ is None: self.num_output_per_event_ = np.empty((0, 2), dtype=int) if other.num_output_per_event_ is None: other.num_output_per_event_ = np.empty((0, 2), dtype=int) if self.num_events_ is None: self.num_events_ = 0 if other.num_events_ is None: other.num_events_ = 0 combined_num_output_per_event: np.ndarray = np.concatenate( (self.num_output_per_event_, other.num_output_per_event_) ) # Adjust event_number for the parts that originally belonged to other combined_num_output_per_event[self.num_events_ :, 0] += self.num_events_ combined_storer: BaseStorer = self.__class__.__new__(self.__class__) combined_storer.__dict__.update( self.__dict__ ) # Inherit all properties from self combined_storer._update_after_merge(other) combined_storer.particle_list_ = combined_particle_list combined_storer.num_output_per_event_ = combined_num_output_per_event combined_storer.num_events_ = self.num_events_ + other.num_events_ combined_storer.loader_ = ( None # Loader is not applicable for combined object ) return combined_storer @abstractmethod def _update_after_merge(self, other: "BaseStorer") -> None: """ Updates the attributes of the current instance after merging with another BaseStorer object. This method should be implemented by subclasses to update the attributes of the current instance after merging with another BaseStorer object. The method raises a NotImplementedError if it is not overridden by a subclass. Parameters ---------- other : BaseStorer The other BaseStorer object that was merged with the current instance. Raises ------ NotImplementedError If the method is not implemented by a subclass. Returns ------- None """ raise NotImplementedError("This method is not implemented yet") @abstractmethod def create_loader(self, arg: Union[str, List[List["Particle"]]]) -> None: raise NotImplementedError("This method is not implemented yet")
[docs] def num_output_per_event(self) -> Optional[np.ndarray]: """ Returns a numpy array containing the event number (starting with 1) and the corresponding number of particles created in this event as :code:`num_output_per_event[event_n, number_of_particles_in_event_n]` :code:`num_output_per_event` is updated with every manipulation e.g. after applying cuts. Returns ------- num_output_per_event_ : numpy.ndarray Array containing the event number and the corresponding number of particles """ return self.num_output_per_event_
[docs] def num_events(self) -> Optional[int]: """ Returns the number of events in :code:`particle_list`. :code:`num_events` is updated with every manipulation e.g. after applying cuts. Returns ------- num_events_ : int Number of events in :code:`particle_list` """ return self.num_events_
[docs] def particle_objects_list(self) -> Optional[List]: """ Returns a nested python list containing all particles from the data as particle objects from :code:`Particle`: | Single Event: :code:`[particle_object]` | Multiple Events: :code:`[event][particle_object]` Returns ------- particle_list_ : list List of particle objects from :code:`Particle` """ return self.particle_list_
[docs] @abstractmethod def _particle_as_list(self, particle: "Particle") -> List: raise NotImplementedError("This method is not implemented yet")
[docs] def particle_list(self) -> List: """ Returns a nested python list containing all quantities from the current data as numerical values with the following shape: | Single Event: :code:`[[output_line][particle_quantity]]` | Multiple Events: :code:`[event][output_line][particle_quantity]` Returns ------- list Nested list containing the current data """ num_events = self.num_events_ if self.num_output_per_event_ is None: raise ValueError("num_output_per_event_ is not set") if self.particle_list_ is None: raise ValueError("particle_list_ is not set") if num_events is None: raise ValueError("num_events_ is not set") if num_events == 1: num_particles = self.num_output_per_event_[0][1] else: num_particles = self.num_output_per_event_[:, 1] particle_array: List[List] = [] if num_events == 1: for i_part in range(0, num_particles): particle = self.particle_list_[0][i_part] particle_array.append(self._particle_as_list(particle)) else: for i_ev in range(0, num_events): event = [] for i_part in range(0, num_particles[i_ev]): particle = self.particle_list_[i_ev][i_part] event.append(self._particle_as_list(particle)) particle_array.append(event) return particle_array
[docs] def charged_particles(self) -> "BaseStorer": """ Keep only charged particles in :code:`particle_list`. Returns ------- self : BaseStorer object Containing charged particles in every event only """ self.particle_list_ = charged_particles(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def uncharged_particles(self) -> "BaseStorer": """ Keep only uncharged particles in :code:`particle_list`. Returns ------- self : BaseStorer object Containing uncharged particles in every event only """ self.particle_list_ = uncharged_particles(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def particle_species( self, pdg_list: Union[int, Union[Tuple[int], List[int], np.ndarray]] ) -> "BaseStorer": """ Keep only particle species given by their PDG ID in every event. Parameters ---------- pdg_list : int To keep a single particle species only, pass a single PDG ID pdg_list : tuple/list/array To keep multiple particle species, pass a tuple or list or array of PDG IDs Returns ------- self : BaseStorer object Containing only particle species specified by :code:`pdg_list` for every event """ self.particle_list_ = particle_species(self.particle_list_, pdg_list) self._update_num_output_per_event_after_filter() return self
[docs] def remove_particle_species( self, pdg_list: Union[int, Union[Tuple[int], List[int], np.ndarray]] ) -> "BaseStorer": """ Remove particle species from :code:`particle_list` by their PDG ID in every event. Parameters ---------- pdg_list : int To remove a single particle species only, pass a single PDG ID pdg_list : tuple/list/array To remove multiple particle species, pass a tuple or list or array of PDG IDs Returns ------- self : BaseStorer object Containing all but the specified particle species in every event """ self.particle_list_ = remove_particle_species( self.particle_list_, pdg_list ) self._update_num_output_per_event_after_filter() return self
[docs] def participants(self) -> "BaseStorer": """ Keep only participants in :code:`particle_list`. Returns ------- self : BaseStorer object Containing participants in every event only """ self.particle_list_ = participants(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def spectators(self) -> "BaseStorer": """ Keep only spectators in :code:`particle_list`. Returns ------- self : BaseStorer object Containing spectators in every event only """ self.particle_list_ = spectators(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def lower_event_energy_cut( self, minimum_event_energy: Union[int, float] ) -> "BaseStorer": """ Filters out events with total energy lower than a threshold. Parameters ---------- minimum_event_energy : int or float The minimum event energy threshold. Should be a positive integer or float. Returns ------- self: BaseStorer object The updated instance of the class contains only events above the energy threshold. Raises ------ TypeError If the :code:`minimum_event_energy` parameter is not an integer or float. ValueError If the :code:`minimum_event_energy` parameter is less than or equal to 0. """ self.particle_list_ = lower_event_energy_cut( self.particle_list_, minimum_event_energy ) self._update_num_output_per_event_after_filter() return self
[docs] def pT_cut( self, cut_value_tuple: Tuple[Union[float, None], Union[float, None]] ) -> "BaseStorer": """ Apply transverse momentum cut to all events by passing an acceptance range by :code:`cut_value_tuple`. All particles outside this range will be removed. Parameters ---------- cut_value_tuple : tuple Tuple with the upper and lower limits of the pT acceptance range :code:`(cut_min, cut_max)`. If one of the limits is not required, set it to :code:`None`, i.e. :code:`(None, cut_max)` or :code:`(cut_min, None)`. Returns ------- self : BaseStorer object Containing only particles complying with the transverse momentum cut for all events """ self.particle_list_ = pT_cut(self.particle_list_, cut_value_tuple) self._update_num_output_per_event_after_filter() return self
[docs] def mT_cut( self, cut_value_tuple: Tuple[Union[float, None], Union[float, None]] ) -> "BaseStorer": """ Apply transverse mass cut to all events by passing an acceptance range by :code:`cut_value_tuple`. All particles outside this range will be removed. Parameters ---------- cut_value_tuple : tuple Tuple with the upper and lower limits of the mT acceptance range :code:`(cut_min, cut_max)`. If one of the limits is not required, set it to :code:`None`, i.e. :code:`(None, cut_max)` or :code:`(cut_min, None)`. Returns ------- self : BaseStorer object Containing only particles complying with the transverse mass cut for all events """ self.particle_list_ = mT_cut(self.particle_list_, cut_value_tuple) self._update_num_output_per_event_after_filter() return self
[docs] def rapidity_cut( self, cut_value: Union[float, Tuple[float, float]] ) -> "BaseStorer": """ Apply rapidity cut to all events and remove all particles with rapidity not complying with :code:`cut_value`. Parameters ---------- cut_value : float If a single value is passed, the cut is applied symmetrically around 0. For example, if :code:`cut_value = 1`, only particles with rapidity in :code:`[-1.0, 1.0]` are kept. cut_value : tuple To specify an asymmetric acceptance range for the rapidity of particles, pass a tuple :code:`(cut_min, cut_max)` Returns ------- self : BaseStorer object Containing only particles complying with the rapidity cut for all events """ self.particle_list_ = rapidity_cut(self.particle_list_, cut_value) self._update_num_output_per_event_after_filter() return self
[docs] def pseudorapidity_cut( self, cut_value: Union[float, Tuple[float, float]] ) -> "BaseStorer": """ Apply pseudo-rapidity cut to all events and remove all particles with pseudo-rapidity not complying with :code:`cut_value`. Parameters ---------- cut_value : float If a single value is passed, the cut is applied symmetrically around 0. For example, if :code:`cut_value = 1`, only particles with rapidity in :code:`[-1.0, 1.0]` are kept. cut_value : tuple To specify an asymmetric acceptance range for the pseudo-rapidity of particles, pass a tuple :code:`(cut_min, cut_max)` Returns ------- self : BaseStorer object Containing only particles complying with the pseudo-rapidity cut for all events """ self.particle_list_ = pseudorapidity_cut(self.particle_list_, cut_value) self._update_num_output_per_event_after_filter() return self
[docs] def spacetime_rapidity_cut( self, cut_value: Union[float, Tuple[float, float]] ) -> "BaseStorer": """ Apply spacetime rapidity (space-time rapidity) cut to all events and remove all particles with spacetime rapidity not complying with cut_value. Parameters ---------- cut_value : float If a single value is passed, the cut is applied symmetrically around 0. For example, if :code:`cut_value = 1`, only particles with spacetime rapidity in :code:`[-1.0, 1.0]` are kept. cut_value : tuple To specify an asymmetric acceptance range for the spacetime rapidity of particles, pass a tuple :code:`(cut_min, cut_max)` Returns ------- self : BaseStorer object Containing only particles complying with the spacetime rapidity cut for all events """ self.particle_list_ = spacetime_rapidity_cut( self.particle_list_, cut_value ) self._update_num_output_per_event_after_filter() return self
[docs] def multiplicity_cut( self, cut_value_tuple: Tuple[Union[float, None], Union[float, None]] ) -> "BaseStorer": """ Apply multiplicity cut. Remove all events with a multiplicity not complying with cut_value. Parameters ---------- cut_value_tuple : tuple Upper and lower bound for multiplicity. If the multiplicity of an event is not in this range, the event is discarded. The range is inclusive on the lower bound and exclusive on the upper bound. Returns ------- self : BaseStorer object Containing only events with a :code:`multiplicity >= min_multiplicity` """ self.particle_list_ = multiplicity_cut( self.particle_list_, cut_value_tuple ) self._update_num_output_per_event_after_filter() return self
[docs] def spacetime_cut( self, dim: str, cut_value_tuple: Tuple[float, float] ) -> "BaseStorer": """ Apply spacetime cut to all events by passing an acceptance range by :code:`cut_value_tuple`. All particles outside this range will be removed. Parameters ---------- dim : string String naming the dimension on which to apply the cut. Options: :code:`t`,:code:`x`,:code:`y`,:code:`z` cut_value_tuple : tuple Tuple with the upper and lower limits of the coordinate space acceptance range :code:`(cut_min, cut_max)`. If one of the limits is not required, set it to :code:`None`, i.e. :code:`(None, cut_max)` or :code:`(cut_min, None)`. Returns ------- self : BaseStorer object Containing only particles complying with the spacetime cut for all events """ self.particle_list_ = spacetime_cut( self.particle_list_, dim, cut_value_tuple ) self._update_num_output_per_event_after_filter() return self
[docs] def particle_status( self, status_list: Union[int, Tuple[int, ...], List[int], np.ndarray] ) -> "BaseStorer": """ Keep only particles with a given particle status. Parameters ---------- status_list : int To keep a particles with a single status only, pass a single status status_list : tuple/list/array To keep hadrons with different hadron status, pass a tuple or list or array Returns ------- self : BaseStorer object Containing only hadrons with status specified by :code:`status_list` for every event """ self.particle_list_ = particle_status(self.particle_list_, status_list) self._update_num_output_per_event_after_filter() return self
[docs] def keep_hadrons(self) -> "BaseStorer": """ Keep only hadrons in :code:`particle_list`. Returns ------- self : BaseStorer object Containing hadrons in every event only """ self.particle_list_ = keep_hadrons(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_leptons(self) -> "BaseStorer": """ Keep only leptons in :code:`particle_list`. Returns ------- self : BaseStorer object Containing leptons in every event only """ self.particle_list_ = keep_leptons(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_quarks(self) -> "BaseStorer": """ Keep only quarks in the :code:`particle_list`. Returns ------- self : BaseStorer object Containing quarks in every event only """ self.particle_list_ = keep_quarks(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_mesons(self) -> "BaseStorer": """ Keep only mesons in :code:`particle_list`. Returns ------- self : BaseStorer object Containing mesons in every event only """ self.particle_list_ = keep_mesons(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_baryons(self) -> "BaseStorer": """ Keep only baryons in :code:`particle_list`. Returns ------- self : BaseStorer object Containing baryons in every event only """ self.particle_list_ = keep_baryons(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_up(self) -> "BaseStorer": """ Keep only hadrons containing up quarks in :code:`particle_list`. Returns ------- self : BaseStorer object Containing hadrons containing up quarks in every event only """ self.particle_list_ = keep_up(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_down(self) -> "BaseStorer": """ Keep only hadrons containing down quarks in :code:`particle_list`. Returns ------- self : BaseStorer object Containing hadrons containing down quarks in every event only """ self.particle_list_ = keep_down(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_strange(self) -> "BaseStorer": """ Keep only hadrons containing strange quarks in :code:`particle_list`. Returns ------- self : BaseStorer object Containing hadrons containing strange quarks in every event only """ self.particle_list_ = keep_strange(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_charm(self) -> "BaseStorer": """ Keep only hadrons containing charm quarks in :code:`particle_list`. Returns ------- self : BaseStorer object Containing hadrons containing charm quarks in every event only """ self.particle_list_ = keep_charm(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_bottom(self) -> "BaseStorer": """ Keep only hadrons containing bottom quarks in :code:`particle_list`. Returns ------- self : BaseStorer object Containing hadrons containing bottom quarks in every event only """ self.particle_list_ = keep_bottom(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def keep_top(self) -> "BaseStorer": """ Keep only hadrons containing top quarks in :code:`particle_list`. Returns ------- self : BaseStorer object Containing hadrons containing top quarks in every event only """ self.particle_list_ = keep_top(self.particle_list_) self._update_num_output_per_event_after_filter() return self
[docs] def remove_photons(self) -> "BaseStorer": """ Remove photons from :code:`particle_list`. Returns ------- self : BaseStorer object Containing all but photons in every event """ self.particle_list_ = remove_photons(self.particle_list_) self._update_num_output_per_event_after_filter() return self
def _update_num_output_per_event_after_filter(self) -> None: if self.num_output_per_event_ is None: raise ValueError("num_output_per_event_ is not set") if self.particle_list_ is None: raise ValueError("particle_list_ is not set") if self.num_output_per_event_.ndim == 1: # Handle the case where num_output_per_event_ is a one-dimensional array self.num_output_per_event_[1] = len(self.particle_list_[0]) elif self.num_output_per_event_.ndim == 2: # Handle the case where num_output_per_event_ is a two-dimensional array updated_num_output_per_event: np.ndarray = np.ndarray( (len(self.particle_list_), 2), dtype=int ) for event in range(len(self.particle_list_)): updated_num_output_per_event[event][0] = ( event + self.num_output_per_event_[0][0] ) updated_num_output_per_event[event][1] = len( self.particle_list_[event] ) self.num_output_per_event_ = updated_num_output_per_event self.num_events_ = len(self.particle_list_) else: raise ValueError( "num_output_per_event_ has an unexpected number of dimensions" ) if self.particle_list_ == []: self.particle_list_ = [[]]
[docs] @abstractmethod def print_particle_lists_to_file(self, output_file: str) -> None: """ Prints the particle lists to a specified file. This method should be implemented by subclasses to print the particle lists to the specified output file. The method raises a :code:`NotImplementedError` if it is not overridden by a subclass. Parameters ---------- output_file : str The path to the file where the particle lists will be printed. Raises ------ NotImplementedError If the method is not implemented by a subclass. Returns ------- None """ raise NotImplementedError("This method is not implemented yet")