mirror of
				https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
				synced 2025-10-31 03:46:44 +01:00 
			
		
		
		
	Note that this is a single noise realisation that is added to the three traces. It will be ~3 times stronger for E_AxB
		
			
				
	
	
		
			362 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			362 lines
		
	
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# vim: fdm=marker ts=4
 | 
						|
 | 
						|
"""
 | 
						|
Add a beacon measurement on top of the
 | 
						|
simulated airshower.
 | 
						|
"""
 | 
						|
 | 
						|
import numpy as np
 | 
						|
import json
 | 
						|
import h5py
 | 
						|
import os.path as path
 | 
						|
from copy import deepcopy
 | 
						|
 | 
						|
from earsim import REvent, Antenna, block_filter
 | 
						|
import lib
 | 
						|
 | 
						|
# {{{ vim marker
 | 
						|
tx_fname = 'tx.json'
 | 
						|
antennas_fname = 'antennas.hdf5'
 | 
						|
c_light = lib.c_light
 | 
						|
 | 
						|
def write_tx_file(fname, tx, f_beacon, **kwargs):
 | 
						|
    with open(fname, 'w') as fp:
 | 
						|
        return json.dump(
 | 
						|
                {
 | 
						|
                    **kwargs,
 | 
						|
                    **dict(
 | 
						|
                        f_beacon=f_beacon,
 | 
						|
                        tx=dict(
 | 
						|
                            x=tx.x,
 | 
						|
                            y=tx.y,
 | 
						|
                            z=tx.z,
 | 
						|
                            name=tx.name
 | 
						|
                        )
 | 
						|
                    )
 | 
						|
                },
 | 
						|
                fp
 | 
						|
               )
 | 
						|
 | 
						|
def read_tx_file(fname):
 | 
						|
    with open(fname, 'r') as fp:
 | 
						|
        data = json.load(fp)
 | 
						|
 | 
						|
    f_beacon = data['f_beacon']
 | 
						|
    tx = Antenna(**data['tx'])
 | 
						|
 | 
						|
    del data['f_beacon']
 | 
						|
    del data['tx']
 | 
						|
 | 
						|
    return tx, f_beacon, data
 | 
						|
 | 
						|
def read_beacon_hdf5(fname, **h5ant_kwargs):
 | 
						|
    with h5py.File(fname, 'r') as h5:
 | 
						|
        tx = Antenna_from_h5ant(h5['tx'], traces_key=None)
 | 
						|
        f_beacon = tx.attrs['f_beacon']
 | 
						|
 | 
						|
        antennas = []
 | 
						|
        for k, h5ant in h5['antennas'].items():
 | 
						|
            ant = Antenna_from_h5ant(h5ant, **h5ant_kwargs)
 | 
						|
 | 
						|
            antennas.append(ant)
 | 
						|
 | 
						|
    return f_beacon, tx, antennas
 | 
						|
 | 
						|
def Antenna_from_h5ant(h5ant, traces_key='traces', raise_exception=True, read_AxB=True, read_beacon_info=True):
 | 
						|
    mydict = { k:h5ant.attrs.get(k) for k in ['x', 'y', 'z', 'name'] }
 | 
						|
    ant = Antenna(**mydict)
 | 
						|
    if h5ant.attrs:
 | 
						|
        ant.attrs = {**h5ant.attrs}
 | 
						|
 | 
						|
    # Traces
 | 
						|
    if traces_key is None:
 | 
						|
        pass
 | 
						|
    elif traces_key not in h5ant:
 | 
						|
        if raise_exception:
 | 
						|
            raise ValueError("Traces_key not in file")
 | 
						|
    else:
 | 
						|
        ant.t  = deepcopy(h5ant[traces_key][0])
 | 
						|
        ant.Ex = deepcopy(h5ant[traces_key][1])
 | 
						|
        ant.Ey = deepcopy(h5ant[traces_key][2])
 | 
						|
        ant.Ez = deepcopy(h5ant[traces_key][3])
 | 
						|
        if len(h5ant[traces_key]) > 4:
 | 
						|
            ant.beacon = deepcopy(h5ant[traces_key][4])
 | 
						|
        if len(h5ant[traces_key]) > 5:
 | 
						|
            ant.noise = deepcopy(h5ant[traces_key][5])
 | 
						|
 | 
						|
    # E_AxB
 | 
						|
    if read_AxB and 'E_AxB' in h5ant:
 | 
						|
        ant.t_AxB = deepcopy(h5ant['E_AxB'][0])
 | 
						|
        ant.E_AxB = deepcopy(h5ant['E_AxB'][1])
 | 
						|
 | 
						|
    # Beacons
 | 
						|
    if read_beacon_info and 'beacon_info' in h5ant:
 | 
						|
        h5beacon = h5ant['beacon_info']
 | 
						|
 | 
						|
        beacon_info = {}
 | 
						|
        for name in h5beacon.keys():
 | 
						|
            beacon_info[name] = dict(h5beacon[name].attrs)
 | 
						|
 | 
						|
        ant.beacon_info = beacon_info
 | 
						|
 | 
						|
    return ant
 | 
						|
 | 
						|
 | 
						|
