From f7a68ef68a4cbe889729f50e26a1ada6a9c190d9 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Fri, 19 May 2023 00:15:48 +0200 Subject: [PATCH] Pulse: Hilbert Envelope Maximum: WIP --- simulations/11_pulsed_timing.py | 78 +++++++++++++++++++++++++++------ 1 file changed, 65 insertions(+), 13 deletions(-) diff --git a/simulations/11_pulsed_timing.py b/simulations/11_pulsed_timing.py index 0876428..a7aedde 100755 --- a/simulations/11_pulsed_timing.py +++ b/simulations/11_pulsed_timing.py @@ -140,12 +140,48 @@ def trace_upsampler(trace, template_t, trace_t): #upsampled_t = np.arange(trace_t[0], trace_t[-1], template_dt) upsampled_t = template_dt * np.arange(len(upsampled_trace)) + trace_t[0] + if False: + upsampled_t = np.linspace(0, template_dt*len(upsampled_trace), len(upsampled_trace), endpoint=False) + trace_t[0] return upsampled_trace, upsampled_t def trace_downsampler(trace, template_t, trace_t, offset): pass + +def hilbert_envelope_max_amplitude_time(trace, trace_t, do_plot=False, fname_distinguish=None): + analytic_signal = signal.hilbert(trace) + envelope = abs(analytic_signal) + + max_idx = np.argmax(envelope) + + t_max = trace_t[max_idx] + + # make a plot + if do_plot: + fig, ax = plt.subplots() + + ax.set_xlabel("Time [ns]") + ax.set_ylabel("Amplitude") + + ax.plot(trace_t, trace, label='Waveform') + ax.plot(trace_t, envelope, ls='dashed', label='Envelope') + + # indicate maximum and t_max + ax.axhline(envelope[max_idx], ls='dotted', color='g') + ax.axvline(t_max, ls='dotted', color='g') + + if True: + ax.legend() + + fig.tight_layout() + fig.savefig(f'figures/11_hilbert_timing{fname_distinguish}.pdf') + + plt.close(fig) + + return t_max, (analytic_signal, max_idx) + + def read_time_residuals_cache(cache_fname, template_dt, antenna_dt, snr_sigma_factor, N=None): try: with h5py.File(cache_fname, 'r') as fp: @@ -160,14 +196,16 @@ def read_time_residuals_cache(cache_fname, template_dt, antenna_dt, snr_sigma_fa else: ret = deepcopy(ds[:min(N, len(ds))]) - if len(ret.shape) > 1: - return ret[0,:], ret[1,:] + if len(ret.shape) > 2: + return ret[0,:], ret[1,:], ret[2,:] + elif len(ret.shape) > 1: + return ret[0,:], ret[1,:], np.array([np.nan]*len(ret)) else: - return ret[:], np.array([np.nan]*len(ret)) + return ret[:], np.array([np.nan]*len(ret)), np.array([np.nan]*len(ret)) except (KeyError, FileNotFoundError): - return np.array([]), np.array([]) + return np.array([]), np.array([]), np.array([]) -def write_time_residuals_cache(cache_fname, time_residuals, snrs, template_dt, antenna_dt, noise_sigma_factor): +def write_time_residuals_cache(cache_fname, data, template_dt, antenna_dt, noise_sigma_factor): with h5py.File(cache_fname, 'a') as fp: pgroup = fp.require_group('time_residuals') pgroup2 = pgroup.require_group(f'{template_dt}_{antenna_dt}') @@ -177,9 +215,10 @@ def write_time_residuals_cache(cache_fname, time_residuals, snrs, template_dt, a if ds_name in pgroup2.keys(): del pgroup2[ds_name] - ds = pgroup2.create_dataset(ds_name, (2, len(time_residuals)), dtype='f', maxshape=(None)) - ds[0] = time_residuals - ds[1] = snrs + ds = pgroup2.create_dataset(ds_name, (3, len(time_residuals)), dtype='f', maxshape=(None)) + ds[0] = data[0] + ds[1] = data[1] + ds[2] = data[2] def create_template(dt=1, timelength=1, bp_freq=(0, np.inf), name=None, normalise=False): template = Waveform(None, dt=dt, name=name) @@ -203,16 +242,20 @@ def get_time_residuals_for_template( ): # Read in cached time residuals if read_cache: - cached_time_residuals, cached_snrs = read_time_residuals_cache(h5_cache_fname, template.dt, antenna_dt, snr_sigma_factor) + cached_time_residuals, cached_snrs, cached_hilbert_time_residuals = read_time_residuals_cache(h5_cache_fname, template.dt, antenna_dt, snr_sigma_factor) else: - cached_time_residuals, cached_snrs = np.array([]), np.array([]) + cached_time_residuals, cached_snrs, cached_hilbert_time_residuals = np.array([]), np.array([]), np.array([]) # # Find difference between true and templated times # + + hilbert_interp_t_max, _ = hilbert_envelope_max_amplitude_time(interp_template.signal, interp_template.t) + time_residuals = np.zeros(max(0, (N_residuals - len(cached_time_residuals)))) snrs = np.zeros_like(time_residuals) + hilbert_time_residuals = np.zeros_like(time_residuals) for j in tqdm(range(len(time_residuals))): do_plots = j==0 @@ -340,6 +383,9 @@ def get_time_residuals_for_template( best_sample_lag = lags[idx] best_time_lag = best_sample_lag * lag_dt + # Find Hilbert Envelope t0 + hilbert_best_time_lag, _ = hilbert_envelope_max_amplitude_time(upsampled_trace, upsampled_t, do_plot=do_plots) + else: # downsampled template raise NotImplementedError @@ -349,6 +395,7 @@ def get_time_residuals_for_template( # Calculate the time residual time_residuals[j] = best_time_lag - true_time_offset snrs[j] = antenna.signal_to_noise + hilbert_time_residuals[j] = hilbert_best_time_lag - hilbert_interp_t_max if not do_plots: continue @@ -429,15 +476,17 @@ def get_time_residuals_for_template( # merge cached and calculated time residuals time_residuals = np.concatenate((cached_time_residuals, time_residuals), axis=None) snrs = np.concatenate( (cached_snrs, snrs), axis=None) + hilbert_time_residuals = np.concatenate((cached_hilbert_time_residuals, hilbert_time_residuals), axis=None) if write_cache or read_cache and write_cache is None: # write the cache - write_time_residuals_cache(h5_cache_fname, time_residuals, snrs, template_dt, antenna_dt, snr_sigma_factor) + write_time_residuals_cache(h5_cache_fname, (time_residuals, snrs, hilbert_time_residuals), template_dt, antenna_dt, snr_sigma_factor) else: time_residuals = cached_time_residuals snrs = cached_snrs + hilbert_time_residuals = cached_hilbert_time_residuals # Only return N_residuals (even if more have been cached) - return time_residuals[:N_residuals], snrs[:N_residuals] + return time_residuals[:N_residuals], snrs[:N_residuals], hilbert_time_residuals[:N_residuals] if __name__ == "__main__": import os @@ -503,6 +552,9 @@ if __name__ == "__main__": if True: plt.close(fig) + if True: # show hilbert interpolation method + _ = hilbert_envelope_max_amplitude_time(interp_template.signal, interp_template.t, do_plot=True, fname_distinguish="_interpolation_template") + # # Find time accuracies as a function of signal strength # @@ -519,7 +571,7 @@ if __name__ == "__main__": for k, snr_sigma_factor in tqdm(enumerate(snr_factors)): # get the time residuals - time_residuals, snrs = get_time_residuals_for_template( + time_residuals, snrs, hilbert_time_residuals = get_time_residuals_for_template( N_residuals, template, interpolation_template=interp_template, antenna_dt=antenna_dt, antenna_timelength=antenna_timelength, snr_sigma_factor=snr_sigma_factor, bp_freq=bp_freq, normalise_noise=normalise_noise,