2022-03-10 14:06:17 +01:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
"""
|
|
|
|
Define the TravelSignal class.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
import scipy.interpolate as interp
|
|
|
|
|
2022-03-11 16:46:18 +01:00
|
|
|
try:
|
|
|
|
from _signal import *
|
|
|
|
except ImportError:
|
|
|
|
from ._signal import *
|
|
|
|
|
2022-03-10 14:06:17 +01:00
|
|
|
class TravelSignal:
|
|
|
|
"""
|
|
|
|
Model an arbitrary digitised signal that can be translated to another position and time.
|
|
|
|
"""
|
|
|
|
|
2022-03-10 17:07:22 +01:00
|
|
|
def __init__(self, signal, sample_rate, t_0 = 0, x_0 = 0, periodic=True, interp1d_kw = None, velocity=None):
|
2022-03-10 14:06:17 +01:00
|
|
|
"""
|
|
|
|
Initialise by saving the raw signal
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
signal : arraylike
|
|
|
|
The raw signal to wrap.
|
|
|
|
sample_rate : float
|
|
|
|
Sample rate of the raw signal.
|
|
|
|
t_0 : float, optional
|
|
|
|
Time that this signal is sent out.
|
|
|
|
x_0 : float, optional
|
|
|
|
Location that this signal is sent out from.
|
|
|
|
periodic : bool, optional
|
|
|
|
Translated signal is 0 if it is not periodic
|
|
|
|
and the time/distance is outside the samples.
|
|
|
|
interp1d_kw : bool or dict, optional
|
|
|
|
Use scipy.interpolate's interp1d_kw for interpolation.
|
|
|
|
Set to True, or a dictionary to enable.
|
|
|
|
Dictionary will be entered in as **kwargs.
|
2022-03-10 17:07:22 +01:00
|
|
|
velocity : float, optional
|
|
|
|
Defaults to the speed of light in m/s.
|
2022-03-10 14:06:17 +01:00
|
|
|
"""
|
|
|
|
|
2022-03-10 17:07:22 +01:00
|
|
|
self.raw = np.asarray(signal)
|
2022-03-10 14:06:17 +01:00
|
|
|
self.periodic = periodic
|
|
|
|
|
|
|
|
self.sample_rate = sample_rate # Hz
|
|
|
|
self.sample_length = len(self.raw)
|
|
|
|
self.time_length = self.sample_length*sample_rate # s
|
|
|
|
|
2022-03-10 17:07:22 +01:00
|
|
|
self.velocity = 299792458 if velocity is None else velocity # m / s
|
|
|
|
|
|
|
|
self.x_0 = x_0 # m
|
|
|
|
self.t_0 = t_0 # s
|
2022-03-10 14:06:17 +01:00
|
|
|
|
|
|
|
# choose interpolation method
|
|
|
|
if not interp1d_kw:
|
|
|
|
self.interp_f = None
|
|
|
|
|
|
|
|
# offload interpolation to scipy.interpolate
|
|
|
|
else:
|
|
|
|
interp1d_kw_defaults = {
|
|
|
|
"copy": False,
|
|
|
|
"kind": 'linear',
|
|
|
|
"assume_sorted": True,
|
|
|
|
"bounds_error": True
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.periodic:
|
|
|
|
interp1d_kw_defaults['bounds_error'] = False
|
|
|
|
interp1d_kw_defaults['fill_value'] = (self.raw[-1], self.raw[0])
|
|
|
|
|
|
|
|
# merge kwargs
|
|
|
|
if interp1d_kw is not True:
|
|
|
|
interp1d_kw = { **interp1d_kw_defaults, **interp1d_kw }
|
|
|
|
|
|
|
|
self.interp_f = interp.interp1d(
|
|
|
|
np.arange(0, self.sample_length),
|
|
|
|
self.raw,
|
|
|
|
**interp1d_kw
|
|
|
|
)
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
return self.sample_length
|
|
|
|
|
|
|
|
def __call__(self, t_f = None, x_f = None, **kwargs):
|
|
|
|
"""
|
|
|
|
Allow this class to be used as a function.
|
|
|
|
"""
|
|
|
|
return self._translate(t_f, x_f, **kwargs)[0]
|
|
|
|
|
|
|
|
def _translate(self, t_f = None, x_f = None, t_0 = None, x_0 = None, velocity = None):
|
|
|
|
"""
|
|
|
|
Translate the signal from (t_0, x_0) to (t_f, x_f) with optional velocity.
|
|
|
|
|
|
|
|
Returns the signal at (t_f, x_f)
|
|
|
|
"""
|
|
|
|
|
2022-03-10 17:07:22 +01:00
|
|
|
total_time_offset = self.total_time_offset(t_f=t_f, x_f=x_f, t_0=t_0, x_0=x_0, velocity=velocity)
|
2022-03-10 14:06:17 +01:00
|
|
|
n_offset = (total_time_offset * self.sample_rate )
|
|
|
|
|
|
|
|
# periodic signal
|
|
|
|
if self.periodic:
|
|
|
|
n_offset = n_offset % self.sample_length
|
|
|
|
|
|
|
|
# non-periodic and possibly outside the bounds
|
|
|
|
else:
|
|
|
|
# this is a numpy array
|
|
|
|
if hasattr(n_offset, 'ndim') and n_offset.ndim > 0:
|
|
|
|
mask_idx = np.nonzero( (0 > n_offset) | (n_offset >= self.sample_length) )
|
|
|
|
|
|
|
|
n_offset[mask_idx] = 0
|
|
|
|
|
|
|
|
# not a numpy array
|
|
|
|
else:
|
|
|
|
# outside the bounds
|
|
|
|
if 0 > n_offset or n_offset > self.sample_length:
|
|
|
|
n_offset = np.nan
|
|
|
|
|
|
|
|
# n_offset is invalid
|
|
|
|
# set amplitude to zero
|
|
|
|
if n_offset is np.nan:
|
|
|
|
amplitude = 0
|
|
|
|
|
|
|
|
# n_offset is valid, interpolate the amplitude
|
|
|
|
else:
|
|
|
|
# offload to scipy interpolation
|
|
|
|
if self.interp_f:
|
|
|
|
amplitude = self.interp_f(n_offset)
|
|
|
|
|
|
|
|
# self written linear interpolation
|
|
|
|
else:
|
|
|
|
n_offset = np.asarray(n_offset)
|
|
|
|
|
|
|
|
n_offset_eps, n_offset_int = np.modf(n_offset)
|
|
|
|
n_offset_int = n_offset.astype(int)
|
|
|
|
|
|
|
|
if True:
|
|
|
|
amplitude = (1-n_offset_eps) * self.raw[n_offset_int] \
|
|
|
|
+ n_offset_eps * self.raw[(n_offset_int + 1) % self.sample_length]
|
|
|
|
|
|
|
|
# use nearest value instead of interpolation
|
|
|
|
else:
|
|
|
|
amplitude = self.raw[n_offset_int]
|
|
|
|
|
|
|
|
if not self.periodic:
|
|
|
|
if hasattr(amplitude, 'ndim'):
|
|
|
|
amplitude[mask_idx] = 0
|
|
|
|
|
|
|
|
return amplitude, total_time_offset
|
|
|
|
|
2022-03-10 17:07:22 +01:00
|
|
|
def spatial_time(self, x_f, velocity=None, x_0=None):
|
|
|
|
"""
|
|
|
|
Calculate the time offset caused by a spatial difference.
|
|
|
|
"""
|
|
|
|
if velocity is None:
|
|
|
|
velocity = self.velocity
|
|
|
|
|
|
|
|
if x_0 is None:
|
|
|
|
x_0 = self.x_0
|
|
|
|
|
|
|
|
return np.sum(np.sqrt( (x_f - x_0)**2 )/velocity)
|
|
|
|
|
|
|
|
def total_time_offset(self, t_f = None, x_f = None, t_0 = None, x_0 = None, velocity = None):
|
|
|
|
"""
|
|
|
|
Calculate how much time shifting is needed to go from (t_0, x_0) to (t_f, x_f).
|
|
|
|
|
|
|
|
Convention:
|
|
|
|
(t_0, x_0) < (t_f, x_0) gives a positive time shift,
|
|
|
|
(t_0, x_0) != (t_0, x_f) gives a negative time shift
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
the time shift
|
|
|
|
"""
|
|
|
|
|
|
|
|
## spatial offset
|
|
|
|
if x_f is None:
|
|
|
|
spatial_time_offset = 0
|
|
|
|
else:
|
|
|
|
x_f = np.asarray(x_f)
|
|
|
|
if x_0 is None:
|
|
|
|
x_0 = self.x_0
|
|
|
|
|
|
|
|
spatial_time_offset = self.spatial_time(x_f, x_0=x_0, velocity=velocity)
|
|
|
|
|
|
|
|
## temporal offset
|
|
|
|
if t_f is None:
|
|
|
|
temporal_time_offset = 0
|
|
|
|
else:
|
|
|
|
t_f = np.asarray(t_f)
|
|
|
|
|
|
|
|
if t_0 is None:
|
|
|
|
t_0 = self.t_0
|
|
|
|
|
|
|
|
temporal_time_offset = t_f - t_0
|
|
|
|
|
|
|
|
return temporal_time_offset - spatial_time_offset
|
2022-03-10 14:06:17 +01:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
from scipy.stats import norm
|
|
|
|
|
|
|
|
sample_rate = 3e2 # Hz
|
|
|
|
|
|
|
|
t_offset = 8
|
|
|
|
periodic = False
|
|
|
|
|
|
|
|
time = t_offset + np.arange(0, 1, 1/sample_rate) #s
|
|
|
|
time2 = t_offset + np.arange(-1.5, 1, 1/sample_rate) #s
|
|
|
|
|
|
|
|
signal = norm.pdf(time, time[len(time)//2], (time[-1] - time[0])/10)
|
|
|
|
|
|
|
|
mysignal = TravelSignal(signal, sample_rate, t_0 = t_offset, periodic=True)
|
|
|
|
mysignal2 = TravelSignal(signal, sample_rate, t_0 = t_offset, periodic=False)
|
|
|
|
|
|
|
|
fig, ax = plt.subplots(1, 1, figsize=(16,4))
|
|
|
|
ax.set_title("Raw and TravelSignal")
|
|
|
|
ax.set_ylabel("Amplitude")
|
|
|
|
ax.set_xlabel("Time")
|
|
|
|
|
2022-03-10 17:07:22 +01:00
|
|
|
ax.plot(time, signal, label='Raw signal')
|
|
|
|
ax.plot(time2, mysignal(time2) +0.5, '.-', label='TravelSignal(periodic)+0.5')
|
|
|
|
ax.plot(time2, mysignal2(time2)-0.5, '.-', label='TravelSignal-0.5')
|
2022-03-10 14:06:17 +01:00
|
|
|
|
|
|
|
ax.legend()
|
|
|
|
|
|
|
|
plt.show();
|