Why is clk-phase not zero is unresolved

Merge branch 'why-is-clk-phase-not-zero-centered' into main
This commit is contained in:
Eric-Teunis de Boone 2023-03-27 17:08:16 +02:00
commit 5ea4a0df17
39 changed files with 1027 additions and 308 deletions

View file

@ -0,0 +1,3 @@
from .lib import *
from . import rit
from .snr import *

@ -0,0 +1 @@
Subproject commit cd903a30f99e7f640ed068c4393cd38c23c316e2

@ -0,0 +1 @@
Subproject commit 6ef809020477cf78415b9fa733d4fbb4a74102a1

View file

@ -0,0 +1,219 @@
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as stats
from itertools import zip_longest
def phase_comparison_figure(
measured_phases,
true_phases,
plot_residuals=True,
f_beacon=None,
hist_kwargs={},
sc_kwargs={},
text_kwargs={},
colors=['blue', 'orange'],
legend_on_scatter=True,
secondary_axis='time',
fit_gaussian=False,
mean_snr=None,
return_fit_info=False,
**fig_kwargs
):
"""
Create a figure comparing measured_phase against true_phase
by both plotting the values, and the residuals.
"""
default_fig_kwargs = dict(sharex=True)
default_hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step')
default_text_kwargs = dict(fontsize=14, verticalalignment='top')
default_sc_kwargs = dict(alpha=0.6, ls='none')
fig_kwargs = {**default_fig_kwargs, **fig_kwargs}
hist_kwargs = {**default_hist_kwargs, **hist_kwargs}
text_kwargs = {**default_text_kwargs, **text_kwargs}
sc_kwargs = {**default_sc_kwargs, **sc_kwargs}
do_hist_plot = hist_kwargs is not False
do_scatter_plot = sc_kwargs is not False
fig, axs = plt.subplots(0+do_hist_plot+do_scatter_plot, 1, **fig_kwargs)
if not hasattr(axs, '__len__'):
axs = [axs]
if f_beacon and secondary_axis in ['phase', 'time']:
phase2time = lambda x: x/(2*np.pi*f_beacon)
time2phase = lambda x: 2*np.pi*x*f_beacon
if secondary_axis == 'time':
functions = (phase2time, time2phase)
label = 'Time $\\varphi/(2\\pi f_{beac})$ [ns]'
else:
functions = (time2phase, phase2time)
label = 'Phase $2\\pi t f_{beac}$ [rad]'
secax = axs[0].secondary_xaxis('top', functions=functions)
# Histogram
fit_info = {}
if do_hist_plot:
i=0
axs[i].set_ylabel("#")
this_kwargs = dict(
ax = axs[i],
text_kwargs=text_kwargs,
hist_kwargs={**hist_kwargs, **dict(label='Measured', color=colors[0], ls='solid')},
mean_snr=mean_snr,
)
if fit_gaussian:
this_kwargs['fit_distr'] = 'gaussian'
_, fit_info = fitted_histogram_figure(
measured_phases,
**this_kwargs
)
if not plot_residuals: # also plot the true clock phases
_bins = fit_info['bins']
axs[i].hist(true_phases, color=colors[1], label='Actual', ls='dashed', **{**hist_kwargs, **dict(bins=_bins)})
# Scatter plot
if do_scatter_plot:
i=1
axs[i].set_ylabel("Antenna no.")
axs[i].plot(measured_phases, np.arange(len(measured_phases)), marker='x' if plot_residuals else '3', color=colors[0], label='Measured', **sc_kwargs)
if not plot_residuals: # also plot the true clock phases
axs[i].plot(true_phases, np.arange(len(true_phases)), marker='4', color=colors[1], label='Actual', **sc_kwargs)
if not plot_residuals and legend_on_scatter:
axs[i].legend()
fig.tight_layout()
if return_fit_info:
return fig, fit_info
return fig
def fitted_histogram_figure(
amplitudes,
fit_distr = None,
calc_chisq = True,
text_kwargs={},
hist_kwargs={},
mean_snr = None,
ax = None,
**fig_kwargs
):
"""
Create a figure showing $amplitudes$ as a histogram.
If fit_distr is a (list of) string, also fit the respective
distribution function and show the parameters on the figure.
"""
default_hist_kwargs = dict(bins='sqrt', density=False, alpha=0.8, histtype='step', label='hist')
default_text_kwargs = dict(fontsize=14, verticalalignment='top')
if isinstance(fit_distr, str):
fit_distr = [fit_distr]
hist_kwargs = {**default_hist_kwargs, **hist_kwargs}
text_kwargs = {**default_text_kwargs, **text_kwargs}
if ax is None:
fig, ax = plt.subplots(1,1, **fig_kwargs)
else:
fig = ax.get_figure()
text_kwargs['transform'] = ax.transAxes
counts, bins, _patches = ax.hist(amplitudes, **hist_kwargs)
fit_info = []
if fit_distr:
min_x = min(amplitudes)
max_x = max(amplitudes)
dx = bins[1] - bins[0]
scale = len(amplitudes) * dx
xs = np.linspace(min_x, max_x)
for distr in fit_distr:
fit_params2text_params = lambda x: x
if 'rice' == distr:
name = "Rice"
param_names = [ "$\\nu$", "$\\sigma$" ]
distr_func = stats.rice
fit_params2text_params = lambda x: (x[0]*x[1], x[1])
elif 'gaussian' == distr:
name = "Norm"
param_names = [ "$\\mu$", "$\\sigma$" ]
distr_func = stats.norm
elif 'rayleigh' == distr:
name = "Rayleigh"
param_names = [ "$\\sigma$" ]
distr_func = stats.rayleigh
fit_params2text_params = lambda x: (x[0]+x[1]/2,)
else:
raise ValueError('Unknown distribution function '+distr)
label = name +"(" + ','.join(param_names) + ')'
fit_params = distr_func.fit(amplitudes)
fit_ys = distr_func.pdf(xs, *fit_params)
ax.plot(xs, fit_ys*scale, label=label)
chisq_strs = []
if calc_chisq:
ct = np.diff(distr_func.cdf(bins, *fit_params))*np.sum(counts)
c2t = stats.chisquare(counts, ct, ddof=len(fit_params))
chisq_strs = [
f"$\\chi^2$/dof = {c2t[0]: .2g}/{len(fit_params)}"
]
# change parameters if needed
text_fit_params = fit_params2text_params(fit_params)
text_str = "\n".join(
[label]
+
[ f"{param} = {value: .2e}" for param, value in zip_longest(param_names, text_fit_params, fillvalue='?') ]
+
chisq_strs
)
this_info = {
'name': name,
'param_names': param_names,
'param_values': text_fit_params,
'text_str': text_str,
}
if chisq_strs:
this_info['chisq'] = c2t[0]
this_info['dof'] = len(fit_params)
fit_info.append(this_info)
loc = (0.02, 0.95)
ax.text(*loc, "\n\n".join([info['text_str'] for info in fit_info]), **{**text_kwargs, **dict(ha='left')})
if mean_snr:
text_str = f"$\\langle SNR \\rangle$ = {mean_snr: .1e}"
loc = (0.98, 0.95)
ax.text(*loc, text_str, **{**text_kwargs, **dict(ha='right')})
return fig, dict(fit_info=fit_info, counts=counts, bins=bins)

