from typing import (
Any,
Dict,
Generator,
List,
Optional,
Sequence,
Tuple,
IO,
Union,
)
# Python 3.8 minimum because of this and walrus
from functools import cached_property
import numpy as np
# Prep if we ever move up np minimum requirements to 1.21
# OurNumpyArrType = np.ndarray[tuple[int, ...], np.dtype[np.int16]]
OurNumpyArrType = np.ndarray
###############################################################################
[docs]
class ChannelData:
"""
All data related to a single channel, could be as part of an event or not as part of an event, and may or may not have a pulse summary or wave
Check self.has_wave and self.has_summary to see if either are present
.. autoattribute::
"""
def __init__(
self,
channel: int,
event_timestamp: int,
pulse_summary: Optional[Dict[str, Any]],
wave: Optional[OurNumpyArrType],
other_data:Optional[Dict[str, Any]] = {},
):
# General / Tracking Variables
#: channel ID
self.channel: int = channel
#: FPGA Event Timestamp
self.event_timestamp: int = event_timestamp
#: Boolean indicating whether this channel contains waveform data
self.has_wave: bool = isinstance(wave, np.ndarray)
#: Boolean indicating whether this channel contains DSP summary data
self.has_summary: bool = isinstance(pulse_summary, dict)
#: Numpy Waveform for digitizer signal
self.wave: Optional[OurNumpyArrType] = wave if self.has_wave else None
#: Dictionary of Pulse Summary (DSP) Information
self.pulse_summary: Dict[str, Any] = pulse_summary if isinstance(pulse_summary, dict) else {}
#: Timestamp of this channels trigger. This will be equal to the event timestamp + relative channel offset defined in certain file formats
if ('relative_timestamp' in self.pulse_summary) and ('relative_channel_timestamp' not in self.pulse_summary):
self.pulse_summary['relative_channel_timestamp'] = self.pulse_summary.pop('relative_timestamp')
self.channel_timestamp : int = event_timestamp + self.pulse_summary.get('relative_channel_timestamp', 0)
#: Maximum height of pulse in the pulse height window. Subject to averaging
self.pulse_height: Optional[int] = self.pulse_summary.get("pulse_height", None)
#: Maximum height of trigger in the trigger active window. Subject to averaging
self.trigger_height: Optional[int] = self.pulse_summary.get("trigger_height", None)
if self.trigger_height is None:
self.trigger_height = self.pulse_summary.get("trig_height", None)
#: QuadQDC BASE Integration. Not supported on all digitizer models
self.quadqdc_base: Optional[int] = self.pulse_summary.get("quadqdc_base", None)
#: QuadQDC FAST Integration. Not supported on all digitizer models
self.quadqdc_fast: Optional[int] = self.pulse_summary.get("quadqdc_fast", None)
#: QuadQDC SLOW Integration. Not supported on all digitizer models
self.quadqdc_slow: Optional[int] = self.pulse_summary.get("quadqdc_slow", None)
#: QuadQDC TAIL Integration. Not supported on all digitizer models
self.quadqdc_tail: Optional[int] = self.pulse_summary.get("quadqdc_tail", None)
if self.quadqdc_base is None:
self.quadqdc_base = self.pulse_summary.get("qdc_base_sum", None)
if self.quadqdc_fast is None:
self.quadqdc_fast = self.pulse_summary.get("qdc_fast_sum", None)
if self.quadqdc_slow is None:
self.quadqdc_slow = self.pulse_summary.get("qdc_slow_sum", None)
if self.quadqdc_tail is None:
self.quadqdc_tail = self.pulse_summary.get("qdc_tail_sum", None)
#: Boolean indicating whether this channel triggered
self.triggered: Optional[bool] = self.pulse_summary.get("triggered", None)
#: The number of times the trigger for this channel fired in the trigger window
#: Also known as pileup count
self.trigger_multiplicity: Optional[int] = self.pulse_summary.get("trigger_count", None)
if self.trigger_multiplicity is None:
self.trigger_multiplicity = self.pulse_summary.get("trig_count", None)
# Future:
# self.qdc_rect : int
# self.qdc_tri : int
# self.mwd : int
#: other data not normally included in pulse summaries that may supported be in a special file format
self.other_data : Dict[str,Any] = {}
# _________________________________________________________________________
[docs]
@cached_property
def relative_channel_timestamp(self) -> Optional[int]:
"""
Returns true if there have been multiple triggers in this channel
"""
if self.channel_timestamp is not None:
return (self.channel_timestamp - self.event_timestamp)
return None
# _________________________________________________________________________
[docs]
@cached_property
def pileup(self) -> bool:
"""
Returns true if there have been multiple triggers in this channel
"""
if self.trigger_multiplicity:
return True
return False
# _________________________________________________________________________
[docs]
@cached_property
def num_wave_samples(self) -> int:
"""
Returns the number of samples in the wave, 0 if this has no wave
"""
if self.has_wave:
if self.wave is not None:
return self.wave.size
return 0
# _________________________________________________________________________
def __str__(self) -> str:
return f"<Channel{self.channel} Data: event_timestamp={self.event_timestamp} relative_channel_timestamp={self.relative_channel_timestamp} has_summary={self.has_summary} num_wave_samples={self.num_wave_samples}>"
# _________________________________________________________________________
def __repr__(self) -> str:
return str(self)
###############################################################################
[docs]
class EventInfo:
"""
Information related to a group of channels collated as an "Event"
as defined by either the file format or a rebuilt coincidence window.
"""
def __init__(self, channel_data: Sequence[ChannelData]):
"""
:channel_data: A list of more than one channel constituting an event
"""
#: number of channels in this event
self.num_channels = len(channel_data)
assert self.num_channels > 0, "At least one channel must be provided"
# Ensure channel data is homogenous
if not all(channel_data[0].has_wave == channel_data[i].has_wave for i in range(self.num_channels)):
raise ValueError("Events require every channel to have a waveform OR for none of them to have waveforms")
if not all(channel_data[0].has_summary == channel_data[i].has_summary for i in range(self.num_channels)):
raise ValueError("Events require every channel to have a pulse_summary OR for none of them to have pulse_summaries")
self.channel_data = sorted(channel_data, key=lambda cd: cd.channel)
[docs]
@cached_property
def has_waves(self):
"""True if this Event contains waveform data for ALL of it's channels"""
return all(cd.has_wave for cd in self.channel_data)
[docs]
@cached_property
def has_summary(self):
"""True if this Event contains pulse summary data for ALL of it's channels"""
return all(cd.has_summary for cd in self.channel_data)
# _________________________________________________________________________
# Waveforms Shape
# _________________________________________________________________________
[docs]
def wavedata(self) -> OurNumpyArrType:
"""
An np.ndarray of waves in the event. Rows are samples, columns are the
channels in this event. see `channels` for the list of channel numbers
"""
if self.has_waves:
# stack multiple waves into a single array. Each channel is one column
arrays: List[OurNumpyArrType] = [cd.wave for cd in self.channel_data] # type:ignore
return np.stack(arrays=arrays, axis=1)
raise RuntimeError("No Waveform waves found for this Event!")
[docs]
@cached_property
def num_wave_samples(self) -> Optional[int]:
"""
Returns the number of samples in each channel's waveform or None if
no waveforms exist
"""
if self.has_waves:
return self.channel_data[0].num_wave_samples
return None
[docs]
@cached_property
def shape(self):
"""
Returns the shape of :meth:`.wavedata' or None if no waveforms exist
"""
if self.has_waves:
return (self.num_wave_samples, self.num_channels)
return tuple()
# _________________________________________________________________________
# Channels
# _________________________________________________________________________
[docs]
@cached_property
def channels(self) -> Sequence[int]:
"""
List of all channels in the event in the order they appear
"""
return [cd.channel for cd in self.channel_data]
# _________________________________________________________________________
# Timestamps
# _________________________________________________________________________
[docs]
@cached_property
def timestamp(self) -> int:
"""
The timestamp of this event. Defined as the timestamp of the first trigger
in an event. This quantity is also saved in each channel's data as :py:ref:`ChannelData.event_timestamp`
"""
return min(cd.event_timestamp for cd in self.channel_data)
# .........................................................................
[docs]
@cached_property
def channel_timestamps(self) -> Sequence[Optional[int]]:
"""
Timestamps for each
"""
return [cd.channel_timestamp for cd in self.channel_data]
# _________________________________________________________________________
[docs]
@cached_property
def relative_channel_timestamps(self) -> Sequence[Optional[int]]:
"""
All timestamps found throughout all of the channels we have
"""
return [cd.relative_channel_timestamp for cd in self.channel_data]
# .........................................................................
[docs]
@cached_property
def channel_timestamp_range(self) -> int:
"""
Returns the difference between the maximum and minimum channel timestamp in this event
"""
channel_timestamps = [t for t in self.relative_channel_timestamps if t is not None]
if channel_timestamps:
return max(channel_timestamps) - min(channel_timestamps)
return 0
# _________________________________________________________________________
# Pulse Heights
# _________________________________________________________________________
[docs]
@cached_property
def pulse_heights(self) -> Union[Sequence[int], Sequence[None]]:
"""Returns the pulse heights on each channel."""
return [cd.pulse_height for cd in self.channel_data] # type: ignore
# _________________________________________________________________________
# Trigger Data
# _________________________________________________________________________
[docs]
@cached_property
def trigger_heights(self) -> Union[Sequence[int], Sequence[None]]:
"""Returns the trigger heights trigger on each channel."""
return [cd.trigger_height for cd in self.channel_data] # type: ignore
# .........................................................................
[docs]
@cached_property
def channel_multiplicity(self) -> int:
"""Returns the number of channels that triggered at least once in this event"""
return sum(cd.triggered for cd in self.channel_data) # type: ignore
# .........................................................................
[docs]
@cached_property
def pileup_count(self) -> Union[Sequence[int], Sequence[None]]:
"""Returns a list of the number of triggers fired for each channel"""
return [cd.trigger_multiplicity for cd in self.channel_data if cd.triggered] # type: ignore
# .........................................................................
[docs]
@cached_property
def total_triggers(self) -> int:
"""Returns the total number of triggers that fired across all channels.
AKA Hit Multiplicity
"""
return sum(self.pileup_count) # type: ignore
# .........................................................................
def __str__(self) -> str:
return f"<EventInfo: timestamp={self.timestamp}, channels={self.channels}, has_summary={self.has_summary}, shape={self.shape}>"
# .........................................................................
def __repr__(self) -> str:
return str(self)
# .........................................................................
def __getitem__(self, chan:int) -> ChannelData:
"""grabs the data associated with the channel number `chan`. The first occurance of the channel in this event will be returned"""
if chan not in self.channels:
raise KeyError(f"Channel {chan} is not in this Event")
ch_idx = self.channels.index(chan)
return self.channel_data[ch_idx]
# .........................................................................
def __iter__(self) -> Generator[ChannelData, Any, None]:
for chan_data in self.channel_data:
yield chan_data
###############################################################################
[docs]
class BaseLoader:
"""
The base class that all Loader types are an extension of, Loaders extending this subclass this class and then implement load_channel_batch
All BaseLoader derived classes can be used as a context manager, i.e.:
with <loader>(file) as loader:
# Do whatever
NOTE:
An individual BaseLoader instance is able to run exactly *once* before needing to -reopen the file, please keep this in mind.
"""
# _________________________________________________________________________
def __init__(self, fpath: str, rebuild_events_with_window: Optional[int] = None):
"""
:fpath: filepath to the data file
:rebuild_events_with_window: timestamp window where channel data is considered part of the same event. If None, then it will return events as they are defined in the file.
FUTURE:
Add a parameter `resort_by_timestamp` that resorts all data
by the timestamp in the rare case that data isn't written to disk in
sequence. This is a slow and memory intensive operation that is never required
if your data is generated using the UI of a SkuTek digitizer. So it's not
worth doing at this time
"""
self.fpath = fpath
self.rebuild_events_with_window = rebuild_events_with_window
self.active_event_building = self.rebuild_events_with_window is not None
self.values: Optional[Sequence[ChannelData]] = []
self.current_chan_in_values: int = 0
self.channel_ran: bool = False
self.file_handle: IO # type: ignore
def __enter__(self) -> "BaseLoader":
return self
def __exit__(self, type: Any, value: Any, traceback: Any) -> None:
self.file_handle.close() # type: ignore
# _________________________________________________________________________
[docs]
def loadChannelBatch(self) -> Optional[Sequence[ChannelData]]:
"""
The base method for loading channels, this loads a sequence of channels (events) or individual channels.
This is specialized for all loader types.
"""
# Many file formats save channels in columns and samples in rows
# which means it's nigh impossible to just load in one channel at a time
# For file formats where this is possible this function will return a single
# ChannelData. For file formats (like IGOR or EventCSV) where it's not,
# then we'll return a list of ChannelData
raise NotImplementedError("required overload")
# _________________________________________________________________________
[docs]
def channelByChannel(self) -> Generator[ChannelData, Any, None]:
"""
Get the individual channels, loaded one at a time
"""
# This code is less complex than it looks, the first repeated while statement is basically just the previous one, but again
while True:
# Does a "Not none" check and also is just an entry-point to finish a list for an unfinished generator
while self.channel_ran and self.values is not None and self.current_chan_in_values < len(self.values):
yield self.values[self.current_chan_in_values]
self.current_chan_in_values += 1
# Load in the batch to a class-based variable
self.current_chan_in_values = 0
self.values = self.loadChannelBatch()
self.channel_ran = True
if self.values is None:
return
# Yield as many channels as we can yield before we return
while self.current_chan_in_values < len(self.values):
yield self.values[self.current_chan_in_values]
self.current_chan_in_values += 1
# _________________________________________________________________________
[docs]
def nextEvent(self) -> Optional[EventInfo]:
"""
Obtain the next event by loading the next batch.
"""
# .....................................................................
# if we are not actively event building then we just define
# an event as it is defined in the file as a batch of channels
# i.e. no timestamp coincidence window checking
if not self.active_event_building:
channel_batch = self.loadChannelBatch()
if channel_batch is None:
return None
return EventInfo(channel_batch)
# .....................................................................
# Otherwise we go through the data channel by channel
# and define an event by it's timestamp and coincidence window
else:
# grab the next channel data first to make sure we're not
# at the end of the file
channel_data_generator = self.channelByChannel()
try:
first_cd = next(channel_data_generator)
except StopIteration:
return None
# There's probably a smart walrus operator way to do this
channels_in_event: List[ChannelData] = []
cd = first_cd
if self.rebuild_events_with_window is None:
self.rebuild_events_with_window = 0
while True:
# channel_timestamp = cd.timestamp if cd.timestamp else cd.event_timestamp
# check to make sure the timestamp is in the event's coincidence window
if cd.channel_timestamp <= (first_cd.channel_timestamp + self.rebuild_events_with_window):
channels_in_event.append(cd)
else:
break
try:
cd = next(channel_data_generator)
except StopIteration:
break
return EventInfo(channels_in_event)
# _________________________________________________________________________
[docs]
def lazyLoad(self) -> Generator[EventInfo, Any, None]:
"""
Lazily yield events, returns the next event in a generator fashion for iterating
"""
# The while will be false if it's None, otherwise it's true
while event_tuple := self.nextEvent():
yield event_tuple
# _________________________________________________________________________
def __iter__(self) -> Generator[EventInfo, Any, None]:
"""
Iterate over a lazy-loading of events
"""
return self.lazyLoad()
# _________________________________________________________________________
@property
def bytes_read(self) -> int:
"""
Returns the number of bytes parsed by this loader so far
"""
return self.file_handle.tell()