def init_antenna_hdf5(fname, tx = None, f_beacon = None):
 | 
						|
    with h5py.File(fname, 'w') as fp:
 | 
						|
        if tx is not None or f_beacon is not None:
 | 
						|
            tx_group = fp.create_group('tx')
 | 
						|
            tx_attrs = tx_group.attrs
 | 
						|
 | 
						|
            if f_beacon is not None:
 | 
						|
                tx_attrs['f_beacon'] = f_beacon
 | 
						|
 | 
						|
            if tx is not None:
 | 
						|
                tx_attrs['x'] = tx.x
 | 
						|
                tx_attrs['y'] = tx.y
 | 
						|
                tx_attrs['z'] = tx.z
 | 
						|
                tx_attrs['name'] = tx.name
 | 
						|
 | 
						|
    return fname
 | 
						|
 | 
						|
def append_antenna_hdf5(fname, antenna, columns = [], name='traces', prepend_time=True, overwrite=True, attrs_dict={}):
 | 
						|
    if not overwrite:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    with h5py.File(fname, 'a') as fp:
 | 
						|
        if 'antennas' in fp.keys():
 | 
						|
            if not overwrite:
 | 
						|
                raise NotImplementedError
 | 
						|
            group = fp['antennas']
 | 
						|
        else:
 | 
						|
            group = fp.create_group('antennas')
 | 
						|
 | 
						|
 | 
						|
        if antenna.name in group:
 | 
						|
            if not overwrite:
 | 
						|
                raise NotImplementedError
 | 
						|
            h5ant = group[antenna.name]
 | 
						|
        else:
 | 
						|
            h5ant = group.create_group(antenna.name)
 | 
						|
 | 
						|
        h5ant_attrs = h5ant.attrs
 | 
						|
        h5ant_attrs['x'] = antenna.x
 | 
						|
        h5ant_attrs['y'] = antenna.y
 | 
						|
        h5ant_attrs['z'] = antenna.z
 | 
						|
        h5ant_attrs['name'] = antenna.name
 | 
						|
 | 
						|
        for k,v in attrs_dict.items():
 | 
						|
            h5ant_attrs[k] = v
 | 
						|
 | 
						|
        if name in h5ant:
 | 
						|
            if not overwrite:
 | 
						|
                raise NotImplementedError
 | 
						|
            del h5ant[name]
 | 
						|
 | 
						|
        dset = h5ant.create_dataset(name, (len(columns) + 1*prepend_time, len(columns[0])), dtype='f')
 | 
						|
 | 
						|
        if prepend_time:
 | 
						|
            dset[0] = antenna.t
 | 
						|
 | 
						|
        for i, col in enumerate(columns, 1*prepend_time):
 | 
						|
            dset[i] = col
 | 
						|
 | 
						|
 | 
						|
def read_baseline_time_diffs_hdf5(fname):
 | 
						|
    """
 | 
						|
    Read Baseline Time Diff information from HDF5 storage.
 | 
						|
    """
 | 
						|
    with h5py.File(fname, 'r') as fp:
 | 
						|
        group_name = 'baseline_time_diffs'
 | 
						|
        base_dset_name = 'baselines'
 | 
						|
        dset_name = 'time_diffs'
 | 
						|
 | 
						|
        group = fp[group_name]
 | 
						|
 | 
						|
        names = group[base_dset_name][:].astype(str)
 | 
						|
 | 
						|
        dset = group[dset_name]
 | 
						|
        time_diffs = dset[:,0]
 | 
						|
        f_beacon = dset[:,1]
 | 
						|
        true_phase_diffs = dset[:,2]
 | 
						|
        k_periods = dset[:,3]
 | 
						|
 | 
						|
        return names, time_diffs, f_beacon, true_phase_diffs, k_periods
 | 
						|
 | 
						|
 | 
						|
