m-thesis-introduction/airshower_beacon_simulation/aa_generate_beacon.py

447 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
# vim: fdm=marker ts=4
"""
Add a beacon measurement on top of the
simulated airshower.
"""
import numpy as np
import json
import h5py
import os.path as path
from copy import deepcopy
from earsim import REvent, Antenna, block_filter
import lib
# {{{ vim marker
tx_fname = 'tx.json'
antennas_fname = 'antennas.hdf5'
beacon_snr_fname = 'beacon_snr.json'
airshower_snr_fname = 'airshower_snr.json'
c_light = lib.c_light
def read_antenna_clock_repair_offsets(antennas, mode='full', freq_name=None):
valid_modes = ['orig', 'ks', 'phases', 'full']
time_offsets = []
for i, ant in enumerate(antennas):
_clock_delta = 0
# original timing
if mode == 'orig':
_clock_delta = -1*ant.attrs['clock_offset']
# phase
if mode in ['full', 'phases']:
clock_phase = ant.beacon_info[freq_name]['clock_phase_mean']
f_beacon = ant.beacon_info[freq_name]['freq']
clock_phase_time = clock_phase/(2*np.pi*f_beacon)
_clock_delta += clock_phase_time
# ks
if mode in ['full', 'ks']:
best_k_time = ant.beacon_info[freq_name]['best_k_time']
_clock_delta += best_k_time
time_offsets.append(_clock_delta)
return time_offsets
def write_snr_file(fname, snrs):
with open(fname, 'w') as fp:
return json.dump(
{'mean': np.mean(snrs), 'std': np.std(snrs), 'values': snrs},
fp
)
def read_snr_file(fname):
with open(fname, 'r') as fp:
return json.load(fp)
def write_tx_file(fname, tx, f_beacon, **kwargs):
with open(fname, 'w') as fp:
return json.dump(
{
**kwargs,
**dict(
f_beacon=f_beacon,
tx=dict(
x=tx.x,
y=tx.y,
z=tx.z,
name=tx.name
)
)
},
fp
)
def read_tx_file(fname):
with open(fname, 'r') as fp:
data = json.load(fp)
f_beacon = data['f_beacon']
tx = Antenna(**data['tx'])
del data['f_beacon']
del data['tx']
return tx, f_beacon, data
def read_beacon_hdf5(fname, **h5ant_kwargs):
with h5py.File(fname, 'r') as h5:
tx = Antenna_from_h5ant(h5['tx'], traces_key=None)
f_beacon = tx.attrs['f_beacon']
antennas = []
for k, h5ant in h5['antennas'].items():
ant = Antenna_from_h5ant(h5ant, **h5ant_kwargs)
antennas.append(ant)
return f_beacon, tx, antennas
def Antenna_from_h5ant(h5ant, traces_key='filtered_traces', raise_exception=True, read_AxB=True, read_beacon_info=True):
mydict = { k:h5ant.attrs.get(k) for k in ['x', 'y', 'z', 'name'] }
ant = Antenna(**mydict)
if h5ant.attrs:
ant.attrs = {**h5ant.attrs}
# Traces
if traces_key is None:
pass
elif traces_key not in h5ant:
if raise_exception:
raise ValueError("Traces_key not in file")
else:
ant.t = deepcopy(h5ant[traces_key][0])
ant.Ex = deepcopy(h5ant[traces_key][1])
ant.Ey = deepcopy(h5ant[traces_key][2])
ant.Ez = deepcopy(h5ant[traces_key][3])
if len(h5ant[traces_key]) > 4:
ant.beacon = deepcopy(h5ant[traces_key][4])
if len(h5ant[traces_key]) > 5:
ant.noise = deepcopy(h5ant[traces_key][5])
# E_AxB
if read_AxB and 'E_AxB' in h5ant:
ant.t_AxB = deepcopy(h5ant['E_AxB'][0])
ant.E_AxB = deepcopy(h5ant['E_AxB'][1])
# Beacons
if read_beacon_info and 'beacon_info' in h5ant:
h5beacon = h5ant['beacon_info']
beacon_info = {}
for name in h5beacon.keys():
beacon_info[name] = dict(h5beacon[name].attrs)
ant.beacon_info = beacon_info
return ant
def init_antenna_hdf5(fname, tx = None, f_beacon = None):
with h5py.File(fname, 'w') as fp:
if tx is not None or f_beacon is not None:
tx_group = fp.create_group('tx')
tx_attrs = tx_group.attrs
if f_beacon is not None:
tx_attrs['f_beacon'] = f_beacon
if tx is not None:
tx_attrs['x'] = tx.x
tx_attrs['y'] = tx.y
tx_attrs['z'] = tx.z
tx_attrs['name'] = tx.name
return fname
def append_antenna_hdf5(fname, antenna, columns = [], name='traces', prepend_time=True, overwrite=True, attrs_dict={}):
if not overwrite:
raise NotImplementedError
with h5py.File(fname, 'a') as fp:
if 'antennas' in fp.keys():
if not overwrite:
raise NotImplementedError
group = fp['antennas']
else:
group = fp.create_group('antennas')
if antenna.name in group:
if not overwrite:
raise NotImplementedError
h5ant = group[antenna.name]
else:
h5ant = group.create_group(antenna.name)
h5ant_attrs = h5ant.attrs
h5ant_attrs['x'] = antenna.x
h5ant_attrs['y'] = antenna.y
h5ant_attrs['z'] = antenna.z
h5ant_attrs['name'] = antenna.name
for k,v in attrs_dict.items():
h5ant_attrs[k] = v
if name in h5ant:
if not overwrite:
raise NotImplementedError
del h5ant[name]
dset = h5ant.create_dataset(name, (len(columns) + 1*prepend_time, len(columns[0])), dtype='f')
if prepend_time:
dset[0] = antenna.t
for i, col in enumerate(columns, 1*prepend_time):
dset[i] = col
def read_baseline_time_diffs_hdf5(fname):
"""
Read Baseline Time Diff information from HDF5 storage.
"""
with h5py.File(fname, 'r') as fp:
group_name = 'baseline_time_diffs'
base_dset_name = 'baselines'
dset_name = 'time_diffs'
group = fp[group_name]
names = group[base_dset_name][:].astype(str)
dset = group[dset_name]
time_diffs = dset[:,0]
f_beacon = dset[:,1]
clock_phase_diffs = dset[:,2]
k_periods = dset[:,3]
return names, time_diffs, f_beacon, clock_phase_diffs, k_periods
def write_baseline_time_diffs_hdf5(fname, baselines, clock_phase_diffs, k_periods, f_beacon, time_diffs=None, overwrite=True):
"""
Write a combination of baselines, phase_diff, k_period and f_beacon to file.
Note that f_beacon is allowed to broadcast, but the others are not.
"""
if not hasattr(baselines[0], '__len__'):
# this is a single baseline
N_baselines = 1
baselines = [baselines]
clock_phase_diffs = [clock_phase_diffs]
k_periods = [k_periods]
f_beacon = np.array([f_beacon])
else:
N_baselines = len(baselines)
# Expand the f_beacon list
if not hasattr(f_beacon, '__len__'):
f_beacon = np.array([f_beacon]*N_baselines)
if time_diffs is None:
time_diffs = k_periods/f_beacon + clock_phase_diffs/(2*np.pi*f_beacon)
assert len(baselines) == len(clock_phase_diffs) == len(k_periods) == len(f_beacon)
with h5py.File(fname, 'a') as fp:
group_name = 'baseline_time_diffs'
base_dset_name = 'baselines'
dset_name = 'time_diffs'
group = fp.require_group(group_name)
if base_dset_name in group:
if not overwrite:
raise NotImplementedError
del group[base_dset_name]
if dset_name in group:
if not overwrite:
raise NotImplementedError
del group[dset_name]
# save baselines list
basenames = np.array([ [b[0].name, b[1].name] for b in baselines ], dtype='S')
base_dset = group.create_dataset(base_dset_name, data=basenames)
data = np.vstack( (time_diffs, f_beacon, clock_phase_diffs, k_periods) ).T
dset = group.create_dataset(dset_name, data=data)
# }}} vim marker
if __name__ == "__main__":
from os import path
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('-n', '--noise-sigma', type=float, default=1e3, help='in [muV/m] (Default: %(default)d)')
parser.add_argument('-f', '--beacon-frequency', type=float, default=51.53e-3, help='The beacon\'s frequency [GHz] (Default: %(default)d)')
# Beacon Properties
parser.add_argument('-a', '--beacon-amplitudes', type=float, nargs=3, default=[1e3, 0, 0], help='in [muV/m] (Default: %(default)s)')
parser.add_argument('-d', '--beacon-rsq-decay', type=bool, default=True, help='Let the beacon amplitude fall of with distance. Uses Beacon amplitudes at 0,0,0 (Default: %(default)d)')
# Bandpass
parser.add_argument('-p', '--use-passband', type=bool, default=True, help='(Default: %(default)d)')
parser.add_argument('-l', '--passband-low', type=float, default=30e-3, help='Lower frequency [GHz] of the passband filter. (set -1 for np.inf) (Default: %(default)g)')
parser.add_argument('-u', '--passband-high', type=float, default=80e-3, help='Upper frequency [GHz] of the passband filter. (set -1 for np.inf) (Default: %(default)g)')
# Trace length modification
parser.add_argument('-N', '--new-trace-length', type=float, help='resize airshower trace (Default: %(default)d)', default=1e4)
parser.add_argument('-P', '--pre-trace-length', type=float, help='amount of trace to prepend the airshower when resizing (Default: %(default)d)', default=2e3)
# Input directory
parser.add_argument('--input-fname', type=str, default=None, help='Path to mysim.sry, either directory or path. If empty it takes DATA_DIR and appends mysim.sry. (Default: %(default)s)')
parser.add_argument('--data-dir', type=str, default="./ZH_airshower", help='Path to Data Directory. (Default: %(default)s)')
args = parser.parse_args()
if not args.input_fname:
args.input_fname = args.data_dir
if path.isdir(args.input_fname):
args.input_fname = path.join(args.input_fname, "mysim.sry")
##
## End of ArgumentParsing
##
rng = np.random.default_rng()
# Noise properties
noise_sigma = args.noise_sigma # mu V/m set to None to ignore
unique_noise_realisations = True # a new noise realisation per antenna vs. single noise realisation shared across antennas
# Beacon properties
beacon_amplitudes = np.array(args.beacon_amplitudes) # mu V/m
beacon_radiate_rsq = args.beacon_rsq_decay # beacon_amplitude is repaired for distance to 0,0,0
# Beacon properties
f_beacon = args.beacon_frequency # GHz
# Transmitter
remake_tx = True
tx = Antenna(x=0,y=0,z=0,name='tx') # m
if True:
# Move tx out a long way
tx.x, tx.y = -75e3, 75e3 # m
elif False:
# Move it to 0,0,0 (among the antennas)
tx.x, tx.y = 0, 0 #m
# modify beacon power to be beacon_amplitude at 0,0,0
if beacon_radiate_rsq:
dist = lib.distance(tx, Antenna(x=0, y=0, z=0))
ampl = max(1, dist**2)
beacon_amplitudes *= ampl
# Bandpass for E field blockfilter
low_bp = args.passband_low if args.passband_low >= 0 else np.inf # GHz
high_bp = args.passband_high if args.passband_high >= 0 else np.inf # GHz
# Enable/Disable block_filter
if not args.use_passband:
block_filter = lambda x, dt, low, high: x
####
fname_dir = args.data_dir
tx_fname = path.join(fname_dir, tx_fname)
antennas_fname = path.join(fname_dir, antennas_fname)
# read/write tx properties
if not path.isfile(tx_fname) or remake_tx:
write_tx_file(tx_fname, tx, f_beacon, amplitudes=beacon_amplitudes.tolist(), radiate_rsq=beacon_radiate_rsq)
else:
tx, f_beacon, _ = read_tx_file(tx_fname)
print("Beacon amplitude at tx [muV/m]:", beacon_amplitudes)
print("Beacon amplitude at 0,0,0 [muV/m]:", beacon_amplitudes/ampl)
print("Tx location:", [tx.x, tx.y, tx.z])
print("Noise sigma [muV/m]:", noise_sigma)
# read in antennas
ev = REvent(args.input_fname)
N_antennas = len(ev.antennas)
# initialize hdf5 file
init_antenna_hdf5(antennas_fname, tx, f_beacon)
# make beacon per antenna
noise_realisation = np.array([0])
for i, antenna in enumerate(ev.antennas):
#TODO: allow to change the samplerate (2, 4, 8 ns)
if i%10 == 0:
print(f"Beaconed antenna {i} out of", len(ev.antennas))
if args.new_trace_length: # modify trace lengths
N_samples = len(antenna.t)
new_N = int(args.new_trace_length)
pre_N = int(args.pre_trace_length)
after_N = new_N - pre_N
dt = antenna.t[1] - antenna.t[0]
new_t = np.arange(-pre_N, after_N)*dt + antenna.t[0]
antenna.t = new_t
# TODO:trace extrapolation?
antenna.Ex = np.pad(antenna.Ex, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
antenna.Ey = np.pad(antenna.Ey, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
antenna.Ez = np.pad(antenna.Ez, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
if i%10 == 0:
print(f"Modified trace lengths by {pre_N},{after_N-N_samples}")
beacon = 1e-6 * lib.beacon_from(tx, antenna, f_beacon, antenna.t, c_light=c_light, radiate_rsq=beacon_radiate_rsq) # mu V/m
# noise realisation
if unique_noise_realisations or (noise_realisation == 0).all(): # either create one for every antenna, or generate a single one
print("Noise realisation!")
noise_realisation = 1e-6 * rng.normal(0, noise_sigma or 0, size=len(antenna.t)) # mu V/m
# Collect all data to be saved (with the first 3 values the E fields)
traces = np.array([antenna.Ex, antenna.Ey, antenna.Ez, beacon, noise_realisation])
append_antenna_hdf5( antennas_fname, antenna, traces, name='original_traces', prepend_time=True)
E_AxB = [np.dot(ev.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(traces[0], traces[1], traces[2])]
t_AxB = antenna.t
append_antenna_hdf5( antennas_fname, antenna, [t_AxB, E_AxB, t_AxB, t_AxB], name='original_E_AxB', prepend_time=False)# Note the 4 element list containing dummys at idx 2 and 3
# add beacon and noise to relevant polarisations
for j, amp in enumerate(beacon_amplitudes):
traces[j] = traces[j] + amp*beacon + noise_realisation
append_antenna_hdf5( antennas_fname, antenna, traces, name='prefiltered_traces', prepend_time=True)
# .. and apply block_filter to every trace
dt = antenna.t[1] - antenna.t[0]
for j in range(len(traces)):
traces[j] = block_filter(traces[j], dt, low_bp, high_bp)
append_antenna_hdf5( antennas_fname, antenna, traces, name='filtered_traces', prepend_time=True)
# Save filtered E field in E_AxB
E_AxB = [np.dot(ev.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(traces[0], traces[1], traces[2])]
append_antenna_hdf5( antennas_fname, antenna, [t_AxB, E_AxB], name='E_AxB', prepend_time=False)
print("Antenna HDF5 file written as " + str(antennas_fname))