From 04858ea01a4b35a54981ecb9529d69d010f23fc0 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Wed, 19 Apr 2023 17:01:04 +0200 Subject: [PATCH] Simple Pulse finding using Template Correlation --- simulations/11_pulsed_timing.py | 368 +++++++++++++++++++------------- 1 file changed, 224 insertions(+), 144 deletions(-) diff --git a/simulations/11_pulsed_timing.py b/simulations/11_pulsed_timing.py index 32e07a4..6b8a69f 100755 --- a/simulations/11_pulsed_timing.py +++ b/simulations/11_pulsed_timing.py @@ -10,16 +10,40 @@ rng = np.random.default_rng() class Waveform: name = None - time = None signal = None + dt = None - def __init__(signal, time=None, name=None, dt=None): + _t = None + + def __init__(self,signal=None, dt=None, t=None, name=None): self.signal = signal - self.time = time self.name = name - if self.time is None and dt is not None: - self.time = dt * len(signal) + if t is not None: + assert len(t) == len(signal) + + self._t = t + self.dt = t[1] - t[0] + elif dt is not None: + self.dt = dt + + # Lazy evaluation of time + @property + def t(self): + if self._t is None: + return self.dt * np.arange(0, len(self.signal)) + return self._t + + @t.setter + def t(self, value): + self._t = value + + @t.deleter + def t(self): + del self._t + + def __len__(): + return len(self.signal) def white_noise_realisation(N_samples, noise_sigma=1, rng=rng): return rng.normal(0, noise_sigma or 0, size=N_samples) @@ -37,7 +61,7 @@ def antenna_bp(trace, low_bp, high_bp, dt, order=3): return bandpassed def my_correlation(in1, template): - # + # in1_long = np.zeros( (len(in1)+2*len(template)) ) in1_long[len(template):-len(template)] = in1 @@ -46,7 +70,7 @@ def my_correlation(in1, template): template_long[len(template):2*len(template)] = template lags = np.arange(-len(template), len(in1) ) - len(template) - + # do the correlation jig corrs = np.zeros_like(lags, dtype=float) for i, l in enumerate(lags): @@ -56,7 +80,7 @@ def my_correlation(in1, template): return corrs, (in1_long, template_long, lags) def trace_upsampler(template_signal, trace, template_t, trace_t): - template_dt = template_t[1] - template_t[0] + template_dt = template.t[1] - template.t[0] trace_dt = trace_t[1] - trace_t[0] upsample_factor = trace_dt/template_dt @@ -88,155 +112,211 @@ if __name__ == "__main__": bp_freq = (30e-3, 80e-3) # GHz template_length = 100 # ns noise_sigma_factor = 1e1 # keep between 10 and 0.1 - + + N_residuals = 50*3 + antenna_dt = 2 # ns antenna_timelength = 2048 # ns - _deltapeak = util.deltapeak(timelength=template_length, samplerate=1/template_dt, offset=10/template_dt) - template_t = util.sampled_time(1/template_dt, start=0, end=template_length) - template_signal = antenna_bp(_deltapeak[0], *bp_freq, (np.sqrt(antenna_dt/template_dt))*template_dt) # TODO: fix sqrt constant - template_peak_time = template_t[_deltapeak[1]] + # + # Create the template + # + template = Waveform(None, dt=template_dt, name='Template') + _deltapeak = util.deltapeak(timelength=template_length, samplerate=1/template.dt, offset=10/template.dt) + template.signal = antenna_bp(_deltapeak[0], *bp_freq, (np.sqrt(antenna_dt/template_dt))*template_dt) # TODO: fix sqrt constant + template.peak_sample = _deltapeak[1] + template.peak_time = template.dt * template.peak_sample - if False: # show template + + if True: # show template fig, ax = plt.subplots() ax.set_title("Deltapeak and Bandpassed Template") ax.set_xlabel("Time [ns]") ax.set_ylabel("Amplitude") - ax.plot(template_t, max(template_signal)*_deltapeak[0]) - ax.plot(template_t, template_signal) + ax.plot(template.t, max(template.signal)*_deltapeak[0]) + ax.plot(template.t, template.signal) fig.savefig('figures/11_template_deltapeak.pdf') - # receive at antenna - antenna_t = util.sampled_time(1/antenna_dt, start=0, end=antenna_timelength) - antenna_samplelength = len(antenna_t) - - ## place the deltapeak signal at a random location - antenna_true_signal, antenna_peak_location = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna_dt, offset=[0.2, 0.8], rng=rng) - antenna_peak_time = antenna_t[antenna_peak_location] - - if not True: # flip polarisation - antenna_true_signal *= -1 - - ## Add noise - noise_amplitude = max(template_signal) * noise_sigma_factor - noise_realisation = noise_amplitude * white_noise_realisation(len(antenna_true_signal)) - antenna_unfiltered_signal = antenna_true_signal + noise_realisation - antenna_signal = antenna_bp(antenna_unfiltered_signal, *bp_freq, antenna_dt) - - true_time_offset = antenna_peak_time - template_peak_time - - if True: # show signals - fig, axs = plt.subplots(2, sharex=True) - axs[0].set_title("Antenna Waveform") - axs[-1].set_xlabel("Time [ns]") - axs[0].set_ylabel("Amplitude") - axs[0].plot(antenna_t, antenna_signal, label='bandpassed w/ noise') - axs[0].plot(antenna_t, antenna_unfiltered_signal, label='true signal w/ noise') - axs[0].plot(antenna_t, antenna_true_signal, label='true signal w/o noise') - axs[0].legend() - - axs[1].set_title("Template") - axs[1].set_ylabel("Amplitude") - axs[1].plot(template_t, template_signal, label='orig') - axs[1].plot(template_t + true_time_offset, template_signal, label='moved orig') - axs[1].legend() - - fig.savefig('figures/11_antenna_signals.pdf') - - if True: # zoom - wx = 100 - x0 = true_time_offset - - old_xlims = axs[0].get_xlim() - axs[0].set_xlim( x0-wx, x0+wx) - fig.savefig('figures/11_antenna_signals_zoom.pdf') - - # restore - axs[0].set_xlim(*old_xlims) - - if True: # upsampled trace - upsampled_trace, upsampled_t = trace_upsampler(template_signal, antenna_signal, template_t, antenna_t) - - if True: # Show upsampled traces - fig2, axs2 = plt.subplots(1, sharex=True) - if not hasattr(axs2, '__len__'): - axs2 = [axs2] - - axs2[-1].set_xlabel("Time [ns]") - axs2[0].set_ylabel("Amplitude") - axs2[0].plot(antenna_t, antenna_signal, marker='o', label='orig') - axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled') - axs2[0].legend(loc='upper right') - - fig2.savefig('figures/11_upsampled.pdf') - - wx = 1e2 - x0 = upsampled_t[0] + wx - 5 - axs2[0].set_xlim(x0-wx, x0+wx) - fig2.savefig('figures/11_upsampled_zoom.pdf') - - # determine correlations with arguments - lag_dt = upsampled_t[1] - upsampled_t[0] - corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template_signal) - - else: # downsampled template - raise NotImplementedError - - corrs, (out1_signal, out2_signal, lags) = my_downsampling_correlation(template_signal, antenna_signal, template_t, antenna_t) - lag_dt = upsampled_t[1] - upsampled_t[0] - - # Determine best correlation time - idx = np.argmax(abs(corrs)) - best_sample_lag = lags[idx] - best_time_lag = best_sample_lag * lag_dt - if axs2: - axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2) - - # Show the final signals correlated - if True: - fig, axs = plt.subplots(3, sharex=True) - ylabel_kwargs = dict( - rotation=0, - ha='right', - va='center' - ) - axs[-1].set_xlabel("Time [ns]") - - # Signal - i=0 - axs[i].set_ylabel("Signal\nAmplitude", **ylabel_kwargs) - axs[i].plot(antenna_t, antenna_signal) - - # Template - i=1 - axs[i].set_ylabel("Template\nAmplitude", **ylabel_kwargs) - for offset in [0, best_time_lag]: - axs[i].axvline(offset + len(template_signal) * (template_t[1] - template_t[0]), color='g') - axs[i].axvline(offset, color='g') - axs[i].plot(offset + template_t, template_signal) - - - # Correlation - i=2 - axs[i].set_ylabel("Correlation", **ylabel_kwargs) - axs[i].plot(lags * lag_dt, corrs) - axs[i].axvline(best_time_lag, color='r', ls='--') - - if True: # zoom - wx = len(template_signal) * (template_t[1] - template_t[0])/2 - t0 = best_time_lag - - for t in [t0-wx, t0+wx]: - axs[2].axvline(t, color='g') - - fig.tight_layout() - fig.legend() - fig.savefig('figures/11_corrs.pdf') + if True: + plt.close(fig) # - time_residual = best_time_lag - true_time_offset + # Find difference between true and templated times + # + time_residuals = np.zeros(N_residuals) + for j in range(N_residuals): + do_plots = j==0 - print(time_residual, template_dt, antenna_dt) + # receive at antenna + ## place the deltapeak signal at a random location + antenna = Waveform(None, dt=antenna_dt, name='Signal') + antenna_true_signal, antenna_peak_sample = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna.dt, offset=[0.2, 0.8], rng=rng) + antenna.signal = antenna_true_signal + antenna.peak_sample = antenna_peak_sample + antenna.peak_time = antenna.dt * antenna.peak_sample + if do_plots: + print(f"Antenna Peak Time: {antenna.peak_time}") + print(f"Antenna Peak Sample: {antenna.peak_sample}") + + if False: # bandpass when emitting the signal + antenna.signal = antenna_bp(antenna.signal, *bp_freq, antenna.dt) + + if False: # flip polarisation + antenna.signal *= -1 + + ## Add noise + noise_amplitude = max(template.signal) * noise_sigma_factor + noise_realisation = noise_amplitude * white_noise_realisation(len(antenna.signal)) + + antenna_unfiltered_signal = antenna.signal + noise_realisation + antenna.signal = antenna_bp(antenna_unfiltered_signal, *bp_freq, antenna.dt) + + true_time_offset = antenna.peak_time - template.peak_time + + if do_plots: # show signals + fig, axs = plt.subplots(2, sharex=True) + axs[0].set_title("Antenna Waveform") + axs[-1].set_xlabel("Time [ns]") + axs[0].set_ylabel("Amplitude") + axs[0].plot(antenna.t, antenna.signal, label='bandpassed w/ noise', alpha=0.9) + axs[0].plot(antenna.t, antenna_unfiltered_signal, label='true signal w/ noise', alpha=0.9) + axs[0].plot(antenna.t, antenna_true_signal, label='true signal w/o noise', alpha=0.9) + axs[0].legend() + + axs[1].set_title("Template") + axs[1].set_ylabel("Amplitude") + axs[1].plot(template.t, template.signal, label='orig') + axs[1].plot(template.t + true_time_offset, template.signal, label='true moved orig') + axs[1].legend() + + fig.savefig('figures/11_antenna_signals.pdf') + + if True: # zoom + wx = 100 + x0 = true_time_offset + + old_xlims = axs[0].get_xlim() + axs[0].set_xlim( x0-wx, x0+wx) + fig.savefig('figures/11_antenna_signals_zoom.pdf') + + # restore + axs[0].set_xlim(*old_xlims) + + if False: + plt.close(fig) + + axs2 = None + if True: # upsampled trace + upsampled_trace, upsampled_t = trace_upsampler(template.signal, antenna.signal, template.t, antenna.t) + + if do_plots: # Show upsampled traces + fig2, axs2 = plt.subplots(1, sharex=True) + if not hasattr(axs2, '__len__'): + axs2 = [axs2] + + axs2[-1].set_xlabel("Time [ns]") + axs2[0].set_ylabel("Amplitude") + axs2[0].plot(antenna.t, antenna.signal, marker='o', label='orig') + axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled') + axs2[0].legend(loc='upper right') + + fig2.savefig('figures/11_upsampled.pdf') + + wx = 1e2 + x0 = upsampled_t[0] + wx - 5 + axs2[0].set_xlim(x0-wx, x0+wx) + fig2.savefig('figures/11_upsampled_zoom.pdf') + + if True: + plt.close(fig2) + + # determine correlations with arguments + lag_dt = upsampled_t[1] - upsampled_t[0] + corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template.signal) + + else: # downsampled template + raise NotImplementedError + + corrs, (out1_signal, out2_template, lags) = my_downsampling_correlation(template.signal, antenna.signal, template.t, antenna.t) + lag_dt = upsampled_t[1] - upsampled_t[0] + + # Determine best correlation time + idx = np.argmax(abs(corrs)) + best_sample_lag = lags[idx] + best_time_lag = best_sample_lag * lag_dt + time_residuals[j] = best_time_lag - true_time_offset + + if do_plots and axs2: + axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2) + + # Show the final signals correlated + if do_plots: + fig, axs = plt.subplots(2, sharex=True) + ylabel_kwargs = dict( + #rotation=0, + ha='right', + va='center' + ) + axs[-1].set_xlabel("Time [ns]") + + offset_list = [ + [best_time_lag, dict(label=template.name, color='orange')], + [true_time_offset, dict(label='True offset', color='green')], + ] + + # Signal + i=0 + axs[i].set_ylabel("Amplitude", **ylabel_kwargs) + axs[i].plot(antenna.t, antenna.signal, label=antenna.name) + + # Put template on an twinned axis (magnitudes are different) + ax2 = axs[i].twinx() + for i, offset_args in enumerate(offset_list): + this_kwargs = offset_args[1] + offset = offset_args[0] + + ax2.axvline(offset + len(template.signal) * (template.t[1] - template.t[0]), color=this_kwargs['color'], alpha=0.7) + ax2.axvline(offset, color=this_kwargs['color'], alpha=0.7) + ax2.plot(offset + template.t, template.signal, **this_kwargs) + + # Correlation + i=1 + axs[i].set_ylabel("Correlation", **ylabel_kwargs) + axs[i].plot(lags * lag_dt, corrs) + for i, offset_args in enumerate(offset_list): + this_kwargs = offset_args[1] + offset = offset_args[0] + + axs[i].axvline(offset, ls='--', **this_kwargs) + + if True: # zoom + wx = len(template.signal) * (template.t[1] - template.t[0])/2 + t0 = best_time_lag + + old_xlims = axs[0].get_xlim() + axs[i].set_xlim( x0-wx, x0+3*wx) + fig.savefig('figures/11_corrs_zoom.pdf') + + # restore + axs[i].set_xlim(*old_xlims) + + fig.tight_layout() + fig.legend() + fig.savefig('figures/11_corrs.pdf') + + if False: + plt.close(fig) + + # Make a plot of the time residuals + fig, ax = plt.subplots() + ax.set_title("Template Correlation Lag finding") + ax.set_xlabel("Time Residual [ns]") + ax.set_ylabel("#") + ax.hist(time_residuals, bins='sqrt', density=False) + + ax.legend(title=f"template dt: {template.dt: .1e}ns\nantenna dt: {antenna.dt: .1e}ns") + + fig.savefig("figures/11_time_residual_hist.pdf") plt.show()