m-thesis-introduction/simulations/airshower_beacon_simulation/aa_generate_beacon.py

203 lines
5.9 KiB
Python
Executable file

#!/usr/bin/env python3
# vim: fdm=indent ts=4
"""
Add a beacon measurement on top of the
simulated airshower.
"""
import numpy as np
import json
import h5py
import os.path as path
from copy import deepcopy as copy
from earsim import REvent, Antenna, block_filter
import lib
tx_fname = 'tx.json'
antennas_fname = 'antennas.hdf5'
def write_tx_file(fname, tx, f_beacon):
with open(fname, 'w') as fp:
return json.dump(
dict(
f_beacon=f_beacon,
tx=dict(
x=tx.x,
y=tx.y,
z=tx.z,
name=tx.name
)
),
fp
)
def read_tx_file(fname):
with open(fname, 'r') as fp:
data = json.load(fp)
f_beacon = data['f_beacon']
tx = Antenna(**data['tx'])
return tx, f_beacon
def read_beacon_hdf5(fname, traces_key='traces', raise_exception=True, read_AxB=True):
with h5py.File(fname, 'r') as h5:
tx_attrs = h5['tx'].attrs
f_beacon = tx_attrs.get('f_beacon')
mydict = { k:tx_attrs.get(k) for k in ['x', 'y', 'z', 'name'] }
tx = Antenna(**mydict)
antennas = []
for k, h5ant in h5['antennas'].items():
mydict = { k:h5ant.attrs.get(k) for k in ['x', 'y', 'z', 'name'] }
ant = Antenna(**mydict)
if traces_key not in h5ant:
if raise_exception:
raise ValueError("Traces_key not in file")
else:
ant.t = h5ant[traces_key][0]
ant.Ex = h5ant[traces_key][1]
ant.Ey = h5ant[traces_key][2]
ant.Ez = h5ant[traces_key][3]
if len(h5ant[traces_key]) > 4:
ant.beacon = h5ant[traces_key][4]
if read_AxB and 'E_AxB' in h5ant:
ant.t_AxB = h5ant['E_AxB'][0]
ant.E_AxB = h5ant['E_AxB'][1]
if h5ant.attrs:
ant.attrs = {**h5ant.attrs}
antennas.append(ant)
return f_beacon, tx, antennas
def init_antenna_hdf5(fname, tx = None, f_beacon = None):
with h5py.File(fname, 'w') as fp:
if tx is not None or f_beacon is not None:
tx_group = fp.create_group('tx')
tx_attrs = tx_group.attrs
if f_beacon is not None:
tx_attrs['f_beacon'] = f_beacon
if tx is not None:
tx_attrs['x'] = tx.x
tx_attrs['y'] = tx.y
tx_attrs['z'] = tx.z
tx_attrs['name'] = tx.name
return fname
def append_antenna_hdf5(fname, antenna, columns = [], name='traces', prepend_time=True, overwrite=True, attrs_dict={}):
if not overwrite:
raise NotImplementedError
with h5py.File(fname, 'a') as fp:
if 'antennas' in fp.keys():
if not overwrite:
raise NotImplementedError
group = fp['antennas']
else:
group = fp.create_group('antennas')
if antenna.name in group:
if not overwrite:
raise NotImplementedError
ant_group = group[antenna.name]
else:
ant_group = group.create_group(antenna.name)
ant_attrs = ant_group.attrs
ant_attrs['x'] = antenna.x
ant_attrs['y'] = antenna.y
ant_attrs['z'] = antenna.z
ant_attrs['name'] = antenna.name
for k,v in attrs_dict.items():
ant_attrs[k] = v
if name in ant_group:
if not overwrite:
raise NotImplementedError
del ant_group[name]
dset = ant_group.create_dataset(name, (len(columns) + 1*prepend_time, len(columns[0])), dtype='f')
if prepend_time:
dset[0] = antenna.t
for i, col in enumerate(columns, 1*prepend_time):
dset[i] = col
if __name__ == "__main__":
from os import path
remake_tx = True
fname = "ZH_airshower/mysim.sry"
tx = Antenna(x=-500,y=0,z=0,name='tx') # m
f_beacon = 51.53e-3 # GHz
# Bandpass for E field blockfilter
low_bp = 0.03 # GHz
high_bp = 0.08 # GHz
beacon_amplitudes = 1e-6*np.array([1e2, 0, 0]) # mu V/m
beacon_radiate_rsq = True
if beacon_radiate_rsq:
# Move tx out, and magnify beacon_amplitude (at tx)
tx = Antenna(x=-20e3,y=0,z=0,name='tx') # m
dist = lib.distance(tx, Antenna(x=0, y=0, z=0))
ampl = max(1, dist**2)
beacon_amplitudes *= ampl
####
fname_dir = path.dirname(fname)
tx_fname = path.join(fname_dir, tx_fname)
antennas_fname = path.join(fname_dir, antennas_fname)
if not path.isfile(tx_fname) or remake_tx:
write_tx_file(tx_fname, tx, f_beacon)
else:
tx, f_beacon = read_tx_file(tx_fname)
# read in antennas
ev = REvent(fname)
N_antennas = len(ev.antennas)
# initialize hdf5 file
init_antenna_hdf5(antennas_fname, tx, f_beacon)
# make beacon per antenna
for i, antenna in enumerate(ev.antennas):
t0 = lib.distance(tx, antenna)/3e8 * 1e9 # ns
beacon = lib.beacon_from(tx, antenna, f_beacon, antenna.t, t0=t0, c_light=np.inf, radiate_rsq=beacon_radiate_rsq)
E = np.array([antenna.Ex, antenna.Ey, antenna.Ez, beacon])
# add to relevant polarisation
# and apply block filter
dt = antenna.t[1] - antenna.t[0]
for j, _ in enumerate(beacon_amplitudes):
E[j] += block_filter(beacon_amplitudes[j]*beacon, dt, low_bp, high_bp)
append_antenna_hdf5( antennas_fname, antenna, E, name='traces', prepend_time=True, attrs_dict=dict(t0=t0))
# Save E field in E_AxB
E = [np.dot(ev.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(E[0], E[1], E[2])]
append_antenna_hdf5( antennas_fname, antenna, [E], name='E_AxB', prepend_time=True)
print("Antenna HDF5 file written as " + str(antennas_fname))