Source code for skutils.GretinaLoader

from collections import OrderedDict
import numpy as np
# import pandas as pd
import mmap
import bitstruct
import time
import sys

from .constants import *

from typing_extensions import deprecated

################################################################################
[docs] class EventMetadata: def __init__(self, packets, number): # make sure all packets have the same timestamp if not all(packets[0].timestamp == p.timestamp for p in packets): raise RuntimeError("invalid packet list. packets have different timestamps") # sort packets by channel - smallest first self.packets = sorted(packets, key = lambda p : (p.channel,p.wave_type) ) self.number = number self.channels = list(set([p.channel for p in packets])) self.start = min(p.start for p in self.packets) self.end = max(p.end for p in self.packets) self.size = self.end - self.start self.trace_packets = [p for p in self.packets if (p.wave_type == GebTypes.raw_waveform)] self.hist_packets = [p for p in self.packets if (p.wave_type == GebTypes.raw_histogram)] self.ps_packets = [p for p in self.packets if (p.wave_type == GebTypes.raw_pulse_summary)] self.timestamp = self.packets[0].timestamp self.version = self.packets[0].version self.module = self.packets[0].module self.signed = self.packets[0].signed self.bitdepth = self.packets[0].bitdepth self.wave_type = self.packets[0].wave_type self.wave_type_string = self.packets[0].wave_type_string if self.trace_packets: self.sample_offset = self.trace_packets[0].sample_offset self.number_samples = self.trace_packets[0].number_samples else: self.sample_offset = 0 self.number_samples = 0 # --------------------------------------------------------------------------
[docs] def as_dict(self): metadata = { 'sample_offset' : self.sample_offset, 'number_samples' : self.number_samples, 'event_number' : self.number, 'packets': self.packets, 'channels' : self.channels, 'timestamp' : self.timestamp, 'version_num' : self.version, 'module_num' : self.module, 'signed' : self.signed, 'bitdepth' : self.bitdepth, 'wave_types' : self.wave_type, 'wave_type_strings' : self.wave_type_string, } return metadata
# --------------------------------------------------------------------------
[docs] def load_arrays_from_file(self, fp, big_endian, wave_type): """Loads hists or traces for this event into a dataframe""" pos = fp.tell() fp.seek(self.start) event_buffer = fp.read(self.size) packets = [p for p in self.packets if (p.wave_type == wave_type)] raw_arrays = [p.extract_data_from_event_buffer(event_buffer, self.start, big_endian, wave_type) for p in packets] channels = [p.channel for p in packets] if len(raw_arrays) == 0: # event = pd.DataFrame() event = np.asarray([]) else: n_samples = packets[0].number_samples sample_offset = packets[0].sample_offset # NOTE: in testing this system. stacking or building the dataframe # usually took 0us, but occasionally took about 1000us. Never between the two # The reason for the inconsistency is unknown, but I'm assuming is more # OS/python related than this code -JM # start = time.time() # DEBUG # build a pandas dataframe. Rows are indices. Columns are channels event = np.stack(raw_arrays, axis=1) # index = np.arange(sample_offset, (sample_offset+n_samples)) # event = pd.DataFrame(stacked_data, columns=channels, index=index, dtype=stacked_data.dtype) # end = time.time() # DEBUG # print(f"stacked and DF built in {round(1e6*(end-start),0)}us") # DEBUG fp.seek(pos) return event
# --------------------------------------------------------------------------
[docs] def load_histograms_from_file(self, fp, big_endian): """Loads HIST data for this event into a dataframe""" return self.load_arrays_from_file(fp, big_endian, GebTypes.raw_histogram)
# --------------------------------------------------------------------------
[docs] def load_traces_from_file(self, fp, big_endian): """Loads TRACE data for this event into a dataframe""" return self.load_arrays_from_file(fp, big_endian, GebTypes.raw_waveform)
# --------------------------------------------------------------------------
[docs] def load_summaries_from_file(self, fp, big_endian): """Loads PULSER SUMMARY data for this event into a dataframe""" pos = fp.tell() fp.seek(self.start) packets = [p for p in self.packets if (p.wave_type == GebTypes.raw_pulse_summary)] summary_struct = PULSE_SUMMARY0_BE if big_endian else PULSE_SUMMARY0_LE summaries = [] for p in packets: fp.seek(p.data_start) raw = fp.read(p.data_length) summary = PulseSummary0(*summary_struct.unpack(raw)) summaries.append(summary._asdict()) fp.seek(pos) # df = pd.DataFrame.from_records(summaries, columns=PulseSummary0._fields) return summaries
################################################################################
[docs] class PacketMetadata: TRACE_DTYPES = [[np.dtype('<u2'),np.dtype('<i2')], # little endian: unsigned, signed [np.dtype('>u2'), np.dtype('>i2')]] # big endian: unsigned, signed HIST_DTYPES = [[np.dtype('<u4'),np.dtype('<i4')], # little endian: unsigned, signed [np.dtype('>u4'), np.dtype('>i4')]] # big endian: unsigned, signed PS_DTYPES = [[np.dtype('<u1')], # little endian: unsigned [np.dtype('>u1')]] # big endian: unsigned ALL_DTYPES = {GebTypes.raw_waveform : TRACE_DTYPES, GebTypes.raw_histogram : HIST_DTYPES, GebTypes.raw_pulse_summary : PS_DTYPES} def __init__(self, packet_header, word1, word2, word3, start): self.start = start self.wave_type = packet_header.type self.length = packet_header.length self.timestamp = packet_header.timestamp self.version = word1.version self.module = word1.module self.signed = word1.signed if (self.wave_type != GebTypes.raw_pulse_summary) else False self.channel = word1.channel self.pre_payload_size = 0 self.pre_payload_size += GEB_HEADER_SIZE self.pre_payload_size += SKUTEK_WORD_SIZE if word2: self.bitdepth = word2.bitdepth + 1 # need to add one to bitdepth per DDC standard self.number_samples = word2.number_samples self.pre_payload_size += SKUTEK_WORD_SIZE else: self.bitdepth = -1 self.number_samples = self.length - self.pre_payload_size if word3: self.sample_offset = word3.sample_offset self.pre_payload_size += SKUTEK_WORD_SIZE else: self.sample_offset = 0 # Calculated values self.wave_type_string = GebTypes.WAVE_TYPE_STRINGS.get(self.wave_type, "unknown") # "unknown" if unrecognized type self.full_length = packet_header.length + GEB_HEADER_SIZE self.sample_size = self.ALL_DTYPES[self.wave_type][0][0].itemsize if (self.wave_type in self.ALL_DTYPES) else 1 # get size of each sample. Default to 1 Byte if self.wave_type == GebTypes.raw_pulse_summary: self.data_length = PULSE_SUMMARY0_LE.size else: self.data_length = self.number_samples * self.sample_size self.data_start = self.start + self.pre_payload_size self.end = self.start + self.full_length self.null_padding_size = self.full_length - (self.pre_payload_size + self.data_length) # --------------------------------------------------------------------------
[docs] def extract_data_from_event_buffer(self, buf, event_start, big_endian, wave_type=GebTypes.raw_waveform): """pulls the packet data from the byte array of event data""" wave_start = self.data_start - event_start wave_end = wave_start + self.data_length # finally read out the data and cast the type required dtype = self.ALL_DTYPES[wave_type][big_endian][self.signed] data = np.frombuffer(buf[wave_start:wave_end], dtype=dtype) return data
# -------------------------------------------------------------------------- def __str__(self): return f"PacketMetadataCh{self.channel}" def __repr__(self): return str(self)
################################################################################
[docs] @deprecated("This class is deprecated, it will be removed in a future update, please use LegacyGretinaLoader for future support or switch to WrappedGretinaLoader") class GretinaLoader: """Object to load in event data from Skutek Gretina formats. unrecognized packet types will be ignored UNTESTED WITH BIG ENDIAN DATA Attributes: filename(str): filename being loaded memmap(bool): whether or not the whole file was loaded into memory. cache_event_metadata(bool): if True, then cache the positions of events as they are loaded. This makes traversing backwards through the file much faster. default is True fp(IoBuffer): open file or memory mapped file object big_endian(bool): whether or the not the file is big endian or small endian. defaults to system byte order, but is updated as soon as an endian packet is parsed None if no ascii version packet is parsed ascii_header(list): list of ascii version data. Typically [ddc_apps_version, git_branch_name, firmware_revision] empty list `[]` if no ascii version packet is parsed event_number(int): index of most recently parsed event packet_parser_functions(dict): dictionary of functions to parse packet contents. wave and histogram packets are parsed independently of this list. Functions are called with arguments (file_pointer, packet_start, packet_end) and are expected not to advance the file position. """ # -------------------------------------------------------------------------- def __init__(self, filename, memmap=False, cache_event_metadata=True): """Instantiates the Gretina File Loader Args: filename(str): filename to load in memmap(bool): whether or not to load the file into memory. Can be much faster especially when seeking events, but at cost of extra memory cache_event_metadata(bool): if True, then cache the positions of events as they are loaded. This makes traversing backwards through the file much faster. default is True """ self.filename = filename self.memmap = None # assigned in __make_memmap() self.cache_event_metadata = cache_event_metadata self.cache = {} self.fp = open(filename, 'rb') # assumed to be true by default, but will be updated to reflect the # contents of endian indicator packets self.big_endian = (sys.byteorder == 'big') # updated if/when endian packet is parsed self.ascii_header = [] # assigned if/when ascii version packet is parsed self.event_number = None # assigned in __reset() self.packet_parser_functions = { GebTypes.general_ascii: self.__parse_version_packet, GebTypes.version_info_ascii: self.__parse_version_packet, GebTypes.endian_indicator: self.__parse_endian_packet, GebTypes.endian_indicator_nonnative: self.__parse_endian_packet, } # if we want to memory map this file, then we'll replace the file IOBuffer # with a memory mapped I/O buffer. This will load the file into memory # but we can still use standard file I/O operations if memmap: self.__make_memmap() # reset event number and file position self.__reset() # peek at the first packet. This will usually load endian and version info self.peek_at_next_packet_metadata() # -------------------------------------------------------------------------- def __reset(self): """resets file position and event number""" self.fp.seek(0) self.event_number = 0 # -------------------------------------------------------------------------- def __make_memmap(self): """sets up the memory mapped I/O buffer""" self.memmap = True # 2nd arg as 0 loads the whole file into memory mmap_fp = mmap.mmap(self.fp.fileno(), 0, access=mmap.ACCESS_READ) self.old_fp = self.fp # self.fp.close() # close the file on disk self.fp = mmap_fp # replace the file pointer with the mmap # -------------------------------------------------------------------------- def __parse_nondata_packet(self, packet_header, start, end): """parses non-data packets. calls the contents of self.packet_parser_functions""" func = self.packet_parser_functions.get(packet_header.type, None) if func: func(self.fp, start, end) else: if packet_header.type == GebTypes.raw_histogram: print(f"currently unable to parse histograms") elif packet_header.type == GebTypes.raw_pulse_summary: print(f"currently unable to parse pulse summaries") else: print(f"unable to parse packet type {hex(packet_header.type)}") # -------------------------------------------------------------------------- def __parse_endian_packet(self, fp, start, end): """parses the endian packet type and updates the self.endian attribute""" pos = fp.tell() fp.seek(start) raw_geb = self.fp.read(GEB_HEADER_SIZE) # read the endian indicator # parse as little endian and check if the timestamp is correct packet_header = GebPacketHeader( *GEB_HEADER_LE.unpack(raw_geb) ) if packet_header.timestamp == ENDIAN_INDICATOR_TIMESTAMP: self.big_endian = False else: self.big_endian = True fp.seek(pos) # -------------------------------------------------------------------------- def __parse_version_packet(self, fp, start, end): """parses version_info_ascii packets and updates the version info attributes""" pos = fp.tell() fp.seek(start) raw_geb = self.fp.read(GEB_HEADER_SIZE) # read the geb header at the start of the packet header_type = GEB_HEADER_BE if self.big_endian else GEB_HEADER_LE packet_header = GebPacketHeader( *header_type.unpack(raw_geb) ) try: # read and decode the skutek word which contains the length of the string # unused currently, but here for completeness raw_length = fp.read(SKUTEK_WORD_SIZE) strlength = int.from_bytes(raw_length, ['little','big'][self.big_endian]) # read the ascii version data raw = fp.read(packet_header.length - SKUTEK_WORD_SIZE) raw = raw[:strlength] # remove padding text = raw.decode('utf-8', errors='ignore').splitlines() self.ascii_header.extend(text) except IndexError: print(f"unable to parse ascii version packet at file index={start}") fp.seek(pos) # --------------------------------------------------------------------------
[docs] def peek_at_next_packet_metadata(self): """returns the metadata for the next data packet in the file without advancing the file position. Returns None if the end of the file is hit. If it encounters non-data packets during its peeking, then it will pass positional information to whatever function is defined for that packet in `packet_parser_functions` Args: None Returns: PacketMetadata: metadata and info about the next data packet. This includes GEB Headers, Skutek words, and position information in the file. OR None if we've reached the end of the file """ pos = self.fp.tell() raw_geb = self.fp.read(GEB_HEADER_SIZE) # Check if we've reached EOF, and return None if true if len(raw_geb) != GEB_HEADER_SIZE: self.fp.seek(pos) return None # read the geb header at the start of the packet header_type = GEB_HEADER_BE if self.big_endian else GEB_HEADER_LE packet_header = GebPacketHeader( *header_type.unpack(raw_geb) ) # NON-DATA PACKET PARSING # If this isn't a data packet, then parse the contents in a separate function # and continue onto the next data packet if (packet_header.type not in (GebTypes.raw_waveform, GebTypes.raw_histogram, GebTypes.raw_pulse_summary)): # run the function meant to handle this packet type. assuming it # won't change file position end = pos + GEB_HEADER_SIZE + packet_header.length self.__parse_nondata_packet(packet_header, pos, end) # meanwhile advance to the start of next packet and return the next data packet self.fp.seek(end) return self.peek_at_next_packet_metadata() elif packet_header.type == GebTypes.raw_pulse_summary: word1_raw = self.fp.read(SKUTEK_WORD_SIZE) word1_raw = bitstruct.byteswap('4', word1_raw) skutek_word1 = SkutekWord1( *SKUTEK_WORD1.unpack(word1_raw) ) # no other words in pulse summary skutek_word2 = None skutek_word3 = None else: # DATA PACKET PARSING # this is a data packet, we continue to read the skutek words # and then the data. raw_skutek = self.fp.read(2*SKUTEK_WORD_SIZE) # Check if we've reached EOF, and return None if true if len(raw_skutek) != 2*SKUTEK_WORD_SIZE: self.fp.seek(pos) return None # read the two skutek-defined metadata words # skutek words are always big endian? word1_raw = bitstruct.byteswap('4', raw_skutek[0:4]) word2_raw = bitstruct.byteswap('4', raw_skutek[4:8]) skutek_word1 = SkutekWord1( *SKUTEK_WORD1.unpack(word1_raw) ) skutek_word2 = SkutekWord2( *SKUTEK_WORD2.unpack(word2_raw) ) # verison 1 packets have an extra word if skutek_word1.version == 1: tmp = self.fp.read(SKUTEK_WORD_SIZE) if len(tmp) != SKUTEK_WORD_SIZE: self.fp.seek(pos) return None word3_raw = bitstruct.byteswap('4', tmp) skutek_word3 = SkutekWord3( *SKUTEK_WORD3.unpack(word3_raw) ) else: skutek_word3 = None packet = PacketMetadata(packet_header, skutek_word1, skutek_word2, skutek_word3, pos) self.fp.seek(pos) return packet
# --------------------------------------------------------------------------
[docs] def peek_at_next_event_metadata(self): """Finds all packets associated with the next event and returns metadata about the next event which can be used to read it. Does not advance file position returns None if there are no more events in the file """ # If the metadata is already in the cache, then don't bother parsing # the file if self.cache_event_metadata: if self.event_number in self.cache: return self.cache[self.event_number] pos = self.fp.tell() # parse the first packet and use it's timestamp to identify event association packet1 = self.peek_at_next_packet_metadata() # If there are no more packets, then we are at the end of the file and # out of events. So we return None if packet1 is None: self.fp.seek(pos) return None packets = [packet1] self.fp.seek(packet1.end) while True: next_packet = self.peek_at_next_packet_metadata() # If there are no more packets, then we are at the end of the file # and thus also the event of this event if next_packet is None: break # If true, then we are in the same event and can advance to the next packet if packet1.timestamp == next_packet.timestamp: packets.append(next_packet) self.fp.seek(next_packet.end) else: break event_metadata = EventMetadata(packets, self.event_number) if self.cache_event_metadata: self.cache[self.event_number] = event_metadata self.fp.seek(pos) return event_metadata
# --------------------------------------------------------------------------
[docs] def seek_event(self, desired_event): """goes to the specified event in the file Args: desired_event(int): the desired event number Returns: None Raises: RuntimeError if event does not exist in this file """ pos = self.fp.tell() # only used for failure conditions event_num = self.event_number # only used for failure conditions # if the next event is the desired event, then we don't have to do anything if desired_event == self.event_number: return if self.cache_event_metadata: # Check if this event already exists in cache. load it if exists if desired_event in self.cache: self.fp.seek(self.cache[desired_event].start) self.event_number = desired_event return # If we are caching, but haven't cached across the one we want # then seek to the closest cached event prior to the desired event else: # gambling that descending order will be slightly faster for most applications for num in sorted(self.cache.keys(), reverse=True): if (desired_event-num) > 0: self.seek_event(num) break # if we are past the desired event, then reset file location to zero if desired_event < self.event_number: self.__reset() # iterate through the file event by event until we get to the desired one while self.event_number < desired_event: event_metadata = self.peek_at_next_event_metadata() self.fp.seek(event_metadata.end) self.event_number += 1 # if we hit the end of the file and still haven't found the event # then throw a runtime error if event_metadata is None: # reset position so other operations aren't affected if this # error is ignored by calling process self.fp.seek(pos) self.event_number = event_num raise RuntimeError(f"event {desired_event} does not exist")
# --------------------------------------------------------------------------
[docs] def fetch_event(self, desired_event): """fetches the specified event in the file. Does not advance event number or file position Returns: Tuple: dict: dictionary of metadata about the packet. keys are 'packets', 'channels', 'wave_type_string', 'timestamp', 'event_number' pandas.DataFrame: The event data. Columns are channels (smallest first), row is the index of the event sample Raises: RuntimeError if event does not exist in this file """ pos = self.fp.tell() event_num = self.event_number self.seek_event(desired_event) metadata, event = self.next_event() self.fp.seek(pos) self.event_number = event_num return metadata, event
# --------------------------------------------------------------------------
[docs] def next_event(self): """Reads next event from the current file location and returns a metadata dictionary and a pandas dataframe of the event data. This will advance the location in the file. Events are defined as a series of contiguous packets with the same timestamp Warning: if end of file has been reached, then this will return (None, None) Returns: Tuple: dict: dictionary of metadata about the packet. keys are 'packets', 'channels', 'wave_type_string', 'timestamp', 'event_number'. OR None if EOF pandas.DataFrame: The event data. Columns are channels (smallest first), row is the index of the event sample. OR None if EOF """ event_metadata = self.peek_at_next_event_metadata() # we've reached the end of the file. Return None in this case if event_metadata is None: return None,None event = event_metadata.load_traces_from_file(self.fp, self.big_endian) self.event_number += 1 self.fp.seek(event_metadata.end) metadata = event_metadata.as_dict() metadata['histograms'] = event_metadata.load_histograms_from_file(self.fp, self.big_endian) metadata['summaries'] = event_metadata.load_summaries_from_file(self.fp, self.big_endian) return metadata, event
# --------------------------------------------------------------------------
[docs] def load_and_sort_all_events(self): """Retrieves a list of all events sorted by timestamp. Unlike fetch_event and seek_event, all packets with the same timestamp are defined as an event even if the packets are non-contiguous in the file. Regardless of instantiation parameters, this function will enable memmap to seek the process up Returns: tuple: list of dicts: metadata for all events, ordered by timestamp (smallest first) list of DataFrames: data for all events, ordered by timestamp (smallest first) """ # force memory mapping mode. No matter what this data will be loaded # into memory, so we aren't gaining anything with memmap=False if not self.memmap: self.__make_memmap() # return to the beginning of the file self.__reset() # Assume that timestamps are mostly in order from smallest-greatest # we can use an ordered dictionary to retain timestamp keys that are # already mostly or already sorted all_packets = OrderedDict() # iterate through all packets and add them to the timestamp dictionary packet = self.peek_at_next_packet_metadata() while packet: if packet.timestamp in all_packets: all_packets[packet.timestamp].append(packet) else: all_packets[packet.timestamp] = [packet] self.fp.seek(packet.end) packet = self.peek_at_next_packet_metadata() # build list of all events sorted by timestamp event_metadata = [] for i,t in enumerate( sorted(all_packets) ): p = all_packets[t] event_metadata.append( EventMetadata(p,i) ) # generate metadata dictionaries metadata_dicts = [m.as_dict() for m in event_metadata] # metadata_dicts = [] # for em in event_metadata: # em_dict = em.as_dict() # # add histograms if they exist to the dict # em_dict['hists'] = em.load_histograms_from_file(self.fp, self.big_endian) # metadata_dicts.append(em_dict) # generate event data events = [m.load_traces_from_file(self.fp, self.big_endian) for m in event_metadata] return metadata_dicts, events
# --------------------------------------------------------------------------
[docs] def calculate_number_of_events(self): # if we're already at the end of the file, we can just fetch the event_number + 1 pos = self.fp.tell() ev_num = self.event_number ev = self.peek_at_next_event_metadata() if ev is None: return self.event_number + 1 # otherwise we have to iterate through all event headers until we find the last one while True: next_ev = self.peek_at_next_event_metadata() if next_ev is None: break self.event_number += 1 self.fp.seek(next_ev.end) ev = next_ev num_events = ev.number + 1 # because events are 0-based indexing self.fp.seek(pos) # reset file back to old state self.event_number = ev_num return num_events
# --------------------------------------------------------------------------
[docs] def reopen(self): if self.fp.closed: self.fp = open(self.filename, 'rb')
# --------------------------------------------------------------------------
[docs] def close(self): if getattr(self, "old_fp", None) is not None: if not self.old_fp.closed: self.old_fp.close() self.fp.close()
# -------------------------------------------------------------------------- @property def next_event_num(self): return self.event_number + 1 # -------------------------------------------------------------------------- def __del__(self): if hasattr(self,"fp"): self.fp.close() # -------------------------------------------------------------------------- def __iter__(self): metadata, event = self.next_event() while event is not None: yield metadata, event # event will become None if EOF metadata, event = self.next_event() # -------------------------------------------------------------------------- def __enter__(self): return self # -------------------------------------------------------------------------- def __exit__(self, exc_type, exc_value, exc_tb): self.close()