Source code for JetscapeLoader

# ===================================================
#
#    Copyright (c) 2024
#      SPARKX Team
#
#    GNU General Public License (GPLv3 or later)
#
# ===================================================

from sparkx.loader.BaseLoader import BaseLoader
from sparkx.Filter import *
from typing import Dict, List, Tuple, Union, Any


[docs] class JetscapeLoader(BaseLoader): """ A class to load and process JETSCAPE data files. Attributes ---------- PATH_JETSCAPE_ : str The path to the JETSCAPE data file. particle_type_ : str The type of particles to load ('hadron' or 'parton'). Methods ------- load(**kwargs) Loads the data from the JETSCAPE file based on the specified optional arguments. set_particle_list(kwargs) Sets the list of particles based on the specified optional arguments. set_num_output_per_event() Sets the number of output lines per event in the JETSCAPE data file. get_last_line(file_path) Returns the last line of a file. get_sigmaGen() Retrieves the sigmaGen values with error from the last line of a file. get_particle_type_defining_string() Returns the string which defines the particle type in the JETSCAPE file. get_particle_type() Returns the particle type of the JETSCAPE file. """ def __init__(self, JETSCAPE_FILE: str): """ Sets the number of output lines per event and the event footers in the OSCAR data file. This method reads the OSCAR data file line by line and determines the number of output lines for each event and the event footers. The method behaves differently depending on the OSCAR format of the data file. If the format is 'Oscar2013Extended_IC' or 'Oscar2013Extended_Photons', it counts the number of lines between 'in' and 'end' lines. Otherwise, it counts the number of lines between 'out' and 'end' lines. Parameters ---------- None Raises ------ FileNotFoundError If the file is not found or does not end with '.dat'. ValueError If the last line of the JETSCAPE file does not contain the string 'sigmaGen'. Returns ------- None """ if not ".dat" in JETSCAPE_FILE: raise FileNotFoundError("File not found or does not end with .dat") # Check that the last line contains the string 'sigmaGen' if "sigmaGen" not in self.get_last_line(JETSCAPE_FILE): raise ValueError( "The last line of the Jetscape file does not contain the string 'sigmaGen'" ) self.PATH_JETSCAPE_ = JETSCAPE_FILE self.particle_type_ = "hadron" self.particle_type_defining_string_ = "N_hadrons" self.optional_arguments_: Any = {} self.event_end_lines_: List[str] = [] self.num_output_per_event_: np.ndarray = np.array([]) self.num_events_: int = 0
[docs] def load( self, **kwargs: Any ) -> Tuple[List[List[Particle]], int, np.ndarray, List[str]]: """ Loads the data from the JETSCAPE file based on the specified optional arguments. This method reads the JETSCAPE data file and applies any filters specified in the 'filters' key of the kwargs dictionary. It also adjusts the number of events and the number of output lines per event based on the 'events' key in the kwargs dictionary. If the 'particletype' key is specified, it sets the particle type to either 'hadron' or 'parton'. If any other keys are specified in the kwargs dictionary, it raises a ValueError. Parameters ---------- kwargs : dict A dictionary of optional arguments. The following keys are recognized: - 'events': Either a tuple of two integers specifying the range of events to load, or a single integer specifying a single event to load. - 'filters': A list of filters to apply to the data. - 'particletype': A string specifying the type of particles to load ('hadron' or 'parton'). Raises ------ ValueError If an unknown keyword argument is used, if the first value of the 'events' tuple is larger than the second value, if an event number is negative, or if the 'particletype' is not 'hadron' or 'parton'. TypeError If the 'events' key is not a tuple or an integer, or if the 'particletype' key is not a string. Returns ------- tuple A tuple containing the list of Particle objects loaded from the JETSCAPE data file, the number of events, and the number of output lines per event. """ self.optional_arguments_ = kwargs self.event_end_lines_ = [] for keys in self.optional_arguments_.keys(): if keys not in ["events", "filters", "particletype"]: raise ValueError("Unknown keyword argument used in constructor") if "events" in self.optional_arguments_.keys() and isinstance( self.optional_arguments_["events"], tuple ): self._check_that_tuple_contains_integers_only( self.optional_arguments_["events"] ) if ( self.optional_arguments_["events"][0] > self.optional_arguments_["events"][1] ): raise ValueError( "First value of event number tuple must be smaller than second value" ) elif ( self.optional_arguments_["events"][0] < 0 or self.optional_arguments_["events"][1] < 0 ): raise ValueError("Event numbers must be non-negative") elif "events" in self.optional_arguments_.keys() and isinstance( self.optional_arguments_["events"], int ): if self.optional_arguments_["events"] < 0: raise ValueError("Event number must be non-negative") if "particletype" in self.optional_arguments_.keys() and isinstance( self.optional_arguments_["particletype"], str ): if (self.optional_arguments_["particletype"] == "hadron") or ( self.optional_arguments_["particletype"] == "parton" ): self.particle_type_ = self.optional_arguments_["particletype"] else: raise ValueError( "'particletype' has to be 'hadron' or 'parton'" ) elif ( "particletype" in self.optional_arguments_.keys() and not isinstance(self.optional_arguments_["particletype"], str) ): raise TypeError("'particletype' is not given as a string value") if self.particle_type_ == "hadron": self.particle_type_defining_string_ = "N_hadrons" else: self.particle_type_defining_string_ = "N_partons" self.set_num_output_per_event() return ( self.set_particle_list(kwargs), self.num_events_, self.num_output_per_event_, [], )
# PRIVATE CLASS METHODS def _get_num_skip_lines(self) -> int: """ Get number of initial lines in Jetscape file that are header or comment lines and need to be skipped in order to read the particle output. Returns ------- skip_lines : int Number of initial lines before data. """ if ( not self.optional_arguments_ or "events" not in self.optional_arguments_.keys() ): skip_lines = 1 elif isinstance(self.optional_arguments_["events"], int): if self.optional_arguments_["events"] == 0: skip_lines = 1 else: cumulate_lines = 0 for i in range(0, self.optional_arguments_["events"]): cumulate_lines += self.num_output_per_event_[i, 1] + 1 skip_lines = 1 + cumulate_lines elif isinstance(self.optional_arguments_["events"], tuple): line_start = self.optional_arguments_["events"][0] if line_start == 0: skip_lines = 1 else: cumulate_lines = 0 for i in range(0, line_start): cumulate_lines += self.num_output_per_event_[i, 1] + 1 skip_lines = 1 + cumulate_lines else: raise TypeError( 'Value given as flag "events" is not of type ' + "int or a tuple of two int values" ) return skip_lines
[docs] def event_end_lines(self) -> List[str]: """ Returns the list of event end lines. This method returns the list of strings that mark the end of events in the Jetscape data. Returns ------- List[str] A list of strings that mark the end of events in the Jetscape data. """ return self.event_end_lines_
def __get_num_read_lines(self) -> int: """ Gets the number of lines to read from the JETSCAPE data file. This method calculates the number of lines to read from the JETSCAPE data file based on the 'events' key in the optional_arguments_ dictionary. If the 'events' key is not specified, it sums up the number of output lines for all events and adds the number of comment lines. If the 'events' key is an integer, it gets the number of output lines for the specified event. If the 'events' key is a tuple, it sums up the number of output lines for the range of events specified by the tuple. If the 'events' key is not an integer or a tuple, it raises a TypeError. Parameters ---------- None Raises ------ TypeError If the 'events' key in the optional_arguments_ dictionary is not an integer or a tuple. Returns ------- cumulated_lines : int The number of lines to read from the JETSCAPE data file. """ if ( not self.optional_arguments_ or "events" not in self.optional_arguments_.keys() ): cumulated_lines = np.sum(self.num_output_per_event_, axis=0)[1] # add number of comments cumulated_lines += int(len(self.num_output_per_event_)) elif isinstance(self.optional_arguments_["events"], int): read_event = self.optional_arguments_["events"] cumulated_lines = int(self.num_output_per_event_[read_event][1] + 1) elif isinstance(self.optional_arguments_["events"], tuple): cumulated_lines = 0 event_start = self.optional_arguments_["events"][0] event_end = self.optional_arguments_["events"][1] for i in range(event_start, event_end + 1): cumulated_lines += int(self.num_output_per_event_[i, 1] + 1) else: raise TypeError( "Value given as flag events is not of type int or a tuple" ) # +1 for the end line in Jetscape format return cumulated_lines + 1 def __apply_kwargs_filters( self, event: List[List[Particle]], filters_dict: Any ) -> List[List[Particle]]: """ Applies the specified filters to the event. This method applies a series of filters to the event based on the keys in the filters_dict dictionary. The filters include 'charged_particles', 'uncharged_particles', 'particle_species', 'remove_particle_species', 'lower_event_energy_cut', 'pT_cut', 'mT_cut', 'rapidity_cut', 'pseudorapidity_cut', 'multiplicity_cut', 'particle_status', 'keep_hadrons', 'keep_leptons', 'keep_quarks', 'keep_mesons', 'keep_baryons', 'keep_up', 'keep_down', 'keep_strange', 'keep_charm', 'keep_bottom', 'keep_top' and 'remove_photons'. If a key in the filters_dict dictionary does not match any of these filters, a ValueError is raised. Parameters ---------- event : list The event to which the filters are applied. filters_dict : dict A dictionary of filters to apply to the event. The keys are the names of the filters and the values are the parameters for the filters. Raises ------ ValueError If a key in the filters_dict dictionary does not match any of the supported filters. Returns ------- event : list The event after the filters have been applied. """ if not isinstance(filters_dict, dict) or len(filters_dict.keys()) == 0: return event for i in filters_dict.keys(): if i == "charged_particles": if filters_dict["charged_particles"]: event = charged_particles(event) elif i == "uncharged_particles": if filters_dict["uncharged_particles"]: event = uncharged_particles(event) elif i == "particle_species": event = particle_species( event, filters_dict["particle_species"] ) elif i == "remove_particle_species": event = remove_particle_species( event, filters_dict["remove_particle_species"] ) elif i == "lower_event_energy_cut": event = lower_event_energy_cut( event, filters_dict["lower_event_energy_cut"] ) elif i == "pT_cut": event = pT_cut(event, filters_dict["pT_cut"]) elif i == "mT_cut": event = mT_cut(event, filters_dict["mT_cut"]) elif i == "rapidity_cut": event = rapidity_cut(event, filters_dict["rapidity_cut"]) elif i == "pseudorapidity_cut": event = pseudorapidity_cut( event, filters_dict["pseudorapidity_cut"] ) elif i == "multiplicity_cut": event = multiplicity_cut( event, filters_dict["multiplicity_cut"] ) elif i == "particle_status": event = particle_status(event, filters_dict["particle_status"]) elif i == "keep_hadrons": if filters_dict["keep_hadrons"]: event = keep_hadrons(event) elif i == "keep_leptons": if filters_dict["keep_leptons"]: event = keep_leptons(event) elif i == "keep_quarks": if filters_dict["keep_quarks"]: event = keep_quarks(event) elif i == "keep_mesons": if filters_dict["keep_mesons"]: event = keep_mesons(event) elif i == "keep_baryons": if filters_dict["keep_baryons"]: event = keep_baryons(event) elif i == "keep_up": if filters_dict["keep_up"]: event = keep_up(event) elif i == "keep_down": if filters_dict["keep_down"]: event = keep_down(event) elif i == "keep_strange": if filters_dict["keep_strange"]: event = keep_strange(event) elif i == "keep_charm": if filters_dict["keep_charm"]: event = keep_charm(event) elif i == "keep_bottom": if filters_dict["keep_bottom"]: event = keep_bottom(event) elif i == "keep_top": if filters_dict["keep_top"]: event = keep_top(event) elif i == "remove_photons": if filters_dict["remove_photons"]: event = remove_photons(event) else: raise ValueError("The cut is unknown!") return event # PUBLIC CLASS METHODS
[docs] def set_particle_list( self, kwargs: Dict[ str, Union[int, Tuple[int, int], List[Dict[str, Union[str, float]]]] ], ) -> List[List[Particle]]: """ Sets the list of particles based on the specified optional arguments. This method reads the JETSCAPE data file and creates a list of Particle objects based on the data in the file. It applies any filters specified in the 'filters' key of the kwargs dictionary. It also adjusts the number of events and the number of output lines per event based on the 'events' key in the kwargs dictionary. If any other keys are specified in the kwargs dictionary, it raises a ValueError. Parameters ---------- kwargs : dict A dictionary of optional arguments. The following keys are recognized: - 'events': Either a tuple of two integers specifying the range of events to load, or a single integer specifying a single event to load. - 'filters': A list of filters to apply to the data. Raises ------ IndexError If the end of the JETSCAPE file is reached before the specified number of lines is read, or if the number of events in the JETSCAPE file does not match the number of events specified by the comments in the file. ValueError If the first line of the event is not a comment line or does not contain "weight", or if an unknown keyword argument is used. Returns ------- particle_list : list A list of Particle objects loaded from the JETSCAPE data file. """ particle_list: List[List[Particle]] = [] data: List[Particle] = [] num_read_lines = self.__get_num_read_lines() cut_events = 0 with open(self.PATH_JETSCAPE_, "r") as jetscape_file: self._skip_lines(jetscape_file) for i in range(0, num_read_lines): line = jetscape_file.readline() if not line: raise IndexError("Index out of range of JETSCAPE file") elif "#" in line and "sigmaGen" in line: old_data_len = len(data) if "filters" in self.optional_arguments_.keys(): data = self.__apply_kwargs_filters( [data], kwargs["filters"] )[0] if len(data) != 0 or old_data_len == 0: self.num_output_per_event_[len(particle_list)] = ( len(particle_list) + 1, len(data), ) else: self.num_output_per_event_ = np.atleast_2d( np.delete( self.num_output_per_event_, len(particle_list), axis=0, ) ) if self.num_output_per_event_.shape[0] == 0: self.num_output_per_event_ = np.array([]) elif ( len(particle_list) < self.num_output_per_event_.shape[0] ): self.num_output_per_event_[ len(particle_list) :, 0 ] -= 1 if len(data) != 0 or old_data_len == 0: particle_list.append(data) else: cut_events = cut_events + 1 elif i == 0 and "#" not in line and "weight" not in line: raise ValueError( "First line of the event is not a comment " + 'line or does not contain "weight"' ) elif "Event" in line and "weight" in line: line_list = ( line.replace("\n", "").replace("\t", " ").split(" ") ) first_event_header = 1 if "events" in self.optional_arguments_.keys(): if isinstance(kwargs["events"], int): first_event_header += int(kwargs["events"]) else: if not ( isinstance(kwargs["events"], tuple) or isinstance(kwargs["events"], int) ): raise ValueError( "Events should be an integer or tuple of two integers" ) first_event_header += int(kwargs["events"][0]) if int(line_list[2]) == first_event_header: continue else: old_data_len = len(data) if "filters" in self.optional_arguments_.keys(): data = self.__apply_kwargs_filters( [data], kwargs["filters"] )[0] if len(data) != 0 or old_data_len == 0: self.num_output_per_event_[ len(particle_list) ] = ( len(particle_list) + 1, len(data), ) else: self.num_output_per_event_ = np.atleast_2d( np.delete( self.num_output_per_event_, len(particle_list), axis=0, ) ) if self.num_output_per_event_.shape[0] == 0: self.num_output_per_event_ = np.array([]) elif ( len(particle_list) < self.num_output_per_event_.shape[0] ): self.num_output_per_event_[ len(particle_list) :, 0 ] -= 1 if len(data) != 0 or old_data_len == 0: particle_list.append(data) else: cut_events = cut_events + 1 data = [] else: line_list = ( line.replace("\n", "").replace("\t", " ").split(" ") ) particle = Particle("JETSCAPE", np.asarray(line_list)) data.append(particle) self.num_events_ = self.num_events_ - cut_events # Correct num_output_per_event and num_events if not kwargs or "events" not in self.optional_arguments_.keys(): if len(particle_list) != self.num_events_: raise IndexError( "Number of events in Jetscape file does not match the " + "number of events specified by the comments in the " + "Jetscape file!" ) elif isinstance(kwargs["events"], int): update = self.num_output_per_event_[kwargs["events"]] self.num_output_per_event_ = np.array(update) self.num_events_ = int(1) elif isinstance(kwargs["events"], tuple): event_start = kwargs["events"][0] event_end = kwargs["events"][1] update = self.num_output_per_event_[event_start : event_end + 1] self.num_output_per_event_ = update self.num_events_ = int(event_end - event_start + 1) if particle_list == []: particle_list = [[]] return particle_list
[docs] def set_num_output_per_event(self) -> None: """ Sets the number of output lines per event in the JETSCAPE data file. This method reads the JETSCAPE data file line by line and determines the number of output lines for each event. It does this by looking for lines that contain a '#' and the ``particle_type_defining_string_``. For each such line, it extracts the event number and the number of output lines from the line and appends them to a list. After reading the entire file, it converts the list to a numpy array and stores it in the ``num_output_per_event_`` attribute. It also sets the ``num_events_`` attribute to the length of the list. Parameters ---------- None Raises ------ None Returns ------- None """ with open(self.PATH_JETSCAPE_, "r") as jetscape_file: event_output = [] while True: line = jetscape_file.readline() if not line: break elif ( "#" in line and self.particle_type_defining_string_ in line ): line_str = ( line.replace("\n", "").replace("\t", " ").split(" ") ) event = line_str[2] num_output = line_str[8] event_output.append([event, num_output]) else: continue self.num_output_per_event_ = np.array(event_output, dtype=np.int32) self.num_events_ = len(event_output)
[docs] def get_last_line(self, file_path: str) -> str: """ Returns the last line of a file. Parameters ---------- file_path : str The path to the file. Returns ------- str The last line of the file, stripped of leading and trailing whitespace. """ with open(file_path, "rb") as file: file.seek(-2, 2) while file.read(1) != b"\n": file.seek(-2, 1) last_line = file.readline().decode().strip() return last_line
[docs] def get_sigmaGen(self) -> Tuple[float, float]: """ Retrieves the sigmaGen values with error from the last line of a file. Returns ------- tuple A tuple containing the first and second sigmaGen values as floats. """ last_line = self.get_last_line(self.PATH_JETSCAPE_) words = last_line.split() numbers = [] for word in words: try: number = float(word) numbers.append(number) if len(numbers) == 2: break except ValueError: continue return (numbers[0], numbers[1])
[docs] def get_particle_type_defining_string(self) -> str: """ Returns the string which defines the particle type in the Jetscape file. Returns ------- string The particle type defining string. """ return self.particle_type_defining_string_
[docs] def get_particle_type(self) -> str: """ Returns the particle type of the Jetscape file. Returns ------- string The particle type. """ return self.particle_type_