mirror of
				https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
				synced 2025-10-31 03:46:44 +01:00 
			
		
		
		
	WIP on Pulse Detection
This commit is contained in:
		
							parent
							
								
									5ea4a0df17
								
							
						
					
					
						commit
						83811bbd1a
					
				
					 3 changed files with 261 additions and 3 deletions
				
			
		
							
								
								
									
										21
									
								
								lib/util.py
									
										
									
									
									
								
							
							
						
						
									
										21
									
								
								lib/util.py
									
										
									
									
									
								
							|  | @ -3,7 +3,10 @@ Various useful utilities (duh) | |||
| """ | ||||
| 
 | ||||
| import numpy as np | ||||
| import scipy.fft as ft | ||||
| try: | ||||
|     import scipy.fft as ft | ||||
| except ImportError: | ||||
|     import numpy.fft as ft | ||||
| 
 | ||||
| def sampled_time(sample_rate=1, start=0, end=1, offset=0): | ||||
|     return offset + np.arange(start, end, 1/sample_rate) | ||||
|  | @ -82,7 +85,7 @@ def fft_bandpass(signal, band, samplerate): | |||
| 
 | ||||
|     return ft.irfft(fft, signal.size), (fft, freqs) | ||||
| 
 | ||||
| def deltapeak(timelength=1e3, samplerate=1, offset=None, peaklength=1): | ||||
| def deltapeak(timelength=1e3, samplerate=1, offset=None, peaklength=1, rng=None): | ||||
|     """ | ||||
|     Generate a series of zeroes with a deltapeak. | ||||
| 
 | ||||
|  | @ -108,7 +111,19 @@ def deltapeak(timelength=1e3, samplerate=1, offset=None, peaklength=1): | |||
|         offset_min = 0 if offset[0] is None else offset[0] | ||||
|         offset_max = N_samples if offset[-1] is None else offset[-1] | ||||
| 
 | ||||
|         offset = (np.random.random(1)*(offset_max - offset_min)+offset_min).astype(int) % N_samples | ||||
|         if 0 < offset_min < 1: | ||||
|             offset_min *= N_samples | ||||
|         if 0 < offset_max < 1: | ||||
|             offset_max *= N_samples | ||||
| 
 | ||||
|         if rng is not None: | ||||
|             rand = rng.random(1) | ||||
|         else: | ||||
|             rand = np.random.random(1) | ||||
| 
 | ||||
|         rand = np.asarray(rand) | ||||
| 
 | ||||
|         offset = (rand * (offset_max - offset_min)+offset_min).astype(int) % N_samples | ||||
| 
 | ||||
|     position = (offset + np.arange(0, peaklength)).astype(int) % N_samples | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										242
									
								
								simulations/11_pulsed_timing.py
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										242
									
								
								simulations/11_pulsed_timing.py
									
										
									
									
									
										Executable file
									
								
							|  | @ -0,0 +1,242 @@ | |||
| #!/usr/bin/env python3 | ||||
| 
 | ||||
| 
 | ||||
| from lib import util | ||||
| from scipy import signal, interpolate | ||||
| import matplotlib.pyplot as plt | ||||
| import numpy as np | ||||
| 
 | ||||
| rng = np.random.default_rng() | ||||
| 
 | ||||
| class Waveform: | ||||
|     name = None | ||||
|     time = None | ||||
|     signal = None | ||||
| 
 | ||||
|     def __init__(signal, time=None, name=None, dt=None): | ||||
|         self.signal = signal | ||||
|         self.time = time | ||||
|         self.name = name | ||||
| 
 | ||||
|         if self.time is None and dt is not None: | ||||
|             self.time = dt * len(signal) | ||||
| 
 | ||||
| def white_noise_realisation(N_samples, noise_sigma=1, rng=rng): | ||||
|     return rng.normal(0, noise_sigma or 0, size=N_samples) | ||||
| 
 | ||||
