mirror of
https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
synced 2024-11-14 02:23:32 +01:00
Eric Teunis de Boone
daf2209c73
This is moved out from the scripts that already have to modify timing
411 lines
13 KiB
Python
Executable file
411 lines
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# vim: fdm=marker ts=4
|
|
|
|
"""
|
|
Add a beacon measurement on top of the
|
|
simulated airshower.
|
|
"""
|
|
|
|
import numpy as np
|
|
import json
|
|
import h5py
|
|
import os.path as path
|
|
from copy import deepcopy
|
|
|
|
from earsim import REvent, Antenna, block_filter
|
|
import lib
|
|
|
|
# {{{ vim marker
|
|
tx_fname = 'tx.json'
|
|
antennas_fname = 'antennas.hdf5'
|
|
c_light = lib.c_light
|
|
|
|
def read_antenna_clock_repair_offsets(antennas, mode='all', freq_name=None):
|
|
valid_modes = ['orig', 'ks', 'phases', 'all']
|
|
|
|
time_offsets = []
|
|
for i, ant in enumerate(antennas):
|
|
_clock_delta = 0
|
|
|
|
# original timing
|
|
if mode == 'orig':
|
|
_clock_delta = ant.attrs['clock_offset']
|
|
|
|
# phase
|
|
if mode in ['all', 'phases']:
|
|
clock_phase = ant.beacon_info[freq_name]['sigma_phase_mean']
|
|
f_beacon = ant.beacon_info[freq_name]['freq']
|
|
clock_phase_time = clock_phase/(2*np.pi*f_beacon)
|
|
|
|
_clock_delta += clock_phase_time
|
|
|
|
# ks
|
|
if mode in ['all', 'ks']:
|
|
best_k_time = ant.beacon_info[freq_name]['best_k_time']
|
|
|
|
_clock_delta += best_k_time
|
|
|
|
time_offsets.append(_clock_delta)
|
|
|
|
return time_offsets
|
|
|
|
def write_tx_file(fname, tx, f_beacon, **kwargs):
|
|
with open(fname, 'w') as fp:
|
|
return json.dump(
|
|
{
|
|
**kwargs,
|
|
**dict(
|
|
f_beacon=f_beacon,
|
|
tx=dict(
|
|
x=tx.x,
|
|
y=tx.y,
|
|
z=tx.z,
|
|
name=tx.name
|
|
)
|
|
)
|
|
},
|
|
fp
|
|
)
|
|
|
|
def read_tx_file(fname):
|
|
with open(fname, 'r') as fp:
|
|
data = json.load(fp)
|
|
|
|
f_beacon = data['f_beacon']
|
|
tx = Antenna(**data['tx'])
|
|
|
|
del data['f_beacon']
|
|
del data['tx']
|
|
|
|
return tx, f_beacon, data
|
|
|
|
def read_beacon_hdf5(fname, **h5ant_kwargs):
|
|
with h5py.File(fname, 'r') as h5:
|
|
tx = Antenna_from_h5ant(h5['tx'], traces_key=None)
|
|
f_beacon = tx.attrs['f_beacon']
|
|
|
|
antennas = []
|
|
for k, h5ant in h5['antennas'].items():
|
|
ant = Antenna_from_h5ant(h5ant, **h5ant_kwargs)
|
|
|
|
antennas.append(ant)
|
|
|
|
return f_beacon, tx, antennas
|
|
|
|
def Antenna_from_h5ant(h5ant, traces_key='traces', raise_exception=True, read_AxB=True, read_beacon_info=True):
|
|
mydict = { k:h5ant.attrs.get(k) for k in ['x', 'y', 'z', 'name'] }
|
|
ant = Antenna(**mydict)
|
|
if h5ant.attrs:
|
|
ant.attrs = {**h5ant.attrs}
|
|
|
|
# Traces
|
|
if traces_key is None:
|
|
pass
|
|
elif traces_key not in h5ant:
|
|
if raise_exception:
|
|
raise ValueError("Traces_key not in file")
|
|
else:
|
|
ant.t = deepcopy(h5ant[traces_key][0])
|
|
ant.Ex = deepcopy(h5ant[traces_key][1])
|
|
ant.Ey = deepcopy(h5ant[traces_key][2])
|
|
ant.Ez = deepcopy(h5ant[traces_key][3])
|
|
if len(h5ant[traces_key]) > 4:
|
|
ant.beacon = deepcopy(h5ant[traces_key][4])
|
|
if len(h5ant[traces_key]) > 5:
|
|
ant.noise = deepcopy(h5ant[traces_key][5])
|
|
|
|
# E_AxB
|
|
if read_AxB and 'E_AxB' in h5ant:
|
|
ant.t_AxB = deepcopy(h5ant['E_AxB'][0])
|
|
ant.E_AxB = deepcopy(h5ant['E_AxB'][1])
|
|
|
|
# Beacons
|
|
if read_beacon_info and 'beacon_info' in h5ant:
|
|
h5beacon = h5ant['beacon_info']
|
|
|
|
beacon_info = {}
|
|
for name in h5beacon.keys():
|
|
beacon_info[name] = dict(h5beacon[name].attrs)
|
|
|
|
ant.beacon_info = beacon_info
|
|
|
|
return ant
|
|
|
|
|
|
def init_antenna_hdf5(fname, tx = None, f_beacon = None):
|
|
with h5py.File(fname, 'w') as fp:
|
|
if tx is not None or f_beacon is not None:
|
|
tx_group = fp.create_group('tx')
|
|
tx_attrs = tx_group.attrs
|
|
|
|
if f_beacon is not None:
|
|
tx_attrs['f_beacon'] = f_beacon
|
|
|
|
if tx is not None:
|
|
tx_attrs['x'] = tx.x
|
|
tx_attrs['y'] = tx.y
|
|
tx_attrs['z'] = tx.z
|
|
tx_attrs['name'] = tx.name
|
|
|
|
return fname
|
|
|
|
def append_antenna_hdf5(fname, antenna, columns = [], name='traces', prepend_time=True, overwrite=True, attrs_dict={}):
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
|
|
with h5py.File(fname, 'a') as fp:
|
|
if 'antennas' in fp.keys():
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
group = fp['antennas']
|
|
else:
|
|
group = fp.create_group('antennas')
|
|
|
|
|
|
if antenna.name in group:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
h5ant = group[antenna.name]
|
|
else:
|
|
h5ant = group.create_group(antenna.name)
|
|
|
|
h5ant_attrs = h5ant.attrs
|
|
h5ant_attrs['x'] = antenna.x
|
|
h5ant_attrs['y'] = antenna.y
|
|
h5ant_attrs['z'] = antenna.z
|
|
h5ant_attrs['name'] = antenna.name
|
|
|
|
for k,v in attrs_dict.items():
|
|
h5ant_attrs[k] = v
|
|
|
|
if name in h5ant:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
del h5ant[name]
|
|
|
|
dset = h5ant.create_dataset(name, (len(columns) + 1*prepend_time, len(columns[0])), dtype='f')
|
|
|
|
if prepend_time:
|
|
dset[0] = antenna.t
|
|
|
|
for i, col in enumerate(columns, 1*prepend_time):
|
|
dset[i] = col
|
|
|
|
|
|
def read_baseline_time_diffs_hdf5(fname):
|
|
"""
|
|
Read Baseline Time Diff information from HDF5 storage.
|
|
"""
|
|
with h5py.File(fname, 'r') as fp:
|
|
group_name = 'baseline_time_diffs'
|
|
base_dset_name = 'baselines'
|
|
dset_name = 'time_diffs'
|
|
|
|
group = fp[group_name]
|
|
|
|
names = group[base_dset_name][:].astype(str)
|
|
|
|
dset = group[dset_name]
|
|
time_diffs = dset[:,0]
|
|
f_beacon = dset[:,1]
|
|
true_phase_diffs = dset[:,2]
|
|
k_periods = dset[:,3]
|
|
|
|
return names, time_diffs, f_beacon, true_phase_diffs, k_periods
|
|
|
|
|
|
def write_baseline_time_diffs_hdf5(fname, baselines, true_phase_diffs, k_periods, f_beacon, time_diffs=None, overwrite=True):
|
|
"""
|
|
Write a combination of baselines, phase_diff, k_period and f_beacon to file.
|
|
|
|
Note that f_beacon is allowed to broadcast, but the others are not.
|
|
"""
|
|
|
|
if not hasattr(baselines[0], '__len__'):
|
|
# this is a single baseline
|
|
N_baselines = 1
|
|
baselines = [baselines]
|
|
true_phase_diffs = [true_phase_diffs]
|
|
k_periods = [k_periods]
|
|
f_beacon = np.array([f_beacon])
|
|
|
|
else:
|
|
N_baselines = len(baselines)
|
|
|
|
# Expand the f_beacon list
|
|
if not hasattr(f_beacon, '__len__'):
|
|
f_beacon = np.array([f_beacon]*N_baselines)
|
|
|
|
if time_diffs is None:
|
|
time_diffs = k_periods/f_beacon + true_phase_diffs/(2*np.pi*f_beacon)
|
|
|
|
assert len(baselines) == len(true_phase_diffs) == len(k_periods) == len(f_beacon)
|
|
|
|
with h5py.File(fname, 'a') as fp:
|
|
group_name = 'baseline_time_diffs'
|
|
base_dset_name = 'baselines'
|
|
dset_name = 'time_diffs'
|
|
|
|
group = fp.require_group(group_name)
|
|
|
|
if base_dset_name in group:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
del group[base_dset_name]
|
|
|
|
if dset_name in group:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
del group[dset_name]
|
|
|
|
# save baselines list
|
|
basenames = np.array([ [b[0].name, b[1].name] for b in baselines ], dtype='S')
|
|
|
|
base_dset = group.create_dataset(base_dset_name, data=basenames)
|
|
|
|
data = np.vstack( (time_diffs, f_beacon, true_phase_diffs, k_periods) ).T
|
|
dset = group.create_dataset(dset_name, data=data)
|
|
|
|
# }}} vim marker
|
|
|
|
if __name__ == "__main__":
|
|
from os import path
|
|
from argparse import ArgumentParser
|
|
|
|
parser = ArgumentParser()
|
|
parser.add_argument('-n', '--noise-sigma', type=float, default=1e3, help='in [muV/m] (Default: %(default)d)')
|
|
parser.add_argument('-f', '--beacon-frequency', type=float, default=51.53e-3, help='The beacon\'s frequency [GHz] (Default: %(default)d)')
|
|
|
|
# Beacon Properties
|
|
parser.add_argument('-a', '--beacon-amplitudes', type=float, nargs=3, default=[1e3, 0, 0], help='in [muV/m] (Default: %(default)s)')
|
|
parser.add_argument('-d', '--beacon-rsq-decay', type=bool, default=True, help='Let the beacon amplitude fall of with distance. Uses Beacon amplitudes at 0,0,0 (Default: %(default)d)')
|
|
|
|
# Bandpass
|
|
parser.add_argument('-p', '--use-passband', type=bool, default=True, help='(Default: %(default)d)')
|
|
parser.add_argument('-l', '--passband-low', type=float, default=30e-3, help='Lower frequency [GHz] of the passband filter. (set -1 for np.inf) (Default: %(default)d)')
|
|
parser.add_argument('-u', '--passband-high', type=float, default=80e-3, help='Upper frequency [GHz] of the passband filter. (set -1 for np.inf) (Default: %(default)d)')
|
|
|
|
# Trace length modification
|
|
parser.add_argument('-N', '--new-trace-length', type=float, help='resize airshower trace (Default: %(default)d)', default=1e4)
|
|
parser.add_argument('-P', '--pre-trace-length', type=float, help='amount of trace to prepend the airshower when resizing (Default: %(default)d)', default=2e3)
|
|
|
|
args = parser.parse_args()
|
|
|
|
##
|
|
## End of ArgumentParsing
|
|
##
|
|
|
|
rng = np.random.default_rng()
|
|
|
|
fname = "ZH_airshower/mysim.sry"
|
|
|
|
# Noise properties
|
|
noise_sigma = args.noise_sigma # mu V/m set to None to ignore
|
|
|
|
# Beacon properties
|
|
beacon_amplitudes = np.array(args.beacon_amplitudes) # mu V/m
|
|
beacon_radiate_rsq = args.beacon_rsq_decay # beacon_amplitude is repaired for distance to 0,0,0
|
|
|
|
# Beacon properties
|
|
f_beacon = args.beacon_frequency # GHz
|
|
|
|
# Transmitter
|
|
remake_tx = True
|
|
|
|
tx = Antenna(x=0,y=0,z=0,name='tx') # m
|
|
if True:
|
|
# Move tx out a long way
|
|
tx.x, tx.y = -75e3, 75e3 # m
|
|
elif False:
|
|
# Move it to 0,0,0 (among the antennas)
|
|
tx.x, tx.y = 0, 0 #m
|
|
|
|
# modify beacon power to be beacon_amplitude at 0,0,0
|
|
if beacon_radiate_rsq:
|
|
dist = lib.distance(tx, Antenna(x=0, y=0, z=0))
|
|
|
|
ampl = max(1, dist**2)
|
|
|
|
beacon_amplitudes *= ampl
|
|
|
|
# Bandpass for E field blockfilter
|
|
low_bp = args.passband_low if args.passband_low >= 0 else np.inf # GHz
|
|
high_bp = args.passband_high if args.passband_high >= 0 else np.inf # GHz
|
|
|
|
# Enable/Disable block_filter
|
|
if not args.use_passband:
|
|
block_filter = lambda x, dt, low, high: x
|
|
|
|
####
|
|
fname_dir = path.dirname(fname)
|
|
tx_fname = path.join(fname_dir, tx_fname)
|
|
antennas_fname = path.join(fname_dir, antennas_fname)
|
|
|
|
# read/write tx properties
|
|
if not path.isfile(tx_fname) or remake_tx:
|
|
write_tx_file(tx_fname, tx, f_beacon, amplitudes=beacon_amplitudes.tolist(), radiate_rsq=beacon_radiate_rsq)
|
|
else:
|
|
tx, f_beacon, _ = read_tx_file(tx_fname)
|
|
|
|
|
|
print("Beacon amplitude at tx [muV/m]:", beacon_amplitudes)
|
|
print("Beacon amplitude at 0,0,0 [muV/m]:", beacon_amplitudes/ampl)
|
|
print("Tx location:", [tx.x, tx.y, tx.z])
|
|
print("Noise sigma [muV/m]:", noise_sigma)
|
|
|
|
# read in antennas
|
|
ev = REvent(fname)
|
|
N_antennas = len(ev.antennas)
|
|
|
|
# initialize hdf5 file
|
|
init_antenna_hdf5(antennas_fname, tx, f_beacon)
|
|
|
|
# make beacon per antenna
|
|
for i, antenna in enumerate(ev.antennas):
|
|
if i%10 == 0:
|
|
print(f"Beaconed antenna {i} out of", len(ev.antennas))
|
|
|
|
if args.new_trace_length: # modify trace lengths
|
|
N_samples = len(antenna.t)
|
|
new_N = int(args.new_trace_length)
|
|
pre_N = int(args.pre_trace_length)
|
|
after_N = new_N - pre_N
|
|
|
|
dt = antenna.t[1] - antenna.t[0]
|
|
new_t = np.arange(-pre_N, after_N)*dt + antenna.t[0]
|
|
|
|
antenna.t = new_t
|
|
|
|
# TODO:trace extrapolation?
|
|
antenna.Ex = np.pad(antenna.Ex, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
|
|
antenna.Ey = np.pad(antenna.Ey, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
|
|
antenna.Ez = np.pad(antenna.Ez, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
|
|
|
|
if i%10 == 0:
|
|
print(f"Modified trace lengths by {pre_N},{after_N-N_samples}")
|
|
|
|
beacon = 1e-6 * lib.beacon_from(tx, antenna, f_beacon, antenna.t, c_light=c_light, radiate_rsq=beacon_radiate_rsq) # mu V/m
|
|
|
|
# noise realisation
|
|
noise_realisation = 0
|
|
if noise_sigma is not None:
|
|
noise_realisation = 1e-6 * rng.normal(0, noise_sigma, size=len(beacon)) # mu V/m
|
|
|
|
# Collect all data to be saved (with the first 3 values the E fields)
|
|
traces = np.array([antenna.Ex, antenna.Ey, antenna.Ez, beacon, noise_realisation])
|
|
|
|
# add to relevant polarisation
|
|
# and apply block filter
|
|
dt = antenna.t[1] - antenna.t[0]
|
|
for j, amp in enumerate(beacon_amplitudes):
|
|
traces[j] = block_filter(traces[j] + amp*beacon + noise_realisation, dt, low_bp, high_bp)
|
|
|
|
append_antenna_hdf5( antennas_fname, antenna, traces, name='traces', prepend_time=True)
|
|
|
|
# Save E field in E_AxB
|
|
E_AxB = [np.dot(ev.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(traces[0], traces[1], traces[2])]
|
|
t_AxB = antenna.t
|
|
|
|
append_antenna_hdf5( antennas_fname, antenna, [t_AxB, E_AxB], name='E_AxB', prepend_time=False)
|
|
|
|
print("Antenna HDF5 file written as " + str(antennas_fname))
|