diff --git a/fourier/06_direct_fourier.py b/fourier/06_direct_fourier.py new file mode 100644 index 0000000..d29c758 --- /dev/null +++ b/fourier/06_direct_fourier.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +if __name__ == "__main__": + import numpy as np + import matplotlib.pyplot as plt + + from mylib.fft import direct_fourier_transform + + n_samples = 1000 + nphi = 100 + f_beacon = 2.3434 + noise_A = 5 + + t = np.linspace(0, 100, n_samples+1) + + phi_in = np.linspace(0, 2*np.pi, nphi) + test_freqs = f_beacon + np.linspace(-0.1, 0.1, 300+1) + + + phi_out = np.zeros( (len(phi_in), len(test_freqs)) ) + amp_out = np.zeros( (len(phi_in), len(test_freqs)) ) + + if not True: + # Same length samples + # means we can precalculate the c_k and s_k terms + c_k, s_k = ft_corr_vectors(test_freqs, t) + + for i, phi in enumerate(phi_in): + s = np.sin(2*np.pi*t*f_beacon + phi) + noise_A * np.random.normal(size=len(t)) + + real = np.dot(c_k, s) + imag = np.dot(s_k, s) + + phi_out[i] = (np.arctan2(real, imag)) + amp_out[i] = (2/len(t) * (real**2 + imag**2)**0.5) + else: + sampleset_gen = ( np.sin(2*np.pi*t*f_beacon + phi) + noise_A*np.random.normal(size=len(t)) for phi in phi_in ) + ft_amp_gen = direct_fourier_transform(test_freqs, t, sampleset_gen) + + for i, ft_amp in enumerate(ft_amp_gen): + real = ft_amp[0] + imag = ft_amp[1] + + phi_out[i] = np.arctan2(real, imag) + amp_out[i] = 2/len(t) * (real**2 + imag**2)**0.5 + + ###### + # Figures + ###### + import matplotlib.colors as colors + cmap = plt.cm.plasma + freq_norm = colors.Normalize(vmin=np.amin(test_freqs), vmax=np.amax(test_freqs)) + freq_cmap = cmap(freq_norm(test_freqs)) + try: + # Amplitudes Histogram + if False: + fig = plt.figure() + ax = fig.add_subplot(projection='3d') + ax.set_xlabel("Amplitude") + ax.set_ylabel("Frequency") + ax.set_zlabel("Count") + + for j, amp in enumerate(amp_out.T): + # per test_freq + counts, edges = np.histogram(amp, bins='auto', ) + l = ax.plot(edges[:-1], counts, zs=test_freqs[j], zdir='y', color=freq_cmap[j]) + + ax.add_collection3d(plt.fill_between(edges[:-1], 0, counts, color=l[0].get_color(), alpha=0.3), zs=test_freqs[j], zdir='y') + + + ax.view_init(elev=20., azim=-35) + elif False: + fig, ax = plt.subplots() + ax.set_xlabel("Amplitude") + ax.set_ylabel("Count") + # + for j, amp in enumerate(amp_out.T): + # per test_freq + ax.hist(amp, histtype='step', bins='auto', color=freq_cmap[j]) + + # Single Amplitude / Frequency plot showing frequency fitting + freq_out = None + if True: + from numpy.polynomial import Polynomial + + freq_out = np.zeros(len(phi_in)) + amp_cut = 0.8 + + fig, ax = plt.subplots() + ax.set_title("Frequency estimation by parabola fitting.") + ax.set_xlabel("Frequency") + ax.set_ylabel("Amplitude") + + for j, amp in enumerate(amp_out): + + if j > 2: + continue + + max_amp_idx = np.argmax(amp) + max_amp = amp[max_amp_idx] + # filter amplitudes below amp_cut*max_amp + valid_idx = amp >= amp_cut*max_amp + + p_fit = Polynomial.fit(test_freqs[valid_idx], amp[valid_idx], 2) + func = p_fit.convert() + + tmp_test_freqs = test_freqs[max_amp_idx] + 0.05*np.linspace(-1,1,101, endpoint=True) + func_amps = func(tmp_test_freqs) + freq_id = np.argmax(func_amps) + + freq_out[j] = tmp_test_freqs[freq_id] + + # plot tmp_test_freqs and freq_id + func_amps_idx = func_amps > 0 + func_amps = func_amps[func_amps_idx] + tmp_test_freqs = tmp_test_freqs[func_amps_idx] + + ax.plot(test_freqs, amp, marker='o') + l = ax.plot(tmp_test_freqs, func_amps) + ax.axvline(freq_out[j], color=l[0].get_color()) + + # Amplitudes figure + if True: + fig, ax = plt.subplots() + ax.set_ylabel("Amplitude") + ax.set_xlabel("Frequency") + if True: + for j, amp in enumerate(amp_out.T): + #per test_freq + ax.plot(np.tile(test_freqs[j], len(amp)), amp, marker='.', color=freq_cmap[j], alpha=max(0.05, 0)) + + + ax.errorbar(test_freqs, np.mean(amp_out, axis=0), yerr=np.std(amp_out, axis=0)) + ax.plot(test_freqs, np.mean(amp_out, axis=0), marker='*', color='red', zorder=6) + + # Phase in vs. Phase out figure + if True: + fig, ax = plt.subplots() + amp_cut = 0.8 + + ax.set_title(f"Measured phases passing amplitude > {amp_cut}") + ax.set_xlabel("Phase in") + ax.set_ylabel("Phase out") + + for j, phi in enumerate(phi_out.T): + if np.mean(amp_out[:,j]) < amp_cut: + # ignore when the amplitudes are not close to 1 + continue + + # per test_freq + ax.plot(phi_in, np.unwrap(phi), label='f-test_f:{:.2e}'.format(f_beacon-test_freqs[j]))#, color=freq_cmap[j]) + + ax.legend() + finally: + plt.show() diff --git a/fourier/mylib/fft.py b/fourier/mylib/fft.py index a75af8e..55165a8 100644 --- a/fourier/mylib/fft.py +++ b/fourier/mylib/fft.py @@ -42,3 +42,44 @@ def ft_spectrum( signal, sample_rate=1, ftfunc=None, freqfunc=None, mask_bias=Fa else: return spectrum[1:], freqs[1:] +def ft_corr_vectors(freqs, time): + """ + Get the cosine and sine terms for freqs at time. + + Takes the outer product of freqs and time. + """ + freqtime = np.outer(freqs, time) + + c_k = np.cos(2*np.pi*freqtime) + s_k = np.sin(2*np.pi*freqtime) + + return c_k, s_k + +def direct_fourier_transform(freqs, time, samplesets_iterable): + """ + Determine the fourier transform of each sampleset in samplesets_iterable at freqs. + + The samplesets are expected to have the same time vector. + + Returns either a generator to return the fourier transform for each sampleset + if samplesets_iterable is a generator + or a numpy array. + """ + + c_k, s_k = ft_corr_vectors(freqs, time) + + if not hasattr(samplesets_iterable, '__len__') and hasattr(samplesets_iterable, '__iter__'): + # samplesets_iterable is an iterator + # return an iterator containing (real, imag) amplitudes + return ( (np.dot(c_k, samples), np.dot(s_k, samples)) for samples in samplesets_iterable ) + + # Numpy array + return np.dot(c_k, samplesets_iterable), np.dot(s_k, samplesets_iterable) + + + +def discrete_fourier_properties(samples, samplerate): + """ + Return f_delta and f_nyquist. + """ + return (samplerate/(len(samples)), samplerate/2)