#!/usr/bin/env python3 from lib import util from scipy import signal, interpolate, stats import matplotlib.pyplot as plt import numpy as np from itertools import zip_longest try: from tqdm import tqdm except: tqdm = lambda x: x rng = np.random.default_rng() class Waveform: name = None signal = None dt = None _t = None def __init__(self,signal=None, dt=None, t=None, name=None): self.signal = signal self.name = name if t is not None: assert len(t) == len(signal) self._t = t self.dt = t[1] - t[0] elif dt is not None: self.dt = dt # Lazy evaluation of time @property def t(self): if self._t is None: return self.dt * np.arange(0, len(self.signal)) return self._t @t.setter def t(self, value): self._t = value @t.deleter def t(self): del self._t def __len__(): return len(self.signal) def white_noise_realisation(N_samples, noise_sigma=1, rng=rng): return rng.normal(0, noise_sigma or 0, size=N_samples) def antenna_bp(trace, low_bp, high_bp, dt, order=3): fs = 1/dt bp_filter = signal.butter(order, [low_bp, high_bp], 'band', fs=fs, output='sos') bandpassed = signal.sosfilt(bp_filter, trace) return bandpassed def my_correlation(in1, template, lags=None): template_length = len(template) in1_length = len(in1) if lags is None: lags = np.arange(-template_length+1, in1_length + 1) # do the correlation jig corrs = np.zeros_like(lags, dtype=float) for i, l in enumerate(lags): if l <= 0: # shorten template at the front in1_start = 0 template_end = template_length template_start = -template_length - l in1_end = max(0, min(in1_length, -template_start)) # 0 =< l + template_length =< in1_lengt elif l > in1_length - template_length: # shorten template from the back in1_end = in1_length template_start = 0 in1_start = min(l, in1_length) template_end = max(0, in1_length - l) else: in1_start = min(l, in1_length) in1_end = min(in1_start + template_length, in1_length) # full template template_start = 0 template_end = template_length # Slice in1 and template in1_slice = in1[in1_start:in1_end] template_slice = template[template_start:template_end] corrs[i] = np.dot(in1_slice, template_slice) return corrs, (in1, template, lags) def trace_upsampler(trace, template_t, trace_t): template_dt = template.t[1] - template.t[0] trace_dt = trace_t[1] - trace_t[0] upsample_factor = trace_dt/template_dt upsampled_trace_N = np.ceil(len(trace) * upsample_factor) upsample_factor = int(upsample_factor) upsampled_trace_N = int(upsampled_trace_N) # upsample trace upsampled_trace = np.zeros(upsampled_trace_N) upsampled_trace[::upsample_factor] = trace #upsampled_t = np.arange(trace_t[0], trace_t[-1], template_dt) upsampled_t = template_dt * np.arange(len(upsampled_trace)) + trace_t[0] return upsampled_trace, upsampled_t def trace_downsampler(trace, template_t, trace_t, offset): pass if __name__ == "__main__": import os import matplotlib import sys if os.name == 'posix' and "DISPLAY" not in os.environ: matplotlib.use('Agg') bp_freq = (30e-3, 80e-3) # GHz template_dt = 5e-2 # ns template_length = 500 # ns noise_sigma_factor = 1e-1 # amplitude factor N_residuals = 50*3 if len(sys.argv) < 2 else int(sys.argv[1]) antenna_dt = 2 # ns antenna_timelength = 2048 # ns # # Create the template # template = Waveform(None, dt=template_dt, name='Template') _deltapeak = util.deltapeak(timelength=template_length, samplerate=1/template.dt, offset=0) template.signal = antenna_bp(_deltapeak[0], *bp_freq, template_dt) template.peak_sample = _deltapeak[1] template.peak_time = template.dt * template.peak_sample interp1d_template = interpolate.interp1d(template.t, template.signal, assume_sorted=True, fill_value=0, bounds_error=False) if True: # show template fig, ax = plt.subplots() ax.set_title("Deltapeak and Bandpassed Template") ax.set_xlabel("Time [ns]") ax.set_ylabel("Amplitude") ax.plot(template.t, max(template.signal)*_deltapeak[0], label='Impulse Template') ax.plot(template.t, template.signal, label='Filtered Template') fig.savefig('figures/11_template_deltapeak.pdf') if True: # show filtering equivalence samplerates _deltapeak = util.deltapeak(timelength=template_length, samplerate=1/antenna_dt, offset=0) _time = util.sampled_time(end=template_length, sample_rate=1/antenna_dt) _bandpassed = antenna_bp(_deltapeak[0], *bp_freq, antenna_dt) ax.plot(_time, max(_bandpassed)*_deltapeak[0], label='Impulse Antenna') ax.plot(_time, _bandpassed, label='Filtered Antenna') ax.legend() fig.savefig('figures/11_template_deltapeak+antenna.pdf') if True: plt.close(fig) # # Find difference between true and templated times # time_residuals = np.zeros(N_residuals) for j in tqdm(range(N_residuals)): do_plots = j==0 # receive at antenna ## place the deltapeak signal at a random location antenna = Waveform(None, dt=antenna_dt, name='Signal') if not True: # Create antenna trace without template antenna_true_signal, antenna_peak_sample = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna.dt, offset=[0.2, 0.8], rng=rng) antenna.peak_sample = antenna_peak_sample antenna.peak_time = antenna.dt * antenna.peak_sample antenna.signal = antenna_bp(antenna.signal, *bp_freq, antenna.dt) else: # Sample the template at some offset antenna.peak_time = antenna_timelength * ((0.8 - 0.2) *rng.random(1) + 0.2) sampling_offset = rng.random(1)*antenna.dt antenna.t = util.sampled_time(1/antenna.dt, start=0, end=antenna_timelength) antenna.signal = interp1d_template(antenna.t - antenna.peak_time) antenna.peak_sample = antenna.peak_time/antenna.dt antenna_true_signal = antenna.signal true_time_offset = antenna.peak_time - template.peak_time if do_plots: print(f"Antenna Peak Time: {antenna.peak_time}") print(f"Antenna Peak Sample: {antenna.peak_sample}") if False: # flip polarisation antenna.signal *= -1 ## Add noise noise_amplitude = max(template.signal) * noise_sigma_factor noise_realisation = noise_amplitude * white_noise_realisation(len(antenna.signal)) filtered_noise = antenna_bp(noise_realisation, *bp_freq, antenna.dt) antenna.signal += filtered_noise if do_plots: # show signals fig, axs = plt.subplots(2, sharex=True) axs[0].set_title("Antenna Waveform") axs[-1].set_xlabel("Time [ns]") axs[0].set_ylabel("Amplitude") axs[0].plot(antenna.t, antenna.signal, label='bandpassed w/ noise', alpha=0.9) axs[0].plot(antenna.t, antenna.signal - filtered_noise, label='bandpassed w/o noise', alpha=0.9) axs[0].legend() axs[1].set_title("Template") axs[1].set_ylabel("Amplitude") axs[1].plot(template.t, template.signal, label='orig') axs[1].plot(template.t + true_time_offset, template.signal, label='true moved orig') axs[1].legend() axs[0].grid() axs[1].grid() fig.savefig('figures/11_antenna_signals.pdf') if True: # zoom wx = 100 x0 = true_time_offset old_xlims = axs[0].get_xlim() axs[0].set_xlim( x0-wx, x0+wx) fig.savefig('figures/11_antenna_signals_zoom.pdf') # restore axs[0].set_xlim(*old_xlims) if False: plt.close(fig) axs2 = None if True: # upsampled trace upsampled_trace, upsampled_t = trace_upsampler(antenna.signal, template.t, antenna.t) if do_plots: # Show upsampled traces fig2, axs2 = plt.subplots(1, sharex=True) if not hasattr(axs2, '__len__'): axs2 = [axs2] axs2[-1].set_xlabel("Time [ns]") axs2[0].set_ylabel("Amplitude") axs2[0].plot(antenna.t, antenna.signal, marker='o', label='orig') axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled') axs2[0].legend(loc='upper right') fig2.savefig('figures/11_upsampled.pdf') wx = 1e2 x0 = upsampled_t[0] + wx - 5 axs2[0].set_xlim(x0-wx, x0+wx) fig2.savefig('figures/11_upsampled_zoom.pdf') if True: plt.close(fig2) # determine correlations with arguments lag_dt = upsampled_t[1] - upsampled_t[0] corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template.signal) else: # downsampled template raise NotImplementedError corrs, (out1_signal, out2_template, lags) = my_downsampling_correlation(template.signal, antenna.signal, template.t, antenna.t) lag_dt = upsampled_t[1] - upsampled_t[0] # Determine best correlation time idx = np.argmax(abs(corrs)) best_sample_lag = lags[idx] best_time_lag = best_sample_lag * lag_dt time_residuals[j] = best_time_lag - true_time_offset if not do_plots: continue if do_plots and axs2: axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2) axs2[-1].axvline(true_time_offset, color='g', alpha=0.5, linewidth=2) # Show the final signals correlated if do_plots: # amplitude scaling required for single axis plotting template_amp_scaler = max(abs(template.signal)) / max(abs(antenna.signal)) # start the figure fig, axs = plt.subplots(2, sharex=True) ylabel_kwargs = dict( #rotation=0, ha='right', va='center' ) axs[-1].set_xlabel("Time [ns]") offset_list = [ [best_time_lag, dict(label=template.name, color='orange')], [true_time_offset, dict(label='True offset', color='green')], ] # Signal i=0 axs[i].set_ylabel("Amplitude", **ylabel_kwargs) axs[i].plot(antenna.t, antenna.signal, label=antenna.name) # Plot the template for offset_args in offset_list: this_kwargs = offset_args[1] offset = offset_args[0] l = axs[i].plot(offset + template.t, template_amp_scaler * template.signal, **this_kwargs) axs[i].legend() # Correlation i=1 axs[i].set_ylabel("Correlation", **ylabel_kwargs) axs[i].plot(lags * lag_dt, corrs) # Lines across both axes for offset_args in offset_list: this_kwargs = offset_args[1] offset = offset_args[0] for i in [0,1]: axs[i].axvline(offset, ls='--', color=this_kwargs['color'], alpha=0.7) axs[0].axvline(offset + len(template.signal) * (template.t[1] - template.t[0]), color=this_kwargs['color'], alpha=0.7) if True: # zoom wx = len(template.signal) * (template.dt)/2 t0 = best_time_lag old_xlims = axs[0].get_xlim() axs[i].set_xlim( x0-wx, x0+3*wx) fig.savefig('figures/11_corrs_zoom.pdf') # restore axs[i].set_xlim(*old_xlims) fig.tight_layout() fig.savefig('figures/11_corrs.pdf') if False: plt.close(fig) # Make a plot of the time residuals if len(time_residuals) > 1: hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step') fig, ax = plt.subplots() ax.set_title( "Template Correlation Lag finding" + f"\n template dt: {template.dt*1e3: .1e}ps" + f"; antenna dt: {antenna.dt: .1e}ns" + f"; noise_factor: {noise_sigma_factor: .1e}" ) ax.set_xlabel("Time Residual [ns]") ax.set_ylabel("#") counts, bins, _patches = ax.hist(time_residuals, **hist_kwargs) if True: # fit gaussian to histogram min_x = min(time_residuals) max_x = max(time_residuals) dx = bins[1] - bins[0] scale = len(time_residuals) * dx xs = np.linspace(min_x, max_x) # do the fit name = "Norm" param_names = [ "$\\mu$", "$\\sigma$" ] distr_func = stats.norm label = name +"(" + ','.join(param_names) + ')' # plot fit_params = distr_func.fit(time_residuals) fit_ys = scale * distr_func.pdf(xs, *fit_params) ax.plot(xs, fit_ys, label=label) # chisq ct = np.diff(distr_func.cdf(bins, *fit_params))*np.sum(counts) if True: ct *= np.sum(counts)/np.sum(ct) c2t = stats.chisquare(counts, ct, ddof=len(fit_params)) chisq_strs = [ f"$\\chi^2$/dof = {c2t[0]: .2g}/{len(fit_params)}" ] # text on plot text_str = "\n".join( [label] + [ f"{param} = {value: .2e}" for param, value in zip_longest(param_names, fit_params, fillvalue='?') ] + chisq_strs ) ax.text( *(0.02, 0.95), text_str, fontsize=12, ha='left', va='top', transform=ax.transAxes) fig.savefig("figures/11_time_residual_hist.pdf") plt.show()