Pulse Matching: introduce caching mechanism

This commit is contained in:
Eric Teunis de Boone 2023-04-25 11:51:24 +02:00
parent a011aee28e
commit 4abb6997b8

View file

@ -6,6 +6,8 @@ from scipy import signal, interpolate, stats
import matplotlib.pyplot as plt
import numpy as np
from itertools import zip_longest
import h5py
from copy import deepcopy
try:
from tqdm import tqdm
@ -125,6 +127,33 @@ def trace_upsampler(trace, template_t, trace_t):
def trace_downsampler(trace, template_t, trace_t, offset):
pass
def read_time_residuals_cache(cache_fname, template_dt, antenna_dt, noise_sigma_factor, N=None):
try:
with h5py.File(cache_fname, 'r') as fp:
pgroup = fp['time_residuals']
pgroup2 = pgroup[f'{template_dt}_{antenna_dt}']
ds_name = str(noise_sigma_factor)
ds = pgroup2[ds_name]
if N is None:
return deepcopy(ds[:])
else:
return deepcopy(ds[:min(N, len(ds))])
except KeyError:
return np.array([])
def write_time_residuals_cache(cache_fname, time_residuals, template_dt, antenna_dt, noise_sigma_factor):
with h5py.File(cache_fname, 'a') as fp:
pgroup = fp.require_group('time_residuals')
pgroup2 = pgroup.require_group(f'{template_dt}_{antenna_dt}')
ds_name = str(noise_sigma_factor)
if ds_name in pgroup2.keys():
del pgroup2[ds_name]
ds = pgroup2.create_dataset(ds_name, (len(time_residuals)), dtype='f', data=time_residuals, maxshape=(None))
if __name__ == "__main__":
import os
@ -176,21 +205,30 @@ if __name__ == "__main__":
if True:
plt.close(fig)
#
# Find time accuracies as a function of signal strength
#
h5_cache_fname = f'11_pulsed_timing.hdf5'
time_accuracies = np.zeros(len(noise_factors))
for k, noise_sigma_factor in tqdm(enumerate(noise_factors)):
print() #separating tqdm
# Read in cached time residuals
cached_time_residuals = read_time_residuals_cache(h5_cache_fname, template.dt, antenna_dt, noise_sigma_factor)
#
# Find difference between true and templated times
#
time_residuals = np.zeros(N_residuals)
for j in tqdm(range(N_residuals)):
time_residuals = np.zeros(max(0, (N_residuals - len(cached_time_residuals))))
for j in tqdm(range(len(time_residuals))):
do_plots = j==0
# receive at antenna
## place the deltapeak signal at a random location
antenna = Waveform(None, dt=antenna_dt, name='Signal')
if not True: # Create antenna trace without template
if False: # Create antenna trace without template
antenna_true_signal, antenna_peak_sample = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna.dt, offset=[0.2, 0.8], rng=rng)
antenna.peak_sample = antenna_peak_sample
@ -371,16 +409,25 @@ if __name__ == "__main__":
plt.close(fig)
print()# separating tqdm
# Make a plot of the time residuals
# Were new time residuals calculated?
# Add them to the cache file
if len(time_residuals) > 1:
time_accuracies[k] = np.std(time_residuals)
# merge cached and calculated time residuals
time_residuals = np.concatenate((cached_time_residuals, time_residuals), axis=None)
write_time_residuals_cache(h5_cache_fname, time_residuals, template_dt, antenna_dt, noise_sigma_factor)
else:
time_residuals = cached_time_residuals
# Make a plot of the time residuals
if N_residuals > 1:
time_accuracies[k] = np.std(time_residuals[:N_residuals])
hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step')
fig, ax = plt.subplots()
ax.set_title(
"Template Correlation Lag finding"
+ f"\n template dt: {template.dt*1e3: .1e}ps"
+ f"; antenna dt: {antenna.dt: .1e}ns"
+ f"\n template dt: {template_dt*1e3: .1e}ps"
+ f"; antenna dt: {antenna_dt: .1e}ns"
+ f"; noise_factor: {noise_sigma_factor: .1e}"
)
ax.set_xlabel("Time Residual [ns]")
@ -428,7 +475,7 @@ if __name__ == "__main__":
ax.text( *(0.02, 0.95), text_str, fontsize=12, ha='left', va='top', transform=ax.transAxes)
fig.savefig("figures/11_time_residual_hist_{noise_sigma_factor: .1e}.pdf")
fig.savefig(f"figures/11_time_residual_hist_tdt{template_dt:0.1e}_n{noise_sigma_factor: .1e}.pdf")
if True:
plt.close(fig)
@ -436,10 +483,16 @@ if __name__ == "__main__":
# SNR time accuracy plot
if True:
fig, ax = plt.subplots()
ax.set_title("Template matching SNR vs time accuracy")
ax.set_title(f"Template matching SNR vs time accuracy")
ax.set_xlabel("Signal to Noise Factor")
ax.set_ylabel("Time Accuracy [ns]")
ax.legend(title="\n".join([
f"N={N_residuals}",
f"template_dt={template_dt:0.1e}ns",
f"antenna_dt={antenna_dt:0.1e}ns",
]))
if True:
ax.set_xscale('log')
ax.set_yscale('log')
@ -454,6 +507,6 @@ if __name__ == "__main__":
ax.grid()
fig.tight_layout()
fig.savefig("figures/11_time_res_vs_snr.pdf")
fig.savefig(f"figures/11_time_res_vs_snr_tdt{template_dt:0.1e}.pdf")
plt.show()