mirror of
https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
synced 2025-01-22 09:13:32 +01:00
Simple Pulse finding using Template Correlation
This commit is contained in:
parent
465b78c535
commit
04858ea01a
1 changed files with 224 additions and 144 deletions
|
@ -10,16 +10,40 @@ rng = np.random.default_rng()
|
|||
|
||||
class Waveform:
|
||||
name = None
|
||||
time = None
|
||||
signal = None
|
||||
dt = None
|
||||
|
||||
def __init__(signal, time=None, name=None, dt=None):
|
||||
_t = None
|
||||
|
||||
def __init__(self,signal=None, dt=None, t=None, name=None):
|
||||
self.signal = signal
|
||||
self.time = time
|
||||
self.name = name
|
||||
|
||||
if self.time is None and dt is not None:
|
||||
self.time = dt * len(signal)
|
||||
if t is not None:
|
||||
assert len(t) == len(signal)
|
||||
|
||||
self._t = t
|
||||
self.dt = t[1] - t[0]
|
||||
elif dt is not None:
|
||||
self.dt = dt
|
||||
|
||||
# Lazy evaluation of time
|
||||
@property
|
||||
def t(self):
|
||||
if self._t is None:
|
||||
return self.dt * np.arange(0, len(self.signal))
|
||||
return self._t
|
||||
|
||||
@t.setter
|
||||
def t(self, value):
|
||||
self._t = value
|
||||
|
||||
@t.deleter
|
||||
def t(self):
|
||||
del self._t
|
||||
|
||||
def __len__():
|
||||
return len(self.signal)
|
||||
|
||||
def white_noise_realisation(N_samples, noise_sigma=1, rng=rng):
|
||||
return rng.normal(0, noise_sigma or 0, size=N_samples)
|
||||
|
@ -37,7 +61,7 @@ def antenna_bp(trace, low_bp, high_bp, dt, order=3):
|
|||
return bandpassed
|
||||
|
||||
def my_correlation(in1, template):
|
||||
#
|
||||
#
|
||||
in1_long = np.zeros( (len(in1)+2*len(template)) )
|
||||
in1_long[len(template):-len(template)] = in1
|
||||
|
||||
|
@ -46,7 +70,7 @@ def my_correlation(in1, template):
|
|||
template_long[len(template):2*len(template)] = template
|
||||
|
||||
lags = np.arange(-len(template), len(in1) ) - len(template)
|
||||
|
||||
|
||||
# do the correlation jig
|
||||
corrs = np.zeros_like(lags, dtype=float)
|
||||
for i, l in enumerate(lags):
|
||||
|
@ -56,7 +80,7 @@ def my_correlation(in1, template):
|
|||
return corrs, (in1_long, template_long, lags)
|
||||
|
||||
def trace_upsampler(template_signal, trace, template_t, trace_t):
|
||||
template_dt = template_t[1] - template_t[0]
|
||||
template_dt = template.t[1] - template.t[0]
|
||||
trace_dt = trace_t[1] - trace_t[0]
|
||||
|
||||
upsample_factor = trace_dt/template_dt
|
||||
|
@ -88,155 +112,211 @@ if __name__ == "__main__":
|
|||
bp_freq = (30e-3, 80e-3) # GHz
|
||||
template_length = 100 # ns
|
||||
noise_sigma_factor = 1e1 # keep between 10 and 0.1
|
||||
|
||||
|
||||
N_residuals = 50*3
|
||||
|
||||
antenna_dt = 2 # ns
|
||||
antenna_timelength = 2048 # ns
|
||||
|
||||
_deltapeak = util.deltapeak(timelength=template_length, samplerate=1/template_dt, offset=10/template_dt)
|
||||
template_t = util.sampled_time(1/template_dt, start=0, end=template_length)
|
||||
template_signal = antenna_bp(_deltapeak[0], *bp_freq, (np.sqrt(antenna_dt/template_dt))*template_dt) # TODO: fix sqrt constant
|
||||
template_peak_time = template_t[_deltapeak[1]]
|
||||
#
|
||||
# Create the template
|
||||
#
|
||||
template = Waveform(None, dt=template_dt, name='Template')
|
||||
_deltapeak = util.deltapeak(timelength=template_length, samplerate=1/template.dt, offset=10/template.dt)
|
||||
template.signal = antenna_bp(_deltapeak[0], *bp_freq, (np.sqrt(antenna_dt/template_dt))*template_dt) # TODO: fix sqrt constant
|
||||
template.peak_sample = _deltapeak[1]
|
||||
template.peak_time = template.dt * template.peak_sample
|
||||
|
||||
if False: # show template
|
||||
|
||||
if True: # show template
|
||||
fig, ax = plt.subplots()
|
||||
ax.set_title("Deltapeak and Bandpassed Template")
|
||||
ax.set_xlabel("Time [ns]")
|
||||
ax.set_ylabel("Amplitude")
|
||||
ax.plot(template_t, max(template_signal)*_deltapeak[0])
|
||||
ax.plot(template_t, template_signal)
|
||||
ax.plot(template.t, max(template.signal)*_deltapeak[0])
|
||||
ax.plot(template.t, template.signal)
|
||||
fig.savefig('figures/11_template_deltapeak.pdf')
|
||||
|
||||
# receive at antenna
|
||||
antenna_t = util.sampled_time(1/antenna_dt, start=0, end=antenna_timelength)
|
||||
antenna_samplelength = len(antenna_t)
|
||||
|
||||
## place the deltapeak signal at a random location
|
||||
antenna_true_signal, antenna_peak_location = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna_dt, offset=[0.2, 0.8], rng=rng)
|
||||
antenna_peak_time = antenna_t[antenna_peak_location]
|
||||
|
||||
if not True: # flip polarisation
|
||||
antenna_true_signal *= -1
|
||||
|
||||
## Add noise
|
||||
noise_amplitude = max(template_signal) * noise_sigma_factor
|
||||
noise_realisation = noise_amplitude * white_noise_realisation(len(antenna_true_signal))
|
||||
antenna_unfiltered_signal = antenna_true_signal + noise_realisation
|
||||
antenna_signal = antenna_bp(antenna_unfiltered_signal, *bp_freq, antenna_dt)
|
||||
|
||||
true_time_offset = antenna_peak_time - template_peak_time
|
||||
|
||||
if True: # show signals
|
||||
fig, axs = plt.subplots(2, sharex=True)
|
||||
axs[0].set_title("Antenna Waveform")
|
||||
axs[-1].set_xlabel("Time [ns]")
|
||||
axs[0].set_ylabel("Amplitude")
|
||||
axs[0].plot(antenna_t, antenna_signal, label='bandpassed w/ noise')
|
||||
axs[0].plot(antenna_t, antenna_unfiltered_signal, label='true signal w/ noise')
|
||||
axs[0].plot(antenna_t, antenna_true_signal, label='true signal w/o noise')
|
||||
axs[0].legend()
|
||||
|
||||
axs[1].set_title("Template")
|
||||
axs[1].set_ylabel("Amplitude")
|
||||
axs[1].plot(template_t, template_signal, label='orig')
|
||||
axs[1].plot(template_t + true_time_offset, template_signal, label='moved orig')
|
||||
axs[1].legend()
|
||||
|
||||
fig.savefig('figures/11_antenna_signals.pdf')
|
||||
|
||||
if True: # zoom
|
||||
wx = 100
|
||||
x0 = true_time_offset
|
||||
|
||||
old_xlims = axs[0].get_xlim()
|
||||
axs[0].set_xlim( x0-wx, x0+wx)
|
||||
fig.savefig('figures/11_antenna_signals_zoom.pdf')
|
||||
|
||||
# restore
|
||||
axs[0].set_xlim(*old_xlims)
|
||||
|
||||
if True: # upsampled trace
|
||||
upsampled_trace, upsampled_t = trace_upsampler(template_signal, antenna_signal, template_t, antenna_t)
|
||||
|
||||
if True: # Show upsampled traces
|
||||
fig2, axs2 = plt.subplots(1, sharex=True)
|
||||
if not hasattr(axs2, '__len__'):
|
||||
axs2 = [axs2]
|
||||
|
||||
axs2[-1].set_xlabel("Time [ns]")
|
||||
axs2[0].set_ylabel("Amplitude")
|
||||
axs2[0].plot(antenna_t, antenna_signal, marker='o', label='orig')
|
||||
axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled')
|
||||
axs2[0].legend(loc='upper right')
|
||||
|
||||
fig2.savefig('figures/11_upsampled.pdf')
|
||||
|
||||
wx = 1e2
|
||||
x0 = upsampled_t[0] + wx - 5
|
||||
axs2[0].set_xlim(x0-wx, x0+wx)
|
||||
fig2.savefig('figures/11_upsampled_zoom.pdf')
|
||||
|
||||
# determine correlations with arguments
|
||||
lag_dt = upsampled_t[1] - upsampled_t[0]
|
||||
corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template_signal)
|
||||
|
||||
else: # downsampled template
|
||||
raise NotImplementedError
|
||||
|
||||
corrs, (out1_signal, out2_signal, lags) = my_downsampling_correlation(template_signal, antenna_signal, template_t, antenna_t)
|
||||
lag_dt = upsampled_t[1] - upsampled_t[0]
|
||||
|
||||
# Determine best correlation time
|
||||
idx = np.argmax(abs(corrs))
|
||||
best_sample_lag = lags[idx]
|
||||
best_time_lag = best_sample_lag * lag_dt
|
||||
if axs2:
|
||||
axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2)
|
||||
|
||||
# Show the final signals correlated
|
||||
if True:
|
||||
fig, axs = plt.subplots(3, sharex=True)
|
||||
ylabel_kwargs = dict(
|
||||
rotation=0,
|
||||
ha='right',
|
||||
va='center'
|
||||
)
|
||||
axs[-1].set_xlabel("Time [ns]")
|
||||
|
||||
# Signal
|
||||
i=0
|
||||
axs[i].set_ylabel("Signal\nAmplitude", **ylabel_kwargs)
|
||||
axs[i].plot(antenna_t, antenna_signal)
|
||||
|
||||
# Template
|
||||
i=1
|
||||
axs[i].set_ylabel("Template\nAmplitude", **ylabel_kwargs)
|
||||
for offset in [0, best_time_lag]:
|
||||
axs[i].axvline(offset + len(template_signal) * (template_t[1] - template_t[0]), color='g')
|
||||
axs[i].axvline(offset, color='g')
|
||||
axs[i].plot(offset + template_t, template_signal)
|
||||
|
||||
|
||||
# Correlation
|
||||
i=2
|
||||
axs[i].set_ylabel("Correlation", **ylabel_kwargs)
|
||||
axs[i].plot(lags * lag_dt, corrs)
|
||||
axs[i].axvline(best_time_lag, color='r', ls='--')
|
||||
|
||||
if True: # zoom
|
||||
wx = len(template_signal) * (template_t[1] - template_t[0])/2
|
||||
t0 = best_time_lag
|
||||
|
||||
for t in [t0-wx, t0+wx]:
|
||||
axs[2].axvline(t, color='g')
|
||||
|
||||
fig.tight_layout()
|
||||
fig.legend()
|
||||
fig.savefig('figures/11_corrs.pdf')
|
||||
if True:
|
||||
plt.close(fig)
|
||||
|
||||
#
|
||||
time_residual = best_time_lag - true_time_offset
|
||||
# Find difference between true and templated times
|
||||
#
|
||||
time_residuals = np.zeros(N_residuals)
|
||||
for j in range(N_residuals):
|
||||
do_plots = j==0
|
||||
|
||||
print(time_residual, template_dt, antenna_dt)
|
||||
# receive at antenna
|
||||
## place the deltapeak signal at a random location
|
||||
antenna = Waveform(None, dt=antenna_dt, name='Signal')
|
||||
antenna_true_signal, antenna_peak_sample = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna.dt, offset=[0.2, 0.8], rng=rng)
|
||||
antenna.signal = antenna_true_signal
|
||||
antenna.peak_sample = antenna_peak_sample
|
||||
antenna.peak_time = antenna.dt * antenna.peak_sample
|
||||
|
||||
if do_plots:
|
||||
print(f"Antenna Peak Time: {antenna.peak_time}")
|
||||
print(f"Antenna Peak Sample: {antenna.peak_sample}")
|
||||
|
||||
if False: # bandpass when emitting the signal
|
||||
antenna.signal = antenna_bp(antenna.signal, *bp_freq, antenna.dt)
|
||||
|
||||
if False: # flip polarisation
|
||||
antenna.signal *= -1
|
||||
|
||||
## Add noise
|
||||
noise_amplitude = max(template.signal) * noise_sigma_factor
|
||||
noise_realisation = noise_amplitude * white_noise_realisation(len(antenna.signal))
|
||||
|
||||
antenna_unfiltered_signal = antenna.signal + noise_realisation
|
||||
antenna.signal = antenna_bp(antenna_unfiltered_signal, *bp_freq, antenna.dt)
|
||||
|
||||
true_time_offset = antenna.peak_time - template.peak_time
|
||||
|
||||
if do_plots: # show signals
|
||||
fig, axs = plt.subplots(2, sharex=True)
|
||||
axs[0].set_title("Antenna Waveform")
|
||||
axs[-1].set_xlabel("Time [ns]")
|
||||
axs[0].set_ylabel("Amplitude")
|
||||
axs[0].plot(antenna.t, antenna.signal, label='bandpassed w/ noise', alpha=0.9)
|
||||
axs[0].plot(antenna.t, antenna_unfiltered_signal, label='true signal w/ noise', alpha=0.9)
|
||||
axs[0].plot(antenna.t, antenna_true_signal, label='true signal w/o noise', alpha=0.9)
|
||||
axs[0].legend()
|
||||
|
||||
axs[1].set_title("Template")
|
||||
axs[1].set_ylabel("Amplitude")
|
||||
axs[1].plot(template.t, template.signal, label='orig')
|
||||
axs[1].plot(template.t + true_time_offset, template.signal, label='true moved orig')
|
||||
axs[1].legend()
|
||||
|
||||
fig.savefig('figures/11_antenna_signals.pdf')
|
||||
|
||||
if True: # zoom
|
||||
wx = 100
|
||||
x0 = true_time_offset
|
||||
|
||||
old_xlims = axs[0].get_xlim()
|
||||
axs[0].set_xlim( x0-wx, x0+wx)
|
||||
fig.savefig('figures/11_antenna_signals_zoom.pdf')
|
||||
|
||||
# restore
|
||||
axs[0].set_xlim(*old_xlims)
|
||||
|
||||
if False:
|
||||
plt.close(fig)
|
||||
|
||||
axs2 = None
|
||||
if True: # upsampled trace
|
||||
upsampled_trace, upsampled_t = trace_upsampler(template.signal, antenna.signal, template.t, antenna.t)
|
||||
|
||||
if do_plots: # Show upsampled traces
|
||||
fig2, axs2 = plt.subplots(1, sharex=True)
|
||||
if not hasattr(axs2, '__len__'):
|
||||
axs2 = [axs2]
|
||||
|
||||
axs2[-1].set_xlabel("Time [ns]")
|
||||
axs2[0].set_ylabel("Amplitude")
|
||||
axs2[0].plot(antenna.t, antenna.signal, marker='o', label='orig')
|
||||
axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled')
|
||||
axs2[0].legend(loc='upper right')
|
||||
|
||||
fig2.savefig('figures/11_upsampled.pdf')
|
||||
|
||||
wx = 1e2
|
||||
x0 = upsampled_t[0] + wx - 5
|
||||
axs2[0].set_xlim(x0-wx, x0+wx)
|
||||
fig2.savefig('figures/11_upsampled_zoom.pdf')
|
||||
|
||||
if True:
|
||||
plt.close(fig2)
|
||||
|
||||
# determine correlations with arguments
|
||||
lag_dt = upsampled_t[1] - upsampled_t[0]
|
||||
corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template.signal)
|
||||
|
||||
else: # downsampled template
|
||||
raise NotImplementedError
|
||||
|
||||
corrs, (out1_signal, out2_template, lags) = my_downsampling_correlation(template.signal, antenna.signal, template.t, antenna.t)
|
||||
lag_dt = upsampled_t[1] - upsampled_t[0]
|
||||
|
||||
# Determine best correlation time
|
||||
idx = np.argmax(abs(corrs))
|
||||
best_sample_lag = lags[idx]
|
||||
best_time_lag = best_sample_lag * lag_dt
|
||||
time_residuals[j] = best_time_lag - true_time_offset
|
||||
|
||||
if do_plots and axs2:
|
||||
axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2)
|
||||
|
||||
# Show the final signals correlated
|
||||
if do_plots:
|
||||
fig, axs = plt.subplots(2, sharex=True)
|
||||
ylabel_kwargs = dict(
|
||||
#rotation=0,
|
||||
ha='right',
|
||||
va='center'
|
||||
)
|
||||
axs[-1].set_xlabel("Time [ns]")
|
||||
|
||||
offset_list = [
|
||||
[best_time_lag, dict(label=template.name, color='orange')],
|
||||
[true_time_offset, dict(label='True offset', color='green')],
|
||||
]
|
||||
|
||||
# Signal
|
||||
i=0
|
||||
axs[i].set_ylabel("Amplitude", **ylabel_kwargs)
|
||||
axs[i].plot(antenna.t, antenna.signal, label=antenna.name)
|
||||
|
||||
# Put template on an twinned axis (magnitudes are different)
|
||||
ax2 = axs[i].twinx()
|
||||
for i, offset_args in enumerate(offset_list):
|
||||
this_kwargs = offset_args[1]
|
||||
offset = offset_args[0]
|
||||
|
||||
ax2.axvline(offset + len(template.signal) * (template.t[1] - template.t[0]), color=this_kwargs['color'], alpha=0.7)
|
||||
ax2.axvline(offset, color=this_kwargs['color'], alpha=0.7)
|
||||
ax2.plot(offset + template.t, template.signal, **this_kwargs)
|
||||
|
||||
# Correlation
|
||||
i=1
|
||||
axs[i].set_ylabel("Correlation", **ylabel_kwargs)
|
||||
axs[i].plot(lags * lag_dt, corrs)
|
||||
for i, offset_args in enumerate(offset_list):
|
||||
this_kwargs = offset_args[1]
|
||||
offset = offset_args[0]
|
||||
|
||||
axs[i].axvline(offset, ls='--', **this_kwargs)
|
||||
|
||||
if True: # zoom
|
||||
wx = len(template.signal) * (template.t[1] - template.t[0])/2
|
||||
t0 = best_time_lag
|
||||
|
||||
old_xlims = axs[0].get_xlim()
|
||||
axs[i].set_xlim( x0-wx, x0+3*wx)
|
||||
fig.savefig('figures/11_corrs_zoom.pdf')
|
||||
|
||||
# restore
|
||||
axs[i].set_xlim(*old_xlims)
|
||||
|
||||
fig.tight_layout()
|
||||
fig.legend()
|
||||
fig.savefig('figures/11_corrs.pdf')
|
||||
|
||||
if False:
|
||||
plt.close(fig)
|
||||
|
||||
# Make a plot of the time residuals
|
||||
fig, ax = plt.subplots()
|
||||
ax.set_title("Template Correlation Lag finding")
|
||||
ax.set_xlabel("Time Residual [ns]")
|
||||
ax.set_ylabel("#")
|
||||
ax.hist(time_residuals, bins='sqrt', density=False)
|
||||
|
||||
ax.legend(title=f"template dt: {template.dt: .1e}ns\nantenna dt: {antenna.dt: .1e}ns")
|
||||
|
||||
fig.savefig("figures/11_time_residual_hist.pdf")
|
||||
|
||||
plt.show()
|
||||
|
|
Loading…
Reference in a new issue