mirror of
https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
synced 2025-05-17 05:19:24 +02:00
Move lib out of ./simulations
This commit is contained in:
parent
84b3a6dca9
commit
6763bbc64c
14 changed files with 10 additions and 0 deletions
7
lib/__init__.py
Normal file
7
lib/__init__.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
from . import signals
|
||||
from . import location
|
||||
from . import sampling
|
||||
from .util import *
|
||||
|
||||
|
||||
TravelSignal = signals.DigitisedSignal
|
2
lib/location/__init__.py
Normal file
2
lib/location/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .location import *
|
||||
from .antenna import *
|
68
lib/location/antenna.py
Normal file
68
lib/location/antenna.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
from functools import partial
|
||||
import copy
|
||||
from typing import Union
|
||||
|
||||
from .location import Location
|
||||
from ..signals import Signal
|
||||
|
||||
class Antenna(Location):
|
||||
"""
|
||||
A location able to interact with a signal.
|
||||
|
||||
Either emitting or receiving.
|
||||
|
||||
Optionally uses digitizer to transform the signal
|
||||
when receiving.
|
||||
"""
|
||||
def __init__(self, x):
|
||||
super().__init__(x)
|
||||
|
||||
def __repr__(self):
|
||||
return "Antenna({}, {})".format(repr(self.x), repr(self.x))
|
||||
|
||||
def emit(self, signal: Union[Signal, callable]) -> Union[Signal, callable]:
|
||||
"""
|
||||
Emit signal from this antenna's location.
|
||||
|
||||
Note that this merely sets a default argument.
|
||||
"""
|
||||
if not isinstance(signal, Signal):
|
||||
return partial(signal, x_0=self.x)
|
||||
else:
|
||||
new_signal = copy.copy(signal)
|
||||
new_signal.x_0 = self.x
|
||||
|
||||
return new_signal
|
||||
|
||||
def recv(self, signal: Union[Signal, callable]) -> Union[Signal, callable]:
|
||||
"""
|
||||
Trace signal as a function of time at this antenna's
|
||||
location.
|
||||
|
||||
Note that this merely sets a default argument.
|
||||
"""
|
||||
if not isinstance(signal, Signal):
|
||||
return partial(signal, x_f=self.x)
|
||||
else:
|
||||
new_signal = copy.copy(signal)
|
||||
new_signal.x_f = self.x
|
||||
|
||||
return new_signal
|
||||
|
||||
receive = recv
|
||||
|
||||
class Receiver(Antenna):
|
||||
"""
|
||||
An antenna which main purpose is to trace a signal over time.
|
||||
|
||||
Optionally applies a transformation to the traced signal.
|
||||
"""
|
||||
def __repr__(self):
|
||||
return "Receiver({})".format(repr(self.x))
|
||||
|
||||
class Emitter(Antenna):
|
||||
"""
|
||||
An antenna which main purpose is to emit a signal.
|
||||
"""
|
||||
def __repr__(self):
|
||||
return "Emitter({})".format(repr(self.x))
|
52
lib/location/example.py
Executable file
52
lib/location/example.py
Executable file
|
@ -0,0 +1,52 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
from mpl_toolkits.mplot3d import axes3d
|
||||
|
||||
# fix package-internal importing
|
||||
if __name__ == "__main__" and __package__ is None:
|
||||
import sys
|
||||
sys.path.append("../../")
|
||||
__package__ = "lib.location"
|
||||
|
||||
from . import location as loc
|
||||
from ..location.antenna import Receiver, Emitter
|
||||
|
||||
# 2D showcase
|
||||
source = Emitter([1,1])
|
||||
|
||||
antennae = [
|
||||
Receiver([2,3]),
|
||||
Receiver([10,10]),
|
||||
Receiver([-2,-3]),
|
||||
Receiver([-10,0]),
|
||||
]
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
loc.plot_geometry(ax, [source], antennae)
|
||||
fig.show()
|
||||
|
||||
# 3D showcase
|
||||
source = Emitter([1,1,1])
|
||||
|
||||
antennae = [
|
||||
Receiver([2,3,0]),
|
||||
Receiver([10,10,-5]),
|
||||
Receiver([-2,-3,9]),
|
||||
Receiver([-10,0,-5]),
|
||||
]
|
||||
|
||||
fig = plt.figure()
|
||||
ax = fig.add_subplot(111, projection='3d')
|
||||
|
||||
ax.set_title("Geometry of Emitter(s) and Antennae")
|
||||
ax.set_xlabel("x")
|
||||
ax.set_ylabel("y")
|
||||
ax.set_zlabel("z")
|
||||
ax.plot([source.x[0]], *source.x[1:], '*', label="Emitter")
|
||||
|
||||
for j, ant in enumerate(antennae):
|
||||
ax.plot([ant.x[0]], *ant.x[1:], '+', label="Antenna {}".format(j))
|
||||
|
||||
ax.legend()
|
||||
plt.show()
|
110
lib/location/location.py
Normal file
110
lib/location/location.py
Normal file
|
@ -0,0 +1,110 @@
|
|||
import numpy as np
|
||||
from functools import partial
|
||||
|
||||
def distance(x1, x2):
|
||||
"""
|
||||
Calculate the Euclidean distance between two locations x1 and x2
|
||||
"""
|
||||
return np.sqrt( np.sum( (x1 - x2)**2, axis=-1) )
|
||||
|
||||
def plot_geometry(ax, emitters=[], antennae=[], unit='m'):
|
||||
"""
|
||||
Show the geometry of emitters and antennae in a square plot.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ax - matplotlib.Axes
|
||||
The axis object to plot the geometry on.
|
||||
emitters - list of Locations
|
||||
The Emitter objects to plot.
|
||||
antennae - list of Locations
|
||||
The Receiver objects to plot.
|
||||
|
||||
Returns
|
||||
-------
|
||||
ax - matplotlib.Axes
|
||||
The axis object containing the plotted geometry.
|
||||
annots - dict of list of matplotlib.text.Annotation
|
||||
The dictionary is split up into a list of annotations
|
||||
belonging to the emitters, and one for the antennae.
|
||||
"""
|
||||
|
||||
ax.grid()
|
||||
ax.set_title("Geometry of Emitter(s) and Antennae")
|
||||
ax.set_ylabel("y ({})".format(unit))
|
||||
ax.set_xlabel("x ({})".format(unit))
|
||||
ax.margins(0.3)
|
||||
ax.set_aspect('equal', 'datalim') # make it a square plot
|
||||
|
||||
annots = {}
|
||||
for k, locs in {"E": emitters, "A": antennae}.items():
|
||||
if k == "E":
|
||||
marker='*'
|
||||
prefix = k
|
||||
elif k == "A":
|
||||
marker="o"
|
||||
prefix = k
|
||||
|
||||
# create the list of annotations
|
||||
if k not in annots:
|
||||
annots[k] = []
|
||||
|
||||
# plot marker and create annotation
|
||||
for j, loc in enumerate(locs):
|
||||
label = "{}{}".format(prefix, j)
|
||||
ax.plot(*loc.x, marker=marker, label=label)
|
||||
annots[k].append(ax.annotate(label, loc.x))
|
||||
|
||||
return ax, annots
|
||||
|
||||
class Location:
|
||||
"""
|
||||
A location is a point designated by a spatial coordinate x.
|
||||
|
||||
Locations are wrappers around a Numpy N-dimensional array.
|
||||
"""
|
||||
|
||||
def __init__(self, x):
|
||||
self.x = np.asarray(x)
|
||||
|
||||
def __repr__(self):
|
||||
return "Location({})".format(repr(self.x))
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.x[key]
|
||||
|
||||
def __setitem__(self, key, val):
|
||||
self.x[key] = val
|
||||
|
||||
def distance(self, other):
|
||||
if isinstance(other, Location):
|
||||
other = other.x
|
||||
|
||||
return distance(self.x, other)
|
||||
|
||||
# math
|
||||
def __add__(self, other):
|
||||
if isinstance(other, Location):
|
||||
other = other.x
|
||||
|
||||
return self.__class__(self.x + other)
|
||||
|
||||
def __sub__(self, other):
|
||||
if isinstance(other, Location):
|
||||
other = other.x
|
||||
|
||||
return self.__class__(self.x - other)
|
||||
|
||||
def __mul__(self, other):
|
||||
return self.__class__(self.x * other)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Location):
|
||||
other = other.x
|
||||
|
||||
return np.all(self.x == other)
|
||||
|
||||
# math alias functions
|
||||
__radd__ = __add__
|
||||
__rsub__ = __sub__
|
||||
__rmul__ = __mul__
|
3
lib/sampling/__init__.py
Normal file
3
lib/sampling/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from .sampling import *
|
||||
from .sampler import *
|
||||
from .digitizer import *
|
66
lib/sampling/digitizer.py
Normal file
66
lib/sampling/digitizer.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
import numpy as np
|
||||
from functools import wraps, partial
|
||||
|
||||
from . import sampling as smp
|
||||
from .sampler import Sampler
|
||||
|
||||
class Digitizer(Sampler):
|
||||
"""
|
||||
Digitizer that takes in a signal and resamples and quantises the signal.
|
||||
"""
|
||||
|
||||
def __init__(self, resolution=0.1, bias=0, sampling_frequency=None):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
##########
|
||||
resolution - float
|
||||
Resolution of the digitizer
|
||||
sampling_frequency - float
|
||||
Frequency this digitizer will sample a signal
|
||||
"""
|
||||
super().__init__(sampling_frequency)
|
||||
|
||||
self.resolution = resolution
|
||||
self.bias = bias
|
||||
|
||||
def digitise(self, signal, signal_sample_frequency=None):
|
||||
"""
|
||||
Digitize signal according to the specs of this digitizer.
|
||||
|
||||
Effectively resamples signal
|
||||
"""
|
||||
|
||||
if callable(signal):
|
||||
# if signal is already a partial,
|
||||
# try to rebuild it after setting the wrapper
|
||||
if isinstance(signal, partial):
|
||||
rebuild_partial = True
|
||||
p_args = signal.args
|
||||
p_kwargs = signal.keywords
|
||||
signal = signal.func
|
||||
else:
|
||||
rebuild_partial = False
|
||||
|
||||
@wraps(signal)
|
||||
def wrapper(*args, **kwargs):
|
||||
return smp.quantise(
|
||||
self.sample(signal(*args, **kwargs), signal_sample_frequency),
|
||||
self.resolution,
|
||||
self.bias
|
||||
)
|
||||
|
||||
# rebuild the partial if applicable
|
||||
if rebuild_partial:
|
||||
wrapper = partial(wrapper, *p_args, **p_kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
else:
|
||||
signal = np.asarray(signal)
|
||||
|
||||
return smp.quantise(
|
||||
self.sample(signal, signal_sample_frequency),
|
||||
self.resolution,
|
||||
self.bias
|
||||
)
|
29
lib/sampling/sampler.py
Normal file
29
lib/sampling/sampler.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
|
||||
import numpy as np
|
||||
|
||||
from . import sampling as smp
|
||||
|
||||
class Sampler():
|
||||
"""
|
||||
A mechanism to sample signals.
|
||||
"""
|
||||
|
||||
def __init__(self, sampling_frequency=None):
|
||||
"""
|
||||
Parameters
|
||||
##########
|
||||
sampling_frequency - float
|
||||
Frequency the signals will be sampled at
|
||||
"""
|
||||
|
||||
self.sampling_frequency = sampling_frequency
|
||||
|
||||
def sample(self, signal, signal_fs=None):
|
||||
"""
|
||||
Sample signal
|
||||
"""
|
||||
# Null operation
|
||||
if signal_fs is None or self.sampling_frequency is None:
|
||||
return signal
|
||||
|
||||
return smp.resample(signal, signal_fs, self.sampling_frequency)
|
55
lib/sampling/sampling.py
Normal file
55
lib/sampling/sampling.py
Normal file
|
@ -0,0 +1,55 @@
|
|||
"""
|
||||
Sampling related stuff
|
||||
|
||||
Such as a Sampler and Digitizer
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
def quantise(signal, resolution, bias=0):
|
||||
"""
|
||||
Quantise the signal with resolution
|
||||
|
||||
Parameters
|
||||
##########
|
||||
signal - arraylike
|
||||
The signal to be quantised
|
||||
resolution - float
|
||||
Resolution for quantising the signal
|
||||
bias - optional,float
|
||||
Optional bias applied before quantising
|
||||
"""
|
||||
|
||||
return np.round(signal / resolution - bias) * resolution
|
||||
|
||||
def resample(signal, signal_fs, sample_frequency = 1):
|
||||
"""
|
||||
Resample signal (sampled at signal_fs) to sample_frequency
|
||||
|
||||
Parameters
|
||||
##########
|
||||
signal - arraylike
|
||||
The signal to be resampled
|
||||
signal_fs - float
|
||||
Sampling frequency of signal
|
||||
sample_frequency - float
|
||||
Wanted sampling frequency for the resampled signal
|
||||
"""
|
||||
|
||||
scale = sample_frequency / signal_fs
|
||||
|
||||
return _resample(signal, scale)
|
||||
|
||||
def _resample(signal, scale):
|
||||
"""
|
||||
Quick resampling algorithm
|
||||
|
||||
From: https://github.com/nwhitehead/swmixer/blob/master/swmixer.py
|
||||
"""
|
||||
n = round( len(signal) * scale )
|
||||
|
||||
return np.interp(
|
||||
np.linspace(0, 1, n, endpoint=False), # where to interpret
|
||||
np.linspace(0, 1, len(signal), endpoint=False), # known positions
|
||||
signal, # known data points
|
||||
)
|
2
lib/signals/__init__.py
Normal file
2
lib/signals/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .signal import *
|
||||
from .digitisedsignal import *
|
176
lib/signals/digitisedsignal.py
Executable file
176
lib/signals/digitisedsignal.py
Executable file
|
@ -0,0 +1,176 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import numpy as np
|
||||
import scipy.interpolate as interp
|
||||
|
||||
if __name__ == "__main__" and __package__ is None:
|
||||
import sys
|
||||
sys.path.append("../../")
|
||||
__package__ = "lib.signals"
|
||||
|
||||
from .signal import *
|
||||
|
||||
class DigitisedSignal(Signal):
|
||||
"""
|
||||
Model an arbitrary digitised signal that can be translated to another position and time.
|
||||
"""
|
||||
|
||||
def __init__(self, signal, sample_rate, t_0 = 0, x_0 = 0, periodic=True, interp1d_kw = None, velocity=None, t_f = None, x_f = None):
|
||||
"""
|
||||
Initialise by saving the raw signal
|
||||
|
||||
Parameters
|
||||
----------
|
||||
signal : arraylike
|
||||
The raw signal to wrap.
|
||||
sample_rate : float
|
||||
Sample rate of the raw signal.
|
||||
t_0 : float, optional
|
||||
Time that this signal is sent out.
|
||||
x_0 : float, optional
|
||||
Location that this signal is sent out from.
|
||||
periodic : bool, optional
|
||||
Translated signal is 0 if it is not periodic
|
||||
and the time/distance is outside the samples.
|
||||
interp1d_kw : bool or dict, optional
|
||||
Use scipy.interpolate's interp1d_kw for interpolation.
|
||||
Set to True, or a dictionary to enable.
|
||||
Dictionary will be entered in as **kwargs.
|
||||
velocity : float, optional
|
||||
Defaults to the speed of light in m/s.
|
||||
t_f : float, optional
|
||||
Default time that this signal is received.
|
||||
x_f : float, optional
|
||||
Default Location that this signal is received.
|
||||
"""
|
||||
super().__init__(t_0=t_0, x_0=x_0, velocity=velocity, t_f=t_f, x_f=x_f)
|
||||
|
||||
self.raw = np.asarray(signal)
|
||||
self.periodic = periodic
|
||||
|
||||
self.sample_rate = sample_rate # Hz
|
||||
self.sample_length = len(self.raw)
|
||||
self.time_length = self.sample_length/sample_rate # s
|
||||
|
||||
# choose interpolation method
|
||||
if not interp1d_kw:
|
||||
self.interp_f = None
|
||||
|
||||
# offload interpolation to scipy.interpolate
|
||||
else:
|
||||
interp1d_kw_defaults = {
|
||||
"copy": False,
|
||||
"kind": 'linear',
|
||||
"assume_sorted": True,
|
||||
"bounds_error": True
|
||||
}
|
||||
|
||||
if self.periodic:
|
||||
interp1d_kw_defaults['bounds_error'] = False
|
||||
interp1d_kw_defaults['fill_value'] = (self.raw[-1], self.raw[0])
|
||||
|
||||
# merge kwargs
|
||||
if interp1d_kw is not True:
|
||||
interp1d_kw = { **interp1d_kw_defaults, **interp1d_kw }
|
||||
|
||||
self.interp_f = interp.interp1d(
|
||||
np.arange(0, self.sample_length),
|
||||
self.raw,
|
||||
**interp1d_kw
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
return self.sample_length
|
||||
|
||||
def raw_time(self):
|
||||
return np.arange(0, self.time_length, 1/self.sample_rate)
|
||||
|
||||
def _translate(self, t_f = None, x_f = None, t_0 = None, x_0 = None, velocity = None):
|
||||
"""
|
||||
Translate the signal from (t_0, x_0) to (t_f, x_f) with optional velocity.
|
||||
|
||||
Returns the signal at (t_f, x_f) and the total time offset
|
||||
"""
|
||||
|
||||
total_time_offset = self.total_time_offset(t_f=t_f, x_f=x_f, t_0=t_0, x_0=x_0, velocity=velocity)
|
||||
n_offset = (total_time_offset * self.sample_rate )
|
||||
|
||||
# periodic signal
|
||||
if self.periodic:
|
||||
n_offset = n_offset % self.sample_length
|
||||
|
||||
# non-periodic and possibly outside the bounds
|
||||
else:
|
||||
# this is a numpy array
|
||||
if hasattr(n_offset, 'ndim') and n_offset.ndim > 0:
|
||||
mask_idx = np.nonzero( (0 > n_offset) | (n_offset >= self.sample_length) )
|
||||
|
||||
n_offset[mask_idx] = 0
|
||||
|
||||
# not a numpy array
|
||||
else:
|
||||
# outside the bounds
|
||||
if 0 > n_offset or n_offset > self.sample_length:
|
||||
n_offset = np.nan
|
||||
|
||||
# n_offset is invalid
|
||||
# set amplitude to zero
|
||||
if n_offset is np.nan:
|
||||
amplitude = 0
|
||||
|
||||
# n_offset is valid, interpolate the amplitude
|
||||
else:
|
||||
# offload to scipy interpolation
|
||||
if self.interp_f:
|
||||
amplitude = self.interp_f(n_offset)
|
||||
|
||||
# self written linear interpolation
|
||||
else:
|
||||
n_offset = np.asarray(n_offset)
|
||||
|
||||
n_offset_eps, n_offset_int = np.modf(n_offset)
|
||||
n_offset_int = n_offset.astype(int)
|
||||
|
||||
if True:
|
||||
amplitude = (1-n_offset_eps) * self.raw[n_offset_int] \
|
||||
+ n_offset_eps * self.raw[(n_offset_int + 1) % self.sample_length]
|
||||
|
||||
# use nearest value instead of interpolation
|
||||
else:
|
||||
amplitude = self.raw[n_offset_int]
|
||||
|
||||
if not self.periodic:
|
||||
if hasattr(amplitude, 'ndim') and amplitude.ndim > 0:
|
||||
amplitude[mask_idx] = 0
|
||||
|
||||
return amplitude, total_time_offset
|
||||
|
||||
if __name__ == "__main__":
|
||||
import matplotlib.pyplot as plt
|
||||
from scipy.stats import norm
|
||||
|
||||
sample_rate = 3e2 # Hz
|
||||
|
||||
t_offset = 8
|
||||
periodic = False
|
||||
|
||||
time = t_offset + np.arange(0, 1, 1/sample_rate) #s
|
||||
time2 = t_offset + np.arange(-1.5, 1, 1/sample_rate) #s
|
||||
|
||||
signal = norm.pdf(time, time[len(time)//2], (time[-1] - time[0])/10)
|
||||
|
||||
mysignal = DigitisedSignal(signal, sample_rate, t_0 = t_offset, periodic=True)
|
||||
mysignal2 = DigitisedSignal(signal, sample_rate, t_0 = t_offset, periodic=False)
|
||||
|
||||
fig, ax = plt.subplots(1, 1, figsize=(16,4))
|
||||
ax.set_title("Raw and DigitisedSignal")
|
||||
ax.set_ylabel("Amplitude")
|
||||
ax.set_xlabel("Time")
|
||||
|
||||
ax.plot(time, signal, label='Raw signal')
|
||||
ax.plot(time2, mysignal(time2) +0.5, '.-', label='DigitisedSignal(periodic)+0.5')
|
||||
ax.plot(time2, mysignal2(time2)-0.5, '.-', label='DigitisedSignal-0.5')
|
||||
|
||||
ax.legend()
|
||||
|
||||
plt.show();
|
137
lib/signals/signal.py
Normal file
137
lib/signals/signal.py
Normal file
|
@ -0,0 +1,137 @@
|
|||
"""
|
||||
Define the super Signal class
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
class Signal():
|
||||
"""
|
||||
An arbitrary signal that can be translated to another position and time.
|
||||
Note that position can be of any length.
|
||||
|
||||
Super object, cannot be used directly.
|
||||
"""
|
||||
def __init__(self, t_0 = 0, x_0 = 0, velocity=None, t_f = None, x_f = None):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
t_0 : float, optional
|
||||
Time that this signal is sent out.
|
||||
x_0 : float, optional
|
||||
Location that this signal is sent out from.
|
||||
velocity : float, optional
|
||||
Defaults to the speed of light in m/s.
|
||||
t_f : float, optional
|
||||
Default time that this signal is received.
|
||||
x_f : float, optional
|
||||
Default Location that this signal is received.
|
||||
"""
|
||||
if t_0 is None:
|
||||
raise ValueError("t_0 cannot be None")
|
||||
if x_0 is None:
|
||||
raise ValueError("x_0 cannot be None")
|
||||
|
||||
self.x_0 = np.asarray(x_0) # m
|
||||
self.t_0 = np.asarray(t_0) # s
|
||||
|
||||
self.velocity = 299792458 if velocity is None else velocity # m / s
|
||||
|
||||
# Default final positions
|
||||
t_f = np.asarray(t_f) if t_f is not None else None
|
||||
x_f = np.asarray(x_f) if x_f is not None else None
|
||||
|
||||
self.x_f = x_f
|
||||
self.t_f = t_f
|
||||
|
||||
def __call__(self, t_f = None, x_f = None, **kwargs):
|
||||
"""
|
||||
Allow this class to be used as a function.
|
||||
"""
|
||||
return self._translate(t_f, x_f, **kwargs)[0]
|
||||
|
||||
def _translate(self, t_f = None, x_f = None, t_0 = None, x_0 = None, velocity = None):
|
||||
"""
|
||||
Translate the signal from (t_0, x_0) to (t_f, x_f) with optional velocity.
|
||||
|
||||
Returns the signal at (t_f, x_f)
|
||||
"""
|
||||
|
||||
raise NotImplementedError
|
||||
|
||||
def spatial_time_offset(self, x_f=None, x_0=None, velocity=None):
|
||||
"""
|
||||
Calculate the time offset caused by a spatial distance.
|
||||
"""
|
||||
if velocity is None:
|
||||
velocity = self.velocity
|
||||
|
||||
if x_0 is None:
|
||||
x_0 = self.x_0
|
||||
if x_f is None:
|
||||
x_f = self.x_f
|
||||
|
||||
## make sure they are arrays
|
||||
x_0 = np.asarray(x_0) if x_0 is not None else None
|
||||
x_f = np.asarray(x_f) if x_f is not None else None
|
||||
|
||||
return np.sqrt( np.sum((x_f - x_0)**2, axis=-1) )/velocity
|
||||
|
||||
def temporal_time_offset(self, t_f=None, t_0=None):
|
||||
"""
|
||||
Calculate the time offset caused by a temporal distance.
|
||||
"""
|
||||
if t_0 is None:
|
||||
t_0 = self.t_0
|
||||
if t_f is None:
|
||||
t_f = self.t_f
|
||||
|
||||
## make sure they are arrays
|
||||
t_0 = np.asarray(t_0) if t_0 is not None else None
|
||||
t_f = np.asarray(t_f) if t_f is not None else None
|
||||
|
||||
return t_f - t_0
|
||||
|
||||
|
||||
def total_time_offset(self, t_f = None, x_f = None, t_0 = None, x_0 = None, velocity = None):
|
||||
"""
|
||||
Calculate how much time shifting is needed to go from (t_0, x_0) to (t_f, x_f).
|
||||
|
||||
Convention:
|
||||
(t_0, x_0) < (t_f, x_0) gives a positive time shift,
|
||||
(t_0, x_0) != (t_0, x_f) gives a negative time shift
|
||||
|
||||
Returns:
|
||||
the time shift
|
||||
"""
|
||||
# Get default values
|
||||
## starting point
|
||||
if t_0 is None:
|
||||
t_0 = self.t_0
|
||||
if x_0 is None:
|
||||
x_0 = self.x_0
|
||||
|
||||
## final point
|
||||
if x_f is None:
|
||||
x_f = self.x_f
|
||||
if t_f is None:
|
||||
t_f = self.t_f
|
||||
|
||||
## make sure they are arrays
|
||||
t_0 = np.asarray(t_0) if t_0 is not None else None
|
||||
x_0 = np.asarray(x_0) if x_0 is not None else None
|
||||
t_f = np.asarray(t_f) if t_f is not None else None
|
||||
x_f = np.asarray(x_f) if x_f is not None else None
|
||||
|
||||
# spatial offset
|
||||
if x_f is None:
|
||||
spatial_time_offset = 0
|
||||
else:
|
||||
spatial_time_offset = self.spatial_time_offset(x_f, x_0=x_0, velocity=velocity)
|
||||
|
||||
# temporal offset
|
||||
if t_f is None:
|
||||
temporal_time_offset = 0
|
||||
else:
|
||||
temporal_time_offset = self.temporal_time_offset(t_f, t_0=t_0)
|
||||
|
||||
return temporal_time_offset - spatial_time_offset
|
38
lib/util.py
Normal file
38
lib/util.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
"""
|
||||
Various useful utilities (duh)
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
|
||||
def sampled_time(sample_rate=1, start=0, end=1, offset=0):
|
||||
return offset + np.arange(start, end, 1/sample_rate)
|
||||
|
||||
def rot_vector(phi1=0.12345):
|
||||
"""
|
||||
Return a unit vector rotated by phi radians.
|
||||
"""
|
||||
|
||||
unit = np.array([
|
||||
phi1,
|
||||
phi1 - np.pi/2
|
||||
])
|
||||
|
||||
return np.cos(unit)
|
||||
|
||||
def detect_edges(threshold, data, rising=True, falling=False):
|
||||
"""
|
||||
Detect rising/falling edges in data, returning the indices
|
||||
of the detected edges.
|
||||
|
||||
https://stackoverflow.com/a/50365462
|
||||
"""
|
||||
|
||||
mask = np.full(len(data)-1, False)
|
||||
|
||||
if rising:
|
||||
mask |= (data[:-1] < threshold) & (data[1:] > threshold)
|
||||
|
||||
if falling:
|
||||
mask |= (data[:-1] > threshold) & (data[1:] < threshold)
|
||||
|
||||
return np.flatnonzero(mask)+1
|
Loading…
Add table
Add a link
Reference in a new issue