From 83811bbd1a2cc8db81526c1fceecd79034fe2f76 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Tue, 11 Apr 2023 14:51:08 +0200 Subject: [PATCH] WIP on Pulse Detection --- lib/util.py | 21 ++- simulations/11_pulsed_timing.py | 242 ++++++++++++++++++++++++++++++++ simulations/lib | 1 + 3 files changed, 261 insertions(+), 3 deletions(-) create mode 100755 simulations/11_pulsed_timing.py create mode 120000 simulations/lib diff --git a/lib/util.py b/lib/util.py index 6f9c7f3..8ab3276 100644 --- a/lib/util.py +++ b/lib/util.py @@ -3,7 +3,10 @@ Various useful utilities (duh) """ import numpy as np -import scipy.fft as ft +try: + import scipy.fft as ft +except ImportError: + import numpy.fft as ft def sampled_time(sample_rate=1, start=0, end=1, offset=0): return offset + np.arange(start, end, 1/sample_rate) @@ -82,7 +85,7 @@ def fft_bandpass(signal, band, samplerate): return ft.irfft(fft, signal.size), (fft, freqs) -def deltapeak(timelength=1e3, samplerate=1, offset=None, peaklength=1): +def deltapeak(timelength=1e3, samplerate=1, offset=None, peaklength=1, rng=None): """ Generate a series of zeroes with a deltapeak. @@ -108,7 +111,19 @@ def deltapeak(timelength=1e3, samplerate=1, offset=None, peaklength=1): offset_min = 0 if offset[0] is None else offset[0] offset_max = N_samples if offset[-1] is None else offset[-1] - offset = (np.random.random(1)*(offset_max - offset_min)+offset_min).astype(int) % N_samples + if 0 < offset_min < 1: + offset_min *= N_samples + if 0 < offset_max < 1: + offset_max *= N_samples + + if rng is not None: + rand = rng.random(1) + else: + rand = np.random.random(1) + + rand = np.asarray(rand) + + offset = (rand * (offset_max - offset_min)+offset_min).astype(int) % N_samples position = (offset + np.arange(0, peaklength)).astype(int) % N_samples diff --git a/simulations/11_pulsed_timing.py b/simulations/11_pulsed_timing.py new file mode 100755 index 0000000..32e07a4 --- /dev/null +++ b/simulations/11_pulsed_timing.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 + + +from lib import util +from scipy import signal, interpolate +import matplotlib.pyplot as plt +import numpy as np + +rng = np.random.default_rng() + +class Waveform: + name = None + time = None + signal = None + + def __init__(signal, time=None, name=None, dt=None): + self.signal = signal + self.time = time + self.name = name + + if self.time is None and dt is not None: + self.time = dt * len(signal) + +def white_noise_realisation(N_samples, noise_sigma=1, rng=rng): + return rng.normal(0, noise_sigma or 0, size=N_samples) + +def antenna_bp(trace, low_bp, high_bp, dt, order=3): + # order < 6 for stability + fs = 1/dt + nyq = 1* fs + low_bp = low_bp / nyq + high_bp = high_bp / nyq + + bp_filter = signal.butter(order, [low_bp, high_bp], 'band', fs=fs) + bandpassed = signal.lfilter(*bp_filter, trace) + + return bandpassed + +def my_correlation(in1, template): + # + in1_long = np.zeros( (len(in1)+2*len(template)) ) + in1_long[len(template):-len(template)] = in1 + + # fill the template with zeros and copy template + template_long = np.zeros_like(in1_long) + template_long[len(template):2*len(template)] = template + + lags = np.arange(-len(template), len(in1) ) - len(template) + + # do the correlation jig + corrs = np.zeros_like(lags, dtype=float) + for i, l in enumerate(lags): + lagged_template = np.roll(template_long, l) + corrs[i] = np.dot(lagged_template, in1_long) + + return corrs, (in1_long, template_long, lags) + +def trace_upsampler(template_signal, trace, template_t, trace_t): + template_dt = template_t[1] - template_t[0] + trace_dt = trace_t[1] - trace_t[0] + + upsample_factor = trace_dt/template_dt + upsampled_trace_N = np.ceil(len(trace) * upsample_factor) + + upsample_factor = int(upsample_factor) + upsampled_trace_N = int(upsampled_trace_N) + + # upsample trace + upsampled_trace = np.zeros(upsampled_trace_N) + upsampled_trace[::upsample_factor] = trace + + #upsampled_t = np.arange(trace_t[0], trace_t[-1], template_dt) + upsampled_t = template_dt * np.arange(len(upsampled_trace)) + trace_t[0] + + return upsampled_trace, upsampled_t + +def template_downsampler(template_signal, trace, template_t, trace_t, offset): + pass + + +if __name__ == "__main__": + import os + import matplotlib + if os.name == 'posix' and "DISPLAY" not in os.environ: + matplotlib.use('Agg') + + template_dt = 5e-2 # ns + bp_freq = (30e-3, 80e-3) # GHz + template_length = 100 # ns + noise_sigma_factor = 1e1 # keep between 10 and 0.1 + + antenna_dt = 2 # ns + antenna_timelength = 2048 # ns + + _deltapeak = util.deltapeak(timelength=template_length, samplerate=1/template_dt, offset=10/template_dt) + template_t = util.sampled_time(1/template_dt, start=0, end=template_length) + template_signal = antenna_bp(_deltapeak[0], *bp_freq, (np.sqrt(antenna_dt/template_dt))*template_dt) # TODO: fix sqrt constant + template_peak_time = template_t[_deltapeak[1]] + + if False: # show template + fig, ax = plt.subplots() + ax.set_title("Deltapeak and Bandpassed Template") + ax.set_xlabel("Time [ns]") + ax.set_ylabel("Amplitude") + ax.plot(template_t, max(template_signal)*_deltapeak[0]) + ax.plot(template_t, template_signal) + fig.savefig('figures/11_template_deltapeak.pdf') + + # receive at antenna + antenna_t = util.sampled_time(1/antenna_dt, start=0, end=antenna_timelength) + antenna_samplelength = len(antenna_t) + + ## place the deltapeak signal at a random location + antenna_true_signal, antenna_peak_location = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna_dt, offset=[0.2, 0.8], rng=rng) + antenna_peak_time = antenna_t[antenna_peak_location] + + if not True: # flip polarisation + antenna_true_signal *= -1 + + ## Add noise + noise_amplitude = max(template_signal) * noise_sigma_factor + noise_realisation = noise_amplitude * white_noise_realisation(len(antenna_true_signal)) + antenna_unfiltered_signal = antenna_true_signal + noise_realisation + antenna_signal = antenna_bp(antenna_unfiltered_signal, *bp_freq, antenna_dt) + + true_time_offset = antenna_peak_time - template_peak_time + + if True: # show signals + fig, axs = plt.subplots(2, sharex=True) + axs[0].set_title("Antenna Waveform") + axs[-1].set_xlabel("Time [ns]") + axs[0].set_ylabel("Amplitude") + axs[0].plot(antenna_t, antenna_signal, label='bandpassed w/ noise') + axs[0].plot(antenna_t, antenna_unfiltered_signal, label='true signal w/ noise') + axs[0].plot(antenna_t, antenna_true_signal, label='true signal w/o noise') + axs[0].legend() + + axs[1].set_title("Template") + axs[1].set_ylabel("Amplitude") + axs[1].plot(template_t, template_signal, label='orig') + axs[1].plot(template_t + true_time_offset, template_signal, label='moved orig') + axs[1].legend() + + fig.savefig('figures/11_antenna_signals.pdf') + + if True: # zoom + wx = 100 + x0 = true_time_offset + + old_xlims = axs[0].get_xlim() + axs[0].set_xlim( x0-wx, x0+wx) + fig.savefig('figures/11_antenna_signals_zoom.pdf') + + # restore + axs[0].set_xlim(*old_xlims) + + if True: # upsampled trace + upsampled_trace, upsampled_t = trace_upsampler(template_signal, antenna_signal, template_t, antenna_t) + + if True: # Show upsampled traces + fig2, axs2 = plt.subplots(1, sharex=True) + if not hasattr(axs2, '__len__'): + axs2 = [axs2] + + axs2[-1].set_xlabel("Time [ns]") + axs2[0].set_ylabel("Amplitude") + axs2[0].plot(antenna_t, antenna_signal, marker='o', label='orig') + axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled') + axs2[0].legend(loc='upper right') + + fig2.savefig('figures/11_upsampled.pdf') + + wx = 1e2 + x0 = upsampled_t[0] + wx - 5 + axs2[0].set_xlim(x0-wx, x0+wx) + fig2.savefig('figures/11_upsampled_zoom.pdf') + + # determine correlations with arguments + lag_dt = upsampled_t[1] - upsampled_t[0] + corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template_signal) + + else: # downsampled template + raise NotImplementedError + + corrs, (out1_signal, out2_signal, lags) = my_downsampling_correlation(template_signal, antenna_signal, template_t, antenna_t) + lag_dt = upsampled_t[1] - upsampled_t[0] + + # Determine best correlation time + idx = np.argmax(abs(corrs)) + best_sample_lag = lags[idx] + best_time_lag = best_sample_lag * lag_dt + if axs2: + axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2) + + # Show the final signals correlated + if True: + fig, axs = plt.subplots(3, sharex=True) + ylabel_kwargs = dict( + rotation=0, + ha='right', + va='center' + ) + axs[-1].set_xlabel("Time [ns]") + + # Signal + i=0 + axs[i].set_ylabel("Signal\nAmplitude", **ylabel_kwargs) + axs[i].plot(antenna_t, antenna_signal) + + # Template + i=1 + axs[i].set_ylabel("Template\nAmplitude", **ylabel_kwargs) + for offset in [0, best_time_lag]: + axs[i].axvline(offset + len(template_signal) * (template_t[1] - template_t[0]), color='g') + axs[i].axvline(offset, color='g') + axs[i].plot(offset + template_t, template_signal) + + + # Correlation + i=2 + axs[i].set_ylabel("Correlation", **ylabel_kwargs) + axs[i].plot(lags * lag_dt, corrs) + axs[i].axvline(best_time_lag, color='r', ls='--') + + if True: # zoom + wx = len(template_signal) * (template_t[1] - template_t[0])/2 + t0 = best_time_lag + + for t in [t0-wx, t0+wx]: + axs[2].axvline(t, color='g') + + fig.tight_layout() + fig.legend() + fig.savefig('figures/11_corrs.pdf') + + # + time_residual = best_time_lag - true_time_offset + + print(time_residual, template_dt, antenna_dt) + + + plt.show() diff --git a/simulations/lib b/simulations/lib new file mode 120000 index 0000000..5bf80bf --- /dev/null +++ b/simulations/lib @@ -0,0 +1 @@ +../lib/ \ No newline at end of file