mirror of
https://gitlab.science.ru.nl/mthesis-edeboone/m-thesis-introduction.git
synced 2024-12-21 19:13:32 +01:00
Sine fitting figure showing SNR vs residuals
This commit is contained in:
parent
87d12748e4
commit
68f029f1f7
4 changed files with 394 additions and 9 deletions
314
fourier/05_sine_fitting.py
Executable file
314
fourier/05_sine_fitting.py
Executable file
|
@ -0,0 +1,314 @@
|
|||
#!/usr/bin/env python3
|
||||
# vim: fdm=indent ts=4
|
||||
|
||||
__doc__ = \
|
||||
"""
|
||||
Sample sine wave + noise
|
||||
Filter it
|
||||
Then fit in t-domain to resolve \\varphi_0
|
||||
"""
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.gridspec as gridspec
|
||||
import numpy as np
|
||||
if not True:
|
||||
import numpy.fft as ft
|
||||
else:
|
||||
import scipy.fftpack as ft
|
||||
import scipy.optimize as opt
|
||||
|
||||
from mylib import *
|
||||
|
||||
rng = np.random.default_rng()
|
||||
|
||||
def guess_sine_parameters(samples, fft=None, fft_freqs=None, guess=[None,None,None,None]):
|
||||
"""Use crude methods to guess the parameters to a sine wave
|
||||
from properties of both samples and their fourier transform.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
samples - arraylike
|
||||
|
||||
guess - arraylike or float or None
|
||||
If float, this is interpreted as a frequency
|
||||
Order of parameters: [amplitude, frequency, phase, offset]
|
||||
If one parameter is None, it is filled with an approximate value if available.
|
||||
|
||||
Returns:
|
||||
-----------
|
||||
guess - arraylike
|
||||
An updated version of init_guess: [amplitude, frequency, phase, offset]
|
||||
"""
|
||||
|
||||
if not hasattr(guess, '__len__'):
|
||||
# interpret as a frequency (might still be None)
|
||||
guess = [None, guess, None, None]
|
||||
|
||||
assert len(guess) == 4, "Wrong length for initial guess (should be 4)"
|
||||
|
||||
nearest_f, nearest_phase = None, None
|
||||
if fft is not None and (guess[1] is None or guess[2] is None):
|
||||
nearest_idx = None
|
||||
if guess[1] is not None:
|
||||
if fft_freqs is not None:
|
||||
nearest_idx = find_nearest(guess[1], fft_freqs)
|
||||
else:
|
||||
# We'll take the strongest peak by default
|
||||
if fft is not None:
|
||||
nearest_idx = np.argmax(fft*2)
|
||||
|
||||
if nearest_idx is not None:
|
||||
if fft_freqs is not None:
|
||||
nearest_f = fft_freqs[nearest_idx]
|
||||
|
||||
nearest_phase = np.angle(fft[nearest_idx])
|
||||
|
||||
for i in range(4):
|
||||
if guess[i] is not None:
|
||||
continue
|
||||
|
||||
if i == 0: # amplitude
|
||||
guess[i] = np.std(samples) * (2 ** 1/2)
|
||||
elif i == 1: # frequency
|
||||
guess[i] = nearest_f
|
||||
elif i == 2: # phase
|
||||
guess[i] = nearest_phase
|
||||
elif i == 3: # offset samples
|
||||
guess[i] = np.mean(samples)
|
||||
|
||||
return guess
|
||||
|
||||
def curvefit_sine(time, samples, init_guess, fitfunc=sine_fitfunc, bounds=(-np.inf, np.inf), **curve_kwargs):
|
||||
"""Fit a sine to samples with a supplied initial guess.
|
||||
|
||||
This function sets bounds for the phase.
|
||||
"""
|
||||
if bounds is None or bounds == (-np.inf, np.inf):
|
||||
high_bounds = np.array([np.inf, np.inf, +1*np.pi, np.inf])
|
||||
low_bounds = -1*high_bounds
|
||||
|
||||
bounds = (low_bounds, high_bounds)
|
||||
|
||||
return opt.curve_fit(fitfunc, time, samples, p0=init_guess, bounds=bounds, **curve_kwargs)
|
||||
|
||||
def fit_sine_to_samples(time, samples, samplerate=1, bandpass=None, guess=[None,None,None,None], fitfunc=sine_fitfunc, fft=None, freqs=None,**curve_kwargs):
|
||||
if bandpass is not None or guess[1] is None or guess[2] is None:
|
||||
if fft is None:
|
||||
fft = ft.rfft(samples)
|
||||
if freqs is None:
|
||||
freqs = ft.rfftfreq(samples.size, 1/samplerate)
|
||||
|
||||
if bandpass:
|
||||
fft[(freqs < bandpass[0]) | (freqs > bandpass[1])] = 0
|
||||
samples = ft.irfft(fft, samples.size)
|
||||
|
||||
old_guess = guess.copy()
|
||||
guess = guess_sine_parameters(samples, fft=fft, fft_freqs=freqs, guess=guess)
|
||||
|
||||
try:
|
||||
fit = curvefit_sine(time, samples, guess, fitfunc=fitfunc, **curve_kwargs)
|
||||
except RuntimeError:
|
||||
fit = None
|
||||
|
||||
return fit, guess, (fft, freqs, samples)
|
||||
|
||||
def chi_sq(observed, expected):
|
||||
"""
|
||||
Simple \Chi^2 test
|
||||
"""
|
||||
return np.sum( (observed-expected)**2 / expected)
|
||||
|
||||
def dof(observed, n_parameters=1):
|
||||
return len(observed) - n_parameters
|
||||
|
||||
def simulate_noisy_sine_fitting_SNR_and_residuals(
|
||||
N=1, snr_band=passband(), noise_band=passband(),
|
||||
t_length=1e-6, f_sample=250e6,
|
||||
noise_sigma=1, init_params=[1, 50e6, None, 0],
|
||||
show_original_signal_figure=True, show_bandpassed_signal_figure=True
|
||||
):
|
||||
residuals = np.empty( (int(N), len(init_params)) )
|
||||
real_snrs = np.empty( (int(N)) )
|
||||
|
||||
axs1, axs2 = None, None
|
||||
for j, _ in enumerate(residuals):
|
||||
|
||||
if j % 500 == 0:
|
||||
print("Iteration {} running".format(j))
|
||||
|
||||
# set random phase
|
||||
init_params[2] = 2*np.pi*rng.random()
|
||||
|
||||
samples = sine_fitfunc(time, *init_params)
|
||||
noise = None
|
||||
if noise_sigma: # noise
|
||||
noise = rng.normal(0,noise_sigma, size=(len(samples)))
|
||||
|
||||
real_snrs[j] = signal_to_noise(samples, noise, signal_band=snr_band, samplerate=f_sample, noise_band=noise_band)
|
||||
|
||||
# plot original
|
||||
if show_original_signal_figure and (j==0 or N == 1):
|
||||
fig, axs1 = plot_signal_and_spectrum(
|
||||
samples+noise, f_sample, "Original",
|
||||
freq_unit='MHz', freq_scaler=freq_scaler
|
||||
)
|
||||
for ax in axs1[[1,2]]:
|
||||
ax.axvline(f_sine/freq_scaler, color='r', alpha=0.4) # f_beacon
|
||||
ax.axvspan(snr_band[0]/freq_scaler,snr_band[1]/freq_scaler, color='purple', alpha=0.3, label='signalband') # snr
|
||||
ax.axvspan(noise_band[0]/freq_scaler, noise_band[1]/freq_scaler, color='orange', alpha=0.3, label='noiseband') # noise_band
|
||||
|
||||
# indicate initial phase
|
||||
axs1[2].axhline(init_params[2], color='r', alpha=0.4)
|
||||
|
||||
axs1[1].legend()
|
||||
|
||||
fit, guess, (fft, freqs, bandpassed) = fit_sine_to_samples(time, samples+noise, f_sample, guess=[None,f_sine,None,None], bandpass=snr_band)
|
||||
|
||||
if fit is None:
|
||||
residuals[j] = np.nan
|
||||
continue
|
||||
|
||||
residuals[j] = normalise_sine_params(init_params - fit[0])
|
||||
|
||||
if show_bandpassed_signal_figure and (j==0 or N == 1):
|
||||
fitted_sine = sine_fitfunc(time, *fit[0])
|
||||
|
||||
fig2, axs2 = plot_signal_and_spectrum(
|
||||
bandpassed, f_sample, "Bandpassed samples\nS/N:{:.2e}".format(real_snrs[j]),
|
||||
freq_unit='MHz', freq_scaler=freq_scaler
|
||||
)
|
||||
for ax in axs2[[1,2]]:
|
||||
ax.axvline(f_sine/freq_scaler, color='r', alpha=0.4) # f_beacon
|
||||
ax.axvspan(snr_band[0]/freq_scaler,snr_band[1]/freq_scaler, color='purple', alpha=0.3, label='signalband') # snr
|
||||
ax.axvspan(noise_band[0]/freq_scaler, noise_band[1]/freq_scaler, color='orange', alpha=0.3, label='noiseband') # noise_band
|
||||
|
||||
l = axs2[0].plot(time, fitted_sine, label='fit')
|
||||
axs2[0].text(1, 1, '$\chi/d.o.f. = {:.2e}/{:.2e}$'.format(chi_sq(fitted_sine, samples), dof(samples,4)), transform=axs2[0].transAxes, ha='right', va='top')
|
||||
|
||||
# indicate initial phase
|
||||
axs2[2].axhline(init_params[2], color='r', alpha=0.4)
|
||||
axs2[2].axhline(init_params[2], color=l[0].get_color(), alpha=0.4)
|
||||
|
||||
axs2[0].legend(loc='upper left')
|
||||
axs2[1].legend()
|
||||
|
||||
print("init:", init_params)
|
||||
print("fit :", fit[0])
|
||||
print("res :", residuals[j])
|
||||
|
||||
return residuals, real_snrs, (axs1, axs2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from argparse import ArgumentParser
|
||||
from myscriptlib import save_all_figs_to_path_or_show
|
||||
|
||||
rng = np.random.default_rng(1)
|
||||
|
||||
parser = ArgumentParser(description=__doc__)
|
||||
parser.add_argument("fname", metavar="path/to/figure[/]", nargs="?", help="Location for generated figure, will append __file__ if a directory. If not supplied, figure is shown.")
|
||||
parser.add_argument("-n", "--n-rand", dest='N', default=1, type=int, nargs='?', help='Number of random sines to fit')
|
||||
args = parser.parse_args()
|
||||
default_extensions = ['.pdf', '.png']
|
||||
|
||||
if args.fname == 'none':
|
||||
args.fname = None
|
||||
|
||||
report_N_nan = True
|
||||
|
||||
f_sine = 53.123456 # MHz
|
||||
sine_amplitude = 0.2
|
||||
sine_offset = 0
|
||||
init_params = np.array([sine_amplitude, f_sine, None, sine_offset])
|
||||
|
||||
N = int(args.N)
|
||||
f_sample = 250 # MHz
|
||||
t_length = 10 # us
|
||||
noise_sigma = 1
|
||||
|
||||
f_delta = 1/t_length
|
||||
noise_band = (30,80) # MHz
|
||||
snr_band = (f_sine -2*f_delta, f_sine + 2*f_delta)
|
||||
|
||||
time = sampled_time(f_sample, end=t_length)
|
||||
|
||||
freq_scaler = 1
|
||||
|
||||
###### End of inputs
|
||||
|
||||
residuals, real_snrs, _ = simulate_noisy_sine_fitting_SNR_and_residuals(N=N, snr_band=snr_band, noise_band=noise_band, t_length=t_length, f_sample=f_sample, noise_sigma=noise_sigma, init_params=init_params)
|
||||
|
||||
# Filter NaNs from fit attempts that failed
|
||||
nan_mask = ~np.isnan(residuals).any(axis=1)
|
||||
if report_N_nan:
|
||||
## report how many NaNs were found
|
||||
print("NaNs: {}/{}".format(np.count_nonzero(~nan_mask), len(real_snrs)))
|
||||
|
||||
residuals = residuals[ nan_mask ]
|
||||
real_snrs = real_snrs [ nan_mask ]
|
||||
|
||||
## Plot Signal-to-Noise vs Residuals of the fit paramters
|
||||
fig, axs = plt.subplots(1,4, sharey=True)
|
||||
fig.suptitle("S/N vs Residuals, S/N Band ({:.2e},{:.2e})MHz".format(snr_band[0]/freq_scaler, snr_band[-1]/freq_scaler))
|
||||
axs[0].set_ylabel("S/N")
|
||||
for i in range(len(init_params)):
|
||||
unit_scaler = [1, 1][i==1]
|
||||
unit_string = ['', '[MHz]'][i==1]
|
||||
xlabel = ["Amplitude", "Frequency", "Phase", "Offset"][i]
|
||||
|
||||
axs[i].set_xlabel(xlabel + unit_string)
|
||||
axs[i].plot(residuals[:,i]/unit_scaler, real_snrs, ls='none', marker='o')
|
||||
|
||||
## Plot Histograms of the Residuals
|
||||
if True and N > 1:
|
||||
for j in range(len(init_params)):
|
||||
if j == 0 or j == 3:
|
||||
continue
|
||||
|
||||
unit_scaler = [1, freq_scaler][j==1]
|
||||
unit_string = ['', '[MHz]'][j==1]
|
||||
xlabel = ["Amplitude", "Frequency", "Phase", "Offset"][j]
|
||||
|
||||
title = xlabel + " residuals"
|
||||
title += "\n"
|
||||
title += "f: {:.2e}MHz, amp/sigma: {:.2e}".format(f_sine/freq_scaler, sine_amplitude/noise_sigma)
|
||||
if noise_band:
|
||||
title += " Band ({:.2e},{:.2e})MHz".format(noise_band[0]/freq_scaler, noise_band[1]/freq_scaler)
|
||||
|
||||
fig, ax = plt.subplots()
|
||||
ax.set_title(title)
|
||||
ax.hist(residuals[:,j]/unit_scaler, density=False, histtype='step', bins='sqrt')
|
||||
ax.set_xlabel(xlabel + unit_string)
|
||||
ax.set_ylabel("Counts")
|
||||
|
||||
# make it symmetric around 0
|
||||
xmax = max(*ax.get_xlim())
|
||||
ax.set_xlim(-xmax, xmax)
|
||||
|
||||
if j == 2: # Phase
|
||||
xmin, xmax = ax.get_xlim()
|
||||
maj_div = max(1, 2**np.ceil(np.log2(np.pi/(xmax-xmin)) + 1 ))
|
||||
min_div = maj_div*12
|
||||
|
||||
axis_pi_ticker(ax.xaxis, major_divider=maj_div, minor_divider=min_div)
|
||||
|
||||
# Plot histogram between phase and frequency
|
||||
if True and N > 10:
|
||||
fig, ax = plt.subplots()
|
||||
title = "Residuals\n"
|
||||
title += "f: {:.2e}MHz, amp/sigma: {:.2e}".format(f_sine/freq_scaler, sine_amplitude/noise_sigma)
|
||||
if noise_band:
|
||||
title += "\n Band ({},{})MHz".format(noise_band[0]/freq_scaler, noise_band[1]/freq_scaler)
|
||||
title += ", N={:.1e}".format(N)
|
||||
ax.set_title(title)
|
||||
ax.set_xlabel('Frequency [MHz]')
|
||||
ax.set_ylabel('Phase')
|
||||
_, _, _, sc = ax.hist2d(residuals[:,1]/freq_scaler, residuals[:,2], bins=np.sqrt(len(residuals)))
|
||||
fig.colorbar(sc, ax=ax, label='Counts')
|
||||
|
||||
#ax.set_xlim(-np.pi, np.pi)
|
||||
axis_pi_ticker(ax.yaxis)
|
||||
ax.set_ylim(-np.pi, np.pi)
|
||||
|
||||
## Save or show figures
|
||||
save_all_figs_to_path_or_show(args.fname, default_basename=__file__, default_extensions=default_extensions)
|
|
@ -66,9 +66,9 @@ def bandpower(samples, samplerate=1, band=passband(), normalise_bandsize=True, *
|
|||
|
||||
return power/bins
|
||||
|
||||
def signal_to_noise( samplerate, samples, noise, signal_band, noise_band=None):
|
||||
def signal_to_noise(samples, noise, samplerate=1, signal_band=passband(), noise_band=None):
|
||||
if noise_band is None:
|
||||
noise_band = sample_band
|
||||
noise_band = signal_band
|
||||
|
||||
if noise is None:
|
||||
noise = samples
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
"""
|
||||
Functions to simplify plotting of fourier spectra
|
||||
"""
|
||||
# vim: fdm=indent ts=4
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.gridspec as gridspec
|
||||
import numpy as np
|
||||
|
||||
from .fft import ft_spectrum
|
||||
|
||||
def plot_spectrum(
|
||||
spectrum, freqs,
|
||||
plot_complex=False, plot_power=False, plot_amplitude=None,
|
||||
|
@ -49,6 +53,7 @@ def plot_phase(
|
|||
spectrum, freqs,
|
||||
ylim_epsilon=0.5, ax=None, grid=True,
|
||||
freq_unit="Hz", freq_scaler=1, xlabel='Frequency',
|
||||
major_divider=2, minor_divider=12,
|
||||
**plot_kwargs
|
||||
):
|
||||
"""
|
||||
|
@ -65,14 +70,21 @@ def plot_phase(
|
|||
ax.plot(freqs/freq_scaler, np.angle(spectrum), '.-', **plot_kwargs)
|
||||
ax.set_ylim(-1*np.pi - ylim_epsilon, np.pi + ylim_epsilon)
|
||||
|
||||
major_tick = Multiple(2, np.pi, '\pi')
|
||||
minor_tick = Multiple(12, np.pi, '\pi')
|
||||
ax.yaxis.set_major_locator(major_tick.locator())
|
||||
ax.yaxis.set_major_formatter(major_tick.formatter())
|
||||
ax.yaxis.set_minor_locator(minor_tick.locator())
|
||||
axis_pi_ticker(ax.yaxis, major_divider=major_divider, minor_divider=minor_divider)
|
||||
|
||||
return ax
|
||||
|
||||
def axis_pi_ticker(axis, major_divider=2, minor_divider=12):
|
||||
|
||||
major_tick = Multiple(major_divider, np.pi, '\pi')
|
||||
minor_tick = Multiple(minor_divider, np.pi, '\pi')
|
||||
|
||||
axis.set_major_locator(major_tick.locator())
|
||||
axis.set_major_formatter(major_tick.formatter())
|
||||
axis.set_minor_locator(minor_tick.locator())
|
||||
|
||||
return axis
|
||||
|
||||
def plot_signal(
|
||||
signal, sample_rate = 1,
|
||||
time=None, ax=None,
|
||||
|
@ -139,6 +151,43 @@ def plot_combined_spectrum(
|
|||
|
||||
return fig, axes
|
||||
|
||||
def plot_signal_and_spectrum(
|
||||
signal, sample_rate=1, title=None,
|
||||
signal_kwargs={}, ft_kwargs={},
|
||||
spectrum_kwargs={}, phase_kwargs={'major_divider':1, 'minor_divider': 6},
|
||||
**phase_spectrum_kwargs
|
||||
):
|
||||
"""
|
||||
Create a figure showing both the signal and the combined spectrum.
|
||||
"""
|
||||
fig = plt.figure(figsize=(16, 4))
|
||||
|
||||
if title:
|
||||
fig.suptitle(title)
|
||||
|
||||
# setup plot layout
|
||||
gs0 = gridspec.GridSpec(1, 2, figure=fig)
|
||||
gs00 = gs0[0].subgridspec(1, 1)
|
||||
gs01 = gs0[1].subgridspec(2, 1, height_ratios=[3,1], hspace=0)
|
||||
|
||||
# plot the signal
|
||||
ax1 = fig.add_subplot(gs00[0, 0])
|
||||
plot_signal(signal, sample_rate, ax=ax1, **signal_kwargs)
|
||||
|
||||
# plot spectrum
|
||||
signal_fft, freqs = ft_spectrum(signal, sample_rate, **ft_kwargs)
|
||||
_, (ax2, ax3) = plot_combined_spectrum(
|
||||
signal_fft, freqs,
|
||||
fig=fig, gs=gs01,
|
||||
spectrum_kwargs=spectrum_kwargs, phase_kwargs=phase_kwargs,
|
||||
**phase_spectrum_kwargs
|
||||
)
|
||||
|
||||
# return the axes
|
||||
axes = np.array([ax1, ax2, ax3])
|
||||
|
||||
return fig, axes
|
||||
|
||||
def multiple_formatter(denominator=2, number=np.pi, latex='\pi'):
|
||||
"""
|
||||
From https://stackoverflow.com/a/53586826
|
||||
|
|
|
@ -13,13 +13,23 @@ def phasemod(phase, low=np.pi):
|
|||
"""
|
||||
return (phase + low) % (2*np.pi) - low
|
||||
|
||||
def sine_fitfunc(t, amp=1, freq=1, phase=0, off=0):
|
||||
# Alias phase_mod to phasemod
|
||||
phase_mod = phasemod
|
||||
|
||||
def sine_fitfunc(t, amp=1, freq=1, phase=0, off=0, t_delay=0):
|
||||
"""Simple sine wave for fitting purposes"""
|
||||
return amp*np.cos( 2*np.pi*freq*t + phase) + off
|
||||
return amp*np.cos( 2*np.pi*freq*(t-t_delay) + phase) + off
|
||||
|
||||
def sin_delay(f, t, phase=0):
|
||||
return sine_fitfunc(t, amp=1, freq=f, phase=phase, off=1, t_delay=0)
|
||||
|
||||
def sampled_time(sample_rate=1, start=0, end=1, offset=0):
|
||||
return offset + np.arange(start, end, 1/sample_rate)
|
||||
|
||||
def normalise_sine_params(params):
|
||||
params[2] = phase_mod(params[2])
|
||||
return params
|
||||
|
||||
def noisy_sine_sampling(time, init_params, noise_sigma=1, rng=rng):
|
||||
if init_params[2] is None:
|
||||
init_params[2] = phasemod(2*np.pi*rng.random())
|
||||
|
@ -29,3 +39,15 @@ def noisy_sine_sampling(time, init_params, noise_sigma=1, rng=rng):
|
|||
|
||||
return samples, noise
|
||||
|
||||
# Alias noisy_sine
|
||||
noisy_sine = noisy_sine_sampling
|
||||
|
||||
def find_nearest(value, array, return_idx=True):
|
||||
array = np.asarray(array)
|
||||
idx = (np.abs(array - value)).argmin()
|
||||
|
||||
if return_idx:
|
||||
return idx
|
||||
else:
|
||||
return array[idx]
|
||||
|
||||
|
|
Loading…
Reference in a new issue