def write_baseline_time_diffs_hdf5(fname, baselines, true_phase_diffs, k_periods, f_beacon, time_diffs=None, overwrite=True):
 | 
						|
    """
 | 
						|
    Write a combination of baselines, phase_diff, k_period and f_beacon to file.
 | 
						|
 | 
						|
    Note that f_beacon is allowed to broadcast, but the others are not.
 | 
						|
    """
 | 
						|
 | 
						|
    if not hasattr(baselines[0], '__len__'):
 | 
						|
        # this is a single baseline
 | 
						|
        N_baselines = 1
 | 
						|
        baselines = [baselines]
 | 
						|
        true_phase_diffs = [true_phase_diffs]
 | 
						|
        k_periods = [k_periods]
 | 
						|
        f_beacon = np.array([f_beacon])
 | 
						|
 | 
						|
    else:
 | 
						|
        N_baselines = len(baselines)
 | 
						|
 | 
						|
        # Expand the f_beacon list
 | 
						|
        if not hasattr(f_beacon, '__len__'):
 | 
						|
            f_beacon = np.array([f_beacon]*N_baselines)
 | 
						|
 | 
						|
    if time_diffs is None:
 | 
						|
        time_diffs = k_periods/f_beacon + true_phase_diffs/(2*np.pi*f_beacon)
 | 
						|
 | 
						|
    assert len(baselines) == len(true_phase_diffs) == len(k_periods) == len(f_beacon)
 | 
						|
 | 
						|
    with h5py.File(fname, 'a') as fp:
 | 
						|
        group_name = 'baseline_time_diffs'
 | 
						|
        base_dset_name = 'baselines'
 | 
						|
        dset_name = 'time_diffs'
 | 
						|
 | 
						|
        group = fp.require_group(group_name)
 | 
						|
 | 
						|
        if base_dset_name in group:
 | 
						|
            if not overwrite:
 | 
						|
                raise NotImplementedError
 | 
						|
            del group[base_dset_name]
 | 
						|
 | 
						|
        if dset_name in group:
 | 
						|
            if not overwrite:
 | 
						|
                raise NotImplementedError
 | 
						|
            del group[dset_name]
 | 
						|
 | 
						|
        # save baselines list
 | 
						|
        basenames = np.array([ [b[0].name, b[1].name] for b in baselines ], dtype='S')
 | 
						|
 | 
						|
        base_dset = group.create_dataset(base_dset_name, data=basenames)
 | 
						|
 | 
						|
        data = np.vstack( (time_diffs, f_beacon, true_phase_diffs, k_periods) ).T
 | 
						|
        dset = group.create_dataset(dset_name, data=data)
 | 
						|
 | 
						|
