mirror of
https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
synced 2025-01-03 00:33:33 +01:00
810 lines
29 KiB
Python
Executable file
810 lines
29 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
|
|
from lib import util
|
|
from scipy import signal, interpolate, stats
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from itertools import zip_longest
|
|
import h5py
|
|
from copy import deepcopy
|
|
|
|
try:
|
|
from itertools import pairwise
|
|
except: # pairwise only introduced since python 3.10
|
|
from itertools import tee
|
|
def pairwise(iterable):
|
|
# pairwise('ABCDEFG') --> AB BC CD DE EF FG
|
|
a, b = tee(iterable)
|
|
next(b, None)
|
|
return zip(a, b)
|
|
|
|
try:
|
|
from tqdm import tqdm
|
|
except:
|
|
tqdm = lambda x: x
|
|
|
|
rng = np.random.default_rng()
|
|
|
|
class Waveform:
|
|
name = None
|
|
signal = None
|
|
dt = None
|
|
|
|
_t = None
|
|
|
|
def __init__(self,signal=None, dt=None, t=None, name=None):
|
|
self.signal = signal
|
|
self.name = name
|
|
|
|
if t is not None:
|
|
assert len(t) == len(signal)
|
|
|
|
self._t = t
|
|
self.dt = t[1] - t[0]
|
|
elif dt is not None:
|
|
self.dt = dt
|
|
|
|
# Lazy evaluation of time
|
|
@property
|
|
def t(self):
|
|
if self._t is None:
|
|
return self.dt * np.arange(0, len(self.signal))
|
|
return self._t
|
|
|
|
@t.setter
|
|
def t(self, value):
|
|
self._t = value
|
|
|
|
@t.deleter
|
|
def t(self):
|
|
del self._t
|
|
|
|
def __len__():
|
|
return len(self.signal)
|
|
|
|
def white_noise_realisation(N_samples, noise_sigma=1, rng=rng, normalise=False):
|
|
noise = rng.normal(0, noise_sigma or 0, size=N_samples)
|
|
|
|
if normalise:
|
|
noise /= max(noise)
|
|
|
|
return noise
|
|
|
|
def antenna_bp(trace, low_bp, high_bp, dt, order=3):
|
|
fs = 1/dt
|
|
|
|
bp_filter = signal.butter(order, [low_bp, high_bp], 'band', fs=fs, output='sos')
|
|
bandpassed = signal.sosfilt(bp_filter, trace)
|
|
|
|
return bandpassed
|
|
|
|
def my_correlation(in1, template, lags=None):
|
|
template_length = len(template)
|
|
in1_length = len(in1)
|
|
|
|
if lags is None:
|
|
lags = np.arange(-template_length+1, in1_length + 1)
|
|
|
|
# do the correlation jig
|
|
corrs = np.zeros_like(lags, dtype=float)
|
|
for i, l in enumerate(lags):
|
|
if l <= 0: # shorten template at the front
|
|
in1_start = 0
|
|
template_end = template_length
|
|
|
|
template_start = -template_length - l
|
|
in1_end = max(0, min(in1_length, -template_start)) # 0 =< l + template_length =< in1_lengt
|
|
|
|
elif l > in1_length - template_length:
|
|
# shorten template from the back
|
|
in1_end = in1_length
|
|
template_start = 0
|
|
|
|
in1_start = min(l, in1_length)
|
|
template_end = max(0, in1_length - l)
|
|
|
|
else:
|
|
in1_start = min(l, in1_length)
|
|
in1_end = min(in1_start + template_length, in1_length)
|
|
|
|
# full template
|
|
template_start = 0
|
|
template_end = template_length
|
|
|
|
# Slice in1 and template
|
|
in1_slice = in1[in1_start:in1_end]
|
|
template_slice = template[template_start:template_end]
|
|
|
|
corrs[i] = np.dot(in1_slice, template_slice)
|
|
|
|
return corrs, (in1, template, lags)
|
|
|
|
def trace_upsampler(trace, template_t, trace_t):
|
|
if not hasattr(template_t, '__len__'):
|
|
template_dt = template_t
|
|
else:
|
|
template_dt = template_t[1] - template_t[0]
|
|
|
|
trace_dt = trace_t[1] - trace_t[0]
|
|
|
|
upsample_factor = trace_dt/template_dt
|
|
upsampled_trace_N = np.ceil(len(trace) * upsample_factor)
|
|
|
|
upsample_factor = int(upsample_factor)
|
|
upsampled_trace_N = int(upsampled_trace_N)
|
|
|
|
# upsample trace
|
|
upsampled_trace = np.zeros(upsampled_trace_N)
|
|
upsampled_trace[::upsample_factor] = trace
|
|
|
|
#upsampled_t = np.arange(trace_t[0], trace_t[-1], template_dt)
|
|
upsampled_t = template_dt * np.arange(len(upsampled_trace)) + trace_t[0]
|
|
if False:
|
|
upsampled_t = np.linspace(0, template_dt*len(upsampled_trace), len(upsampled_trace), endpoint=False) + trace_t[0]
|
|
|
|
return upsampled_trace, upsampled_t
|
|
|
|
def trace_downsampler(trace, template_t, trace_t, offset):
|
|
pass
|
|
|
|
|
|
def hilbert_envelope_max_amplitude_time(trace, trace_t, do_plot=False, fname_distinguish=None):
|
|
analytic_signal = signal.hilbert(trace)
|
|
envelope = abs(analytic_signal)
|
|
|
|
max_idx = np.argmax(envelope)
|
|
|
|
t_max = trace_t[max_idx]
|
|
|
|
# make a plot
|
|
if do_plot:
|
|
fig, ax = plt.subplots()
|
|
|
|
ax.set_xlabel("Time [ns]")
|
|
ax.set_ylabel("Amplitude")
|
|
|
|
ax.plot(trace_t, trace, label='Waveform')
|
|
ax.plot(trace_t, envelope, ls='dashed', label='Envelope')
|
|
|
|
# indicate maximum and t_max
|
|
ax.axhline(envelope[max_idx], ls='dotted', color='g')
|
|
ax.axvline(t_max, ls='dotted', color='g')
|
|
|
|
if True:
|
|
ax.legend()
|
|
|
|
fig.tight_layout()
|
|
fig.savefig(f'figures/11_hilbert_timing{fname_distinguish}.pdf')
|
|
|
|
plt.close(fig)
|
|
|
|
return t_max, (analytic_signal, max_idx)
|
|
|
|
|
|
def read_time_residuals_cache(cache_fname, template_dt, antenna_dt, snr_sigma_factor, N=None):
|
|
try:
|
|
with h5py.File(cache_fname, 'r') as fp:
|
|
pgroup = fp['time_residuals']
|
|
pgroup2 = pgroup[f'{template_dt}_{antenna_dt}']
|
|
|
|
ds_name = str(snr_sigma_factor)
|
|
|
|
ds = pgroup2[ds_name]
|
|
if N is None:
|
|
ret = deepcopy(ds[:])
|
|
else:
|
|
ret = deepcopy(ds[:min(N, len(ds))])
|
|
|
|
if len(ret.shape) > 2:
|
|
return ret[0,:], ret[1,:], ret[2,:]
|
|
elif len(ret.shape) > 1:
|
|
return ret[0,:], ret[1,:], np.array([np.nan]*len(ret))
|
|
else:
|
|
return ret[:], np.array([np.nan]*len(ret)), np.array([np.nan]*len(ret))
|
|
except (KeyError, FileNotFoundError):
|
|
return np.array([]), np.array([]), np.array([])
|
|
|
|
def write_time_residuals_cache(cache_fname, data, template_dt, antenna_dt, noise_sigma_factor):
|
|
with h5py.File(cache_fname, 'a') as fp:
|
|
pgroup = fp.require_group('time_residuals')
|
|
pgroup2 = pgroup.require_group(f'{template_dt}_{antenna_dt}')
|
|
|
|
ds_name = str(noise_sigma_factor)
|
|
|
|
if ds_name in pgroup2.keys():
|
|
del pgroup2[ds_name]
|
|
|
|
ds = pgroup2.create_dataset(ds_name, (3, len(time_residuals)), dtype='f', maxshape=(None))
|
|
ds[0] = data[0]
|
|
ds[1] = data[1]
|
|
ds[2] = data[2]
|
|
|
|
def create_template(dt=1, timelength=1, bp_freq=(0, np.inf), name=None, normalise=False):
|
|
template = Waveform(None, dt=dt, name=name)
|
|
_deltapeak = util.deltapeak(timelength=timelength, samplerate=1/dt, offset=0)
|
|
template.signal = antenna_bp(_deltapeak[0], *bp_freq, dt)
|
|
template.peak_sample = _deltapeak[1]
|
|
template.peak_time = template.dt * template.peak_sample
|
|
|
|
if normalise:
|
|
template.signal /= max(template.signal)
|
|
|
|
return template, _deltapeak
|
|
|
|
def get_time_residuals_for_template(
|
|
N_residuals, template, interpolation_template=None,
|
|
antenna_dt=1, antenna_timelength=100,
|
|
snr_sigma_factor=10,bp_freq=(0,np.inf),
|
|
normalise_noise=False, h5_cache_fname=None, read_cache=True, write_cache=None,
|
|
rng=rng, tqdm=tqdm,
|
|
peak_window=[0.2, 0.8],
|
|
):
|
|
# Read in cached time residuals
|
|
if read_cache:
|
|
cached_time_residuals, cached_snrs, cached_hilbert_time_residuals = read_time_residuals_cache(h5_cache_fname, template.dt, antenna_dt, snr_sigma_factor)
|
|
|
|
else:
|
|
cached_time_residuals, cached_snrs, cached_hilbert_time_residuals = np.array([]), np.array([]), np.array([])
|
|
|
|
#
|
|
# Find difference between true and templated times
|
|
#
|
|
|
|
hilbert_interp_t_max, _ = hilbert_envelope_max_amplitude_time(interp_template.signal, interp_template.t)
|
|
|
|
time_residuals = np.zeros(max(0, (N_residuals - len(cached_time_residuals))))
|
|
snrs = np.zeros_like(time_residuals)
|
|
hilbert_time_residuals = np.zeros_like(time_residuals)
|
|
for j in tqdm(range(len(time_residuals))):
|
|
do_plots = j==0
|
|
|
|
# receive at antenna
|
|
## place the deltapeak signal at a random location
|
|
antenna = Waveform(None, dt=antenna_dt, name='Signal')
|
|
if interpolation_template is None: # Create antenna trace without interpolation template
|
|
antenna_true_signal, antenna_peak_sample = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna.dt, offset=peak_window, rng=rng)
|
|
|
|
antenna.peak_sample = antenna_peak_sample
|
|
antenna.peak_time = antenna.dt * antenna.peak_sample
|
|
antenna.signal = antenna_bp(antenna.signal, *bp_freq, antenna.dt)
|
|
print(f"Antenna Peak Time: {antenna.peak_time}")
|
|
print(f"Antenna Peak Sample: {antenna.peak_sample}")
|
|
|
|
else: # Sample the interpolation template at some offset
|
|
peak_window_length = peak_window[-1] - peak_window[0]
|
|
antenna.peak_time = antenna_timelength * (peak_window_length*rng.random(1) + peak_window[0])
|
|
sampling_offset = rng.random(1)*antenna.dt
|
|
|
|
antenna.t = util.sampled_time(1/antenna.dt, start=0, end=antenna_timelength)
|
|
|
|
# Sample the interpolation template
|
|
antenna.signal = interpolation_template.interpolate(antenna.t - antenna.peak_time)
|
|
|
|
antenna.peak_sample = antenna.peak_time/antenna.dt
|
|
antenna_true_signal = antenna.signal
|
|
|
|
true_time_offset = antenna.peak_time - template.peak_time
|
|
|
|
antenna.signal_level = np.max(antenna.signal)
|
|
|
|
if False: # flip polarisation
|
|
antenna.signal *= -1
|
|
|
|
## Add noise
|
|
noise_amplitude = max(template.signal) * 1/snr_sigma_factor
|
|
noise_realisation = noise_amplitude * white_noise_realisation(len(antenna.signal), normalise=normalise_noise)
|
|
filtered_noise = antenna_bp(noise_realisation, *bp_freq, antenna.dt)
|
|
|
|
antenna.signal += filtered_noise
|
|
|
|
antenna.noise_level = np.sqrt(np.mean(filtered_noise**2))
|
|
|
|
antenna.signal_to_noise = antenna.signal_level/antenna.noise_level
|
|
|
|
# Show signals
|
|
if do_plots:
|
|
fig, axs = plt.subplots(2, sharex=True)
|
|
if not hasattr(axs, '__len__'):
|
|
axs = [axs]
|
|
|
|
axs[0].set_title("Antenna Waveform")
|
|
axs[-1].set_xlabel("Time [ns]")
|
|
axs[0].set_ylabel("Amplitude")
|
|
l1 = axs[0].plot(antenna.t, antenna.signal, label='Filtered w/ noise', alpha=0.7)
|
|
l2 = axs[0].plot(antenna.t, antenna.signal - filtered_noise, label='Filtered w/o noise', alpha=0.7)
|
|
l3 = axs[0].plot(antenna.t, filtered_noise, label='Noise', alpha=0.7)
|
|
|
|
if True: # indicate signal and noise levels
|
|
level_kwargs = dict(ls='dashed', alpha=0.4)
|
|
|
|
axs[0].axhline(antenna.signal_level, color=l2[0].get_color(), **level_kwargs, label='Signal Level')
|
|
axs[0].axhline(antenna.noise_level, color=l3[0].get_color(), **level_kwargs, label='Noise Level')
|
|
|
|
|
|
axs[0].legend(title=f'SNR = {antenna.signal_to_noise:.1g}')
|
|
axs[0].grid()
|
|
|
|
if len(axs) > 1:
|
|
axs[1].set_title("Template")
|
|
axs[1].set_ylabel("Amplitude")
|
|
axs[1].plot(template.t, template.signal, label='orig')
|
|
axs[1].plot(template.t + true_time_offset, template.signal, label='true moved orig')
|
|
axs[1].legend()
|
|
axs[1].grid()
|
|
|
|
fig.tight_layout()
|
|
fig.savefig(f'figures/11_antenna_signals_tdt{template.dt:.1g}.pdf')
|
|
|
|
if True: # zoom
|
|
wx = 100
|
|
x0 = true_time_offset
|
|
|
|
old_xlims = axs[0].get_xlim()
|
|
axs[0].set_xlim( x0-wx, x0+wx)
|
|
fig.savefig(f'figures/11_antenna_signals_tdt{template.dt:.1g}_zoom.pdf')
|
|
|
|
# restore
|
|
axs[0].set_xlim(*old_xlims)
|
|
|
|
if True:
|
|
plt.close(fig)
|
|
|
|
axs2 = None
|
|
if True: # simple and dumb trace upsampling
|
|
upsampled_trace, upsampled_t = trace_upsampler(antenna.signal, template.t, antenna.t)
|
|
if do_plots: # Show upsampled traces
|
|
fig2, axs2 = plt.subplots(1, sharex=True)
|
|
if not hasattr(axs2, '__len__'):
|
|
axs2 = [axs2]
|
|
|
|
axs2[-1].set_xlabel("Time [ns]")
|
|
axs2[0].set_ylabel("Amplitude")
|
|
axs2[0].plot(antenna.t, antenna.signal, marker='o', label='orig')
|
|
axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled')
|
|
axs2[0].legend(loc='upper right')
|
|
|
|
fig2.savefig(f'figures/11_upsampled_tdt{template.dt:.1g}.pdf')
|
|
|
|
wx = 1e2
|
|
x0 = upsampled_t[0] + wx - 5
|
|
axs2[0].set_xlim(x0-wx, x0+wx)
|
|
fig2.savefig(f'figures/11_upsampled_tdt{template.dt:.1g}_zoom.pdf')
|
|
|
|
if True:
|
|
plt.close(fig2)
|
|
|
|
# determine correlations with arguments
|
|
lag_dt = upsampled_t[1] - upsampled_t[0]
|
|
corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template.signal)
|
|
|
|
# Determine best correlation time
|
|
idx = np.argmax(abs(corrs))
|
|
best_sample_lag = lags[idx]
|
|
best_time_lag = best_sample_lag * lag_dt
|
|
|
|
# Find Hilbert Envelope t0
|
|
hilbert_best_time_lag, _ = hilbert_envelope_max_amplitude_time(upsampled_trace, upsampled_t, do_plot=do_plots)
|
|
|
|
else: # downsampled template
|
|
raise NotImplementedError
|
|
|
|
corrs, (_, _, lags) = my_downsampling_correlation(antenna.signal, antenna.t, template.signal, template.t)
|
|
lag_dt = upsampled_t[1] - upsampled_t[0]
|
|
|
|
# Calculate the time residual
|
|
time_residuals[j] = best_time_lag - true_time_offset
|
|
snrs[j] = antenna.signal_to_noise
|
|
hilbert_time_residuals[j] = hilbert_best_time_lag - hilbert_interp_t_max
|
|
|
|
if not do_plots:
|
|
continue
|
|
|
|
if do_plots and axs2:
|
|
axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2)
|
|
axs2[-1].axvline(true_time_offset, color='g', alpha=0.5, linewidth=2)
|
|
|
|
# Show the final signals correlated
|
|
if do_plots:
|
|
# amplitude scaling required for single axis plotting
|
|
template_amp_scaler = max(abs(template.signal)) / max(abs(antenna.signal))
|
|
|
|
# start the figure
|
|
fig, axs = plt.subplots(2, sharex=True)
|
|
ylabel_kwargs = dict(
|
|
#rotation=0,
|
|
ha='right',
|
|
va='center'
|
|
)
|
|
axs[-1].set_xlabel("Time [ns]")
|
|
|
|
offset_list = [
|
|
[best_time_lag, dict(label=template.name, color='orange')],
|
|
[true_time_offset, dict(label='True offset', ls='dashed', color='green')],
|
|
]
|
|
|
|
# Signal
|
|
i=0
|
|
axs[i].set_ylabel("Amplitude", **ylabel_kwargs)
|
|
axs[i].plot(antenna.t, antenna.signal, label=antenna.name)
|
|
|
|
# Plot the template
|
|
for offset_args in offset_list:
|
|
this_kwargs = offset_args[1]
|
|
offset = offset_args[0]
|
|
|
|
l = axs[i].plot(offset + template.t, template_amp_scaler * template.signal, **this_kwargs)
|
|
|
|
axs[i].legend()
|
|
|
|
# Correlation
|
|
i=1
|
|
axs[i].set_ylabel("Correlation", **ylabel_kwargs)
|
|
axs[i].plot(lags * lag_dt, corrs)
|
|
|
|
# Lines across both axes
|
|
for offset_args in offset_list:
|
|
this_kwargs = offset_args[1]
|
|
offset = offset_args[0]
|
|
|
|
for i in [0,1]:
|
|
axs[i].axvline(offset, ls='--', color=this_kwargs['color'], alpha=0.7)
|
|
|
|
axs[0].axvline(offset + len(template.signal) * (template.t[1] - template.t[0]), color=this_kwargs['color'], alpha=0.7)
|
|
|
|
|
|
if True: # zoom
|
|
wx = len(template.signal) * (min(1,template.dt))/4
|
|
t0 = true_time_offset
|
|
|
|
old_xlims = axs[0].get_xlim()
|
|
axs[i].set_xlim( x0-wx, x0+3*wx)
|
|
fig.savefig(f'figures/11_corrs_tdt{template.dt:.1g}_zoom.pdf')
|
|
|
|
# restore
|
|
axs[i].set_xlim(*old_xlims)
|
|
|
|
fig.tight_layout()
|
|
fig.savefig(f'figures/11_corrs_tdt{template.dt:.1g}.pdf')
|
|
|
|
if True:
|
|
plt.close(fig)
|
|
|
|
# Were new time residuals calculated?
|
|
# Add them to the cache file
|
|
if len(time_residuals) > 1:
|
|
# merge cached and calculated time residuals
|
|
time_residuals = np.concatenate((cached_time_residuals, time_residuals), axis=None)
|
|
snrs = np.concatenate( (cached_snrs, snrs), axis=None)
|
|
hilbert_time_residuals = np.concatenate((cached_hilbert_time_residuals, hilbert_time_residuals), axis=None)
|
|
|
|
if write_cache or read_cache and write_cache is None: # write the cache
|
|
write_time_residuals_cache(h5_cache_fname, (time_residuals, snrs, hilbert_time_residuals), template_dt, antenna_dt, snr_sigma_factor)
|
|
else:
|
|
time_residuals = cached_time_residuals
|
|
snrs = cached_snrs
|
|
hilbert_time_residuals = cached_hilbert_time_residuals
|
|
|
|
# Only return N_residuals (even if more have been cached)
|
|
return time_residuals[:N_residuals], snrs[:N_residuals], hilbert_time_residuals[:N_residuals]
|
|
|
|
if __name__ == "__main__":
|
|
import os
|
|
import matplotlib
|
|
import sys
|
|
if os.name == 'posix' and "DISPLAY" not in os.environ:
|
|
matplotlib.use('Agg')
|
|
|
|
bp_freq = (30e-3, 80e-3) # GHz
|
|
interp_template_dt = 5e-5 # ns
|
|
template_length = 200 # ns
|
|
|
|
antenna_dt = 2 # ns
|
|
antenna_timelength = 1024 # ns
|
|
|
|
N_residuals = 50*3 if len(sys.argv) < 2 else int(sys.argv[1])
|
|
template_dts = np.array([antenna_dt, 5e-1, 5e-2]) # ns
|
|
snr_factors = np.concatenate( # 1/noise_amplitude factor
|
|
(
|
|
#[0.25, 0.5, 0.75],
|
|
[1, 1.5, 2, 2.5, 3, 4, 5, 7],
|
|
[10, 20, 30, 50],
|
|
[100, 200, 300, 500]
|
|
),
|
|
axis=None, dtype=float)
|
|
|
|
|
|
cut_wrong_peak_matches = True
|
|
normalise_noise = False
|
|
|
|
h5_cache_fname = f'11_pulsed_timing.hdf5'
|
|
use_cache = True
|
|
write_cache = None # Leave None for default action
|
|
|
|
#
|
|
# Interpolation Template
|
|
# to create an 'analog' sampled antenna
|
|
interp_template, _deltapeak = create_template(dt=interp_template_dt, timelength=template_length, bp_freq=bp_freq, name='Interpolation Template', normalise=True)
|
|
|
|
interp_template.interpolate = interpolate.interp1d(interp_template.t, interp_template.signal, assume_sorted=True, fill_value=0, bounds_error=False, copy=False)
|
|
|
|
if True: # show interpolation template
|
|
fig, ax = plt.subplots()
|
|
ax.set_title("Filter Response")
|
|
ax.set_xlabel("Time [ns]")
|
|
ax.set_ylabel("Amplitude")
|
|
ax.plot(interp_template.t, max(interp_template.signal)*_deltapeak[0], label='Impulse')
|
|
ax.plot(interp_template.t, interp_template.signal, label='Filtered Signal')
|
|
ax.legend()
|
|
fig.savefig('figures/11_filter_response.pdf')
|
|
|
|
if True: # show filtering equivalence samplerates
|
|
_deltapeak = util.deltapeak(timelength=template_length, samplerate=1/antenna_dt, offset=0)
|
|
_time = util.sampled_time(end=template_length, sample_rate=1/antenna_dt)
|
|
_bandpassed = antenna_bp(_deltapeak[0], *bp_freq, antenna_dt)
|
|
|
|
ax.plot(_time, max(_bandpassed)*_deltapeak[0], label='Impulse Antenna')
|
|
ax.plot(_time, _bandpassed, label='Filtered Antenna')
|
|
|
|
ax.legend()
|
|
fig.savefig('figures/11_interpolation_deltapeak+antenna.pdf')
|
|
|
|
if True:
|
|
plt.close(fig)
|
|
|
|
if True: # show hilbert interpolation method
|
|
_ = hilbert_envelope_max_amplitude_time(interp_template.signal, interp_template.t, do_plot=True, fname_distinguish="_interpolation_template")
|
|
|
|
#
|
|
# Find time accuracies as a function of signal strength
|
|
#
|
|
time_residuals_data = []
|
|
|
|
for a, template_dt in tqdm(enumerate(template_dts)):
|
|
|
|
time_residuals_data.append(np.zeros( (len(snr_factors), 4, N_residuals)))# res, snr, masked, hilber_res
|
|
|
|
# Create the template
|
|
# This is sampled at a lower samplerate than the interpolation template
|
|
template, _ = create_template(dt=template_dt, timelength=template_length, bp_freq=bp_freq, name='Template', normalise=True)
|
|
|
|
for k, snr_sigma_factor in tqdm(enumerate(snr_factors)):
|
|
|
|
# get the time residuals
|
|
time_residuals, snrs, hilbert_time_residuals = get_time_residuals_for_template(
|
|
N_residuals, template, interpolation_template=interp_template,
|
|
antenna_dt=antenna_dt, antenna_timelength=antenna_timelength,
|
|
snr_sigma_factor=snr_sigma_factor, bp_freq=bp_freq, normalise_noise=normalise_noise,
|
|
h5_cache_fname=h5_cache_fname, rng=rng, tqdm=tqdm, read_cache=use_cache, write_cache=write_cache)
|
|
|
|
print()# separating tqdm
|
|
print()# separating tqdm
|
|
|
|
wrong_peak_condition = lambda t_res: abs(t_res) > antenna_dt*4
|
|
mask = wrong_peak_condition(time_residuals)
|
|
|
|
# Save directly to large data array
|
|
time_residuals_data[a][k] = time_residuals, snrs, ~mask, hilbert_time_residuals
|
|
|
|
# Make a plot of the time residuals <<<
|
|
if True and N_residuals > 1:
|
|
for i in range(1 + cut_wrong_peak_matches):
|
|
mask_count = 0
|
|
|
|
if i==1: # if cut_wrong_peak_matches:
|
|
mask_count = np.count_nonzero(mask)
|
|
|
|
time_residuals = time_residuals[~mask]
|
|
|
|
# None masked
|
|
if not mask_count:
|
|
continue
|
|
|
|
# All masked
|
|
if not len(time_residuals):
|
|
continue
|
|
|
|
hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step')
|
|
fig, ax = plt.subplots()
|
|
ax.set_title(
|
|
"Template Correlation Lag finding"
|
|
+ f"\n template dt: {template_dt: .1e}ns"
|
|
+ f"; antenna dt: {antenna_dt: .1e}ns"
|
|
+ ";" if not mask_count else "\n"
|
|
+ f"snr_factor: {snr_sigma_factor: .1e}"
|
|
+ "" if not mask_count else f"; N_masked: {mask_count}"
|
|
)
|
|
ax.set_xlabel("Time Residual [ns]")
|
|
ax.set_ylabel("#")
|
|
|
|
if True:
|
|
# indicate boxcar accuracy limits
|
|
for sign in [-1, 1]:
|
|
ax.axvline( sign*template_dt/np.sqrt(12), ls='--', alpha=0.5, color='green')
|
|
|
|
counts, bins, _patches = ax.hist(time_residuals, **hist_kwargs)
|
|
if True: # fit gaussian to histogram
|
|
min_x = min(time_residuals)
|
|
max_x = max(time_residuals)
|
|
|
|
dx = bins[1] - bins[0]
|
|
scale = len(time_residuals) * dx
|
|
|
|
xs = np.linspace(min_x, max_x)
|
|
|
|
# do the fit
|
|
name = "Norm"
|
|
param_names = [ "$\\mu$", "$\\sigma$" ]
|
|
distr_func = stats.norm
|
|
|
|
label = name +"(" + ','.join(param_names) + ')'
|
|
|
|
# plot
|
|
fit_params = distr_func.fit(time_residuals)
|
|
fit_ys = scale * distr_func.pdf(xs, *fit_params)
|
|
ax.plot(xs, fit_ys, label=label)
|
|
|
|
# chisq
|
|
ct = np.diff(distr_func.cdf(bins, *fit_params))*np.sum(counts)
|
|
if True:
|
|
ct *= np.sum(counts)/np.sum(ct)
|
|
c2t = stats.chisquare(counts, ct, ddof=len(fit_params))
|
|
chisq_strs = [
|
|
f"$\\chi^2$/dof = {c2t[0]: .2g}/{len(fit_params)}"
|
|
]
|
|
|
|
# text on plot
|
|
text_str = "\n".join(
|
|
[label]
|
|
+
|
|
[ f"{param} = {value: .2e}" for param, value in zip_longest(param_names, fit_params, fillvalue='?') ]
|
|
+
|
|
chisq_strs
|
|
)
|
|
|
|
ax.text( *(0.02, 0.95), text_str, fontsize=12, ha='left', va='top', transform=ax.transAxes)
|
|
|
|
if mask_count:
|
|
fig.savefig(f"figures/11_time_residual_hist_tdt{template_dt:0.1e}_n{snr_sigma_factor:.1e}_masked.pdf")
|
|
else:
|
|
fig.savefig(f"figures/11_time_residual_hist_tdt{template_dt:0.1e}_n{snr_sigma_factor:.1e}.pdf")
|
|
|
|
if True:
|
|
plt.close(fig)
|
|
# >>> End of plot
|
|
|
|
#
|
|
# SNR time accuracy plot
|
|
#
|
|
if True:
|
|
threshold_markers = ['^', 'v', '8', 'X'] # make sure to have filled markers here
|
|
mask_thresholds = np.array([np.inf, N_residuals*0.5, N_residuals*0.1, 1, 0])
|
|
|
|
fig, ax = plt.subplots()
|
|
ax.set_title(f"Template matching SNR vs time accuracy")
|
|
ax.set_xlabel("Signal to Noise Factor")
|
|
ax.set_ylabel("Time Accuracy [ns]")
|
|
ax.grid()
|
|
|
|
ax.legend(title="\n".join([
|
|
f"N={N_residuals}",
|
|
#f"template_dt={template_dt:0.1e}ns",
|
|
f"antenna_dt={antenna_dt:0.1e}ns",
|
|
]))
|
|
|
|
# plot the values per template_dt slice
|
|
template_dt_colors = [None]*len(template_dts)
|
|
for a, template_dt in enumerate(template_dts):
|
|
for k, snr_sigma_factor in enumerate(snr_factors):
|
|
time_residuals, snrs, valid_mask, hilbert_time_residuals = time_residuals_data[a][k]
|
|
|
|
valid_mask = np.array(valid_mask, dtype=bool)
|
|
|
|
mean_residual = np.mean(time_residuals[valid_mask])
|
|
time_accuracy = np.std(time_residuals[valid_mask])
|
|
|
|
# calculate absolute deviation from the mean
|
|
residual_mean_deviation = np.sqrt( (time_residuals - mean_residual)**2 )
|
|
|
|
snr_std = np.std(snrs)
|
|
time_accuracy_std = np.std(residual_mean_deviation)
|
|
|
|
scatter_kwargs = dict(
|
|
ls='none',
|
|
marker='.',
|
|
alpha=0.3,
|
|
zorder=1.8,
|
|
)
|
|
|
|
y_values = residual_mean_deviation
|
|
|
|
# snr_sigma_factor is a factor 2 too low
|
|
snr_sigma_factor *= 2
|
|
|
|
# plot all invalid datapoints
|
|
if True:
|
|
ax.plot(snrs[~valid_mask], y_values[~valid_mask], color='grey', **scatter_kwargs)
|
|
|
|
# plot valid datapoints
|
|
if True:
|
|
if template_dt_colors[a] is not None:
|
|
scatter_kwargs['color'] = template_dt_colors[a]
|
|
|
|
l = ax.plot(snrs[valid_mask], y_values[valid_mask], **scatter_kwargs)
|
|
|
|
template_dt_colors[a] = l[0].get_color()
|
|
|
|
masked_count = np.count_nonzero(~valid_mask)
|
|
|
|
# plot accuracy indicating masking counts
|
|
kwargs = dict(
|
|
ls='none',
|
|
color= None if template_dt_colors[a] is None else template_dt_colors[a],
|
|
marker=threshold_markers[np.argmin( masked_count <= mask_thresholds)-1],
|
|
ms=10,
|
|
markeredgecolor='white',
|
|
markeredgewidth=1,
|
|
)
|
|
|
|
#l = ax.plot(snr_sigma_factor, np.sqrt(np.mean(y_values[valid_mask])**2), **{**kwargs, **dict(ms=50)})
|
|
|
|
if False:
|
|
l = ax.errorbar(snr_sigma_factor, time_accuracy, yerr=time_accuracy_std, xerr=snr_std, **kwargs)
|
|
else:
|
|
l = ax.plot(snr_sigma_factor, time_accuracy, **kwargs)
|
|
|
|
|
|
# set color if not yet set
|
|
template_dt_colors[a] = l[0].get_color()
|
|
|
|
|
|
# indicate boxcar threshold
|
|
if True:
|
|
ax.axhline(template_dt/np.sqrt(12), ls='--', alpha=0.7, color=template_dt_colors[a], label=f'Template dt:{template_dt:0.1e}ns')
|
|
|
|
|
|
# Set horizontal line at 1 ns
|
|
if True:
|
|
ax.axhline(1, ls='--', alpha=0.8, color='g')
|
|
|
|
ax.legend()
|
|
|
|
fig.tight_layout()
|
|
fig.savefig(f"figures/11_time_res_vs_snr_full_linear.pdf")
|
|
|
|
# logscaling
|
|
if True:
|
|
ax.set_xscale('log')
|
|
ax.set_yscale('log')
|
|
|
|
# limit y-axis upper limit to 1e1
|
|
if True:
|
|
this_lim = 1e1
|
|
ax.set_ylim([None, this_lim])
|
|
|
|
# require y-axis lower limit to be at least 1e-1
|
|
if True:
|
|
this_lim = 1e-1
|
|
low_ylims = ax.get_ylim()[0]
|
|
if low_ylims >= this_lim:
|
|
ax.set_ylim([this_lim, None])
|
|
|
|
# .. but keep it above 1e-3
|
|
if True:
|
|
this_lim = 1e-3
|
|
low_ylims = ax.get_ylim()[0]
|
|
if low_ylims <= this_lim:
|
|
ax.set_ylim([this_lim, None])
|
|
|
|
if True: # require y-axis lower limit to be at least 1e-1
|
|
low_ylims = ax.get_ylim()[0]
|
|
if low_ylims >= 1e-1:
|
|
ax.set_ylim([1e-1, None])
|
|
|
|
fig.tight_layout()
|
|
if len(template_dts) == 1:
|
|
fig.savefig(f"figures/11_time_res_vs_snr_tdt{template_dt:0.1e}.pdf")
|
|
else:
|
|
fig.savefig(f"figures/11_time_res_vs_snr.pdf")
|
|
|
|
plt.show()
|