View file

@ -0,0 +1,258 @@
# vim: fdm=indent ts=4
"""
Library for this simulation
"""
import numpy as np
from numpy.polynomial import Polynomial
from earsim import Antenna
c_light = 3e8*1e-9 # m/ns
""" Beacon utils """
def sine_beacon(f, t, t0=0, amplitude=1, baseline=0, phase=0, peak_at_t0=0):
"""
Return a sine appropriate as a beacon
"""
baseline = baseline*np.ones_like(t)
if peak_at_t0: # add peak near t0
idx = (np.abs(t-t0)).argmin()
baseline[ max(0, idx-1):min(len(t), idx+1) ] += peak_at_t0 + amplitude
return amplitude * np.cos(2*np.pi*f*(t+t0) + phase) + baseline
def phase_mod(phase, low=np.pi):
"""
Modulo phase such that it falls within the
interval $[-low, 2\pi - low)$.
"""
return (phase + low) % (2*np.pi) - low
def distance(x1, x2):
"""
Calculate the Euclidean distance between two locations x1 and x2
"""
assert type(x1) in [Antenna]
x1 = np.array([x1.x, x1.y, x1.z])
assert type(x2) in [Antenna]
x2 = np.array([x2.x, x2.y, x2.z])
return np.sqrt( np.sum( (x1-x2)**2 ) )
def geometry_time(dist, x2=None, c_light=c_light):
if x2 is not None:
dist = distance(dist, x2)
return dist/c_light
def beacon_from(tx, rx, f, t=0, t0=0, c_light=c_light, radiate_rsq=True, amplitude=1,**kwargs):
dist = distance(tx,rx)
# suppress extra time delay from distance
if c_light is not None and np.isfinite(c_light):
t0 = t0 + dist/c_light
if radiate_rsq:
if np.isclose(dist, 0):
dist = 1
amplitude *= 1/(dist**2)
return sine_beacon(f, t, t0=t0, amplitude=amplitude,**kwargs)
def remove_antenna_geometry_phase(tx, antennas, f_beacon, measured_phases=None, c_light=c_light):
"""
Remove the geometrical phase from the measured antenna phase.
"""
if not hasattr(antennas, '__len__'):
antennas = [antennas]
if not hasattr(measured_phases, '__len__'):
measured_phases = [measured_phases]
clock_phases = np.empty( (len(antennas)) )
for i, ant in enumerate(antennas):
measured_phase = measured_phases[i]
geom_time = geometry_time(tx, ant, c_light=c_light)
geom_phase = geom_time * 2*np.pi*f_beacon
clock_phases[i] = phase_mod(measured_phase - geom_phase)
return clock_phases
""" Fourier """
def ft_corr_vectors(freqs, time):
"""
Get the cosine and sine terms for freqs at time.
Takes the outer product of freqs and time.
"""
freqtime = np.outer(freqs, time)
c_k = np.cos(2*np.pi*freqtime)
s_k = -1*np.sin(2*np.pi*freqtime)
return c_k, s_k
def direct_fourier_transform(freqs, time, samplesets_iterable):
"""
Determine the fourier transform of each sampleset in samplesets_iterable at freqs.
The samplesets are expected to have the same time vector.
Returns either a generator to return the fourier transform for each sampleset
if samplesets_iterable is a generator
or a numpy array.
"""
c_k, s_k = ft_corr_vectors(freqs, time)
if not hasattr(samplesets_iterable, '__len__') and hasattr(samplesets_iterable, '__iter__'):
# samplesets_iterable is an iterator
# return a generator containing (real, imag) amplitudes
return ( (np.dot(c_k, samples), np.dot(s_k, samples)) for samples in samplesets_iterable )
# Numpy array
return np.dot(c_k, samplesets_iterable), np.dot(s_k, samplesets_iterable)
def phase_field_from_tx(x, y, tx, f_beacon, c_light=c_light, t0=0, wrap_phase=True, return_meshgrid=True):
"""
"""
assert type(tx) in [Antenna]
xs, ys = np.meshgrid(x, y, sparse=True)
x_distances = (tx.x - xs)**2
y_distances = (tx.y - ys)**2
dist = np.sqrt( x_distances + y_distances )
phase = (dist/c_light + t0) * f_beacon*2*np.pi
if wrap_phase:
phase = phase_mod(phase)
if return_meshgrid:
return phase, (xs, ys)
else:
return phase, (np.repeat(xs, len(ys), axis=0), np.repeat(ys, len(xs[0]), axis=1))
def find_beacon_in_traces(
traces,
t_trace,
f_beacon_estimate = 50e6,
frequency_fit = False,
N_test_freqs = 5e2,
f_beacon_estimate_band = 0.01,
amp_cut = 0.8
):
"""
f_beacon_band is inclusive
traces is [trace, trace, trace, .. ]
"""
amplitudes = np.zeros(len(traces))
phases = np.zeros(len(traces))
frequencies = np.zeros(len(traces))
if frequency_fit: # fit frequency
test_freqs = f_beacon_estimate + f_beacon_estimate_band * np.linspace(-1, 1, int(N_test_freqs)+1)
ft_amp_gen = direct_fourier_transform(test_freqs, t_trace, (x for x in traces))
n_samples = len(t_trace)
for i, ft_amp in enumerate(ft_amp_gen):
real, imag = ft_amp
amps = 1/n_samples * ( real**2 + imag**2)**0.5
# find frequency peak and surrounding bins
# that are valid for the parabola fit
max_amp_idx = np.argmax(amps)
max_amp = amps[max_amp_idx]
if True:
frequencies[i] = test_freqs[max_amp_idx]
continue
valid_mask = amps >= amp_cut*max_amp
if True: # make sure not to use other peaks
lower_mask = valid_mask[0:max_amp_idx]
upper_mask = valid_mask[max_amp_idx:]
if any(lower_mask):
lower_end = np.argmin(lower_mask[::-1])
else:
lower_end = max_amp_idx
if any(upper_mask):
upper_end = np.argmin(upper_mask)
else:
upper_end = 0
valid_mask[0:(max_amp_idx - lower_end)] = False
valid_mask[(max_amp_idx + upper_end):] = False
if all(~valid_mask):
frequencies[i] = np.nan
continue
# fit Parabola
parafit = Polynomial.fit(test_freqs[valid_mask], amps[valid_mask], 2)
func = parafit.convert()
# find frequency where derivative is 0
deriv = func.deriv(1)
freq = deriv.roots()[0]
frequencies[i] = freq
else: # no frequency finding
frequencies[:] = f_beacon_estimate
n_samples = len(t_trace)
# evaluate fourier transform at freq for each trace
for i, freq in enumerate(frequencies):
if freq is np.nan:
phases[i] = np.nan
amplitudes[i] = np.nan
continue
real, imag = direct_fourier_transform(freq, t_trace, traces[i])
phases[i] = np.arctan2(imag, real)
amplitudes[i] = 2/n_samples * (real**2 + imag**2)**0.5
return frequencies, phases, amplitudes
def coherence_sum_maxima(ref_x, y, k_step=1, k_start=0, k_end=None, periodic=True):
"""
Use the maximum of a coherent sum to determine
the best number of samples to move
"""
N_samples = int( len(ref_x) )
k_end = N_samples if k_end is None or k_end > max_k else k_end
ks = np.arange(k_start, k_end, step=k_step)
maxima = np.empty(len(ks))
if periodic is False:
# prepend zeros
N_zeros = N_samples
preshift = 0 # only required for testing purposes
ref_x = np.pad(ref_x, (N_zeros-0,0), 'constant')
y = np.pad(y, (N_zeros-preshift,preshift), 'constant')
for i,k in enumerate(ks, 0):
augmented_y = np.roll(y, k)
maxima[i] = max(ref_x + augmented_y)
return ks, maxima

