mirror of
https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
synced 2025-05-17 05:19:24 +02:00
ZH: move airshower beacon simulation to top folder
This commit is contained in:
parent
7a7647f231
commit
dbb0352229
37 changed files with 2 additions and 2 deletions
3
airshower_beacon_simulation/lib/__init__.py
Normal file
3
airshower_beacon_simulation/lib/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from .lib import *
|
||||
from . import rit
|
||||
from .snr import *
|
1
airshower_beacon_simulation/lib/atmocal
Submodule
1
airshower_beacon_simulation/lib/atmocal
Submodule
|
@ -0,0 +1 @@
|
|||
Subproject commit cd903a30f99e7f640ed068c4393cd38c23c316e2
|
1
airshower_beacon_simulation/lib/earsim
Submodule
1
airshower_beacon_simulation/lib/earsim
Submodule
|
@ -0,0 +1 @@
|
|||
Subproject commit a5abe360fa5210be6ab423ecf2b5f783ae45b675
|
219
airshower_beacon_simulation/lib/figlib.py
Normal file
219
airshower_beacon_simulation/lib/figlib.py
Normal file
|
@ -0,0 +1,219 @@
|
|||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
import scipy.stats as stats
|
||||
from itertools import zip_longest
|
||||
|
||||
def phase_comparison_figure(
|
||||
measured_phases,
|
||||
true_phases,
|
||||
plot_residuals=True,
|
||||
f_beacon=None,
|
||||
hist_kwargs={},
|
||||
sc_kwargs={},
|
||||
text_kwargs={},
|
||||
colors=['blue', 'orange'],
|
||||
legend_on_scatter=True,
|
||||
secondary_axis='time',
|
||||
fit_gaussian=False,
|
||||
mean_snr=None,
|
||||
return_fit_info=False,
|
||||
**fig_kwargs
|
||||
):
|
||||
"""
|
||||
Create a figure comparing measured_phase against true_phase
|
||||
by both plotting the values, and the residuals.
|
||||
"""
|
||||
default_fig_kwargs = dict(sharex=True)
|
||||
default_hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step')
|
||||
default_text_kwargs = dict(fontsize=14, verticalalignment='top')
|
||||
default_sc_kwargs = dict(alpha=0.6, ls='none')
|
||||
|
||||
fig_kwargs = {**default_fig_kwargs, **fig_kwargs}
|
||||
hist_kwargs = {**default_hist_kwargs, **hist_kwargs}
|
||||
text_kwargs = {**default_text_kwargs, **text_kwargs}
|
||||
sc_kwargs = {**default_sc_kwargs, **sc_kwargs}
|
||||
|
||||
do_hist_plot = hist_kwargs is not False
|
||||
do_scatter_plot = sc_kwargs is not False
|
||||
|
||||
fig, axs = plt.subplots(0+do_hist_plot+do_scatter_plot, 1, **fig_kwargs)
|
||||
|
||||
if not hasattr(axs, '__len__'):
|
||||
axs = [axs]
|
||||
|
||||
if f_beacon and secondary_axis in ['phase', 'time']:
|
||||
phase2time = lambda x: x/(2*np.pi*f_beacon)
|
||||
time2phase = lambda x: 2*np.pi*x*f_beacon
|
||||
|
||||
if secondary_axis == 'time':
|
||||
functions = (phase2time, time2phase)
|
||||
label = 'Time $\\varphi/(2\\pi f_{beac})$ [ns]'
|
||||
else:
|
||||
functions = (time2phase, phase2time)
|
||||
label = 'Phase $2\\pi t f_{beac}$ [rad]'
|
||||
|
||||
secax = axs[0].secondary_xaxis('top', functions=functions)
|
||||
|
||||
# Histogram
|
||||
fit_info = {}
|
||||
if do_hist_plot:
|
||||
i=0
|
||||
|
||||
axs[i].set_ylabel("#")
|
||||
|
||||
this_kwargs = dict(
|
||||
ax = axs[i],
|
||||
text_kwargs=text_kwargs,
|
||||
hist_kwargs={**hist_kwargs, **dict(label='Measured', color=colors[0], ls='solid')},
|
||||
mean_snr=mean_snr,
|
||||
)
|
||||
|
||||
if fit_gaussian:
|
||||
this_kwargs['fit_distr'] = 'gaussian'
|
||||
|
||||
_, fit_info = fitted_histogram_figure(
|
||||
measured_phases,
|
||||
**this_kwargs
|
||||
)
|
||||
|
||||
if not plot_residuals: # also plot the true clock phases
|
||||
_bins = fit_info['bins']
|
||||
axs[i].hist(true_phases, color=colors[1], label='Actual', ls='dashed', **{**hist_kwargs, **dict(bins=_bins)})
|
||||
|
||||
# Scatter plot
|
||||
if do_scatter_plot:
|
||||
i=1
|
||||
axs[i].set_ylabel("Antenna no.")
|
||||
axs[i].plot(measured_phases, np.arange(len(measured_phases)), marker='x' if plot_residuals else '3', color=colors[0], label='Measured', **sc_kwargs)
|
||||
|
||||
if not plot_residuals: # also plot the true clock phases
|
||||
axs[i].plot(true_phases, np.arange(len(true_phases)), marker='4', color=colors[1], label='Actual', **sc_kwargs)
|
||||
|
||||
if not plot_residuals and legend_on_scatter:
|
||||
axs[i].legend()
|
||||
|
||||
fig.tight_layout()
|
||||
|
||||
if return_fit_info:
|
||||
return fig, fit_info
|
||||
|
||||
return fig
|
||||
|
||||
|
||||
def fitted_histogram_figure(
|
||||
amplitudes,
|
||||
fit_distr = None,
|
||||
calc_chisq = True,
|
||||
text_kwargs={},
|
||||
hist_kwargs={},
|
||||
mean_snr = None,
|
||||
ax = None,
|
||||
**fig_kwargs
|
||||
):
|
||||
"""
|
||||
Create a figure showing $amplitudes$ as a histogram.
|
||||
If fit_distr is a (list of) string, also fit the respective
|
||||
distribution function and show the parameters on the figure.
|
||||
"""
|
||||
default_hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step', label='hist')
|
||||
default_text_kwargs = dict(fontsize=14, verticalalignment='top')
|
||||
|
||||
if isinstance(fit_distr, str):
|
||||
fit_distr = [fit_distr]
|
||||
|
||||
hist_kwargs = {**default_hist_kwargs, **hist_kwargs}
|
||||
text_kwargs = {**default_text_kwargs, **text_kwargs}
|
||||
|
||||
if ax is None:
|
||||
fig, ax = plt.subplots(1,1, **fig_kwargs)
|
||||
else:
|
||||
fig = ax.get_figure()
|
||||
|
||||
text_kwargs['transform'] = ax.transAxes
|
||||
|
||||
counts, bins, _patches = ax.hist(amplitudes, **hist_kwargs)
|
||||
|
||||
fit_info = []
|
||||
if fit_distr:
|
||||
min_x = min(amplitudes)
|
||||
max_x = max(amplitudes)
|
||||
|
||||
dx = bins[1] - bins[0]
|
||||
scale = len(amplitudes) * dx
|
||||
|
||||
xs = np.linspace(min_x, max_x)
|
||||
|
||||
for distr in fit_distr:
|
||||
fit_params2text_params = lambda x: x
|
||||
|
||||
if 'rice' == distr:
|
||||
name = "Rice"
|
||||
param_names = [ "$\\nu$", "$\\sigma$" ]
|
||||
distr_func = stats.rice
|
||||
|
||||
fit_params2text_params = lambda x: (x[0]*x[1], x[1])
|
||||
|
||||
elif 'gaussian' == distr:
|
||||
name = "Norm"
|
||||
param_names = [ "$\\mu$", "$\\sigma$" ]
|
||||
distr_func = stats.norm
|
||||
|
||||
elif 'rayleigh' == distr:
|
||||
name = "Rayleigh"
|
||||
param_names = [ "$\\sigma$" ]
|
||||
distr_func = stats.rayleigh
|
||||
|
||||
fit_params2text_params = lambda x: (x[0]+x[1]/2,)
|
||||
|
||||
else:
|
||||
raise ValueError('Unknown distribution function '+distr)
|
||||
|
||||
label = name +"(" + ','.join(param_names) + ')'
|
||||
|
||||
fit_params = distr_func.fit(amplitudes)
|
||||
fit_ys = distr_func.pdf(xs, *fit_params)
|
||||
|
||||
ax.plot(xs, fit_ys*scale, label=label)
|
||||
|
||||
chisq_strs = []
|
||||
if calc_chisq:
|
||||
ct = np.diff(distr_func.cdf(bins, *fit_params))*np.sum(counts)
|
||||
c2t = stats.chisquare(counts, ct, ddof=len(fit_params))
|
||||
chisq_strs = [
|
||||
f"$\\chi^2$/dof = {c2t[0]: .2g}/{len(fit_params)}"
|
||||
]
|
||||
|
||||
# change parameters if needed
|
||||
text_fit_params = fit_params2text_params(fit_params)
|
||||
|
||||
text_str = "\n".join(
|
||||
[label]
|
||||
+
|
||||
[ f"{param} = {value: .2e}" for param, value in zip_longest(param_names, text_fit_params, fillvalue='?') ]
|
||||
+
|
||||
chisq_strs
|
||||
)
|
||||
|
||||
this_info = {
|
||||
'name': name,
|
||||
'param_names': param_names,
|
||||
'param_values': text_fit_params,
|
||||
'text_str': text_str,
|
||||
}
|
||||
|
||||
if chisq_strs:
|
||||
this_info['chisq'] = c2t[0]
|
||||
this_info['dof'] = len(fit_params)
|
||||
|
||||
fit_info.append(this_info)
|
||||
|
||||
loc = (0.02, 0.95)
|
||||
ax.text(*loc, "\n\n".join([info['text_str'] for info in fit_info]), **{**text_kwargs, **dict(ha='left')})
|
||||
|
||||
if mean_snr:
|
||||
text_str = f"$\\langle SNR \\rangle$ = {mean_snr: .1e}"
|
||||
loc = (0.98, 0.95)
|
||||
ax.text(*loc, text_str, **{**text_kwargs, **dict(ha='right')})
|
||||
|
||||
return fig, dict(fit_info=fit_info, counts=counts, bins=bins)
|
258
airshower_beacon_simulation/lib/lib.py
Normal file
258
airshower_beacon_simulation/lib/lib.py
Normal file
|
@ -0,0 +1,258 @@
|
|||
# vim: fdm=indent ts=4
|
||||
"""
|
||||
Library for this simulation
|
||||
"""
|
||||
import numpy as np
|
||||
from numpy.polynomial import Polynomial
|
||||
|
||||
from earsim import Antenna
|
||||
c_light = 3e8*1e-9 # m/ns
|
||||
|
||||
""" Beacon utils """
|
||||
def sine_beacon(f, t, t0=0, amplitude=1, baseline=0, phase=0, peak_at_t0=0):
|
||||
"""
|
||||
Return a sine appropriate as a beacon
|
||||
"""
|
||||
baseline = baseline*np.ones_like(t)
|
||||
|
||||
if peak_at_t0: # add peak near t0
|
||||
idx = (np.abs(t-t0)).argmin()
|
||||
baseline[ max(0, idx-1):min(len(t), idx+1) ] += peak_at_t0 + amplitude
|
||||
|
||||
return amplitude * np.cos(2*np.pi*f*(t+t0) + phase) + baseline
|
||||
|
||||
def phase_mod(phase, low=np.pi):
|
||||
"""
|
||||
Modulo phase such that it falls within the
|
||||
interval $[-low, 2\pi - low)$.
|
||||
"""
|
||||
return (phase + low) % (2*np.pi) - low
|
||||
|
||||
def distance(x1, x2):
|
||||
"""
|
||||
Calculate the Euclidean distance between two locations x1 and x2
|
||||
"""
|
||||
|
||||
assert type(x1) in [Antenna]
|
||||
x1 = np.array([x1.x, x1.y, x1.z])
|
||||
|
||||
assert type(x2) in [Antenna]
|
||||
x2 = np.array([x2.x, x2.y, x2.z])
|
||||
|
||||
return np.sqrt( np.sum( (x1-x2)**2 ) )
|
||||
|
||||
def geometry_time(dist, x2=None, c_light=c_light):
|
||||
if x2 is not None:
|
||||
dist = distance(dist, x2)
|
||||
|
||||
return dist/c_light
|
||||
|
||||
def beacon_from(tx, rx, f, t=0, t0=0, c_light=c_light, radiate_rsq=True, amplitude=1,**kwargs):
|
||||
dist = distance(tx,rx)
|
||||
# suppress extra time delay from distance
|
||||
if c_light is not None and np.isfinite(c_light):
|
||||
t0 = t0 + dist/c_light
|
||||
|
||||
if radiate_rsq:
|
||||
if np.isclose(dist, 0):
|
||||
dist = 1
|
||||
amplitude *= 1/(dist**2)
|
||||
|
||||
return sine_beacon(f, t, t0=t0, amplitude=amplitude,**kwargs)
|
||||
|
||||
def remove_antenna_geometry_phase(tx, antennas, f_beacon, measured_phases=None, c_light=c_light):
|
||||
"""
|
||||
Remove the geometrical phase from the measured antenna phase.
|
||||
"""
|
||||
if not hasattr(antennas, '__len__'):
|
||||
antennas = [antennas]
|
||||
|
||||
if not hasattr(measured_phases, '__len__'):
|
||||
measured_phases = [measured_phases]
|
||||
|
||||
clock_phases = np.empty( (len(antennas)) )
|
||||
for i, ant in enumerate(antennas):
|
||||
measured_phase = measured_phases[i]
|
||||
|
||||
geom_time = geometry_time(tx, ant, c_light=c_light)
|
||||
geom_phase = geom_time * 2*np.pi*f_beacon
|
||||
|
||||
clock_phases[i] = phase_mod(measured_phase - geom_phase)
|
||||
|
||||
return clock_phases
|
||||
|
||||
|
||||
""" Fourier """
|
||||
def ft_corr_vectors(freqs, time):
|
||||
"""
|
||||
Get the cosine and sine terms for freqs at time.
|
||||
|
||||
Takes the outer product of freqs and time.
|
||||
"""
|
||||
freqtime = np.outer(freqs, time)
|
||||
|
||||
c_k = np.cos(2*np.pi*freqtime)
|
||||
s_k = -1*np.sin(2*np.pi*freqtime)
|
||||
|
||||
return c_k, s_k
|
||||
|
||||
def direct_fourier_transform(freqs, time, samplesets_iterable):
|
||||
"""
|
||||
Determine the fourier transform of each sampleset in samplesets_iterable at freqs.
|
||||
|
||||
The samplesets are expected to have the same time vector.
|
||||
|
||||
Returns either a generator to return the fourier transform for each sampleset
|
||||
if samplesets_iterable is a generator
|
||||
or a numpy array.
|
||||
"""
|
||||
|
||||
c_k, s_k = ft_corr_vectors(freqs, time)
|
||||
|
||||
if not hasattr(samplesets_iterable, '__len__') and hasattr(samplesets_iterable, '__iter__'):
|
||||
# samplesets_iterable is an iterator
|
||||
# return a generator containing (real, imag) amplitudes
|
||||
return ( (np.dot(c_k, samples), np.dot(s_k, samples)) for samples in samplesets_iterable )
|
||||
|
||||
# Numpy array
|
||||
return np.dot(c_k, samplesets_iterable), np.dot(s_k, samplesets_iterable)
|
||||
|
||||
def phase_field_from_tx(x, y, tx, f_beacon, c_light=c_light, t0=0, wrap_phase=True, return_meshgrid=True):
|
||||
"""
|
||||
"""
|
||||
|
||||
assert type(tx) in [Antenna]
|
||||
|
||||
xs, ys = np.meshgrid(x, y, sparse=True)
|
||||
|
||||
x_distances = (tx.x - xs)**2
|
||||
y_distances = (tx.y - ys)**2
|
||||
|
||||
dist = np.sqrt( x_distances + y_distances )
|
||||
|
||||
phase = (dist/c_light + t0) * f_beacon*2*np.pi
|
||||
if wrap_phase:
|
||||
phase = phase_mod(phase)
|
||||
|
||||
if return_meshgrid:
|
||||
return phase, (xs, ys)
|
||||
else:
|
||||
return phase, (np.repeat(xs, len(ys), axis=0), np.repeat(ys, len(xs[0]), axis=1))
|
||||
|
||||
|
||||
def find_beacon_in_traces(
|
||||
traces,
|
||||
t_trace,
|
||||
f_beacon_estimate = 50e6,
|
||||
frequency_fit = False,
|
||||
N_test_freqs = 5e2,
|
||||
f_beacon_estimate_band = 0.01,
|
||||
amp_cut = 0.8
|
||||
):
|
||||
"""
|
||||
f_beacon_band is inclusive
|
||||
|
||||
traces is [trace, trace, trace, .. ]
|
||||
"""
|
||||
|
||||
amplitudes = np.zeros(len(traces))
|
||||
phases = np.zeros(len(traces))
|
||||
frequencies = np.zeros(len(traces))
|
||||
|
||||
if frequency_fit: # fit frequency
|
||||
test_freqs = f_beacon_estimate + f_beacon_estimate_band * np.linspace(-1, 1, int(N_test_freqs)+1)
|
||||
ft_amp_gen = direct_fourier_transform(test_freqs, t_trace, (x for x in traces))
|
||||
|
||||
n_samples = len(t_trace)
|
||||
|
||||
for i, ft_amp in enumerate(ft_amp_gen):
|
||||
real, imag = ft_amp
|
||||
amps = 1/n_samples * ( real**2 + imag**2)**0.5
|
||||
|
||||
# find frequency peak and surrounding bins
|
||||
# that are valid for the parabola fit
|
||||
max_amp_idx = np.argmax(amps)
|
||||
max_amp = amps[max_amp_idx]
|
||||
|
||||
if True:
|
||||
frequencies[i] = test_freqs[max_amp_idx]
|
||||
continue
|
||||
|
||||
valid_mask = amps >= amp_cut*max_amp
|
||||
|
||||
if True: # make sure not to use other peaks
|
||||
lower_mask = valid_mask[0:max_amp_idx]
|
||||
upper_mask = valid_mask[max_amp_idx:]
|
||||
|
||||
if any(lower_mask):
|
||||
lower_end = np.argmin(lower_mask[::-1])
|
||||
else:
|
||||
lower_end = max_amp_idx
|
||||
|
||||
if any(upper_mask):
|
||||
upper_end = np.argmin(upper_mask)
|
||||
else:
|
||||
upper_end = 0
|
||||
|
||||
|
||||
valid_mask[0:(max_amp_idx - lower_end)] = False
|
||||
valid_mask[(max_amp_idx + upper_end):] = False
|
||||
|
||||
if all(~valid_mask):
|
||||
frequencies[i] = np.nan
|
||||
continue
|
||||
|
||||
# fit Parabola
|
||||
parafit = Polynomial.fit(test_freqs[valid_mask], amps[valid_mask], 2)
|
||||
func = parafit.convert()
|
||||
|
||||
# find frequency where derivative is 0
|
||||
deriv = func.deriv(1)
|
||||
freq = deriv.roots()[0]
|
||||
|
||||
frequencies[i] = freq
|
||||
|
||||
else: # no frequency finding
|
||||
frequencies[:] = f_beacon_estimate
|
||||
|
||||
|
||||
n_samples = len(t_trace)
|
||||
# evaluate fourier transform at freq for each trace
|
||||
for i, freq in enumerate(frequencies):
|
||||
if freq is np.nan:
|
||||
phases[i] = np.nan
|
||||
amplitudes[i] = np.nan
|
||||
continue
|
||||
|
||||
real, imag = direct_fourier_transform(freq, t_trace, traces[i])
|
||||
|
||||
phases[i] = np.arctan2(imag, real)
|
||||
amplitudes[i] = 2/n_samples * (real**2 + imag**2)**0.5
|
||||
|
||||
return frequencies, phases, amplitudes
|
||||
|
||||
def coherence_sum_maxima(ref_x, y, k_step=1, k_start=0, k_end=None, periodic=True):
|
||||
"""
|
||||
Use the maximum of a coherent sum to determine
|
||||
the best number of samples to move
|
||||
"""
|
||||
N_samples = int( len(ref_x) )
|
||||
k_end = N_samples if k_end is None or k_end > max_k else k_end
|
||||
|
||||
ks = np.arange(k_start, k_end, step=k_step)
|
||||
|
||||
maxima = np.empty(len(ks))
|
||||
|
||||
if periodic is False:
|
||||
# prepend zeros
|
||||
N_zeros = N_samples
|
||||
preshift = 0 # only required for testing purposes
|
||||
|
||||
ref_x = np.pad(ref_x, (N_zeros-0,0), 'constant')
|
||||
y = np.pad(y, (N_zeros-preshift,preshift), 'constant')
|
||||
|
||||
for i,k in enumerate(ks, 0):
|
||||
augmented_y = np.roll(y, k)
|
||||
maxima[i] = max(ref_x + augmented_y)
|
||||
|
||||
return ks, maxima
|
394
airshower_beacon_simulation/lib/rit.py
Normal file
394
airshower_beacon_simulation/lib/rit.py
Normal file
|
@ -0,0 +1,394 @@
|
|||
# from wappy import *
|
||||
from earsim import *
|
||||
from atmocal import *
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
from scipy.signal import hilbert
|
||||
from scipy import signal
|
||||
from scipy.interpolate import interp1d
|
||||
from scipy.optimize import curve_fit,minimize
|
||||
import pandas as pd
|
||||
import os
|
||||
|
||||
try:
|
||||
from joblib import Parallel, delayed
|
||||
except:
|
||||
Parallel = None
|
||||
delayed = lambda x: x
|
||||
|
||||
plt.rcParams.update({'font.size': 16})
|
||||
|
||||
atm = AtmoCal()
|
||||
|
||||
from matplotlib import cm
|
||||
|
||||
def set_pol_and_bp(e,low=0.03,high=0.08):
|
||||
for ant in e.antennas:
|
||||
E = [np.dot(e.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(ant.Ex,ant.Ey,ant.Ez)]
|
||||
dt = ant.t[1] -ant.t[0]
|
||||
E = block_filter(E,dt,low,high)
|
||||
ant.E_AxB = E
|
||||
ant.t_AxB = ant.t
|
||||
|
||||
|
||||
def pow_and_time(test_loc,ev,dt=1.0):
|
||||
t_ = []
|
||||
a_ = []
|
||||
t_min = 1e9
|
||||
t_max = -1e9
|
||||
for ant in ev.antennas:
|
||||
#propagate to test location
|
||||
aloc = [ant.x,ant.y,ant.z]
|
||||
delta,dist = atm.light_travel_time(test_loc,aloc)
|
||||
delta = delta*1e9
|
||||
t__ = np.subtract(ant.t_AxB,delta)
|
||||
t_.append(t__)
|
||||
a_.append(ant.E_AxB)
|
||||
if t__[0] < t_min:
|
||||
t_min = t__[0]
|
||||
if t__[-1] > t_max:
|
||||
t_max = t__[-1]
|
||||
|
||||
t_sum = np.arange(t_min+1,t_max-1,dt)
|
||||
a_sum = np.zeros(len(t_sum))
|
||||
#interpolation
|
||||
for t_r,E_ in zip (t_,a_):
|
||||
f = interp1d(t_r,E_,assume_sorted=True,bounds_error=False,fill_value=0.)
|
||||
a_int = f(t_sum)
|
||||
a_sum = np.add(a_sum,a_int)
|
||||
if len(a_sum) != 0:
|
||||
P = np.sum(np.square(np.absolute(np.fft.fft(a_sum))))
|
||||
# normalise P with the length of the traces
|
||||
P = P/( t_sum[-1] - t_sum[0])
|
||||
else:
|
||||
print("ERROR, a_sum lenght = 0",
|
||||
"tmin ",t_min,
|
||||
"t_max ",t_max,
|
||||
"dt",dt)
|
||||
P = 0
|
||||
return P,t_,a_,a_sum,t_sum
|
||||
|
||||
def shower_axis_slice(e,Xb=200,Xe=1200,dX=2,zgr=0):
|
||||
zgr = zgr + e.core[2]
|
||||
N = int((Xe-Xb)/dX)
|
||||
Xs = np.array(np.linspace(Xb,Xe,N+1))
|
||||
ds = np.array([atm.distance_to_slant_depth(np.deg2rad(e.zenith),X,zgr) for X in Xs])
|
||||
|
||||
locs = []
|
||||
for d_ in ds:
|
||||
xc = np.sin(np.deg2rad(e.zenith))*np.cos(np.deg2rad(e.azimuth))* d_
|
||||
yc = np.sin(np.deg2rad(e.zenith))*np.sin(np.deg2rad(e.azimuth))* d_
|
||||
zc = np.cos(np.deg2rad(e.zenith))* d_
|
||||
locs.append([xc,yc,zc])
|
||||
p = []
|
||||
for loc in locs:
|
||||
P,t_,pulses_,wav,twav = pow_and_time(loc,e)
|
||||
p.append(P)
|
||||
p = np.asanyarray(p)
|
||||
return ds,Xs,locs,p
|
||||
|
||||
def shower_plane_slice(e,X=750.,Nx=10,Ny=10,wx=1e3,wy=1e3,xoff=0,yoff=0,zgr=0,n_jobs=None):
|
||||
zgr = zgr + e.core[2]
|
||||
dX = atm.distance_to_slant_depth(np.deg2rad(e.zenith),X,zgr)
|
||||
x = np.linspace(-wx,wx,Nx)
|
||||
y = np.linspace(-wy,wy,Ny)
|
||||
|
||||
def loop_func(x_, y_, xoff=xoff, yoff=yoff):
|
||||
loc = (x_+xoff)* e.uAxB + (y_+yoff)*e.uAxAxB + dX *e.uA
|
||||
P,t_,pulses_,wav,twav = pow_and_time(loc,e)
|
||||
|
||||
return x_+xoff, y_+yoff, P, loc
|
||||
|
||||
res = ( delayed(loop_func)(x_, y_) for x_ in x for y_ in y)
|
||||
|
||||
if Parallel:
|
||||
#if n_jobs is None change with `with parallel_backend`
|
||||
res = Parallel(n_jobs=n_jobs)(res)
|
||||
|
||||
# unpack loop results
|
||||
xx, yy, p, locs = zip(*res)
|
||||
|
||||
xx = np.asarray(xx)
|
||||
yy = np.asarray(yy)
|
||||
p = np.asanyarray(p)
|
||||
|
||||
return xx,yy,p,locs[np.argmax(p)]
|
||||
|
||||
def slice_figure(e,X,xx,yy,p,mode='horizontal', scatter_kwargs={}, colorbar_kwargs={'label':'Power'}):
|
||||
scatter_kwargs = {
|
||||
**dict(
|
||||
cmap='Spectral_r',
|
||||
alpha=0.9,
|
||||
s=30
|
||||
),
|
||||
**scatter_kwargs
|
||||
}
|
||||
|
||||
|
||||
fig, axs = plt.subplots(1,figsize=(10,8))
|
||||
fig.suptitle(r'E = %.1f EeV, $\theta$ = %.1f$^\circ$, $\phi$ = %.1f$^\circ$ X = %.f'%(e.energy,e.zenith,e.azimuth,X))
|
||||
sc = axs.scatter(xx/1e3,yy/1e3,c=p,**scatter_kwargs)
|
||||
fig.colorbar(sc,ax=axs, **colorbar_kwargs)
|
||||
zgr = 0 + e.core[2]
|
||||
dX = atm.distance_to_slant_depth(np.deg2rad(e.zenith),X,zgr)
|
||||
xc = np.sin(np.deg2rad(e.zenith))*np.cos(np.deg2rad(e.azimuth))* dX
|
||||
yc = np.sin(np.deg2rad(e.zenith))*np.sin(np.deg2rad(e.azimuth))* dX
|
||||
if mode == 'horizontal':
|
||||
axs.plot(xc/1e3,yc/1e3,'r+',ms=30)
|
||||
axs.set_xlabel('x (km)')
|
||||
axs.set_ylabel('y (km)')
|
||||
elif mode == "sp":
|
||||
axs.plot(0,0,'r+',ms=30)
|
||||
axs.set_xlabel('-v x B (km)')
|
||||
axs.set_ylabel(' vxvxB (km)')
|
||||
im = np.argmax(p)
|
||||
axs.plot(xx[im]/1e3,yy[im]/1e3,'bx',ms=30)
|
||||
fig.tight_layout()
|
||||
return fig,axs
|
||||
|
||||
def dist_to_line(xp,core,u):
|
||||
xp = np.array(xp)
|
||||
xp_core = xp-core
|
||||
c2 = np.dot(xp_core,xp_core)
|
||||
a2 = np.dot((np.dot(xp_core,u)*u),(np.dot(xp_core,u)*u))
|
||||
d = (np.abs(c2 - a2))**0.5
|
||||
return d
|
||||
|
||||
def dist_to_line_sum(param,data,weights):
|
||||
#distance line point: a = xp-core is D= | (a)^2-(a dot n)n |
|
||||
#where ux is direction of line and x0 is a point in the line (like t = 0)
|
||||
x0 = param[0]
|
||||
y0 = param[1]
|
||||
theta = param[2]
|
||||
phi = param[3]
|
||||
core = np.array([x0, y0, 0.])
|
||||
u = np.array([np.cos(phi)*np.sin(theta),np.sin(phi)*np.sin(theta),np.cos(theta)])
|
||||
dsum = 0
|
||||
for xp,w in zip(data,weights):
|
||||
dsum += dist_to_line(xp,core,u)*w**2
|
||||
# print('%.2e %.2e %.2e %.2e %.2e'%(x0,y0,theta,phi,dsum))
|
||||
return dsum/len(data)
|
||||
|
||||
def get_axis_points(e,savefig=True,path="",zgr=0,Xlow=300, Xhigh=1000, N_X=15, n_jobs=None):
|
||||
Xsteps = np.linspace(Xlow, Xhigh, N_X)
|
||||
zgr=zgr+e.core[2] #not exact
|
||||
dXref = atm.distance_to_slant_depth(np.deg2rad(e.zenith),750,zgr)
|
||||
scale2d = dXref*np.tan(np.deg2rad(2.))
|
||||
scale4d = dXref*np.tan(np.deg2rad(4.))
|
||||
scale0_2d=dXref*np.tan(np.deg2rad(0.2))
|
||||
|
||||
def loop_func(X):
|
||||
print("Starting", X)
|
||||
x,y,p,loc_max = shower_plane_slice(e,X,21,21,scale2d,scale2d)
|
||||
if savefig:
|
||||
fig,axs = slice_figure(e,X,x,y,p,'sp')
|
||||
fig.savefig(path+'X%d_a.pdf'%(X))
|
||||
plt.close(fig)
|
||||
im = np.argmax(p)
|
||||
if np.abs(x[im]) == np.max(x) or np.abs(y[im]) == (np.max(y)):
|
||||
x,y,p,loc_max = shower_plane_slice(e,X,21,21,scale4d,scale4d)
|
||||
if savefig:
|
||||
fig,axs = slice_figure(e,X,x,y,p,'sp')
|
||||
fig.savefig(path+'X%d_c.pdf'%(X))
|
||||
plt.close(fig)
|
||||
im = np.argmax(p)
|
||||
x,y,p,loc_max = shower_plane_slice(e,X,21,21,scale0_2d,scale0_2d,x[im],y[im])
|
||||
if savefig:
|
||||
fig,axs = slice_figure(e,X,x,y,p,'sp')
|
||||
fig.savefig(path+'X%d_b.pdf'%(X))
|
||||
plt.close(fig)
|
||||
|
||||
print("Finished", X)
|
||||
return np.max(p), loc_max
|
||||
|
||||
res = (delayed(loop_func)(X) for X in Xsteps)
|
||||
|
||||
if Parallel:
|
||||
#if n_jobs is None change with `with parallel_backend`
|
||||
res = Parallel(n_jobs=n_jobs)(res)
|
||||
|
||||
# unpack loop results
|
||||
max_vals, axis_points = zip(*res)
|
||||
|
||||
return Xsteps,axis_points,max_vals
|
||||
|
||||
def fit_track(e,axis_points,vals,nscale=1e0):
|
||||
weights = vals/np.max(vals)
|
||||
data=axis_points[:]
|
||||
data = [d/nscale for d in data] #km, to have more comparable step sizes
|
||||
x0=[0,0,np.deg2rad(e.zenith),np.deg2rad(e.azimuth)]
|
||||
res = minimize(dist_to_line_sum,args=(data,weights),x0=x0)
|
||||
zen_r = np.rad2deg(res.x[2])
|
||||
azi_r = np.rad2deg(res.x[3])
|
||||
print(res,zen_r,e.zenith,azi_r,e.azimuth)
|
||||
return zen_r,azi_r,[res.x[0]*nscale,res.x[1]*nscale,0]
|
||||
|
||||
|
||||
def update_event(e,core,theta,phi,axp=None):
|
||||
#recalculate
|
||||
e.zenith = theta
|
||||
e.azimuth = phi
|
||||
theta = np.deg2rad(theta)
|
||||
phi = np.deg2rad(phi)
|
||||
e.core = e.core+core
|
||||
e.uA = np.array([np.cos(phi)*np.sin(theta),np.sin(phi)*np.sin(theta),np.cos(theta)])
|
||||
e.uAxB = np.cross(e.uA,e.uB)
|
||||
e.uAxB = e.uAxB/(np.dot(e.uAxB,e.uAxB))**0.5
|
||||
e.uAxAxB = np.cross(e.uA,e.uAxB)
|
||||
#antenna position
|
||||
for a in e.antennas:
|
||||
a.x -= core[0]
|
||||
a.y -= core[1]
|
||||
a.z -= core[2]
|
||||
if axp != None:
|
||||
for ap in axp:
|
||||
ap[0] -= core[0]
|
||||
ap[1] -= core[1]
|
||||
ap[2] -= core[2]
|
||||
|
||||
def longitudinal_figure(dist,Xs,p,mode='grammage'):
|
||||
fig, axs = plt.subplots(1,figsize=(6,5))
|
||||
if mode=='grammage':
|
||||
axs.plot(Xs,p/np.max(p),'o-')
|
||||
axs.set_xlabel('X (g/cm$^2$)')
|
||||
if mode=='distance':
|
||||
axs.plot(dist/1e3,p/np.max(p),'o-')
|
||||
axs.set_xlabel('distance from ground (km)')
|
||||
axs.grid()
|
||||
fig.tight_layout()
|
||||
return fig
|
||||
|
||||
def time_residuals(e,tlable=True):
|
||||
ds,tp,nsum,ssum,swidth,azi,x,y,sid = lateral_parameters(e,True,[0,0,0])
|
||||
fig, axs = plt.subplots(1,figsize=(6,5),sharex=True)
|
||||
tp = tp-np.min(tp)
|
||||
cut_outlier = ~((ds<200)&(tp > 10))
|
||||
axs.plot(ds,tp,'o')
|
||||
if tlable:
|
||||
for d,t,s in zip(ds,tp,sid):
|
||||
plt.text(d,t,s)
|
||||
# axs.text(ds,tp,sid)
|
||||
axs.set_xlabel('distance (m)')
|
||||
axs.set_ylabel('$\Delta t (ns)$')
|
||||
axs.grid()
|
||||
z = np.polyfit(ds[cut_outlier],tp[cut_outlier],3)
|
||||
pfit = np.poly1d(z)
|
||||
xfit = np.linspace(np.min(ds),np.max(ds),100)
|
||||
yfit = pfit(xfit)
|
||||
tres = tp - pfit(ds)
|
||||
sigma =np.std(tres[cut_outlier])
|
||||
axs.plot(xfit,yfit,label=r'pol3 fit, $\sigma=%.2f$ (ns)'%(sigma))
|
||||
axs.legend()
|
||||
fig.tight_layout()
|
||||
return fig,tres
|
||||
|
||||
def figure_3D(axis_points,max_vals,zen,azi,core,res = 0):
|
||||
fig = plt.figure(figsize=(5,9))
|
||||
# fig, axs = plt.subplots(1,2,figsize=(12,8))
|
||||
ax = fig.add_subplot(2,1,1,projection='3d')
|
||||
xp = [ap[0]/1e3 for ap in axis_points]
|
||||
yp = [ap[1]/1e3 for ap in axis_points]
|
||||
zp = [ap[2]/1e3 for ap in axis_points]
|
||||
max_vals = np.asarray(max_vals)
|
||||
ax.scatter(xp, yp, zp,c=max_vals,s=150*(max_vals/np.max(max_vals))**2,cmap='Spectral_r')
|
||||
ax = fig.add_subplot(2,1,2)
|
||||
core = np.array(core)
|
||||
theta = np.deg2rad(zen)
|
||||
phi = np.deg2rad(azi)
|
||||
u = np.array([np.cos(phi)*np.sin(theta),np.sin(phi)*np.sin(theta),np.cos(theta)])
|
||||
residuals = [dist_to_line(ap,core,u) for ap in axis_points]
|
||||
dist = [np.sum((ap-core)**2)**0.5 for ap in axis_points]
|
||||
ax.scatter(dist,residuals,c=max_vals,cmap='Spectral_r')
|
||||
ax.grid()
|
||||
# ax.plot(xl,yl,zl,'-')
|
||||
# ax.set_zlim(0,18)
|
||||
# ax.view_init(15, 10)
|
||||
fig.tight_layout()
|
||||
if res != 0:
|
||||
res.track_dis.append(dist)
|
||||
res.track_res.append(residuals)
|
||||
res.track_val.append(max_vals)
|
||||
return fig
|
||||
|
||||
class RITResult():
|
||||
"""docstring for RITResult."""
|
||||
def __init__(self):
|
||||
super(RITResult, self).__init__()
|
||||
self.xmax_rit = []
|
||||
self.xmax = []
|
||||
self.profile_rit = []
|
||||
self.dX = []
|
||||
self.dl = []
|
||||
self.zenith_ini = []
|
||||
self.azimuth_ini = []
|
||||
self.core_ini = []
|
||||
self.dcore_rec = []
|
||||
self.zenith_rec = []
|
||||
self.azimuth_rec = []
|
||||
self.index = []
|
||||
self.isMC = []
|
||||
self.track_dis = []
|
||||
self.track_res =[]
|
||||
self.track_val =[]
|
||||
self.station_ids =[]
|
||||
self.station_x =[]
|
||||
self.station_y =[]
|
||||
self.station_z =[]
|
||||
self.station_maxE = []
|
||||
self.has_pulse = []
|
||||
|
||||
def fill_stations_propeties(e,res):
|
||||
x = np.array([a.x for a in e.antennas])
|
||||
y = np.array([a.y for a in e.antennas])
|
||||
z = np.array([a.z for a in e.antennas])
|
||||
ids = [a.name for a in e.antennas]
|
||||
maxE = np.array([np.max(a.E_AxB) for a in e.antennas])
|
||||
#has_pulse = np.array([np.max(a.has_pulse) for a in e.antennas])
|
||||
res.station_x.append(x)
|
||||
res.station_y.append(y)
|
||||
res.station_z.append(z)
|
||||
res.station_ids.append(ids)
|
||||
#res.has_pulse.append(has_pulse)
|
||||
|
||||
def reconstruction(e,outfile='', slice_outdir=None, Xlow=300, Xhigh=1000, N_X=15, disable_pol_and_bp=False):
|
||||
res = RITResult()
|
||||
res.isMC.append(True)
|
||||
res.zenith_ini.append(e.zenith)
|
||||
res.azimuth_ini.append(e.azimuth)
|
||||
res.core_ini.append(e.core)
|
||||
|
||||
if not disable_pol_and_bp:
|
||||
set_pol_and_bp(e, 0.03, 0.08)
|
||||
|
||||
#only use signal that have a signal in data
|
||||
fill_stations_propeties(e,res)
|
||||
Xs,axis_points,max_vals = get_axis_points(e,savefig=(slice_outdir is not None), path=slice_outdir, Xlow=Xlow, Xhigh=Xhigh, N_X=N_X)
|
||||
zen,azi,core = fit_track(e,axis_points,max_vals,1e2)
|
||||
fig = figure_3D(axis_points,max_vals,zen,azi,core,res)
|
||||
fig.savefig(outfile)
|
||||
update_event(e,core,zen,azi)
|
||||
ds,Xs,locs,p = shower_axis_slice(e)
|
||||
#result
|
||||
res.dX.append(Xs)
|
||||
res.dl.append(ds)
|
||||
res.profile_rit.append(p)
|
||||
res.xmax_rit.append(Xs[np.argmax(p)])
|
||||
res.azimuth_rec.append(e.azimuth)
|
||||
res.zenith_rec.append(e.zenith)
|
||||
res.dcore_rec.append(core)
|
||||
return res
|
||||
|
||||
if __name__ == "__main__":
|
||||
file = '../ZH_airshower/mysim.sry'
|
||||
ev = REvent(file)
|
||||
set_pol_and_bp(ev)
|
||||
X = 750
|
||||
|
||||
dXref = atm.distance_to_slant_depth(np.deg2rad(ev.zenith),X,0)
|
||||
scale2d = dXref*np.tan(np.deg2rad(2.))
|
||||
xx,yy,p,km= shower_plane_slice(ev,X,21,21,scale2d,scale2d)
|
||||
|
||||
slice_figure(ev,X,xx,yy,p,mode='sp')
|
||||
#plt.scatter(xx,yy,c=p)
|
||||
#plt.colorbar()
|
||||
plt.show()
|
97
airshower_beacon_simulation/lib/snr.py
Normal file
97
airshower_beacon_simulation/lib/snr.py
Normal file
|
@ -0,0 +1,97 @@
|
|||
import numpy as np
|
||||
from collections import namedtuple
|
||||
|
||||
from lib import direct_fourier_transform as dtft
|
||||
|
||||
import matplotlib.pyplot as plt # for debug plotting
|
||||
|
||||
passband = namedtuple("passband", ['low', 'high'], defaults=[0, np.inf])
|
||||
|
||||
def get_freq_spec(val,dt):
|
||||
"""From earsim/tools.py"""
|
||||
fval = np.fft.fft(val)[:len(val)//2]
|
||||
freq = np.fft.fftfreq(len(val),dt)[:len(val)//2]
|
||||
return fval, freq
|
||||
|
||||
|
||||
def bandpass_samples(samples, samplerate, band=passband()):
|
||||
"""
|
||||
Bandpass the samples with this passband.
|
||||
This is a hard filter.
|
||||
"""
|
||||
fft, freqs = get_freq_spec(samples, samplerate)
|
||||
|
||||
fft[ ~ self.freq_mask(freqs) ] = 0
|
||||
|
||||
return np.fft.irfft(fft)
|
||||
|
||||
def bandpass_mask(freqs, band=passband()):
|
||||
low_pass = abs(freqs) <= band[1]
|
||||
high_pass = abs(freqs) >= band[0]
|
||||
|
||||
return low_pass & high_pass
|
||||
|
||||
def bandpower(samples, samplerate=1, band=passband(), normalise_bandsize=True, debug_ax=False):
|
||||
bins = 0
|
||||
fft, freqs = get_freq_spec(samples, 1/samplerate)
|
||||
bandmask = [False]*len(freqs)
|
||||
|
||||
if band[1] is None:
|
||||
# Only a single frequency given
|
||||
# use a DTFT for finding the power
|
||||
time = np.arange(0, len(samples), 1/samplerate)
|
||||
|
||||
real, imag = dtft(band[0], time, samples)
|
||||
power = np.sum(np.abs(real**2 + imag**2))
|
||||
else:
|
||||
bandmask = bandpass_mask(freqs, band=band)
|
||||
|
||||
if normalise_bandsize:
|
||||
bins = np.count_nonzero(bandmask, axis=-1)
|
||||
else:
|
||||
bins = 1
|
||||
|
||||
bins = max(1, bins)
|
||||
|
||||
power = 1/bins * np.sum(np.abs(fft[bandmask])**2)
|
||||
|
||||
# Prepare plotting variables if an Axes is supplied
|
||||
if debug_ax:
|
||||
if any(bandmask):
|
||||
min_f, max_f = min(freqs[bandmask]), max(freqs[bandmask])
|
||||
else:
|
||||
min_f, max_f = 0, 0
|
||||
|
||||
if band[1] is None:
|
||||
min_f, max_f = band[0], band[0]
|
||||
|
||||
if debug_ax is True:
|
||||
debug_ax = plt.gca()
|
||||
|
||||
l = debug_ax.plot(freqs, np.abs(fft), alpha=0.9)
|
||||
|
||||
amp = np.sqrt(power)
|
||||
|
||||
if min_f != max_f:
|
||||
debug_ax.plot( [min_f, max_f], [amp, amp], alpha=0.7, color=l[0].get_color(), ls='dashed')
|
||||
debug_ax.axvspan(min_f, max_f, color=l[0].get_color(), alpha=0.2)
|
||||
else:
|
||||
debug_ax.plot( min_f, amp, '4', alpha=0.7, color=l[0].get_color(), ms=10)
|
||||
|
||||
return power
|
||||
|
||||
def signal_to_noise(samples, noise, samplerate=1, signal_band=passband(), noise_band=None, debug_ax=False):
|
||||
if noise_band is None:
|
||||
noise_band = signal_band
|
||||
|
||||
if noise is None:
|
||||
noise = samples
|
||||
|
||||
if debug_ax is True:
|
||||
debug_ax = plt.gca()
|
||||
|
||||
noise_power = bandpower(noise, samplerate, noise_band, debug_ax=debug_ax)
|
||||
|
||||
signal_power = bandpower(samples, samplerate, signal_band, debug_ax=debug_ax)
|
||||
|
||||
return (signal_power/noise_power)**0.5
|
1
airshower_beacon_simulation/lib/tests/lib
Symbolic link
1
airshower_beacon_simulation/lib/tests/lib
Symbolic link
|
@ -0,0 +1 @@
|
|||
../
|
108
airshower_beacon_simulation/lib/tests/test_beacon_fourier.py
Executable file
108
airshower_beacon_simulation/lib/tests/test_beacon_fourier.py
Executable file
|
@ -0,0 +1,108 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Test the functions in lib concerning
|
||||
beacon generation and phase measuring
|
||||
work correctly together.
|
||||
"""
|
||||
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
import lib
|
||||
|
||||
seed = 12345
|
||||
dt = 1 # ns
|
||||
frequency = 52.12345e-3 # GHz
|
||||
N = 5e2
|
||||
t_clock = 0
|
||||
beacon_amplitude = 1
|
||||
|
||||
t = np.arange(0, 10*int(1e3), dt, dtype=float)
|
||||
rng = np.random.default_rng(seed)
|
||||
|
||||
|
||||
# Vary both the base time and the phase
|
||||
t_extra = 0
|
||||
phase_res = np.zeros(int(N))
|
||||
amp_res = np.zeros(int(N))
|
||||
for i in range(int(N)):
|
||||
# Change timebase
|
||||
t -= t_extra
|
||||
t_extra = (2*rng.uniform(size=1) - 1) *1e3
|
||||
t += t_extra
|
||||
|
||||
# Randomly phased beacon
|
||||
phase = lib.phase_mod(np.pi*(2*rng.uniform(size=1) -1)) # rad
|
||||
beacon = beacon_amplitude * lib.sine_beacon(frequency, t, t0=0, phase=phase, peak_at_t0=False)
|
||||
|
||||
if False: # blank part of the beacon
|
||||
blank_low, blank_high = int(0.2*len(t)), int(0.4*len(t))
|
||||
t_mask = np.ones(len(t), dtype=bool)
|
||||
t_mask[blank_low:blank_high] = False
|
||||
|
||||
t = t[t_mask]
|
||||
beacon = beacon[t_mask]
|
||||
|
||||
# Introduce clock errors
|
||||
t += t_clock
|
||||
_, measured_phases, measured_amplitudes = lib.find_beacon_in_traces([beacon], t, frequency, frequency_fit=False)
|
||||
t -= t_clock
|
||||
|
||||
# Save residuals
|
||||
phase_res[i] = lib.phase_mod(measured_phases[0] - phase)
|
||||
amp_res[i] = measured_amplitudes[0] - beacon_amplitude
|
||||
|
||||
###
|
||||
## Present figures
|
||||
###
|
||||
phase2time = lambda x: x/(2*np.pi*frequency)
|
||||
time2phase = lambda x: 2*np.pi*x*frequency
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
ax.set_title("Sine beacon phase determination\nwith random time shifts")
|
||||
ax.set_xlabel("$\\varphi_{meas} - \\varphi_{true}$ [rad]")
|
||||
ax.set_ylabel("#")
|
||||
ax.hist(phase_res, bins='sqrt')
|
||||
if True:
|
||||
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
|
||||
if True:
|
||||
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
|
||||
secax.set_xlabel('Time [ns]')
|
||||
ax.legend(title='N={:.1e}'.format(len(t)))
|
||||
phase_xlims = ax.get_xlim()
|
||||
fig.tight_layout()
|
||||
|
||||
amp_xlims = None
|
||||
if True:
|
||||
fig, ax = plt.subplots()
|
||||
ax.set_title("Amplitude")
|
||||
ax.set_xlabel("$A_{meas} - 1$")
|
||||
ax.set_ylabel("#")
|
||||
ax.legend(title='N={:.1e}'.format(len(t)))
|
||||
ax.hist(amp_res, bins='sqrt')
|
||||
fig.tight_layout()
|
||||
|
||||
amp_xlims = ax.get_xlim()
|
||||
|
||||
if True:
|
||||
fig, ax = plt.subplots()
|
||||
ax.grid()
|
||||
ax.set_xlabel("Phase Res [rad]")
|
||||
ax.set_ylabel("Amplitude Res")
|
||||
sc = ax.scatter( phase_res, amp_res )
|
||||
#fig.colorbar(sc, ax=ax)
|
||||
|
||||
ax.set_xlim(phase_xlims)
|
||||
if amp_xlims is not None:
|
||||
ax.set_ylim(amp_xlims)
|
||||
if True:
|
||||
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
|
||||
if True:
|
||||
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
|
||||
secax.set_xlabel('Time [ns]')
|
||||
ax.legend(title='N={:.1e}'.format(len(t)))
|
||||
fig.tight_layout()
|
||||
|
||||
plt.show()
|
119
airshower_beacon_simulation/lib/tests/test_calculated_phase_measurement.py
Executable file
119
airshower_beacon_simulation/lib/tests/test_calculated_phase_measurement.py
Executable file
|
@ -0,0 +1,119 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Test the functions in lib concerning
|
||||
beacon generation and removing the geometrical phase
|
||||
work correctly together.
|
||||
"""
|
||||
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
from earsim import Antenna
|
||||
import lib
|
||||
|
||||
seed = 12345
|
||||
dt = 1 # ns
|
||||
frequency = 52.12345e-3 # GHz
|
||||
N = 5e2
|
||||
t_clock = 0
|
||||
beacon_amplitude = 1
|
||||
c_light = lib.c_light
|
||||
|
||||
t = np.arange(0, 10*int(1e3), dt, dtype=float)
|
||||
rng = np.random.default_rng(seed)
|
||||
|
||||
tx = Antenna(x=0,y=0,z=0,name='tx')
|
||||
rx = Antenna(x=tx.x,y=tx.y,z=tx.z,name='rx')
|
||||
|
||||
# Vary both the base time and the phase
|
||||
t_extra = 0
|
||||
phase_res = np.zeros(int(N))
|
||||
amp_res = np.zeros(int(N))
|
||||
for i in range(int(N)):
|
||||
# Change timebase
|
||||
t -= t_extra
|
||||
t_extra = (2*rng.uniform(size=1) - 1) *1e3
|
||||
t += t_extra
|
||||
|
||||
# Randomise Antenna Location
|
||||
if True:
|
||||
rx.x, rx.y, rx.z = (2*rng.uniform(size=3) -1) * 1e4
|
||||
|
||||
# Randomly phased beacon
|
||||
# at Antenna
|
||||
phase = lib.phase_mod(np.pi*(2*rng.uniform(size=1) -1)) # rad
|
||||
beacon = beacon_amplitude * lib.beacon_from(tx, rx, frequency, t, t0=0, phase=phase, peak_at_t0=False, c_light=c_light, radiate_rsq=False)
|
||||
|
||||
if False: # blank part of the beacon
|
||||
blank_low, blank_high = int(0.2*len(t)), int(0.4*len(t))
|
||||
t_mask = np.ones(len(t), dtype=bool)
|
||||
t_mask[blank_low:blank_high] = False
|
||||
|
||||
t = t[t_mask]
|
||||
beacon = beacon[t_mask]
|
||||
|
||||
# Introduce clock errors
|
||||
t += t_clock
|
||||
_, measured_phases, measured_amplitudes = lib.find_beacon_in_traces([beacon], t, frequency, frequency_fit=False)
|
||||
t -= t_clock
|
||||
|
||||
calculated_phases = lib.remove_antenna_geometry_phase(tx, rx, frequency, measured_phases, c_light=c_light)
|
||||
|
||||
# Save residuals
|
||||
phase_res[i] = lib.phase_mod(calculated_phases[0] - phase)
|
||||
amp_res[i] = measured_amplitudes[0] - beacon_amplitude
|
||||
|
||||
###
|
||||
## Present figures
|
||||
###
|
||||
phase2time = lambda x: x/(2*np.pi*frequency)
|
||||
time2phase = lambda x: 2*np.pi*x*frequency
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
ax.set_title("Measured phase at Antenna - geometrical phase")
|
||||
ax.set_xlabel("$\\varphi_{meas} - \\varphi_{true}$ [rad]")
|
||||
ax.set_ylabel("#")
|
||||
ax.hist(phase_res, bins='sqrt')
|
||||
if True:
|
||||
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
|
||||
if True:
|
||||
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
|
||||
secax.set_xlabel('Time [ns]')
|
||||
ax.legend(title='N={:.1e}'.format(len(t)))
|
||||
phase_xlims = ax.get_xlim()
|
||||
fig.tight_layout()
|
||||
|
||||
amp_xlims = None
|
||||
if True:
|
||||
fig, ax = plt.subplots()
|
||||
ax.set_title("Amplitude")
|
||||
ax.set_xlabel("$A_{meas} - 1$")
|
||||
ax.set_ylabel("#")
|
||||
ax.legend(title='N={:.1e}'.format(len(t)))
|
||||
ax.hist(amp_res, bins='sqrt')
|
||||
fig.tight_layout()
|
||||
|
||||
amp_xlims = ax.get_xlim()
|
||||
|
||||
if True:
|
||||
fig, ax = plt.subplots()
|
||||
ax.grid()
|
||||
ax.set_xlabel("Phase Res [rad]")
|
||||
ax.set_ylabel("Amplitude Res")
|
||||
sc = ax.scatter( phase_res, amp_res )
|
||||
#fig.colorbar(sc, ax=ax)
|
||||
|
||||
ax.set_xlim(phase_xlims)
|
||||
if amp_xlims is not None:
|
||||
ax.set_ylim(amp_xlims)
|
||||
if True:
|
||||
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
|
||||
if True:
|
||||
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
|
||||
secax.set_xlabel('Time [ns]')
|
||||
ax.legend(title='N={:.1e}'.format(len(t)))
|
||||
fig.tight_layout()
|
||||
|
||||
plt.show()
|
Loading…
Add table
Add a link
Reference in a new issue