mirror of
https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
synced 2025-01-03 00:33:33 +01:00
426 lines
14 KiB
Python
Executable file
426 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# vim: fdm=marker ts=4
|
|
|
|
"""
|
|
Add a beacon measurement on top of the
|
|
simulated airshower.
|
|
"""
|
|
|
|
import numpy as np
|
|
import json
|
|
import h5py
|
|
import os.path as path
|
|
from copy import deepcopy
|
|
|
|
from earsim import REvent, Antenna, block_filter
|
|
import lib
|
|
|
|
# {{{ vim marker
|
|
tx_fname = 'tx.json'
|
|
antennas_fname = 'antennas.hdf5'
|
|
c_light = lib.c_light
|
|
|
|
def read_antenna_clock_repair_offsets(antennas, mode='all', freq_name=None):
|
|
valid_modes = ['orig', 'ks', 'phases', 'all']
|
|
|
|
time_offsets = []
|
|
for i, ant in enumerate(antennas):
|
|
_clock_delta = 0
|
|
|
|
# original timing
|
|
if mode == 'orig':
|
|
_clock_delta = -1*ant.attrs['clock_offset']
|
|
|
|
# phase
|
|
if mode in ['all', 'phases']:
|
|
clock_phase = ant.beacon_info[freq_name]['clock_phase_mean']
|
|
f_beacon = ant.beacon_info[freq_name]['freq']
|
|
clock_phase_time = clock_phase/(2*np.pi*f_beacon)
|
|
|
|
_clock_delta += clock_phase_time
|
|
|
|
# ks
|
|
if mode in ['all', 'ks']:
|
|
best_k_time = ant.beacon_info[freq_name]['best_k_time']
|
|
|
|
_clock_delta += best_k_time
|
|
|
|
time_offsets.append(_clock_delta)
|
|
|
|
return time_offsets
|
|
|
|
def write_tx_file(fname, tx, f_beacon, **kwargs):
|
|
with open(fname, 'w') as fp:
|
|
return json.dump(
|
|
{
|
|
**kwargs,
|
|
**dict(
|
|
f_beacon=f_beacon,
|
|
tx=dict(
|
|
x=tx.x,
|
|
y=tx.y,
|
|
z=tx.z,
|
|
name=tx.name
|
|
)
|
|
)
|
|
},
|
|
fp
|
|
)
|
|
|
|
def read_tx_file(fname):
|
|
with open(fname, 'r') as fp:
|
|
data = json.load(fp)
|
|
|
|
f_beacon = data['f_beacon']
|
|
tx = Antenna(**data['tx'])
|
|
|
|
del data['f_beacon']
|
|
del data['tx']
|
|
|
|
return tx, f_beacon, data
|
|
|
|
def read_beacon_hdf5(fname, **h5ant_kwargs):
|
|
with h5py.File(fname, 'r') as h5:
|
|
tx = Antenna_from_h5ant(h5['tx'], traces_key=None)
|
|
f_beacon = tx.attrs['f_beacon']
|
|
|
|
antennas = []
|
|
for k, h5ant in h5['antennas'].items():
|
|
ant = Antenna_from_h5ant(h5ant, **h5ant_kwargs)
|
|
|
|
antennas.append(ant)
|
|
|
|
return f_beacon, tx, antennas
|
|
|
|
def Antenna_from_h5ant(h5ant, traces_key='filtered_traces', raise_exception=True, read_AxB=True, read_beacon_info=True):
|
|
mydict = { k:h5ant.attrs.get(k) for k in ['x', 'y', 'z', 'name'] }
|
|
ant = Antenna(**mydict)
|
|
if h5ant.attrs:
|
|
ant.attrs = {**h5ant.attrs}
|
|
|
|
# Traces
|
|
if traces_key is None:
|
|
pass
|
|
elif traces_key not in h5ant:
|
|
if raise_exception:
|
|
raise ValueError("Traces_key not in file")
|
|
else:
|
|
ant.t = deepcopy(h5ant[traces_key][0])
|
|
ant.Ex = deepcopy(h5ant[traces_key][1])
|
|
ant.Ey = deepcopy(h5ant[traces_key][2])
|
|
ant.Ez = deepcopy(h5ant[traces_key][3])
|
|
if len(h5ant[traces_key]) > 4:
|
|
ant.beacon = deepcopy(h5ant[traces_key][4])
|
|
if len(h5ant[traces_key]) > 5:
|
|
ant.noise = deepcopy(h5ant[traces_key][5])
|
|
|
|
# E_AxB
|
|
if read_AxB and 'E_AxB' in h5ant:
|
|
ant.t_AxB = deepcopy(h5ant['E_AxB'][0])
|
|
ant.E_AxB = deepcopy(h5ant['E_AxB'][1])
|
|
|
|
# Beacons
|
|
if read_beacon_info and 'beacon_info' in h5ant:
|
|
h5beacon = h5ant['beacon_info']
|
|
|
|
beacon_info = {}
|
|
for name in h5beacon.keys():
|
|
beacon_info[name] = dict(h5beacon[name].attrs)
|
|
|
|
ant.beacon_info = beacon_info
|
|
|
|
return ant
|
|
|
|
|
|
def init_antenna_hdf5(fname, tx = None, f_beacon = None):
|
|
with h5py.File(fname, 'w') as fp:
|
|
if tx is not None or f_beacon is not None:
|
|
tx_group = fp.create_group('tx')
|
|
tx_attrs = tx_group.attrs
|
|
|
|
if f_beacon is not None:
|
|
tx_attrs['f_beacon'] = f_beacon
|
|
|
|
if tx is not None:
|
|
tx_attrs['x'] = tx.x
|
|
tx_attrs['y'] = tx.y
|
|
tx_attrs['z'] = tx.z
|
|
tx_attrs['name'] = tx.name
|
|
|
|
return fname
|
|
|
|
def append_antenna_hdf5(fname, antenna, columns = [], name='traces', prepend_time=True, overwrite=True, attrs_dict={}):
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
|
|
with h5py.File(fname, 'a') as fp:
|
|
if 'antennas' in fp.keys():
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
group = fp['antennas']
|
|
else:
|
|
group = fp.create_group('antennas')
|
|
|
|
|
|
if antenna.name in group:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
h5ant = group[antenna.name]
|
|
else:
|
|
h5ant = group.create_group(antenna.name)
|
|
|
|
h5ant_attrs = h5ant.attrs
|
|
h5ant_attrs['x'] = antenna.x
|
|
h5ant_attrs['y'] = antenna.y
|
|
h5ant_attrs['z'] = antenna.z
|
|
h5ant_attrs['name'] = antenna.name
|
|
|
|
for k,v in attrs_dict.items():
|
|
h5ant_attrs[k] = v
|
|
|
|
if name in h5ant:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
del h5ant[name]
|
|
|
|
dset = h5ant.create_dataset(name, (len(columns) + 1*prepend_time, len(columns[0])), dtype='f')
|
|
|
|
if prepend_time:
|
|
dset[0] = antenna.t
|
|
|
|
for i, col in enumerate(columns, 1*prepend_time):
|
|
dset[i] = col
|
|
|
|
|
|
def read_baseline_time_diffs_hdf5(fname):
|
|
"""
|
|
Read Baseline Time Diff information from HDF5 storage.
|
|
"""
|
|
with h5py.File(fname, 'r') as fp:
|
|
group_name = 'baseline_time_diffs'
|
|
base_dset_name = 'baselines'
|
|
dset_name = 'time_diffs'
|
|
|
|
group = fp[group_name]
|
|
|
|
names = group[base_dset_name][:].astype(str)
|
|
|
|
dset = group[dset_name]
|
|
time_diffs = dset[:,0]
|
|
f_beacon = dset[:,1]
|
|
clock_phase_diffs = dset[:,2]
|
|
k_periods = dset[:,3]
|
|
|
|
return names, time_diffs, f_beacon, clock_phase_diffs, k_periods
|
|
|
|
|
|
def write_baseline_time_diffs_hdf5(fname, baselines, clock_phase_diffs, k_periods, f_beacon, time_diffs=None, overwrite=True):
|
|
"""
|
|
Write a combination of baselines, phase_diff, k_period and f_beacon to file.
|
|
|
|
Note that f_beacon is allowed to broadcast, but the others are not.
|
|
"""
|
|
|
|
if not hasattr(baselines[0], '__len__'):
|
|
# this is a single baseline
|
|
N_baselines = 1
|
|
baselines = [baselines]
|
|
clock_phase_diffs = [clock_phase_diffs]
|
|
k_periods = [k_periods]
|
|
f_beacon = np.array([f_beacon])
|
|
|
|
else:
|
|
N_baselines = len(baselines)
|
|
|
|
# Expand the f_beacon list
|
|
if not hasattr(f_beacon, '__len__'):
|
|
f_beacon = np.array([f_beacon]*N_baselines)
|
|
|
|
if time_diffs is None:
|
|
time_diffs = k_periods/f_beacon + clock_phase_diffs/(2*np.pi*f_beacon)
|
|
|
|
assert len(baselines) == len(clock_phase_diffs) == len(k_periods) == len(f_beacon)
|
|
|
|
with h5py.File(fname, 'a') as fp:
|
|
group_name = 'baseline_time_diffs'
|
|
base_dset_name = 'baselines'
|
|
dset_name = 'time_diffs'
|
|
|
|
group = fp.require_group(group_name)
|
|
|
|
if base_dset_name in group:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
del group[base_dset_name]
|
|
|
|
if dset_name in group:
|
|
if not overwrite:
|
|
raise NotImplementedError
|
|
del group[dset_name]
|
|
|
|
# save baselines list
|
|
basenames = np.array([ [b[0].name, b[1].name] for b in baselines ], dtype='S')
|
|
|
|
base_dset = group.create_dataset(base_dset_name, data=basenames)
|
|
|
|
data = np.vstack( (time_diffs, f_beacon, clock_phase_diffs, k_periods) ).T
|
|
dset = group.create_dataset(dset_name, data=data)
|
|
|
|
# }}} vim marker
|
|
|
|
if __name__ == "__main__":
|
|
from os import path
|
|
from argparse import ArgumentParser
|
|
|
|
parser = ArgumentParser()
|
|
parser.add_argument('-n', '--noise-sigma', type=float, default=1e3, help='in [muV/m] (Default: %(default)d)')
|
|
parser.add_argument('-f', '--beacon-frequency', type=float, default=51.53e-3, help='The beacon\'s frequency [GHz] (Default: %(default)d)')
|
|
|
|
# Beacon Properties
|
|
parser.add_argument('-a', '--beacon-amplitudes', type=float, nargs=3, default=[1e3, 0, 0], help='in [muV/m] (Default: %(default)s)')
|
|
parser.add_argument('-d', '--beacon-rsq-decay', type=bool, default=True, help='Let the beacon amplitude fall of with distance. Uses Beacon amplitudes at 0,0,0 (Default: %(default)d)')
|
|
|
|
# Bandpass
|
|
parser.add_argument('-p', '--use-passband', type=bool, default=True, help='(Default: %(default)d)')
|
|
parser.add_argument('-l', '--passband-low', type=float, default=30e-3, help='Lower frequency [GHz] of the passband filter. (set -1 for np.inf) (Default: %(default)g)')
|
|
parser.add_argument('-u', '--passband-high', type=float, default=80e-3, help='Upper frequency [GHz] of the passband filter. (set -1 for np.inf) (Default: %(default)g)')
|
|
|
|
# Trace length modification
|
|
parser.add_argument('-N', '--new-trace-length', type=float, help='resize airshower trace (Default: %(default)d)', default=1e4)
|
|
parser.add_argument('-P', '--pre-trace-length', type=float, help='amount of trace to prepend the airshower when resizing (Default: %(default)d)', default=2e3)
|
|
|
|
# Input directory
|
|
parser.add_argument('--input-fname', type=str, default=None, help='Path to mysim.sry, either directory or path. If empty it takes DATA_DIR and appends mysim.sry. (Default: %(default)s)')
|
|
parser.add_argument('--data-dir', type=str, default="./ZH_airshower", help='Path to Data Directory. (Default: %(default)s)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.input_fname:
|
|
args.input_fname = args.data_dir
|
|
|
|
if path.isdir(args.input_fname):
|
|
args.input_fname = path.join(args.input_fname, "mysim.sry")
|
|
|
|
##
|
|
## End of ArgumentParsing
|
|
##
|
|
|
|
rng = np.random.default_rng()
|
|
|
|
# Noise properties
|
|
noise_sigma = args.noise_sigma # mu V/m set to None to ignore
|
|
|
|
# Beacon properties
|
|
beacon_amplitudes = np.array(args.beacon_amplitudes) # mu V/m
|
|
beacon_radiate_rsq = args.beacon_rsq_decay # beacon_amplitude is repaired for distance to 0,0,0
|
|
|
|
# Beacon properties
|
|
f_beacon = args.beacon_frequency # GHz
|
|
|
|
# Transmitter
|
|
remake_tx = True
|
|
|
|
tx = Antenna(x=0,y=0,z=0,name='tx') # m
|
|
if True:
|
|
# Move tx out a long way
|
|
tx.x, tx.y = -75e3, 75e3 # m
|
|
elif False:
|
|
# Move it to 0,0,0 (among the antennas)
|
|
tx.x, tx.y = 0, 0 #m
|
|
|
|
# modify beacon power to be beacon_amplitude at 0,0,0
|
|
if beacon_radiate_rsq:
|
|
dist = lib.distance(tx, Antenna(x=0, y=0, z=0))
|
|
|
|
ampl = max(1, dist**2)
|
|
|
|
beacon_amplitudes *= ampl
|
|
|
|
# Bandpass for E field blockfilter
|
|
low_bp = args.passband_low if args.passband_low >= 0 else np.inf # GHz
|
|
high_bp = args.passband_high if args.passband_high >= 0 else np.inf # GHz
|
|
|
|
# Enable/Disable block_filter
|
|
if not args.use_passband:
|
|
block_filter = lambda x, dt, low, high: x
|
|
|
|
####
|
|
fname_dir = args.data_dir
|
|
tx_fname = path.join(fname_dir, tx_fname)
|
|
antennas_fname = path.join(fname_dir, antennas_fname)
|
|
|
|
# read/write tx properties
|
|
if not path.isfile(tx_fname) or remake_tx:
|
|
write_tx_file(tx_fname, tx, f_beacon, amplitudes=beacon_amplitudes.tolist(), radiate_rsq=beacon_radiate_rsq)
|
|
else:
|
|
tx, f_beacon, _ = read_tx_file(tx_fname)
|
|
|
|
|
|
print("Beacon amplitude at tx [muV/m]:", beacon_amplitudes)
|
|
print("Beacon amplitude at 0,0,0 [muV/m]:", beacon_amplitudes/ampl)
|
|
print("Tx location:", [tx.x, tx.y, tx.z])
|
|
print("Noise sigma [muV/m]:", noise_sigma)
|
|
|
|
# read in antennas
|
|
ev = REvent(args.input_fname)
|
|
N_antennas = len(ev.antennas)
|
|
|
|
# initialize hdf5 file
|
|
init_antenna_hdf5(antennas_fname, tx, f_beacon)
|
|
|
|
# make beacon per antenna
|
|
for i, antenna in enumerate(ev.antennas):
|
|
#TODO: allow to change the samplerate (2, 4, 8 ns)
|
|
|
|
if i%10 == 0:
|
|
print(f"Beaconed antenna {i} out of", len(ev.antennas))
|
|
|
|
if args.new_trace_length: # modify trace lengths
|
|
N_samples = len(antenna.t)
|
|
new_N = int(args.new_trace_length)
|
|
pre_N = int(args.pre_trace_length)
|
|
after_N = new_N - pre_N
|
|
|
|
dt = antenna.t[1] - antenna.t[0]
|
|
new_t = np.arange(-pre_N, after_N)*dt + antenna.t[0]
|
|
|
|
antenna.t = new_t
|
|
|
|
# TODO:trace extrapolation?
|
|
antenna.Ex = np.pad(antenna.Ex, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
|
|
antenna.Ey = np.pad(antenna.Ey, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
|
|
antenna.Ez = np.pad(antenna.Ez, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
|
|
|
|
if i%10 == 0:
|
|
print(f"Modified trace lengths by {pre_N},{after_N-N_samples}")
|
|
|
|
beacon = 1e-6 * lib.beacon_from(tx, antenna, f_beacon, antenna.t, c_light=c_light, radiate_rsq=beacon_radiate_rsq) # mu V/m
|
|
|
|
# noise realisation
|
|
noise_realisation = 0
|
|
if noise_sigma is not None:
|
|
noise_realisation = 1e-6 * rng.normal(0, noise_sigma, size=len(beacon)) # mu V/m
|
|
|
|
# Collect all data to be saved (with the first 3 values the E fields)
|
|
traces = np.array([antenna.Ex, antenna.Ey, antenna.Ez, beacon, noise_realisation])
|
|
|
|
# add beacon and noise to relevant polarisations
|
|
for j, amp in enumerate(beacon_amplitudes):
|
|
traces[j] = traces[j] + amp*beacon + noise_realisation
|
|
|
|
append_antenna_hdf5( antennas_fname, antenna, traces, name='prefiltered_traces', prepend_time=True)
|
|
|
|
# .. and apply block_filter to every trace
|
|
dt = antenna.t[1] - antenna.t[0]
|
|
for j in range(len(traces)):
|
|
traces[j] = block_filter(traces[j], dt, low_bp, high_bp)
|
|
|
|
append_antenna_hdf5( antennas_fname, antenna, traces, name='filtered_traces', prepend_time=True)
|
|
|
|
# Save filtered E field in E_AxB
|
|
E_AxB = [np.dot(ev.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(traces[0], traces[1], traces[2])]
|
|
t_AxB = antenna.t
|
|
|
|
append_antenna_hdf5( antennas_fname, antenna, [t_AxB, E_AxB], name='E_AxB', prepend_time=False)
|
|
|
|
print("Antenna HDF5 file written as " + str(antennas_fname))
|