View file

@ -0,0 +1,394 @@
# from wappy import *
from earsim import *
from atmocal import *
import matplotlib.pyplot as plt
from scipy.signal import hilbert
from scipy import signal
from scipy.interpolate import interp1d
from scipy.optimize import curve_fit,minimize
import pandas as pd
import os
try:
from joblib import Parallel, delayed
except:
Parallel = None
delayed = lambda x: x
plt.rcParams.update({'font.size': 16})
atm = AtmoCal()
from matplotlib import cm
def set_pol_and_bp(e,low=0.03,high=0.08):
for ant in e.antennas:
E = [np.dot(e.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(ant.Ex,ant.Ey,ant.Ez)]
dt = ant.t[1] -ant.t[0]
E = block_filter(E,dt,low,high)
ant.E_AxB = E
ant.t_AxB = ant.t
def pow_and_time(test_loc,ev,dt=1.0):
t_ = []
a_ = []
t_min = 1e9
t_max = -1e9
for ant in ev.antennas:
#propagate to test location
aloc = [ant.x,ant.y,ant.z]
delta,dist = atm.light_travel_time(test_loc,aloc)
delta = delta*1e9
t__ = np.subtract(ant.t_AxB,delta)
t_.append(t__)
a_.append(ant.E_AxB)
if t__[0] < t_min:
t_min = t__[0]
if t__[-1] > t_max:
t_max = t__[-1]
t_sum = np.arange(t_min+1,t_max-1,dt)
a_sum = np.zeros(len(t_sum))
#interpolation
for t_r,E_ in zip (t_,a_):
f = interp1d(t_r,E_,assume_sorted=True,bounds_error=False,fill_value=0.)
a_int = f(t_sum)
a_sum = np.add(a_sum,a_int)
if len(a_sum) != 0:
P = np.sum(np.square(np.absolute(np.fft.fft(a_sum))))
# normalise P with the length of the traces
P = P/( t_sum[-1] - t_sum[0])
else:
print("ERROR, a_sum lenght = 0",
"tmin ",t_min,
"t_max ",t_max,
"dt",dt)
P = 0
return P,t_,a_,a_sum,t_sum
def shower_axis_slice(e,Xb=200,Xe=1200,dX=2,zgr=0):
zgr = zgr + e.core[2]
N = int((Xe-Xb)/dX)
Xs = np.array(np.linspace(Xb,Xe,N+1))
ds = np.array([atm.distance_to_slant_depth(np.deg2rad(e.zenith),X,zgr) for X in Xs])
locs = []
for d_ in ds:
xc = np.sin(np.deg2rad(e.zenith))*np.cos(np.deg2rad(e.azimuth))* d_
yc = np.sin(np.deg2rad(e.zenith))*np.sin(np.deg2rad(e.azimuth))* d_
zc = np.cos(np.deg2rad(e.zenith))* d_
locs.append([xc,yc,zc])
p = []
for loc in locs:
P,t_,pulses_,wav,twav = pow_and_time(loc,e)
p.append(P)
p = np.asanyarray(p)
return ds,Xs,locs,p
def shower_plane_slice(e,X=750.,Nx=10,Ny=10,wx=1e3,wy=1e3,xoff=0,yoff=0,zgr=0,n_jobs=None):
zgr = zgr + e.core[2]
dX = atm.distance_to_slant_depth(np.deg2rad(e.zenith),X,zgr)
x = np.linspace(-wx,wx,Nx)
y = np.linspace(-wy,wy,Ny)
def loop_func(x_, y_, xoff=xoff, yoff=yoff):
loc = (x_+xoff)* e.uAxB + (y_+yoff)*e.uAxAxB + dX *e.uA
P,t_,pulses_,wav,twav = pow_and_time(loc,e)
return x_+xoff, y_+yoff, P, loc
res = ( delayed(loop_func)(x_, y_) for x_ in x for y_ in y)
if Parallel:
#if n_jobs is None change with `with parallel_backend`
res = Parallel(n_jobs=n_jobs)(res)
# unpack loop results
xx, yy, p, locs = zip(*res)
xx = np.asarray(xx)
yy = np.asarray(yy)
p = np.asanyarray(p)
return xx,yy,p,locs[np.argmax(p)]
def slice_figure(e,X,xx,yy,p,mode='horizontal', scatter_kwargs={}, colorbar_kwargs={'label':'Power'}):
scatter_kwargs = {
**dict(
cmap='Spectral_r',
alpha=0.9,
s=30
),
**scatter_kwargs
}
fig, axs = plt.subplots(1,figsize=(10,8))
fig.suptitle(r'E = %.1f EeV, $\theta$ = %.1f$^\circ$, $\phi$ = %.1f$^\circ$ X = %.f'%(e.energy,e.zenith,e.azimuth,X))
sc = axs.scatter(xx/1e3,yy/1e3,c=p,**scatter_kwargs)
fig.colorbar(sc,ax=axs, **colorbar_kwargs)
zgr = 0 + e.core[2]
dX = atm.distance_to_slant_depth(np.deg2rad(e.zenith),X,zgr)
xc = np.sin(np.deg2rad(e.zenith))*np.cos(np.deg2rad(e.azimuth))* dX
yc = np.sin(np.deg2rad(e.zenith))*np.sin(np.deg2rad(e.azimuth))* dX
if mode == 'horizontal':
axs.plot(xc/1e3,yc/1e3,'r+',ms=30)
axs.set_xlabel('x (km)')
axs.set_ylabel('y (km)')
elif mode == "sp":
axs.plot(0,0,'r+',ms=30)
axs.set_xlabel('-v x B (km)')
axs.set_ylabel(' vxvxB (km)')
im = np.argmax(p)
axs.plot(xx[im]/1e3,yy[im]/1e3,'bx',ms=30)
fig.tight_layout()
return fig,axs
def dist_to_line(xp,core,u):
xp = np.array(xp)
xp_core = xp-core
c2 = np.dot(xp_core,xp_core)
a2 = np.dot((np.dot(xp_core,u)*u),(np.dot(xp_core,u)*u))
d = (np.abs(c2 - a2))**0.5
return d
def dist_to_line_sum(param,data,weights):
#distance line point: a = xp-core is D= | (a)^2-(a dot n)n |
#where ux is direction of line and x0 is a point in the line (like t = 0)
x0 = param[0]
y0 = param[1]
theta = param[2]
phi = param[3]
core = np.array([x0, y0, 0.])
u = np.array([np.cos(phi)*np.sin(theta),np.sin(phi)*np.sin(theta),np.cos(theta)])
dsum = 0
for xp,w in zip(data,weights):
dsum += dist_to_line(xp,core,u)*w**2
# print('%.2e %.2e %.2e %.2e %.2e'%(x0,y0,theta,phi,dsum))
return dsum/len(data)
def get_axis_points(e,savefig=True,path="",zgr=0,Xlow=300, Xhigh=1000, N_X=15, n_jobs=None):
Xsteps = np.linspace(Xlow, Xhigh, N_X)
zgr=zgr+e.core[2] #not exact
dXref = atm.distance_to_slant_depth(np.deg2rad(e.zenith),750,zgr)
scale2d = dXref*np.tan(np.deg2rad(2.))
scale4d = dXref*np.tan(np.deg2rad(4.))
scale0_2d=dXref*np.tan(np.deg2rad(0.2))
def loop_func(X):
print("Starting", X)
x,y,p,loc_max = shower_plane_slice(e,X,21,21,scale2d,scale2d)
if savefig:
fig,axs = slice_figure(e,X,x,y,p,'sp')
fig.savefig(path+'X%d_a.pdf'%(X))
plt.close(fig)
im = np.argmax(p)
if np.abs(x[im]) == np.max(x) or np.abs(y[im]) == (np.max(y)):
x,y,p,loc_max = shower_plane_slice(e,X,21,21,scale4d,scale4d)
if savefig:
fig,axs = slice_figure(e,X,x,y,p,'sp')
fig.savefig(path+'X%d_c.pdf'%(X))
plt.close(fig)
im = np.argmax(p)
x,y,p,loc_max = shower_plane_slice(e,X,21,21,scale0_2d,scale0_2d,x[im],y[im])
if savefig:
fig,axs = slice_figure(e,X,x,y,p,'sp')
fig.savefig(path+'X%d_b.pdf'%(X))
plt.close(fig)
print("Finished", X)
return np.max(p), loc_max
res = (delayed(loop_func)(X) for X in Xsteps)
if Parallel:
#if n_jobs is None change with `with parallel_backend`
res = Parallel(n_jobs=n_jobs)(res)
# unpack loop results
max_vals, axis_points = zip(*res)
return Xsteps,axis_points,max_vals
def fit_track(e,axis_points,vals,nscale=1e0):
weights = vals/np.max(vals)
data=axis_points[:]
data = [d/nscale for d in data] #km, to have more comparable step sizes
x0=[0,0,np.deg2rad(e.zenith),np.deg2rad(e.azimuth)]
res = minimize(dist_to_line_sum,args=(data,weights),x0=x0)
zen_r = np.rad2deg(res.x[2])
azi_r = np.rad2deg(res.x[3])
print(res,zen_r,e.zenith,azi_r,e.azimuth)
return zen_r,azi_r,[res.x[0]*nscale,res.x[1]*nscale,0]
def update_event(e,core,theta,phi,axp=None):
#recalculate
e.zenith = theta
e.azimuth = phi
theta = np.deg2rad(theta)
phi = np.deg2rad(phi)
e.core = e.core+core
e.uA = np.array([np.cos(phi)*np.sin(theta),np.sin(phi)*np.sin(theta),np.cos(theta)])
e.uAxB = np.cross(e.uA,e.uB)
e.uAxB = e.uAxB/(np.dot(e.uAxB,e.uAxB))**0.5
e.uAxAxB = np.cross(e.uA,e.uAxB)
#antenna position
for a in e.antennas:
a.x -= core[0]
a.y -= core[1]
a.z -= core[2]
if axp != None:
for ap in axp:
ap[0] -= core[0]
ap[1] -= core[1]
ap[2] -= core[2]
def longitudinal_figure(dist,Xs,p,mode='grammage'):
fig, axs = plt.subplots(1,figsize=(6,5))
if mode=='grammage':
axs.plot(Xs,p/np.max(p),'o-')
axs.set_xlabel('X (g/cm$^2$)')
if mode=='distance':
axs.plot(dist/1e3,p/np.max(p),'o-')
axs.set_xlabel('distance from ground (km)')
axs.grid()
fig.tight_layout()
return fig
def time_residuals(e,tlable=True):
ds,tp,nsum,ssum,swidth,azi,x,y,sid = lateral_parameters(e,True,[0,0,0])
fig, axs = plt.subplots(1,figsize=(6,5),sharex=True)
tp = tp-np.min(tp)
cut_outlier = ~((ds<200)&(tp > 10))
axs.plot(ds,tp,'o')
if tlable:
for d,t,s in zip(ds,tp,sid):
plt.text(d,t,s)
# axs.text(ds,tp,sid)
axs.set_xlabel('distance (m)')
axs.set_ylabel('$\Delta t (ns)$')
axs.grid()
z = np.polyfit(ds[cut_outlier],tp[cut_outlier],3)
pfit = np.poly1d(z)
xfit = np.linspace(np.min(ds),np.max(ds),100)
yfit = pfit(xfit)
tres = tp - pfit(ds)
sigma =np.std(tres[cut_outlier])
axs.plot(xfit,yfit,label=r'pol3 fit, $\sigma=%.2f$ (ns)'%(sigma))
axs.legend()
fig.tight_layout()
return fig,tres
def figure_3D(axis_points,max_vals,zen,azi,core,res = 0):
fig = plt.figure(figsize=(5,9))
# fig, axs = plt.subplots(1,2,figsize=(12,8))
ax = fig.add_subplot(2,1,1,projection='3d')
xp = [ap[0]/1e3 for ap in axis_points]
yp = [ap[1]/1e3 for ap in axis_points]
zp = [ap[2]/1e3 for ap in axis_points]
max_vals = np.asarray(max_vals)
ax.scatter(xp, yp, zp,c=max_vals,s=150*(max_vals/np.max(max_vals))**2,cmap='Spectral_r')
ax = fig.add_subplot(2,1,2)
core = np.array(core)
theta = np.deg2rad(zen)
phi = np.deg2rad(azi)
u = np.array([np.cos(phi)*np.sin(theta),np.sin(phi)*np.sin(theta),np.cos(theta)])
residuals = [dist_to_line(ap,core,u) for ap in axis_points]
dist = [np.sum((ap-core)**2)**0.5 for ap in axis_points]
ax.scatter(dist,residuals,c=max_vals,cmap='Spectral_r')
ax.grid()
# ax.plot(xl,yl,zl,'-')
# ax.set_zlim(0,18)
# ax.view_init(15, 10)
fig.tight_layout()
if res != 0:
res.track_dis.append(dist)
res.track_res.append(residuals)
res.track_val.append(max_vals)
return fig
class RITResult():
"""docstring for RITResult."""
def __init__(self):
super(RITResult, self).__init__()
self.xmax_rit = []
self.xmax = []
self.profile_rit = []
self.dX = []
self.dl = []
self.zenith_ini = []
self.azimuth_ini = []
self.core_ini = []
self.dcore_rec = []
self.zenith_rec = []
self.azimuth_rec = []
self.index = []
self.isMC = []
self.track_dis = []
self.track_res =[]
self.track_val =[]
self.station_ids =[]
self.station_x =[]
self.station_y =[]
self.station_z =[]
self.station_maxE = []
self.has_pulse = []
def fill_stations_propeties(e,res):
x = np.array([a.x for a in e.antennas])
y = np.array([a.y for a in e.antennas])
z = np.array([a.z for a in e.antennas])
ids = [a.name for a in e.antennas]
maxE = np.array([np.max(a.E_AxB) for a in e.antennas])
#has_pulse = np.array([np.max(a.has_pulse) for a in e.antennas])
res.station_x.append(x)
res.station_y.append(y)
res.station_z.append(z)
res.station_ids.append(ids)
#res.has_pulse.append(has_pulse)
def reconstruction(e,outfile='', slice_outdir=None, Xlow=300, Xhigh=1000, N_X=15, disable_pol_and_bp=False):
res = RITResult()
res.isMC.append(True)
res.zenith_ini.append(e.zenith)
res.azimuth_ini.append(e.azimuth)
res.core_ini.append(e.core)
if not disable_pol_and_bp:
set_pol_and_bp(e, 0.03, 0.08)
#only use signal that have a signal in data
fill_stations_propeties(e,res)
Xs,axis_points,max_vals = get_axis_points(e,savefig=(slice_outdir is not None), path=slice_outdir, Xlow=Xlow, Xhigh=Xhigh, N_X=N_X)
zen,azi,core = fit_track(e,axis_points,max_vals,1e2)
fig = figure_3D(axis_points,max_vals,zen,azi,core,res)
fig.savefig(outfile)
update_event(e,core,zen,azi)
ds,Xs,locs,p = shower_axis_slice(e)
#result
res.dX.append(Xs)
res.dl.append(ds)
res.profile_rit.append(p)
res.xmax_rit.append(Xs[np.argmax(p)])
res.azimuth_rec.append(e.azimuth)
res.zenith_rec.append(e.zenith)
res.dcore_rec.append(core)
return res
if __name__ == "__main__":
file = '../ZH_airshower/mysim.sry'
ev = REvent(file)
set_pol_and_bp(ev)
X = 750
dXref = atm.distance_to_slant_depth(np.deg2rad(ev.zenith),X,0)
scale2d = dXref*np.tan(np.deg2rad(2.))
xx,yy,p,km= shower_plane_slice(ev,X,21,21,scale2d,scale2d)
slice_figure(ev,X,xx,yy,p,mode='sp')
#plt.scatter(xx,yy,c=p)
#plt.colorbar()
plt.show()

View file

@ -0,0 +1,97 @@
import numpy as np
from collections import namedtuple
from lib import direct_fourier_transform as dtft
import matplotlib.pyplot as plt # for debug plotting
passband = namedtuple("passband", ['low', 'high'], defaults=[0, np.inf])
def get_freq_spec(val,dt):
"""From earsim/tools.py"""
fval = np.fft.fft(val)[:len(val)//2]
freq = np.fft.fftfreq(len(val),dt)[:len(val)//2]
return fval, freq
def bandpass_samples(samples, samplerate, band=passband()):
"""
Bandpass the samples with this passband.
This is a hard filter.
"""
fft, freqs = get_freq_spec(samples, samplerate)
fft[ ~ self.freq_mask(freqs) ] = 0
return np.fft.irfft(fft)
def bandpass_mask(freqs, band=passband()):
low_pass = abs(freqs) <= band[1]
high_pass = abs(freqs) >= band[0]
return low_pass & high_pass
def bandpower(samples, samplerate=1, band=passband(), normalise_bandsize=True, debug_ax=False):
bins = 0
fft, freqs = get_freq_spec(samples, 1/samplerate)
bandmask = [False]*len(freqs)
if band[1] is None:
# Only a single frequency given
# use a DTFT for finding the power
time = np.arange(0, len(samples), 1/samplerate)
real, imag = dtft(band[0], time, samples)
power = np.sum(np.abs(real**2 + imag**2))
else:
bandmask = bandpass_mask(freqs, band=band)
if normalise_bandsize:
bins = np.count_nonzero(bandmask, axis=-1)
else:
bins = 1
bins = max(1, bins)
power = 1/bins * np.sum(np.abs(fft[bandmask])**2)
# Prepare plotting variables if an Axes is supplied
if debug_ax:
if any(bandmask):
min_f, max_f = min(freqs[bandmask]), max(freqs[bandmask])
else:
min_f, max_f = 0, 0
if band[1] is None:
min_f, max_f = band[0], band[0]
if debug_ax is True:
debug_ax = plt.gca()
l = debug_ax.plot(freqs, np.abs(fft), alpha=0.9)
amp = np.sqrt(power)
if min_f != max_f:
debug_ax.plot( [min_f, max_f], [amp, amp], alpha=0.7, color=l[0].get_color(), ls='dashed')
debug_ax.axvspan(min_f, max_f, color=l[0].get_color(), alpha=0.2)
else:
debug_ax.plot( min_f, amp, '4', alpha=0.7, color=l[0].get_color(), ms=10)
return power
def signal_to_noise(samples, noise, samplerate=1, signal_band=passband(), noise_band=None, debug_ax=False):
if noise_band is None:
noise_band = signal_band
if noise is None:
noise = samples
if debug_ax is True:
debug_ax = plt.gca()
noise_power = bandpower(noise, samplerate, noise_band, debug_ax=debug_ax)
signal_power = bandpower(samples, samplerate, signal_band, debug_ax=debug_ax)
return (signal_power/noise_power)**0.5

View file

@ -0,0 +1 @@
../

View file

@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""
Test the functions in lib concerning
beacon generation and phase measuring
work correctly together.
"""
import matplotlib.pyplot as plt
import numpy as np
import lib
seed = 12345
dt = 1 # ns
frequency = 52.12345e-3 # GHz
N = 5e2
t_clock = 0
beacon_amplitude = 1
t = np.arange(0, 10*int(1e3), dt, dtype=float)
rng = np.random.default_rng(seed)
# Vary both the base time and the phase
t_extra = 0
phase_res = np.zeros(int(N))
amp_res = np.zeros(int(N))
for i in range(int(N)):
# Change timebase
t -= t_extra
t_extra = (2*rng.uniform(size=1) - 1) *1e3
t += t_extra
# Randomly phased beacon
phase = lib.phase_mod(np.pi*(2*rng.uniform(size=1) -1)) # rad
beacon = beacon_amplitude * lib.sine_beacon(frequency, t, t0=0, phase=phase, peak_at_t0=False)
if False: # blank part of the beacon
blank_low, blank_high = int(0.2*len(t)), int(0.4*len(t))
t_mask = np.ones(len(t), dtype=bool)
t_mask[blank_low:blank_high] = False
t = t[t_mask]
beacon = beacon[t_mask]
# Introduce clock errors
t += t_clock
_, measured_phases, measured_amplitudes = lib.find_beacon_in_traces([beacon], t, frequency, frequency_fit=False)
t -= t_clock
# Save residuals
phase_res[i] = lib.phase_mod(measured_phases[0] - phase)
amp_res[i] = measured_amplitudes[0] - beacon_amplitude
###
## Present figures
###
phase2time = lambda x: x/(2*np.pi*frequency)
time2phase = lambda x: 2*np.pi*x*frequency
fig, ax = plt.subplots()
ax.set_title("Sine beacon phase determination\nwith random time shifts")
ax.set_xlabel("$\\varphi_{meas} - \\varphi_{true}$ [rad]")
ax.set_ylabel("#")
ax.hist(phase_res, bins='sqrt')
if True:
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
if True:
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
secax.set_xlabel('Time [ns]')
ax.legend(title='N={:.1e}'.format(len(t)))
phase_xlims = ax.get_xlim()
fig.tight_layout()
amp_xlims = None
if True:
fig, ax = plt.subplots()
ax.set_title("Amplitude")
ax.set_xlabel("$A_{meas} - 1$")
ax.set_ylabel("#")
ax.legend(title='N={:.1e}'.format(len(t)))
ax.hist(amp_res, bins='sqrt')
fig.tight_layout()
amp_xlims = ax.get_xlim()
if True:
fig, ax = plt.subplots()
ax.grid()
ax.set_xlabel("Phase Res [rad]")
ax.set_ylabel("Amplitude Res")
sc = ax.scatter( phase_res, amp_res )
#fig.colorbar(sc, ax=ax)
ax.set_xlim(phase_xlims)
if amp_xlims is not None:
ax.set_ylim(amp_xlims)
if True:
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
if True:
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
secax.set_xlabel('Time [ns]')
ax.legend(title='N={:.1e}'.format(len(t)))
fig.tight_layout()
plt.show()

View file

@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
Test the functions in lib concerning
beacon generation and removing the geometrical phase
work correctly together.
"""
import matplotlib.pyplot as plt
import numpy as np
from earsim import Antenna
import lib
seed = 12345
dt = 1 # ns
frequency = 52.12345e-3 # GHz
N = 5e2
t_clock = 0
beacon_amplitude = 1
c_light = lib.c_light
t = np.arange(0, 10*int(1e3), dt, dtype=float)
rng = np.random.default_rng(seed)
tx = Antenna(x=0,y=0,z=0,name='tx')
rx = Antenna(x=tx.x,y=tx.y,z=tx.z,name='rx')
# Vary both the base time and the phase
t_extra = 0
phase_res = np.zeros(int(N))
amp_res = np.zeros(int(N))
for i in range(int(N)):
# Change timebase
t -= t_extra
t_extra = (2*rng.uniform(size=1) - 1) *1e3
t += t_extra
# Randomise Antenna Location
if True:
rx.x, rx.y, rx.z = (2*rng.uniform(size=3) -1) * 1e4
# Randomly phased beacon
# at Antenna
phase = lib.phase_mod(np.pi*(2*rng.uniform(size=1) -1)) # rad
beacon = beacon_amplitude * lib.beacon_from(tx, rx, frequency, t, t0=0, phase=phase, peak_at_t0=False, c_light=c_light, radiate_rsq=False)
if False: # blank part of the beacon
blank_low, blank_high = int(0.2*len(t)), int(0.4*len(t))
t_mask = np.ones(len(t), dtype=bool)
t_mask[blank_low:blank_high] = False
t = t[t_mask]
beacon = beacon[t_mask]
# Introduce clock errors
t += t_clock
_, measured_phases, measured_amplitudes = lib.find_beacon_in_traces([beacon], t, frequency, frequency_fit=False)
t -= t_clock
calculated_phases = lib.remove_antenna_geometry_phase(tx, rx, frequency, measured_phases, c_light=c_light)
# Save residuals
phase_res[i] = lib.phase_mod(calculated_phases[0] - phase)
amp_res[i] = measured_amplitudes[0] - beacon_amplitude
###
## Present figures
###
phase2time = lambda x: x/(2*np.pi*frequency)
time2phase = lambda x: 2*np.pi*x*frequency
fig, ax = plt.subplots()
ax.set_title("Measured phase at Antenna - geometrical phase")
ax.set_xlabel("$\\varphi_{meas} - \\varphi_{true}$ [rad]")
ax.set_ylabel("#")
ax.hist(phase_res, bins='sqrt')
if True:
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
if True:
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
secax.set_xlabel('Time [ns]')
ax.legend(title='N={:.1e}'.format(len(t)))
phase_xlims = ax.get_xlim()
fig.tight_layout()
amp_xlims = None
if True:
fig, ax = plt.subplots()
ax.set_title("Amplitude")
ax.set_xlabel("$A_{meas} - 1$")
ax.set_ylabel("#")
ax.legend(title='N={:.1e}'.format(len(t)))
ax.hist(amp_res, bins='sqrt')
fig.tight_layout()
amp_xlims = ax.get_xlim()
if True:
fig, ax = plt.subplots()
ax.grid()
ax.set_xlabel("Phase Res [rad]")
ax.set_ylabel("Amplitude Res")
sc = ax.scatter( phase_res, amp_res )
#fig.colorbar(sc, ax=ax)
ax.set_xlim(phase_xlims)
if amp_xlims is not None:
ax.set_ylim(amp_xlims)
if True:
ax.axvline( -1*lib.phase_mod(t_clock*frequency*2*np.pi), color='red', lw=5, alpha=0.8, label='true t_clock')
if True:
secax = ax.secondary_xaxis(1.0, functions=(phase2time, time2phase))
secax.set_xlabel('Time [ns]')
ax.legend(title='N={:.1e}'.format(len(t)))
fig.tight_layout()
plt.show()