# ===================================================
#
# Copyright (c) 2023-2024
# SPARKX Team
#
# GNU General Public License (GPLv3 or later)
#
# ===================================================
from sparkx.Filter import *
import numpy as np
import warnings
from sparkx.loader.OscarLoader import OscarLoader
from sparkx.BaseStorer import BaseStorer
from typing import Any, List, Optional, Union, Dict
[docs]
class Oscar(BaseStorer):
"""
Defines an Oscar object.
The Oscar class contains a single .oscar file including all or only chosen
events in either the Oscar2013, Oscar2013Extended, Oscar2013Extended_IC,
Oscar2013Extended_Photons or ASCII format. It's methods
allow to directly act on all contained events as applying acceptance filters
(e.g. un-/charged particles, spectators/participants) to keep/remove particles
by their PDG codes or to apply cuts (e.g. multiplicity, pseudo-/rapidity, pT).
Once these filters are applied, the new data set can be accessed as a
1) nested list containing all quantities of the Oscar format
2) list containing Particle objects from the Particle class
or it may be printed to a file complying with the input format.
.. note::
If filters are applied, be aware that not all cuts commute.
Parameters
----------
OSCAR_FILE : str
Path to Oscar file
Other Parameters
----------------
**kwargs : properties, optional
kwargs are used to specify optional properties like a chunk reading
and must be used like :code:`'property'='value'` where the possible
properties are specified below.
.. list-table::
:header-rows: 1
:widths: 25 75
* - Property
- Description
* - :code:`events` (int)
- From the input Oscar file load only a single event by |br|
specifying :code:`events=i` where i is event number i.
* - :code:`events` (tuple)
- From the input Oscar file load only a range of events |br|
given by the tuple :code:`(first_event, last_event)` |br|
by specifying :code:`events=(first_event, last_event)` |br|
where last_event is included.
* - :code:`filters` (dict)
- Apply filters on an event-by-event basis to directly filter the |br|
particles after the read in of one event. This method saves |br|
memory. The names of the filters are the same as the names of |br|
the filter methods. All filters are applied in the order in |br|
which they appear in the dictionary.
.. |br| raw:: html
<br />
Attributes
----------
PATH_OSCAR_ : str
Path to the Oscar file
oscar_format_ : str
Input Oscar format "Oscar2013", "Oscar2013Extended",
"Oscar2013Extended_IC", "Oscar2013Extended_Photons",
"ASCII" (set automatically)
num_output_per_event_ : numpy.array
Array containing the event number and the number of particles in this
event as :code:`num_output_per_event_[event i][num_output in event i]`
(updated when filters are applied)
num_events_ : int
Number of events contained in the Oscar object (updated when filters
are applied)
event_end_lines_ : list
List containing all comment lines at the end of each event as str.
Needed to print the Oscar object to a file.
Methods
-------
oscar_format:
Get Oscar format of the input files
impact_parameters:
Get impact parameters of the events
print_particle_lists_to_file:
Print current particle data to file with same format
Examples
--------
**1. Initialization**
To create an Oscar object, the path to the Oscar file has to be passed.
By default the Oscar object will contain all events of the input file. If
the Oscar object should only contain certain events, the keyword argument
"events" must be used.
.. highlight:: python
.. code-block:: python
:linenos:
>>> from sparkx.Oscar import Oscar
>>>
>>> OSCAR_FILE_PATH = [Oscar_directory]/particle_lists.oscar
>>>
>>> # Oscar object containing all events
>>> oscar1 = Oscar(OSCAR_FILE_PATH)
>>>
>>> # Oscar object containing only the first event
>>> oscar2 = Oscar(OSCAR_FILE_PATH, events=0)
>>>
>>> # Oscar object containing only events 2, 3, 4 and 5
>>> oscar3 = Oscar(OSCAR_FILE_PATH, events=(2,5))
**2. Method Usage**
All methods that apply filters to the Oscar data return :code:`self`. This
means that methods can be concatenated. To access the Oscar data as list to
store it into a variable, the method :code:`particle_list()` or
:code:`particle_objects_list` must be called in the end.
Let's assume we only want to keep participant pions in events with a
multiplicity > 500:
>>> oscar = Oscar(OSCAR_FILE_PATH)
>>>
>>> pions = oscar.multiplicity_cut(500, None).participants().particle_species((211, -211, 111))
>>>
>>> # save the pions of all events as nested list
>>> pions_list = pions.particle_list()
>>>
>>> # save the pions as list of Particle objects
>>> pions_particle_objects = pions.particle_objects_list()
>>>
>>> # print the pions to an oscar file
>>> pions.print_particle_lists_to_file('./particle_lists.oscar')
**3. Constructor cuts**
Cuts can be performed directly in the constructor by passing a dictionary. This
has the advantage that memory is saved because the cuts are applied after reading
each single event. This is achieved by the keyword argument :code:`filters`, which
contains the filter dictionary. Filters are applied in the order in which they appear.
Let's assume we only want to keep participant pions in events with a
multiplicity > 500:
>>> oscar = Oscar(OSCAR_FILE_PATH, filters={'multiplicity_cut':(500,None), 'participants':True, 'particle_species':(211, -211, 111)})
>>>
>>> # print the pions to an oscar file
>>> oscar.print_particle_lists_to_file('./particle_lists.oscar')
Notes
-----
If the :code:`filters` keyword with the :code:`spacetime_cut` is used, then a list
specifying the dimension to be cut in the first entry and the tuple with the cut
boundaries in the second entry is needed. For all other filters, the dictionary
only needs the filter name as key and the filter argument as value.
All filter functions without arguments need a :code:`True` in the dictionary.
"""
def __init__(self, OSCAR_FILE: str, **kwargs: Any) -> None:
super().__init__(OSCAR_FILE, **kwargs)
self.PATH_OSCAR_: str = OSCAR_FILE
if not isinstance(self.loader_, OscarLoader):
raise TypeError("The loader must be an instance of OscarLoader.")
self.oscar_format_: Union[str, None] = self.loader_.oscar_format()
self.event_end_lines_: List[str] = self.loader_.event_end_lines()
self.impact_parameters_: List[float] = self.loader_.impact_parameter()
del self.loader_
[docs]
def create_loader(self, OSCAR_FILE: str) -> None: # type: ignore[override]
"""
Creates a new OscarLoader object.
This method initializes a new OscarLoader object with the specified OSCAR file
and assigns it to the loader attribute.
Parameters
----------
OSCAR_FILE : str
The path to the OSCAR file to be loaded.
Returns
-------
None
"""
self.loader_ = OscarLoader(OSCAR_FILE)
def _particle_as_list(self, particle: Any) -> List[Union[float, int]]:
particle_list = []
if self.oscar_format_ == "ASCII":
for attr in self.custom_attr_list:
particle_list.append(getattr(particle, attr))
return particle_list
else:
particle_list.append(float(particle.t))
particle_list.append(float(particle.x))
particle_list.append(float(particle.y))
particle_list.append(float(particle.z))
particle_list.append(float(particle.mass))
particle_list.append(float(particle.E))
particle_list.append(float(particle.px))
particle_list.append(float(particle.py))
particle_list.append(float(particle.pz))
particle_list.append(int(particle.pdg))
particle_list.append(int(particle.ID))
particle_list.append(int(particle.charge))
if (
self.oscar_format_ == "Oscar2013Extended"
or self.oscar_format_ == "Oscar2013Extended_IC"
or self.oscar_format_ == "Oscar2013Extended_Photons"
):
particle_list.append(int(particle.ncoll))
particle_list.append(float(particle.form_time))
particle_list.append(float(particle.xsecfac))
particle_list.append(int(particle.proc_id_origin))
particle_list.append(int(particle.proc_type_origin))
particle_list.append(float(particle.t_last_coll))
particle_list.append(int(particle.pdg_mother1))
particle_list.append(int(particle.pdg_mother2))
if self.oscar_format_ != "Oscar2013Extended_Photons":
if not np.isnan(particle.baryon_number):
particle_list.append(int(particle.baryon_number))
if not np.isnan(particle.strangeness):
particle_list.append(int(particle.strangeness))
else:
if not np.isnan(particle.weight):
particle_list.append(int(particle.weight))
elif (
self.oscar_format_ != "Oscar2013"
and self.oscar_format_ != "Oscar2013Extended"
and self.oscar_format_ != "Oscar2013Extended_IC"
and self.oscar_format_ != "Oscar2013Extended_Photons"
):
raise TypeError(
"Input file not in OSCAR2013, OSCAR2013Extended or Oscar2013Extended_IC format"
)
return particle_list
[docs]
def particle_status(
self, status_list: Union[int, Tuple[int, ...], List[int], np.ndarray]
) -> "Oscar":
"""
Raises an error because the method is not implemented for the Oscar
class.
Parameters
----------
status_list : int
To keep a particles with a single status only, pass a single status
status_list : tuple/list/array
To keep hadrons with different hadron status, pass a tuple or list
or array
Returns
-------
NotImplementedError
This method is not implemented for the Oscar class.
"""
raise NotImplementedError(
"particle_status is not implemented for the Oscar class."
)
[docs]
def keep_quarks(self) -> "Oscar":
"""
Raises an error because the method is not implemented for the Oscar
class.
Returns
-------
NotImplementedError
This method is not implemented for the Oscar class.
"""
raise NotImplementedError(
"keep_quarks is not implemented for the Oscar class."
)
def _update_after_merge(self, other: BaseStorer) -> None:
"""
Updates the current instance after merging with another Oscar instance.
This method is called after merging two Oscar instances to update the
attributes of the current instance based on the attributes of the other instance.
Parameters
----------
other : Oscar
The other Oscar instance that was merged with the current instance.
Raises
------
UserWarning
If the Oscar formats of the two instances do not match, a warning is issued.
"""
if not isinstance(other, Oscar):
raise TypeError("Can only add Oscar objects to Oscar.")
if self.oscar_format_ != other.oscar_format_:
warnings.warn(
"Oscar format of the merged instances do not match. Taking the left-hand side Oscar format."
)
self.event_end_lines_ = self.event_end_lines_ + other.event_end_lines_
[docs]
def impact_parameters(self) -> List[float]:
"""
Get the impact parameters for the loaded events.
Returns
-------
impact_parameters : List[float]
Impact parameters of the events
"""
return self.impact_parameters_
[docs]
def print_particle_lists_to_file(self, output_file: str) -> None:
"""
Prints the current Oscar data to an output file specified by
:code:`output_file` with the same format as the input file.
For empty events, only the event header and footer are printed.
Parameters
----------
output_file : str
Path to the output file like :code:`[output_directory]/particle_lists.oscar`
"""
header: List[str] = []
format_oscar2013: str = "%g %g %g %g %g %.9g %.9g %.9g %.9g %d %d %d"
format_oscar2013_extended: str = (
"%g %g %g %g %g %.9g %.9g %.9g %.9g %d %d %d %d %g %g %d %d %g %d %d"
)
format_map: Dict[str, str] = {
"t": "%g",
"x": "%g",
"y": "%g",
"z": "%g",
"mass": "%g",
"p0": "%.9g",
"px": "%.9g",
"py": "%.9g",
"pz": "%.9g",
"pdg": "%d",
"ID": "%d",
"charge": "%d",
"ncoll": "%d",
"form_time": "%g",
"xsecfac": "%g",
"proc_id_origin": "%d",
"proc_type_origin": "%d",
"time_last_coll": "%g",
"pdg_mother1": "%d",
"pdg_mother2": "%d",
"baryon_number": "%d",
"strangeness": "%d",
}
if self.oscar_format_ == "ASCII":
format_custom = " ".join(
[format_map[attr] for attr in self.custom_attr_list]
)
with open(self.PATH_OSCAR_, "r") as oscar_file:
counter_line = 0
while True:
line = oscar_file.readline()
line_splitted = line.replace("\n", "").split(" ")
if counter_line < 3:
header.append(line)
elif line_splitted[0] == "#" and line_splitted[3] == "end":
event_footer = line
break
elif counter_line > 1000000:
err_msg = (
"Unable to find the end of an event in the original"
+ "Oscar file within the first 1000000 lines"
)
raise RuntimeError(err_msg)
counter_line += 1
with open(output_file, "w") as f_out:
for i in range(3):
f_out.write(header[i])
# Open the output file with buffered writing (25 MB)
with open(output_file, "a", buffering=25 * 1024 * 1024) as f_out:
if self.particle_list_ is None:
raise ValueError("The particle list is empty.")
list_of_particles = self.particle_list()
if self.num_output_per_event_ is None:
raise ValueError("The number of output per event is empty.")
if self.num_events_ is None:
raise ValueError("The number of events is empty.")
if self.num_events_ == 1 and self.particle_list_ == [[]]:
warnings.warn("The number of events is zero.")
elif self.num_events_ > 1:
for i in range(self.num_events_):
event = self.num_output_per_event_[i, 0]
num_out = self.num_output_per_event_[i, 1]
particle_output = np.asarray(list_of_particles[i])
f_out.write(
"# event " + str(event) + " out " + str(num_out) + "\n"
)
if len(particle_output) == 0:
f_out.write(self.event_end_lines_[event])
continue
elif (
i == 0
and len(particle_output[0]) > 20
and (
self.oscar_format_ == "Oscar2013Extended"
or self.oscar_format_ == "Oscar2013Extended_IC"
)
):
format_oscar2013_extended = (
format_oscar2013_extended
+ (len(particle_output[0]) - 20) * " %d"
)
if self.oscar_format_ == "Oscar2013":
np.savetxt(
f_out,
particle_output,
delimiter=" ",
newline="\n",
fmt=format_oscar2013,
)
elif (
self.oscar_format_ == "Oscar2013Extended"
or self.oscar_format_ == "Oscar2013Extended_IC"
or self.oscar_format_ == "Oscar2013Extended_Photons"
):
np.savetxt(
f_out,
particle_output,
delimiter=" ",
newline="\n",
fmt=format_oscar2013_extended,
)
elif self.oscar_format_ == "ASCII":
np.savetxt(
f_out,
particle_output,
delimiter=" ",
newline="\n",
fmt=format_custom,
)
f_out.write(self.event_end_lines_[event])
else:
event = 0
num_out = self.num_output_per_event_[0][1]
particle_output = np.asarray(list_of_particles)
f_out.write(
"# event " + str(event) + " out " + str(num_out) + "\n"
)
if len(particle_output) == 0:
f_out.write(self.event_end_lines_[event])
f_out.close()
return
elif len(particle_output[0]) > 20 and (
self.oscar_format_ == "Oscar2013Extended"
or self.oscar_format_ == "Oscar2013Extended_IC"
):
format_oscar2013_extended = (
format_oscar2013_extended
+ (len(particle_output[0]) - 20) * " %d"
)
if self.oscar_format_ == "Oscar2013":
np.savetxt(
f_out,
particle_output,
delimiter=" ",
newline="\n",
fmt=format_oscar2013,
)
elif (
self.oscar_format_ == "Oscar2013Extended"
or self.oscar_format_ == "Oscar2013Extended_IC"
or self.oscar_format_ == "Oscar2013Extended_Photons"
):
np.savetxt(
f_out,
particle_output,
delimiter=" ",
newline="\n",
fmt=format_oscar2013_extended,
)
elif self.oscar_format_ == "ASCII":
np.savetxt(
f_out,
particle_output,
delimiter=" ",
newline="\n",
fmt=format_custom,
)
f_out.write(self.event_end_lines_[event])
f_out.close()