| def antenna_bp(trace, low_bp, high_bp, dt, order=3): | ||||
|     # order < 6 for stability | ||||
|     fs = 1/dt | ||||
|     nyq = 1* fs | ||||
|     low_bp = low_bp / nyq | ||||
|     high_bp = high_bp / nyq | ||||
| 
 | ||||
|     bp_filter = signal.butter(order, [low_bp, high_bp], 'band', fs=fs) | ||||
|     bandpassed = signal.lfilter(*bp_filter, trace) | ||||
| 
 | ||||
|     return bandpassed | ||||
| 
 | ||||
| def my_correlation(in1, template): | ||||
|     #  | ||||
|     in1_long = np.zeros( (len(in1)+2*len(template)) ) | ||||
|     in1_long[len(template):-len(template)] = in1 | ||||
| 
 | ||||
|     # fill the template with zeros and copy template | ||||
|     template_long = np.zeros_like(in1_long) | ||||
|     template_long[len(template):2*len(template)] = template | ||||
| 
 | ||||
|     lags = np.arange(-len(template), len(in1) ) - len(template) | ||||
|     | ||||
|     # do the correlation jig | ||||
|     corrs = np.zeros_like(lags, dtype=float) | ||||
|     for i, l in enumerate(lags): | ||||
|         lagged_template = np.roll(template_long, l) | ||||
|         corrs[i] = np.dot(lagged_template, in1_long) | ||||
| 
 | ||||
|     return corrs, (in1_long, template_long, lags) | ||||
| 
 | ||||
| def trace_upsampler(template_signal, trace, template_t, trace_t): | ||||
|     template_dt = template_t[1] - template_t[0] | ||||
|     trace_dt = trace_t[1] - trace_t[0] | ||||
| 
 | ||||
|     upsample_factor = trace_dt/template_dt | ||||
|     upsampled_trace_N = np.ceil(len(trace) * upsample_factor) | ||||
| 
 | ||||
|     upsample_factor = int(upsample_factor) | ||||
|     upsampled_trace_N = int(upsampled_trace_N) | ||||
| 
 | ||||
|     # upsample trace | ||||
|     upsampled_trace = np.zeros(upsampled_trace_N) | ||||
|     upsampled_trace[::upsample_factor] = trace | ||||
| 
 | ||||
|     #upsampled_t = np.arange(trace_t[0], trace_t[-1], template_dt) | ||||
|     upsampled_t = template_dt * np.arange(len(upsampled_trace)) + trace_t[0] | ||||
| 
 | ||||
|     return upsampled_trace, upsampled_t | ||||
| 
 | ||||
| def template_downsampler(template_signal, trace, template_t, trace_t, offset): | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     import os | ||||
|     import matplotlib | ||||
|     if os.name == 'posix' and "DISPLAY" not in os.environ: | ||||
|         matplotlib.use('Agg') | ||||
| 
 | ||||
|     template_dt = 5e-2 # ns | ||||
|     bp_freq = (30e-3, 80e-3) # GHz | ||||
|     template_length = 100 # ns | ||||
|     noise_sigma_factor = 1e1 # keep between 10 and 0.1 | ||||
|      | ||||
|     antenna_dt = 2 # ns | ||||
|     antenna_timelength = 2048 # ns | ||||
| 
 | ||||
|     _deltapeak = util.deltapeak(timelength=template_length, samplerate=1/template_dt, offset=10/template_dt) | ||||
|     template_t = util.sampled_time(1/template_dt, start=0, end=template_length) | ||||
|     template_signal = antenna_bp(_deltapeak[0], *bp_freq, (np.sqrt(antenna_dt/template_dt))*template_dt) # TODO: fix sqrt constant | ||||
|     template_peak_time = template_t[_deltapeak[1]] | ||||
| 
 | ||||