# }}} vim marker
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    from os import path
 | 
						|
 | 
						|
    rng = np.random.default_rng()
 | 
						|
 | 
						|
    fname = "ZH_airshower/mysim.sry"
 | 
						|
 | 
						|
    # Transmitter
 | 
						|
    remake_tx = True
 | 
						|
 | 
						|
    tx = Antenna(x=-2e3,y=0,z=0,name='tx') # m
 | 
						|
    if False:
 | 
						|
        # Move tx out a long way
 | 
						|
        tx.x, tx.y = -75e3, 75e3 # m
 | 
						|
    elif False:
 | 
						|
        # Move it to 0,0,0 (among the antennas)
 | 
						|
        tx.x, tx.y = 0, 0 #m
 | 
						|
 | 
						|
    # Beacon properties
 | 
						|
    if False: # slowest beacon to be found:
 | 
						|
        f_beacon =  10e-3 # GHz
 | 
						|
        low_bp = 5e-3 # GHz
 | 
						|
        high_bp = 80e-3 # GHz
 | 
						|
    else: # original wanted beacon
 | 
						|
        f_beacon = 51.53e-3 # GHz
 | 
						|
 | 
						|
        # Bandpass for E field blockfilter
 | 
						|
        low_bp = 30e-3 # GHz
 | 
						|
        high_bp = 80e-3 # GHz
 | 
						|
 | 
						|
    beacon_amplitudes = 1e-6*np.array([1e3, 0, 0]) # mu V/m
 | 
						|
    beacon_radiate_rsq = True # beacon_amplitude is repaired for distance to 0,0,0
 | 
						|
 | 
						|
    # modify beacon power to be beacon_amplitude at 0,0,0
 | 
						|
    if beacon_radiate_rsq:
 | 
						|
        dist = lib.distance(tx, Antenna(x=0, y=0, z=0))
 | 
						|
 | 
						|
        ampl = max(1, dist**2)
 | 
						|
 | 
						|
        beacon_amplitudes *= ampl
 | 
						|
 | 
						|
    # Noise properties
 | 
						|
    noise_sigma = 1e-4 # set to None to ignore
 | 
						|
 | 
						|
    # Disable block_filter
 | 
						|
    if False:
 | 
						|
        block_filter = lambda x, dt, low, high: x
 | 
						|
 | 
						|
    ####
 | 
						|
    fname_dir = path.dirname(fname)
 | 
						|
    tx_fname = path.join(fname_dir, tx_fname)
 | 
						|
    antennas_fname = path.join(fname_dir, antennas_fname)
 | 
						|
 | 
						|
    # read/write tx properties
 | 
						|
    if not path.isfile(tx_fname) or remake_tx:
 | 
						|
        write_tx_file(tx_fname, tx, f_beacon, amplitudes=beacon_amplitudes.tolist(), radiate_rsq=beacon_radiate_rsq)
 | 
						|
    else:
 | 
						|
        tx, f_beacon, _ = read_tx_file(tx_fname)
 | 
						|
 | 
						|
 | 
						|
    print("Beacon amplitude at tx [muV/m]:", beacon_amplitudes)
 | 
						|
    print("Tx location:", [tx.x, tx.y, tx.z])
 | 
						|
    print("Noise sigma:", noise_sigma)
 | 
						|
 | 
						|
    # read in antennas
 | 
						|
    ev = REvent(fname)
 | 
						|
    N_antennas = len(ev.antennas)
 | 
						|
 | 
						|
    # initialize hdf5 file
 | 
						|
    init_antenna_hdf5(antennas_fname, tx, f_beacon)
 | 
						|
 | 
						|
    # make beacon per antenna
 | 
						|
    for i, antenna in enumerate(ev.antennas):
 | 
						|
        if i%10 == 0:
 | 
						|
            print(f"Beaconed antenna {i} out of", len(ev.antennas))
 | 
						|
 | 
						|
        if not False: # modify trace lengths
 | 
						|
            N_samples = len(antenna.t)
 | 
						|
            #new_N = 2*N_samples
 | 
						|
            new_N = 10000 # ns = 10us
 | 
						|
            pre_N = 2000 # ns = 2us
 | 
						|
            after_N = new_N - pre_N
 | 
						|
 | 
						|
            dt = antenna.t[1] - antenna.t[0]
 | 
						|
            new_t = np.arange(-pre_N, after_N)*dt + antenna.t[0]
 | 
						|
 | 
						|
            antenna.t = new_t
 | 
						|
 | 
						|
            # TODO:trace extrapolation?
 | 
						|
            antenna.Ex = np.pad(antenna.Ex, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
 | 
						|
            antenna.Ey = np.pad(antenna.Ey, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
 | 
						|
            antenna.Ez = np.pad(antenna.Ez, (pre_N, after_N-N_samples), mode='constant', constant_values=0)
 | 
						|
 | 
						|
            if i%10 == 0:
 | 
						|
                print(f"Modified trace lengths by {pre_N},{after_N-N_samples}")
 | 
						|
 | 
						|
        beacon = lib.beacon_from(tx, antenna, f_beacon, antenna.t, c_light=c_light, radiate_rsq=beacon_radiate_rsq)
 | 
						|
 | 
						|
        # noise realisation
 | 
						|
        noise_realisation = 0
 | 
						|
        if noise_sigma is not None:
 | 
						|
            noise_realisation = rng.normal(0, noise_sigma, size=len(beacon))
 | 
						|
 | 
						|
        # Collect all data to be saved (with the first 3 values the E fields)
 | 
						|
        traces = np.array([antenna.Ex, antenna.Ey, antenna.Ez, beacon, noise_realisation])
 | 
						|
 | 
						|
        # add to relevant polarisation
 | 
						|
        # and apply block filter
 | 
						|
        dt = antenna.t[1] - antenna.t[0]
 | 
						|
        for j, amp in enumerate(beacon_amplitudes):
 | 
						|
            traces[j] = block_filter(traces[j] + amp*beacon + noise_realisation, dt, low_bp, high_bp)
 | 
						|
 | 
						|
        append_antenna_hdf5( antennas_fname, antenna, traces, name='traces', prepend_time=True)
 | 
						|
 | 
						|
        # Save E field in E_AxB
 | 
						|
        E_AxB = [np.dot(ev.uAxB,[ex,ey,ez]) for ex,ey,ez in zip(traces[0], traces[1], traces[2])]
 | 
						|
        t_AxB = antenna.t
 | 
						|
 | 
						|
        append_antenna_hdf5( antennas_fname, antenna, [t_AxB, E_AxB], name='E_AxB', prepend_time=False)
 | 
						|
 | 
						|
    print("Antenna HDF5 file written as " + str(antennas_fname))
 |