m-thesis-introduction/airshower_beacon_simulation/lib/lib.py

259 lines
7.3 KiB
Python
Raw Normal View History

2022-11-18 11:02:41 +01:00
# vim: fdm=indent ts=4
2022-09-26 17:18:07 +02:00
"""
Library for this simulation
"""
import numpy as np
from numpy.polynomial import Polynomial
2022-09-26 17:18:07 +02:00
from earsim import Antenna
2022-12-15 14:40:20 +01:00
c_light = 3e8*1e-9 # m/ns
2022-09-26 17:18:07 +02:00
""" Beacon utils """
2022-12-19 19:02:35 +01:00
def sine_beacon(f, t, t0=0, amplitude=1, baseline=0, phase=0, peak_at_t0=0):
2022-11-18 11:02:41 +01:00
"""
Return a sine appropriate as a beacon
"""
2022-12-19 19:02:35 +01:00
baseline = baseline*np.ones_like(t)
if peak_at_t0: # add peak near t0
idx = (np.abs(t-t0)).argmin()
baseline[ max(0, idx-1):min(len(t), idx+1) ] += peak_at_t0 + amplitude
return amplitude * np.cos(2*np.pi*f*(t+t0) + phase) + baseline
2022-09-26 17:18:07 +02:00
def phase_mod(phase, low=np.pi):
"""
Modulo phase such that it falls within the
interval $[-low, 2\pi - low)$.
"""
return (phase + low) % (2*np.pi) - low
2022-09-26 17:18:07 +02:00
def distance(x1, x2):
"""
Calculate the Euclidean distance between two locations x1 and x2
"""
assert type(x1) in [Antenna]
x1 = np.array([x1.x, x1.y, x1.z])
assert type(x2) in [Antenna]
x2 = np.array([x2.x, x2.y, x2.z])
return np.sqrt( np.sum( (x1-x2)**2 ) )
2022-12-15 14:40:20 +01:00
def geometry_time(dist, x2=None, c_light=c_light):
if x2 is not None:
dist = distance(dist, x2)
return dist/c_light
2022-12-15 14:40:20 +01:00
def beacon_from(tx, rx, f, t=0, t0=0, c_light=c_light, radiate_rsq=True, amplitude=1,**kwargs):
dist = distance(tx,rx)
2022-12-05 14:35:35 +01:00
# suppress extra time delay from distance
if c_light is not None and np.isfinite(c_light):
t0 = t0 + dist/c_light
2022-09-26 17:18:07 +02:00
if radiate_rsq:
2022-11-18 11:02:41 +01:00
if np.isclose(dist, 0):
dist = 1
amplitude *= 1/(dist**2)
return sine_beacon(f, t, t0=t0, amplitude=amplitude,**kwargs)
2022-12-15 14:40:20 +01:00
def remove_antenna_geometry_phase(tx, antennas, f_beacon, measured_phases=None, c_light=c_light):
"""
2022-12-15 12:56:20 +01:00
Remove the geometrical phase from the measured antenna phase.
"""
if not hasattr(antennas, '__len__'):
antennas = [antennas]
2022-12-15 12:56:20 +01:00
if not hasattr(measured_phases, '__len__'):
measured_phases = [measured_phases]
clock_phases = np.empty( (len(antennas)) )
for i, ant in enumerate(antennas):
2022-12-15 12:56:20 +01:00
measured_phase = measured_phases[i]
geom_time = geometry_time(tx, ant, c_light=c_light)
geom_phase = geom_time * 2*np.pi*f_beacon
clock_phases[i] = phase_mod(measured_phase - geom_phase)
return clock_phases
""" Fourier """
def ft_corr_vectors(freqs, time):
"""
Get the cosine and sine terms for freqs at time.
Takes the outer product of freqs and time.
"""
freqtime = np.outer(freqs, time)
c_k = np.cos(2*np.pi*freqtime)
s_k = -1*np.sin(2*np.pi*freqtime)
return c_k, s_k
def direct_fourier_transform(freqs, time, samplesets_iterable):
"""
Determine the fourier transform of each sampleset in samplesets_iterable at freqs.
The samplesets are expected to have the same time vector.
Returns either a generator to return the fourier transform for each sampleset
if samplesets_iterable is a generator
or a numpy array.
"""
c_k, s_k = ft_corr_vectors(freqs, time)
if not hasattr(samplesets_iterable, '__len__') and hasattr(samplesets_iterable, '__iter__'):
# samplesets_iterable is an iterator
# return a generator containing (real, imag) amplitudes
return ( (np.dot(c_k, samples), np.dot(s_k, samples)) for samples in samplesets_iterable )
# Numpy array
return np.dot(c_k, samplesets_iterable), np.dot(s_k, samplesets_iterable)
2022-12-15 14:40:20 +01:00
def phase_field_from_tx(x, y, tx, f_beacon, c_light=c_light, t0=0, wrap_phase=True, return_meshgrid=True):
"""
"""
2022-11-18 19:31:24 +01:00
assert type(tx) in [Antenna]
xs, ys = np.meshgrid(x, y, sparse=True)
x_distances = (tx.x - xs)**2
y_distances = (tx.y - ys)**2
dist = np.sqrt( x_distances + y_distances )
phase = (dist/c_light + t0) * f_beacon*2*np.pi
if wrap_phase:
phase = phase_mod(phase)
if return_meshgrid:
return phase, (xs, ys)
else:
return phase, (np.repeat(xs, len(ys), axis=0), np.repeat(ys, len(xs[0]), axis=1))
def find_beacon_in_traces(
traces,
t_trace,
f_beacon_estimate = 50e6,
frequency_fit = False,
N_test_freqs = 5e2,
f_beacon_estimate_band = 0.01,
amp_cut = 0.8
):
"""
f_beacon_band is inclusive
traces is [trace, trace, trace, .. ]
"""
amplitudes = np.zeros(len(traces))
phases = np.zeros(len(traces))
frequencies = np.zeros(len(traces))
if frequency_fit: # fit frequency
test_freqs = f_beacon_estimate + f_beacon_estimate_band * np.linspace(-1, 1, int(N_test_freqs)+1)
ft_amp_gen = direct_fourier_transform(test_freqs, t_trace, (x for x in traces))
n_samples = len(t_trace)
for i, ft_amp in enumerate(ft_amp_gen):
real, imag = ft_amp
amps = 1/n_samples * ( real**2 + imag**2)**0.5
2022-11-18 19:31:24 +01:00
# find frequency peak and surrounding bins
# that are valid for the parabola fit
max_amp_idx = np.argmax(amps)
max_amp = amps[max_amp_idx]
if True:
frequencies[i] = test_freqs[max_amp_idx]
continue
valid_mask = amps >= amp_cut*max_amp
if True: # make sure not to use other peaks
lower_mask = valid_mask[0:max_amp_idx]
upper_mask = valid_mask[max_amp_idx:]
if any(lower_mask):
lower_end = np.argmin(lower_mask[::-1])
else:
lower_end = max_amp_idx
if any(upper_mask):
upper_end = np.argmin(upper_mask)
else:
upper_end = 0
valid_mask[0:(max_amp_idx - lower_end)] = False
valid_mask[(max_amp_idx + upper_end):] = False
if all(~valid_mask):
frequencies[i] = np.nan
continue
# fit Parabola
parafit = Polynomial.fit(test_freqs[valid_mask], amps[valid_mask], 2)
func = parafit.convert()
# find frequency where derivative is 0
deriv = func.deriv(1)
freq = deriv.roots()[0]
frequencies[i] = freq
2022-11-18 19:31:24 +01:00
else: # no frequency finding
frequencies[:] = f_beacon_estimate
2022-11-18 19:31:24 +01:00
n_samples = len(t_trace)
# evaluate fourier transform at freq for each trace
for i, freq in enumerate(frequencies):
if freq is np.nan:
phases[i] = np.nan
amplitudes[i] = np.nan
continue
real, imag = direct_fourier_transform(freq, t_trace, traces[i])
phases[i] = np.arctan2(imag, real)
2022-11-18 19:31:24 +01:00
amplitudes[i] = 2/n_samples * (real**2 + imag**2)**0.5
return frequencies, phases, amplitudes
2022-11-24 14:47:29 +01:00
def coherence_sum_maxima(ref_x, y, k_step=1, k_start=0, k_end=None, periodic=True):
"""
Use the maximum of a coherent sum to determine
the best number of samples to move
"""
2022-11-24 14:47:29 +01:00
N_samples = int( len(ref_x) )
k_end = N_samples if k_end is None or k_end > max_k else k_end
ks = np.arange(k_start, k_end, step=k_step)
maxima = np.empty(len(ks))
2022-11-24 14:47:29 +01:00
if periodic is False:
# prepend zeros
N_zeros = N_samples
preshift = 0 # only required for testing purposes
ref_x = np.pad(ref_x, (N_zeros-0,0), 'constant')
y = np.pad(y, (N_zeros-preshift,preshift), 'constant')
for i,k in enumerate(ks, 0):
augmented_y = np.roll(y, k)
maxima[i] = max(ref_x + augmented_y)
return ks, maxima