|     if False: # show template | ||||
|         fig, ax = plt.subplots() | ||||
|         ax.set_title("Deltapeak and Bandpassed Template") | ||||
|         ax.set_xlabel("Time [ns]") | ||||
|         ax.set_ylabel("Amplitude") | ||||
|         ax.plot(template_t, max(template_signal)*_deltapeak[0]) | ||||
|         ax.plot(template_t, template_signal) | ||||
|         fig.savefig('figures/11_template_deltapeak.pdf') | ||||
| 
 | ||||
|     # receive at antenna | ||||
|     antenna_t = util.sampled_time(1/antenna_dt, start=0, end=antenna_timelength) | ||||
|     antenna_samplelength = len(antenna_t) | ||||
| 
 | ||||
|     ## place the deltapeak signal at a random location | ||||
|     antenna_true_signal, antenna_peak_location = util.deltapeak(timelength=antenna_timelength, samplerate=1/antenna_dt, offset=[0.2, 0.8], rng=rng) | ||||
|     antenna_peak_time = antenna_t[antenna_peak_location] | ||||
| 
 | ||||
|     if not True: # flip polarisation | ||||
|         antenna_true_signal *= -1 | ||||
| 
 | ||||
|     ## Add noise | ||||
|     noise_amplitude = max(template_signal) * noise_sigma_factor | ||||
|     noise_realisation = noise_amplitude *  white_noise_realisation(len(antenna_true_signal)) | ||||
|     antenna_unfiltered_signal = antenna_true_signal + noise_realisation | ||||
|     antenna_signal = antenna_bp(antenna_unfiltered_signal, *bp_freq, antenna_dt) | ||||
| 
 | ||||
|     true_time_offset = antenna_peak_time - template_peak_time | ||||
| 
 | ||||
|     if True: # show signals  | ||||
|         fig, axs = plt.subplots(2, sharex=True) | ||||
|         axs[0].set_title("Antenna Waveform") | ||||
|         axs[-1].set_xlabel("Time [ns]") | ||||
|         axs[0].set_ylabel("Amplitude") | ||||
|         axs[0].plot(antenna_t, antenna_signal, label='bandpassed w/ noise') | ||||
|         axs[0].plot(antenna_t, antenna_unfiltered_signal, label='true signal w/ noise') | ||||
|         axs[0].plot(antenna_t, antenna_true_signal, label='true signal w/o noise') | ||||
|         axs[0].legend() | ||||
| 
 | ||||
|         axs[1].set_title("Template") | ||||
|         axs[1].set_ylabel("Amplitude") | ||||
|         axs[1].plot(template_t, template_signal, label='orig') | ||||
|         axs[1].plot(template_t + true_time_offset, template_signal, label='moved orig') | ||||
|         axs[1].legend() | ||||
| 
 | ||||
|         fig.savefig('figures/11_antenna_signals.pdf') | ||||
| 
 | ||||
|         if True: # zoom | ||||
|             wx = 100 | ||||
|             x0 = true_time_offset | ||||
| 
 | ||||
|             old_xlims = axs[0].get_xlim() | ||||
|             axs[0].set_xlim( x0-wx, x0+wx) | ||||
|             fig.savefig('figures/11_antenna_signals_zoom.pdf') | ||||
| 
 | ||||
|             # restore | ||||
|             axs[0].set_xlim(*old_xlims) | ||||
| 
 | ||||
|     if True: # upsampled trace | ||||
|         upsampled_trace, upsampled_t = trace_upsampler(template_signal, antenna_signal, template_t, antenna_t) | ||||
| 
 | ||||
|         if True: # Show upsampled traces | ||||
|             fig2, axs2 = plt.subplots(1, sharex=True) | ||||
|             if not hasattr(axs2, '__len__'): | ||||
|                 axs2 = [axs2] | ||||
| 
 | ||||
|             axs2[-1].set_xlabel("Time [ns]") | ||||
|             axs2[0].set_ylabel("Amplitude") | ||||
|             axs2[0].plot(antenna_t, antenna_signal, marker='o', label='orig') | ||||
|             axs2[0].plot(upsampled_t, upsampled_trace, label='upsampled') | ||||
|             axs2[0].legend(loc='upper right') | ||||
|              | ||||
|             fig2.savefig('figures/11_upsampled.pdf') | ||||
| 
 | ||||
