#!/usr/bin/env python3 # vim: fdm=marker fmr=<<<,>>> # TODO: compare with Peak Hilbert Envelope Timing # Remove non-cross Points in SNR plot # extrapolate exponential to lower snr values from lib import util from scipy import signal, interpolate, stats import matplotlib.pyplot as plt import numpy as np from itertools import zip_longest import h5py from copy import deepcopy try: from itertools import pairwise except: # pairwise only introduced since python 3.10 from itertools import tee def pairwise(iterable): # pairwise('ABCDEFG') --> AB BC CD DE EF FG a, b = tee(iterable) next(b, None) return zip(a, b) try: from tqdm import tqdm except: tqdm = lambda x: x rng = np.random.default_rng() class Waveform: name = None signal = None dt = None _t = None def __init__(self,signal=None, dt=None, t=None, name=None): self.signal = signal self.name = name if t is not None: assert len(t) == len(signal) self._t = t self.dt = t[1] - t[0] elif dt is not None: self.dt = dt # Lazy evaluation of time @property def t(self): if self._t is None: return self.dt * np.arange(0, len(self.signal)) return self._t @t.setter def t(self, value): self._t = value @t.deleter def t(self): del self._t def __len__(): return len(self.signal) def white_noise_realisation(N_samples, noise_sigma=1, rng=rng, normalise=False): noise = rng.normal(0, noise_sigma or 0, size=N_samples) if normalise: noise /= max(noise) return noise def antenna_bp(trace, low_bp, high_bp, dt, order=3): fs = 1/dt bp_filter = signal.butter(order, [low_bp, high_bp], 'band', fs=fs, output='sos') bandpassed = signal.sosfilt(bp_filter, trace) return bandpassed def my_correlation(in1, template, lags=None, normalise=True): template_length = len(template) in1_length = len(in1) if lags is None: lags = np.arange(-template_length+1, in1_length + 1) # do the correlation jig corrs = np.zeros_like(lags, dtype=float) for i, l in enumerate(lags): if l <= 0: # shorten template at the front in1_start = 0 template_end = template_length template_start = -template_length - l in1_end = max(0, min(in1_length, -template_start)) # 0 =< l + template_length =< in1_lengt elif l > in1_length - template_length: # shorten template from the back in1_end = in1_length template_start = 0 in1_start = min(l, in1_length) template_end = max(0, in1_length - l) else: in1_start = min(l, in1_length) in1_end = min(in1_start + template_length, in1_length) # full template template_start = 0 template_end = template_length # Slice in1 and template in1_slice = in1[in1_start:in1_end] template_slice = template[template_start:template_end] corrs[i] = np.dot(in1_slice, template_slice) if normalise: corrs /= np.amax(corrs) return corrs, (in1, template, lags) def trace_upsampler(trace, template_t, trace_t): if not hasattr(template_t, '__len__'): template_dt = template_t else: template_dt = template_t[1] - template_t[0] trace_dt = trace_t[1] - trace_t[0] upsample_factor = trace_dt/template_dt upsampled_trace_N = np.ceil(len(trace) * upsample_factor) upsample_factor = int(upsample_factor) upsampled_trace_N = int(upsampled_trace_N) # upsample trace upsampled_trace = np.zeros(upsampled_trace_N) upsampled_trace[::upsample_factor] = trace #upsampled_t = np.arange(trace_t[0], trace_t[-1], template_dt) upsampled_t = template_dt * np.arange(len(upsampled_trace)) + trace_t[0] if False: upsampled_t = np.linspace(0, template_dt*len(upsampled_trace), len(upsampled_trace), endpoint=False) + trace_t[0] return upsampled_trace, upsampled_t def trace_downsampler(trace, template_t, trace_t, offset): pass def hilbert_envelope_max_amplitude_time(trace, trace_t, do_plot=False, fname_distinguish='', zoom_wx=50, inset_zoom_extent=(0.03, 0.4, 0.53, 0.57)): analytic_signal = signal.hilbert(trace) envelope = abs(analytic_signal) max_idx = np.argmax(envelope) t_max = trace_t[max_idx] # make a plot if do_plot: fig, ax = plt.subplots() ax.set_xlabel("Time [ns]") ax.set_ylabel("Amplitude") ax.plot(trace_t, trace, label='Waveform') ax.plot(trace_t, envelope, ls='dashed', label='Envelope') # indicate maximum and t_max ax.axhline(envelope[max_idx], ls='dotted', color='g') ax.axvline(t_max, ls='dotted', color='g') if True: ax.legend() ax.grid() fig.tight_layout() fig.savefig(f'figures/11_hilbert_timing{fname_distinguish}.pdf') if zoom_wx: xlims = ax.get_xlim() if not hasattr(zoom_wx, '__len__'): zoom_wx = (zoom_wx, zoom_wx) if inset_zoom_extent: # do inset axes orig_ax = ax axins = orig_ax.inset_axes(inset_zoom_extent) axins.patch.set_alpha(0.9) axins.set_yticklabels([]) axins.set_xlim(t_max - zoom_wx[0], t_max + zoom_wx[-1]) axins.grid() # replot data axins.plot(trace_t, trace, label='Waveform') axins.plot(trace_t, envelope, ls='dashed', label='Envelope') # indicate maximum and t_max axins.axhline(envelope[max_idx], ls='dotted', color='g') axins.axvline(t_max, ls='dotted', color='g') # increase margins and indicate inset zoom orig_ax.margins(y=0.09) orig_ax.indicate_inset_zoom(axins) else: ax.set_xlim(t_max - zoom_wx[0], t_max + zoom_wx[-1]) fig.tight_layout() fig.savefig(f'figures/11_hilbert_timing{fname_distinguish}_zoom.pdf') ax.set_xlim(*xlims) plt.close(fig) return t_max, (analytic_signal, max_idx) def read_time_residuals_cache(cache_fname, template_dt, antenna_dt, snr_sigma_factor, N=None): try: with h5py.File(cache_fname, 'r') as fp: pgroup = fp['time_residuals'] pgroup2 = pgroup[f'{template_dt}_{antenna_dt}'] ds_name = str(snr_sigma_factor) ds = pgroup2[ds_name] if N is None: ret = deepcopy(ds[:]) else: ret = deepcopy(ds[:min(N, len(ds))]) if len(ret.shape) > 2: return ret[0,:], ret[1,:], ret[2,:] elif len(ret.shape) > 1: return ret[0,:], ret[1,:], np.array([np.nan]*len(ret[0])) else: return ret[:], np.array([np.nan]*len(ret[0])), np.array([np.nan]*len(ret[0])) except (KeyError, FileNotFoundError): return np.array([]), np.array([]), np.array([]) def write_time_residuals_cache(cache_fname, data, template_dt, antenna_dt, noise_sigma_factor): with h5py.File(cache_fname, 'a') as fp: pgroup = fp.require_group('time_residuals') pgroup2 = pgroup.require_group(f'{template_dt}_{antenna_dt}') ds_name = str(noise_sigma_factor) if ds_name in pgroup2.keys(): del pgroup2[ds_name] ds = pgroup2.create_dataset(ds_name, (3, len(data[0])), dtype='f', maxshape=(None)) ds[0] = data[0] ds[1] = data[1] ds[2] = data[2] def create_template(dt=1, timelength=1, bp_freq=(0, np.inf), name=None, normalise=False): template = Waveform(None, dt=dt, name=name) _deltapeak = util.deltapeak(timelength=timelength, samplerate=1/dt, offset=0) template.signal = antenna_bp(_deltapeak[0], *bp_freq, dt) template.peak_sample = _deltapeak[1] template.peak_time = template.dt * template.peak_sample if normalise: template.signal /= max(template.signal) return template, _deltapeak def get_time_residuals_for_template( N_residuals, template, interpolation_template=None, antenna_dt=1, antenna_timelength=100, snr_sigma_factor=10,bp_freq=(0,np.inf), normalise_noise=False, h5_cache_fname=None, read_cache=True, write_cache=None, rng=rng, tqdm=tqdm, peak_window=[0.6, 0.65], ): # Read in cached time residuals if read_cache: cached_time_residuals, cached_snrs, cached_hilbert_time_residuals = read_time_residuals_cache(h5_cache_fname, template.dt, antenna_dt, snr_sigma_factor) else: cached_time_residuals, cached_snrs, cached_hilbert_time_residuals = np.array([]), np.array([]), np.array([]) print(cached_hilbert_time_residuals.shape) print(cached_time_residuals.shape) # # Find difference between true and templated times # hilbert_interp_t_max, _ = hilbert_envelope_max_amplitude_time(interp_template.signal, interp_template.t, zoom_wx=None) time_residuals = np.zeros(max(0, (N_residuals - len(cached_time_residuals)))) snrs = np.zeros_like(time_residuals) hilbert_time_residuals = np.zeros_like(time_residuals) for j in tqdm(range(len(time_residuals))): do_plots = j==0 # receive at antenna ## place the deltapeak signal at a random location antenna = Waveform(None, dt=antenna_dt, name='Signal') if interpolation_template is None: # Create antenna trace without interpolation template antenna_true_signal, antenna_peak_sample = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna.dt, offset=peak_window, rng=rng) antenna.peak_sample = antenna_peak_sample antenna.peak_time = antenna.dt * antenna.peak_sample antenna.signal = antenna_bp(antenna.signal, *bp_freq, antenna.dt) print(f"Antenna Peak Time: {antenna.peak_time}") print(f"Antenna Peak Sample: {antenna.peak_sample}") else: # Sample the interpolation template at some offset peak_window_length = peak_window[-1] - peak_window[0] antenna.peak_time = antenna_timelength * (peak_window_length*rng.random(1) + peak_window[0]) sampling_offset = rng.random(1)*antenna.dt antenna.t = util.sampled_time(1/antenna.dt, start=0, end=antenna_timelength) # Sample the interpolation template antenna.signal = interpolation_template.interpolate(antenna.t - antenna.peak_time) antenna.peak_sample = antenna.peak_time/antenna.dt antenna_true_signal = antenna.signal true_time_offset = antenna.peak_time - template.peak_time antenna.signal_level = np.max(antenna.signal) if False: # flip polarisation antenna.signal *= -1 ## Add noise noise_amplitude = max(template.signal) * 1/snr_sigma_factor noise_realisation = noise_amplitude * white_noise_realisation(len(antenna.signal), normalise=normalise_noise) filtered_noise = antenna_bp(noise_realisation, *bp_freq, antenna.dt) antenna.signal += filtered_noise antenna.noise_level = np.sqrt(np.mean(filtered_noise**2)) antenna.signal_to_noise = antenna.signal_level/antenna.noise_level # Show signals if do_plots: fig, axs = plt.subplots(1, sharex=True) if not hasattr(axs, '__len__'): axs = [axs] axs[0].set_title("Antenna Waveform") axs[-1].set_xlabel("Time [ns]") axs[0].set_ylabel("Amplitude") l1 = axs[0].plot(antenna.t, antenna.signal, label='Filtered w/ noise', alpha=0.7) l2 = axs[0].plot(antenna.t, antenna.signal - filtered_noise, label='Filtered w/o noise', alpha=0.7) l3 = axs[0].plot(antenna.t, filtered_noise, label='Noise', alpha=0.7) if True: # indicate signal and noise levels level_kwargs = dict(ls='dashed', alpha=0.4) axs[0].axhline(antenna.signal_level, color=l2[0].get_color(), **level_kwargs)#, label='Signal Level') axs[0].axhline(antenna.noise_level, color=l3[0].get_color(), **level_kwargs)#, label='Noise Level') axs[0].legend(title=f'SNR = {antenna.signal_to_noise:.2g}', loc='lower right') axs[0].grid() if len(axs) > 1: axs[1].set_ylabel("Amplitude") axs[1].plot(template.t + true_time_offset, template.signal, label='Template') axs[1].legend() axs[1].grid() fig.tight_layout() fig.savefig(f'figures/11_antenna_signals_tdt{template.dt:.1g}.pdf') if True: # zoom wx = 50 x0 = true_time_offset + wx/2 old_xlims = axs[0].get_xlim() if True: # do inset axes extent = [0.03, 0.4, 0.53, 0.57] orig_ax = axs[0] axins = orig_ax.inset_axes(extent) axins.patch.set_alpha(0.9) axins.set_yticklabels([]) axins.set_xlim(x0-wx, x0+wx) axins.grid() # replot data l1 = axins.plot(antenna.t, antenna.signal, label='Filtered w/ noise', alpha=0.7) l2 = axins.plot(antenna.t, antenna.signal - filtered_noise, label='Filtered w/o noise', alpha=0.7) l3 = axins.plot(antenna.t, filtered_noise, label='Noise', alpha=0.7) if True: # indicate signal and noise levels level_kwargs = dict(ls='dashed', alpha=0.4) axins.axhline(antenna.signal_level, color=l2[0].get_color(), **level_kwargs)#, label='Signal Level') axins.axhline(antenna.noise_level, color=l3[0].get_color(), **level_kwargs)#, label='Noise Level') # increase margins and indicate inset zoom orig_ax.margins(y=0.09) orig_ax.indicate_inset_zoom(axins) if len(axs) > 1: orig_ax = axs[1] axins2 = orig_ax.inset_axes(extent) axins2.patch.set_alpha(axins.patch.get_alpha()) axins2.set_yticklabels([]) axins2.set_xlim(x0-wx, x0+wx) axins2.grid() # replot data axins2.plot(template.t + true_time_offset, template.signal) # increase margins and indicate inset zoom orig_ax.margins(y=0.1) orig_ax.indicate_inset_zoom(axins2) else: axs[0].set_xlim( x0-wx, x0+wx) fig.tight_layout() fig.savefig(f'figures/11_antenna_signals_tdt{template.dt:.1g}_zoom.pdf') # restore axs[0].set_xlim(*old_xlims) if True: plt.close(fig) axs2 = None if True: # simple and dumb trace upsampling upsampled_trace, upsampled_t = trace_upsampler(antenna.signal, template.t, antenna.t) if do_plots: # Show upsampled traces fig2, axs2 = plt.subplots(1, sharex=True) if not hasattr(axs2, '__len__'): axs2 = [axs2] axs2[-1].set_xlabel("Time [ns]") axs2[0].set_ylabel("Amplitude") axs2[0].plot(antenna.t, antenna.signal, marker='o', label='waveform') axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled') axs2[0].legend(loc='upper right') axs2[0].grid() fig2.tight_layout() fig2.savefig(f'figures/11_upsampled_tdt{template.dt:.1g}.pdf') wx = 0.25e2 x0 = upsampled_t[np.argmax(upsampled_trace)] - 5 if True: # do inset axes extent = [0.03, 0.4, 0.47, 0.57] orig_ax = axs2[0] axins = orig_ax.inset_axes(extent) axins.patch.set_alpha(0.9) axins.set_yticklabels([]) axins.set_xlim(x0-wx, x0+wx) axins.grid() # replot data axins.plot(antenna.t, antenna.signal, marker='o') axins.plot(upsampled_t, upsampled_trace) # increase margins and indicate inset zoom orig_ax.margins(y=0.1) orig_ax.indicate_inset_zoom(axins) else: axs2[0].set_xlim(x0-wx, x0+wx) fig2.tight_layout() fig2.savefig(f'figures/11_upsampled_tdt{template.dt:.1g}_zoom.pdf') if True: plt.close(fig2) # determine correlations with arguments lag_dt = upsampled_t[1] - upsampled_t[0] corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template.signal) # Determine best correlation time idx = np.argmax(abs(corrs)) best_sample_lag = lags[idx] best_time_lag = best_sample_lag * lag_dt # Find Hilbert Envelope t0 hilbert_best_time_lag, _ = hilbert_envelope_max_amplitude_time(upsampled_trace, upsampled_t, do_plot=do_plots, zoom_wx=(6,12)) else: # downsampled template raise NotImplementedError corrs, (_, _, lags) = my_downsampling_correlation(antenna.signal, antenna.t, template.signal, template.t) lag_dt = upsampled_t[1] - upsampled_t[0] # Calculate the time residual time_residuals[j] = best_time_lag - true_time_offset snrs[j] = antenna.signal_to_noise hilbert_time_residuals[j] = hilbert_best_time_lag - hilbert_interp_t_max if not do_plots: continue if do_plots and axs2: axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2) axs2[-1].axvline(true_time_offset, color='g', alpha=0.5, linewidth=2) # Show the final signals correlated if do_plots: # amplitude scaling required for single axis plotting template_amp_scaler = max(abs(template.signal)) / max(abs(antenna.signal)) # start the figure fig, axs = plt.subplots(2, sharex=True, figsize=(9,6)) ylabel_kwargs = dict( #rotation=0, #ha='right', va='center' ) axs[-1].set_xlabel("Time [ns]") axs[-1].set_xlabel("Time [ns]") offset_list = [ [best_time_lag, dict(label=template.name, color='orange')], [true_time_offset, dict(label='True offset', ls='dashed', color='green')], ] # Signal i=0 axs[i].set_ylabel("Amplitude", **ylabel_kwargs) axs[i].plot(antenna.t, antenna.signal, label=antenna.name) # Plot the template for offset_args in offset_list: this_kwargs = offset_args[1] offset = offset_args[0] l = axs[i].plot(offset + template.t, template_amp_scaler * template.signal, **this_kwargs) axs[i].legend() axs[i].grid() # Correlation i=1 axs[i].set_ylabel("Correlation", **ylabel_kwargs) axs[i].grid() axs[i].plot(lags * lag_dt, corrs) # Lines across both axes for offset_args in offset_list: this_kwargs = offset_args[1] offset = offset_args[0] for i in [0,1]: axs[i].axvline(offset, ls='--', color=this_kwargs['color'], alpha=0.7) axs[0].axvline(offset + len(template.signal) * (template.t[1] - template.t[0]), color=this_kwargs['color'], alpha=0.7) # separated axes for i, myax in enumerate(axs): [ axes.set_visible(False) for axes in axs] myax.set_visible(True) fig.tight_layout() fig.savefig(f'figures/11_corrs_tdt{template.dt:.1g}_axes{i}.pdf') # re enable all axes [ axes.set_visible(True) for axes in axs] fig.tight_layout() fig.savefig(f'figures/11_corrs_tdt{template.dt:.1g}.pdf') if True: # zoom wx = len(template.signal) * (min(1,template.dt))/4 t0 = true_time_offset old_xlims = axs[0].get_xlim() if True: # do inset axes extent = [0.03, 0.4, 0.47, 0.57] axins = [] for i in [0,1]: orig_ax = axs[i] axins.append(orig_ax.inset_axes(extent)) axins[i].patch.set_alpha(0.9) axins[i].set_yticklabels([]) axins[i].set_xlim(x0-wx, x0+wx) axins[i].grid() # replot data if i == 0: axins[i].plot(antenna.t, antenna.signal, label=antenna.name) # Plot the template for offset_args in offset_list: this_kwargs = offset_args[1] offset = offset_args[0] l = axins[i].plot(offset + template.t, template_amp_scaler * template.signal, **this_kwargs) elif i == 1: # correlation axins[i].plot(lags*lag_dt, corrs) # Lines across both axes for offset_args in offset_list: this_kwargs = offset_args[1] offset = offset_args[0] for j in [0,1]: axins[j].axvline(offset, ls='--', color=this_kwargs['color'], alpha=0.7) axins[i].axvline(offset + len(template.signal) * (template.t[1] - template.t[0]), color=this_kwargs['color'], alpha=0.7) # increase margins and indicate inset zoom orig_ax.margins(y=0.1) orig_ax.indicate_inset_zoom(axins[i]) else: axs[i].set_xlim( t0-wx, t0+2*wx) # separated axes for i, myax in enumerate(axs): [ axes.set_visible(False) for axes in axs] myax.set_visible(True) fig.tight_layout() fig.savefig(f'figures/11_corrs_tdt{template.dt:.1g}_axes{i}_zoom.pdf') # re enable all axes [ axes.set_visible(True) for axes in axs] fig.tight_layout() fig.savefig(f'figures/11_corrs_tdt{template.dt:.1g}_zoom.pdf') # restore axs[i].set_xlim(*old_xlims) if True: plt.close(fig) # Were new time residuals calculated? # Add them to the cache file if len(time_residuals) >= 1: # merge cached and calculated time residuals time_residuals = np.concatenate((cached_time_residuals, time_residuals), axis=None) snrs = np.concatenate( (cached_snrs, snrs), axis=None) hilbert_time_residuals = np.concatenate((cached_hilbert_time_residuals, hilbert_time_residuals), axis=None) if write_cache or read_cache and write_cache is None: # write the cache write_time_residuals_cache(h5_cache_fname, (time_residuals, snrs, hilbert_time_residuals), template_dt, antenna_dt, snr_sigma_factor) else: time_residuals = cached_time_residuals snrs = cached_snrs hilbert_time_residuals = cached_hilbert_time_residuals # Only return N_residuals (even if more have been cached) return time_residuals[:N_residuals], snrs[:N_residuals], hilbert_time_residuals[:N_residuals] if __name__ == "__main__": import os import matplotlib import sys if os.name == 'posix' and "DISPLAY" not in os.environ: matplotlib.use('Agg') figsize = (8,6) fontsize = 12 if True: from matplotlib import rcParams #rcParams["text.usetex"] = True rcParams["font.family"] = "serif" rcParams["font.size"] = fontsize if not True:# small figsize = (6, 4) rcParams["font.size"] = "15" # 15 at 6,4 looks fine elif True: # large figsize = (9, 6) rcParams["font.size"] = "16" # 15 at 9,6 looks fine rcParams["grid.linestyle"] = 'dotted' rcParams["figure.figsize"] = figsize fontsize = rcParams['font.size'] bp_freq = (30e-3, 80e-3) # GHz interp_template_dt = 5e-5 # ns template_length = 200 # ns antenna_dt = 2 # ns antenna_timelength = 2048 # ns N_residuals = 50*3 if len(sys.argv) < 2 else int(sys.argv[1]) if True: template_dts = np.array([ 5e-1, 1e-1, 1e-2]) # ns elif True: template_dts = np.array([1e-2]) # ns else: template_dts = np.array([antenna_dt, 5e-1]) # ns snr_factors = np.concatenate( # 1/noise_amplitude factor ( #[0.25, 0.5, 0.75], [1, 1.5, 2, 2.5, 3, 4, 5, 7], [10, 20, 30, 50], [100, 200, 300, 500] #[5, 50] ), axis=None, dtype=float) cut_wrong_peak_matches = True normalise_noise = False h5_cache_fname = f'11_pulsed_timing.hdf5' use_cache = True write_cache = None # Leave None for default action wrong_peak_condition_multiple = 2 wrong_peak_condition = lambda t_res: abs(t_res) > antenna_dt*wrong_peak_condition_multiple # # Interpolation Template # to create an 'analog' sampled antenna interp_template, _deltapeak = create_template(dt=interp_template_dt, timelength=template_length, bp_freq=bp_freq, name='Interpolation Template', normalise=True) interp_template.interpolate = interpolate.interp1d(interp_template.t, interp_template.signal, assume_sorted=True, fill_value=0, bounds_error=False, copy=False)#, kind='nearest') if not True: # show interpolation template fig, ax = plt.subplots() ax.set_title("Filter Response") ax.set_xlabel("Time [ns]") ax.set_ylabel("Amplitude") ax.plot(interp_template.t, max(interp_template.signal)*_deltapeak[0], label='Impulse') ax.plot(interp_template.t, interp_template.signal, label='Filtered Signal') ax.legend(loc='upper right') ax.grid() fig.tight_layout() fig.savefig('figures/11_filter_response.pdf') if True: # show filtering equivalence samplerates _deltapeak = util.deltapeak(timelength=template_length, samplerate=1/antenna_dt, offset=0) _time = util.sampled_time(end=template_length, sample_rate=1/antenna_dt) _bandpassed = antenna_bp(_deltapeak[0], *bp_freq, antenna_dt) ax.plot(_time, max(_bandpassed)*_deltapeak[0], label='Impulse Antenna') ax.plot(_time, _bandpassed, label='Filtered Antenna') ax.legend(loc='upper right') fig.tight_layout() fig.savefig('figures/11_interpolation_deltapeak+antenna.pdf') if True: plt.close(fig) if True: # show hilbert interpolation method _ = hilbert_envelope_max_amplitude_time(interp_template.signal, interp_template.t, do_plot=True, fname_distinguish="_interpolation_template") # # Find time accuracies as a function of signal strength # time_residuals_data = [] for a, template_dt in tqdm(enumerate(template_dts)): time_residuals_data.append(np.zeros( (len(snr_factors), 4, N_residuals)))# res, snr, masked, hilber_res # Create the template # This is sampled at a lower samplerate than the interpolation template template, _ = create_template(dt=template_dt, timelength=template_length, bp_freq=bp_freq, name='Template', normalise=True) for k, snr_sigma_factor in tqdm(enumerate(snr_factors)): # get the time residuals time_residuals, snrs, hilbert_time_residuals = get_time_residuals_for_template( N_residuals, template, interpolation_template=interp_template, antenna_dt=antenna_dt, antenna_timelength=antenna_timelength, snr_sigma_factor=snr_sigma_factor, bp_freq=bp_freq, normalise_noise=normalise_noise, h5_cache_fname=h5_cache_fname, rng=rng, tqdm=tqdm, read_cache=use_cache, write_cache=write_cache) print()# separating tqdm print()# separating tqdm mask = wrong_peak_condition(time_residuals) # Save directly to large data array time_residuals_data[a][k] = time_residuals, snrs, ~mask, hilbert_time_residuals # Make a plot of the time residuals <<< if True and N_residuals > 1: for i in range(1 + cut_wrong_peak_matches): mask_count = 0 if i==1: # if cut_wrong_peak_matches: mask_count = np.count_nonzero(mask) time_residuals = time_residuals[~mask] # None masked if not mask_count: continue # All masked if not len(time_residuals): continue hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step') fig, ax = plt.subplots() #ax.set_title( # "Template Correlation Lag finding" # + f"\n template dt: {template_dt: .1e}ns" # + f"; antenna dt: {antenna_dt: .1e}ns" # + ";" if not mask_count else "\n" # + f"snr_factor: {snr_sigma_factor: .1e}" # + "" if not mask_count else f"; N_masked: {mask_count}" # ) ax.set_xlabel("Time Residual [ns]") ax.set_ylabel("#") ax.grid() if not True: # indicate boxcar accuracy limits for sign in [-1, 1]: ax.axvline( sign*template_dt/np.sqrt(12), ls='--', alpha=0.5, color='green') counts, bins, _patches = ax.hist(time_residuals, **hist_kwargs) if True: # fit gaussian to histogram min_x = min(time_residuals) max_x = max(time_residuals) dx = bins[1] - bins[0] scale = len(time_residuals) * dx xs = np.linspace(min_x, max_x) # do the fit name = "Norm" param_names = [ "$\\mu$", "$\\sigma$" ] distr_func = stats.norm label = name +"(" + ','.join(param_names) + ')' # plot fit_params = distr_func.fit(time_residuals) fit_ys = scale * distr_func.pdf(xs, *fit_params) ax.plot(xs, fit_ys, label=label) # chisq ct = np.diff(distr_func.cdf(bins, *fit_params))*np.sum(counts) if True: ct *= np.sum(counts)/np.sum(ct) c2t = stats.chisquare(counts, ct, ddof=len(fit_params)) chisq_strs = [ f"$\\chi^2$/dof = {c2t[0]: .2g}/{len(fit_params)}" ] # text on plot text_str = "\n".join( [label] + [ f"{param} = {value: .2e}" for param, value in zip_longest(param_names, fit_params, fillvalue='?') ] + chisq_strs ) ax.text( *(0.02, 0.95), text_str, ha='left', va='top', transform=ax.transAxes) if True: ax.legend(title=f"$\\langle SNR \\rangle$ = {snr_sigma_factor:.2g}", loc='upper right') if True: this_lim = 55 if ax.get_ylim()[1] <= this_lim: ax.set_ylim([None, this_lim]) fig.tight_layout() if mask_count: fig.savefig(f"figures/11_time_residual_hist_tdt{template_dt:0.1e}_n{snr_sigma_factor:.1e}_masked.pdf") else: fig.savefig(f"figures/11_time_residual_hist_tdt{template_dt:0.1e}_n{snr_sigma_factor:.1e}.pdf") if True: plt.close(fig) # >>> End of plot # # SNR time accuracy plot # if True: enable_threshold_markers = [False, False, True, True] threshold_markers = ['^', 'v', '8', 'o'] # make sure to have filled markers here mask_thresholds = np.array([np.inf, N_residuals*0.5, N_residuals*0.1, 1, 0]) fig, ax = plt.subplots(figsize=figsize) ax.set_title(f"Template matching SNR vs time accuracy") ax.set_xlabel("Signal to Noise") ax.set_ylabel("Time Accuracy [ns]") ax.grid() ax.legend(title=", ".join([ f"N={N_residuals}", #f"template_dt={template_dt:0.1e}ns", f"$1/f_s$ ={antenna_dt}ns", ]), loc='lower left') if not True: ax.set_title(f"Template matching, $N={N_residuals}$, $dt={antenna_dt}\\mathrm{{ns}}$") if False: pass # add wrong_peak_condition_multiple into plot # plot the values per template_dt slice template_dt_colors = [None]*len(template_dts) for a, template_dt in enumerate(template_dts): for k, snr_sigma_factor in enumerate(snr_factors): time_residuals, snrs, valid_mask, hilbert_time_residuals = time_residuals_data[a][k] valid_mask = np.array(valid_mask, dtype=bool) mean_residual = np.mean(time_residuals[valid_mask]) time_accuracy = np.std(time_residuals[valid_mask]) # calculate absolute deviation from the mean residual_mean_deviation = np.sqrt( (time_residuals - mean_residual)**2 ) snr_std = np.std(snrs[valid_mask]) time_accuracy_std = np.std(residual_mean_deviation[valid_mask]) scatter_kwargs = dict( ls='none', marker='o', alpha=0.2, ms=1, zorder=1.8, ) y_values = residual_mean_deviation # snr_sigma_factor is a factor 2 too low snr_sigma_factor *= 2 # plot all invalid datapoints if False: ax.plot(snrs[~valid_mask], y_values[~valid_mask], color='grey', **scatter_kwargs) # plot valid datapoints if False: if template_dt_colors[a] is not None: scatter_kwargs['color'] = template_dt_colors[a] l = ax.plot(snrs[valid_mask], y_values[valid_mask], **scatter_kwargs) template_dt_colors[a] = l[0].get_color() masked_count = np.count_nonzero(~valid_mask) threshold_index = np.argmin(masked_count <= mask_thresholds) -1 if not enable_threshold_markers[threshold_index]: continue # plot accuracy indicating masking counts kwargs = dict( ls='none', color= None if template_dt_colors[a] is None else template_dt_colors[a], marker=threshold_markers[threshold_index], ms=10, markeredgecolor='white', markeredgewidth=1, alpha=0.8 ) #l = ax.plot(snr_sigma_factor, np.sqrt(np.mean(y_values[valid_mask])**2), **{**kwargs, **dict(ms=50)}) if False: l = ax.errorbar(snr_sigma_factor, time_accuracy, yerr=time_accuracy_std, xerr=snr_std, **kwargs, capsize=5) else: l = ax.plot(snr_sigma_factor, time_accuracy, **kwargs) # set color if not yet set template_dt_colors[a] = l[0].get_color() # indicate boxcar threshold if True: ax.axhline(template_dt/np.sqrt(12), ls='--', alpha=0.7, color=template_dt_colors[a], label=f'{template_dt}ns') text_coord = (0.03, template_dt/np.sqrt(12)) ax.text( *text_coord, f'${template_dt}\mathrm{{\,ns}} / \sqrt{{12}}$', va='bottom', ha='left', color=template_dt_colors[a], fontsize=fontsize-1, transform=ax.get_yaxis_transform()) # Set horizontal line at 1 ns if not True: ax.axhline(1, ls='--', alpha=0.8, color='g') if not True: ax.legend(title="Template dt", loc='lower left') elif True: ax.legend().remove() fig.tight_layout() fig.savefig(f"figures/11_time_res_vs_snr_full_linear.pdf") # logscaling if True: ax.set_xscale('log') ax.set_yscale('log') # limit y-axis upper limit to 1e1 if True: this_lim = 1e1 if ax.get_ylim()[1] >= this_lim: ax.set_ylim([None, this_lim]) # but keep it above 1 if True: this_lim = 1e0 if ax.get_ylim()[1] <= this_lim: ax.set_ylim([None, this_lim]) # require y-axis lower limit to be at least 1e-1 if True: this_lim = 1e-1 low_ylims = ax.get_ylim()[0] if low_ylims >= this_lim: ax.set_ylim([this_lim, None]) # .. but keep it above 1e-3 if True: this_lim = 1e-3 low_ylims = ax.get_ylim()[0] if low_ylims <= this_lim: ax.set_ylim([this_lim, None]) # require x-axis lower limit to be under 1e0 if True: this_lim = 1e0 if ax.get_xlim()[0] >= this_lim: ax.set_xlim([this_lim, None]) fig.tight_layout() if len(template_dts) == 1: fig.savefig(f"figures/11_time_res_vs_snr_tdt{template_dt:0.1e}.pdf") else: fig.savefig(f"figures/11_time_res_vs_snr.pdf") plt.show()