from typing import (
Any,
Dict,
Generator,
List,
Optional,
Sequence,
Tuple,
IO,
Union,
)
import re
import io
import numpy as np
from collections import OrderedDict
from .BaseLoader import BaseLoader,ChannelData
[docs]
class IGORWaveLoader(BaseLoader):
"""
A loader for the IGOR wave format type, this is an event type format and will consistently have events correctly built so long as the orignial event was made.
"""
__BYTES_TO_BUFFER = 1024
def __init__(self, fpath: str, rebuild_events_with_window: Optional[int] = None):
self.file_handle: IO[str] = open(fpath)
# First line starts with IGOR
super().__init__(fpath, rebuild_events_with_window)
self.buffer = io.StringIO()
self.event_group_regex = re.compile(r"(?:X\sevt_num.*?X\sProcessOneEvent.*?\))", flags=re.DOTALL | re.IGNORECASE)
self.comment_regex = re.compile(r"X\s//.*\n", flags=re.IGNORECASE)
self.timestamp_regex = re.compile(r"X\stimestamp\s*=\s*(?P<timestamp>[0-9-]{1,})\n",
flags=re.IGNORECASE)
self.evt_num_regex = re.compile(r"X\sevt_num\s*=\s*(?P<evt_num>[0-9-]{1,})\n",
flags=re.IGNORECASE)
self.ps_wave_regex = re.compile(r"WAVES/o/D\/N=\((?P<num_quantities>\d*),(?P<num_channels>\d*)\)\spulse_summaries\sBEGIN\n(?P<data>.*?)\nEND",
flags=re.IGNORECASE | re.DOTALL)
self.ps_dim_label_regex = re.compile(r"X\sSetDimLabel\s*(?P<igor_axis>\d)\s*,\s*(?P<quantity_index>\d)\s*,\s*'(?P<quantity_label>.*?)'\s*,\s*pulse_summaries",
flags=re.IGNORECASE | re.MULTILINE)
self.waveform_wave_regex = re.compile(r"WAVES/o/D\s(?P<channel_names>.*?)\nBEGIN\n(?P<data>.*?)\sEND",
flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
# _________________________________________________________________________
def __readNextEventTextBlockWithBuffering(self):
# import time; lap_start_t = time.monotonic(); init_start_t = lap_start_t; idx = 0
# .....................................................................
# Buffer until we detect that the entire content of an event if in memory
event_text = ""
# idx += 1; print(f"==== loop start {idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic()
while True:
tmp_buf = self.file_handle.read(self.__BYTES_TO_BUFFER)
# print(f"reading : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic()
bytes_read = len(tmp_buf)
self.buffer.write(tmp_buf)
# print(f"writing to buffer : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic()
if "ProcessOneEvent" in self.buffer.getvalue():
event_match = re.search(self.event_group_regex, self.buffer.getvalue())
# print(f"event search : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic()
if event_match:
# grab block of text that represents an event in the IGOR format
event_text = event_match.group(0)
# roll the remaining data into a new buffer
self.buffer.seek( event_match.end(0) )
remaining_data = self.buffer.read()
self.buffer = io.StringIO()
self.buffer.write(remaining_data)
# print(f"rolling over buffer : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic()
break
if bytes_read == 0:
break
# idx += 1; print(f"==== loop end {idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic()
# print(f"Total Time is {1000*(time.monotonic()-init_start_t):.2f}ms");
return event_text
# _________________________________________________________________________
[docs]
def loadChannelBatch(self) -> Optional[Sequence[ChannelData]]:
# import time; lap_start_t = time.monotonic(); init_start_t = lap_start_t; idx = 0
# .....................................................................
# Get the next block of Event Text
event_text = self.__readNextEventTextBlockWithBuffering()
if event_text == "":
return None
# remove comments
event_text = re.sub(self.comment_regex, "", event_text)
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
# .....................................................................
# timestamp parsing
timestamp_match = re.search(self.timestamp_regex, event_text)
if timestamp_match:
# use the last timestamp specified in the event text. Should only ever be 1
timestamp = int( timestamp_match.group('timestamp') )
else:
timestamp = -1
# .....................................................................
# evt_num parsing
evt_num_match = re.search(self.evt_num_regex, event_text)
if evt_num_match:
# use the last evt_num specified in the event text. Should only ever be 1
evt_num = int( evt_num_match.group('evt_num') )
else:
evt_num = -1
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
# .....................................................................
# Pulse Summary Parsing
all_ps_dicts = []
ps_data_match = re.search(self.ps_wave_regex, event_text)
ps_dim_matches = [m for m in re.finditer(self.ps_dim_label_regex, event_text)]
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
if ps_data_match and ps_dim_matches:
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# The following block converts the pulse summary WAVE into a numpy array
num_quantities = int(ps_data_match.group('num_quantities'))
num_channels = int(ps_data_match.group('num_channels'))
raw_ps_data = ps_data_match.group('data')
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
tmp_array = np.fromstring(raw_ps_data, dtype=np.int64, sep=" ")
tmp_array = tmp_array.reshape( (num_quantities,num_channels) )
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# The following block decodes which column/row contains the pulse summary data
# that we need. In IGOR this is specified by the "SetDimLabel" command
# create a dictionary storing pulse summary information for each column (ie channel) in the event
all_ps_dicts = [{} for i in range(tmp_array.shape[1])]
# iterate through every single line in this event that contains "SetDimLabel"
for dim_components in ps_dim_matches:
igor_axis = int( dim_components.group('igor_axis') )
quantity_index = int( dim_components.group('quantity_index') )
quantity_label = dim_components.group('quantity_label').lower()
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
# if the SetDimLabel axus is 0, then we are specifying the name and index
# of the quantity in the pulse summary.
if igor_axis == 0:
for col_idx in range(tmp_array.shape[1]):
all_ps_dicts[col_idx][quantity_label] = int(tmp_array[quantity_index,col_idx])
# otherwise the SetDimLabel row is referring to the experimenter-defined name of
# the column itself
else:
col_idx = quantity_index
all_ps_dicts[col_idx]['channel_name'] = quantity_label
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
ps_by_channel : Dict[int,Dict] = {int(ps['channel']):ps for ps in all_ps_dicts}
# Not used at this time. For future where we can specify name of channel in FemtoDAQ GUI
names_to_channels : Dict[str,int] = {ps['channel_name']:ps['channel'] for ps in all_ps_dicts} # type: ignore
# .....................................................................
# Waveform Parsing
wave_by_channel = {}
wavedata = None
waveform_match = re.search(self.waveform_wave_regex, event_text)
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
if waveform_match:
channel_names = waveform_match.group('channel_names').split(',')
raw_wavedata = waveform_match.group('data')
# NOTE: In future we may want to replace this with a lookup using `names_to_channels`
wavedata_channels = [int(re.sub(r"\D","",name)) for name in channel_names]
wavedata = np.fromstring(raw_wavedata, dtype=np.int16, sep=" ")
wavedata = wavedata.reshape((-1, len(wavedata_channels)))
wave_by_channel = {ch:wavedata[:,i] for i,ch in enumerate(wavedata_channels)}
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
# .....................................................................
# Build the list of Channels and return
ret = None
all_channels = set(wave_by_channel.keys()).union(ps_by_channel.keys())
if all_channels:
channel_data_batch: List[ChannelData] = []
for channel in all_channels:
waveform = wave_by_channel.get(channel, None)
summary = ps_by_channel.get(channel, None)
channel_data_batch.append( ChannelData(channel, timestamp, summary, waveform, other_data={'evt_num':evt_num}) )
ret = channel_data_batch
# # print(f"{idx} : {1000*(time.monotonic()-lap_start_t):.1f}ms"); lap_start_t = time.monotonic(); idx += 1
# print(f"Total Time is {1000*(time.monotonic()-init_start_t):.2f}ms"); \
return ret