|             wx = 1e2 | ||||
|             x0 = upsampled_t[0] + wx - 5 | ||||
|             axs2[0].set_xlim(x0-wx, x0+wx) | ||||
|             fig2.savefig('figures/11_upsampled_zoom.pdf') | ||||
| 
 | ||||
|         # determine correlations with arguments | ||||
|         lag_dt = upsampled_t[1] - upsampled_t[0] | ||||
|         corrs, (out1_signal, out2_template, lags) = my_correlation(upsampled_trace, template_signal) | ||||
| 
 | ||||
|     else: # downsampled template | ||||
|         raise NotImplementedError | ||||
| 
 | ||||
|         corrs, (out1_signal, out2_signal, lags) = my_downsampling_correlation(template_signal, antenna_signal, template_t, antenna_t) | ||||
|         lag_dt = upsampled_t[1] - upsampled_t[0] | ||||
| 
 | ||||
|     # Determine best correlation time | ||||
|     idx = np.argmax(abs(corrs)) | ||||
|     best_sample_lag = lags[idx] | ||||
|     best_time_lag = best_sample_lag * lag_dt | ||||
|     if axs2: | ||||
|         axs2[-1].axvline(best_time_lag, color='r', alpha=0.5, linewidth=2) | ||||
| 
 | ||||
|     # Show the final signals correlated | ||||
|     if True: | ||||
|         fig, axs = plt.subplots(3, sharex=True) | ||||
|         ylabel_kwargs = dict( | ||||
|                             rotation=0, | ||||
|                             ha='right', | ||||
|                             va='center' | ||||
|                         ) | ||||
|         axs[-1].set_xlabel("Time [ns]") | ||||
| 
 | ||||
|         # Signal | ||||
|         i=0 | ||||
|         axs[i].set_ylabel("Signal\nAmplitude", **ylabel_kwargs) | ||||
|         axs[i].plot(antenna_t, antenna_signal) | ||||
| 
 | ||||
|         # Template | ||||
|         i=1 | ||||
|         axs[i].set_ylabel("Template\nAmplitude", **ylabel_kwargs) | ||||
|         for offset in [0, best_time_lag]: | ||||
|             axs[i].axvline(offset + len(template_signal) * (template_t[1] - template_t[0]), color='g') | ||||
|             axs[i].axvline(offset, color='g') | ||||
|             axs[i].plot(offset + template_t, template_signal) | ||||
|          | ||||
| 
 | ||||
|         # Correlation | ||||
|         i=2 | ||||
|         axs[i].set_ylabel("Correlation", **ylabel_kwargs) | ||||
|         axs[i].plot(lags * lag_dt, corrs) | ||||
|         axs[i].axvline(best_time_lag, color='r', ls='--') | ||||
| 
 | ||||
|         if True: # zoom | ||||
|             wx = len(template_signal) * (template_t[1] - template_t[0])/2 | ||||
|             t0 = best_time_lag | ||||
| 
 | ||||
|             for t in [t0-wx, t0+wx]: | ||||
|                 axs[2].axvline(t, color='g') | ||||
| 
 | ||||
|         fig.tight_layout() | ||||
|         fig.legend() | ||||
|         fig.savefig('figures/11_corrs.pdf') | ||||
| 
 | ||||
|     # | ||||
|     time_residual = best_time_lag - true_time_offset | ||||
| 
 | ||||
|     print(time_residual, template_dt, antenna_dt) | ||||
| 
 | ||||
| 
 | ||||
|     plt.show() | ||||
							
								
								
									
										1
									
								
								simulations/lib
									
										
									
									
									
										Symbolic link
									
								
							
							
						
						
									
										1
									
								
								simulations/lib
									
										
									
									
									
										Symbolic link
									
								
							|  | @ -0,0 +1 @@ | |||
| ../lib/ | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue