Window function and amplitude correction¶
[1]:
from radiocalibrationtoolkit import *
from scipy.signal import butter, lfilter
from scipy.signal import blackmanharris, boxcar, hann
from scipy.fft import rfft
[INFO] LFmap: Import successful.
[2]:
# some global plot settings
plt.rcParams["axes.labelweight"] = "bold"
plt.rcParams["font.weight"] = "bold"
plt.rcParams["font.size"] = 16
plt.rcParams["legend.fontsize"] = 12
plt.rcParams["xtick.major.width"] = 2
plt.rcParams["ytick.major.width"] = 2
plt.rcParams["xtick.major.size"] = 5
plt.rcParams["ytick.major.size"] = 5
plt.rcParams["xtick.labelsize"] = 14
plt.rcParams["ytick.labelsize"] = 14
Examine effects on a single trace¶
[3]:
N = 2048 # samples
fs_Hz = 250e6 # sampling frequency
signal_freq_Hz = 55.5e6 # signal
win = blackmanharris(N) # window function type
[4]:
tx = np.arange(N) / fs_Hz
def make_simple_time_trace(signal_amplitude=0.7, signal_frequency=55.5e+6, noise_amplitude=1, N=2048, apply_filter=True):
time_trace = signal_amplitude * np.sin(2*np.pi*signal_frequency*tx) + noise_amplitude * np.random.normal(size=N)
b, a = butter(5, [30e6, 80e6], fs=250e6, btype='band')
if apply_filter:
time_trace = lfilter(b, a, time_trace)
return np.round(time_trace * N) / N
[5]:
tt = make_simple_time_trace(signal_amplitude=1)
fx = np.fft.rfftfreq(N, 1/fs_Hz)*1e-6
fig, ax = plt.subplots(2,1, figsize=(6,6))
labels = ['no windows', 'hann', 'blackman-harris']
for i, w in enumerate([1, hann(N), blackmanharris(N)]):
ax[0].plot(tx*1e+6, tt*w)
ax[1].plot(fx, 10*np.log10(np.abs(np.fft.rfft(tt*w))), label=labels[i])
fig.subplots_adjust(hspace=0.5)
ax[1].legend()
ax[0].set_xlabel("$\mu$s")
ax[1].set_xlabel("frequency [MHz]")
ax[0].set_ylabel("amplitude [ADC]")
ax[1].set_ylabel("amplitude [ADC]")
[5]:
Text(0, 0.5, 'amplitude [ADC]')
[6]:
# generate time trace
time_trace = make_simple_time_trace()
# calculate two sided spectrum
spectrum = np.abs(fft(time_trace))
# calculate one sided spectrum (not corrected for the one side)
rspectrum = np.abs(np.fft.rfft(time_trace))
# calculate one sided spectrum (corrected for the one side)
r2spectrum = correct_energy_of_one_sided_spectrum(rspectrum)
# calculate amplitude window function correction
Aw = N/np.sum(win) /np.sqrt(2)
# calculate two sided spectrum using a window function (not corrected for the one side)
spectrum_w = np.abs(fft(time_trace*win))
# calculate one sided spectrum using a window function (not corrected for the one side)
rspectrum_w = np.abs(np.fft.rfft(time_trace*win))
# calculate one sided spectrum (corrected for the one side)
r2spectrum = correct_energy_of_one_sided_spectrum(rspectrum)
r2spectrum_w = correct_energy_of_one_sided_spectrum(rspectrum_w)
# define X-axis values
fx = np.fft.rfftfreq(N, 1/fs_Hz)/1e+6
[7]:
# calculate energy from time trace, one and two sided spectrum with and without window
# when the window is used, the amplitudes are corrected
print(
f"Energy calculated from:\n"
f"time trace = {get_energy_from_time_trace(time_trace):.2f},\n"
f"two sided spectrum = {get_energy_from_two_sided_spectrum(spectrum):.2f},\n"
f"one sided spectrum = {get_energy_from_one_sided_spectrum(rspectrum):.2f},\n"
f"windowed two sided spectrum = {get_energy_from_two_sided_spectrum(spectrum_w):.2f},\n"
f"one sided spectrum corrected for being the one sided spectrum = {get_energy_from_one_sided_spectrum_corrected4one_side(r2spectrum):.2f},\n"
f"windowed two sided spectrum corrected by window function loss = {get_energy_from_two_sided_spectrum(spectrum_w * Aw):.2f},\n"
f"windowed one sided spectrum corrected by window function loss = {get_energy_from_one_sided_spectrum(rspectrum_w * Aw):.2f},\n"
f"one sided spectrum corrected for being the one sided spectrum and for window function loss = {get_energy_from_one_sided_spectrum_corrected4one_side(r2spectrum_w*Aw):.2f},\n"
)
Energy calculated from:
time trace = 1332.59,
two sided spectrum = 1332.59,
one sided spectrum = 1332.59,
windowed two sided spectrum = 337.56,
one sided spectrum corrected for being the one sided spectrum = 1332.59,
windowed two sided spectrum corrected by window function loss = 1312.67,
windowed one sided spectrum corrected by window function loss = 1312.67,
one sided spectrum corrected for being the one sided spectrum and for window function loss = 1312.67,
Trace with a broad band pulse¶
[8]:
# create a broad band noise trace
bb_spec=np.zeros(65)
bb_spec[26:33] = 200
std_dev = 10 # Standard deviation
# Randomize the array values using a normal distribution
bb_spec = bb_spec + std_dev * np.random.randn(len(bb_spec))
bb_spec = np.abs(bb_spec)
bb_tt = np.real(ifft(one_sided_2_complex_two_sided(bb_spec)))
fx_bb = np.arange(0, bb_spec.size)/(2*(bb_spec.size-1)/250)
tx_bb = np.arange(bb_tt.size)/250
fig, ax = plt.subplots(2,1, figsize=(6,6))
ax[0].set_xlabel("frequency [MHz]")
ax[0].set_ylabel("amplitude [ADC]")
ax[1].set_ylabel("amplitude [ADC]")
ax[1].set_xlabel("$\mu$s")
ax[0].plot(fx_bb, bb_spec)
ax[1].plot(tx_bb, bb_tt)
fig.subplots_adjust(hspace=0.35)
fig, ax = plt.subplots()
ax.plot(np.abs(rfft(bb_tt)))
[8]:
[<matplotlib.lines.Line2D at 0x7fb9f0954190>]
[9]:
# add broad band pulse to the time trace
time_trace_e = time_trace.copy()
time_trace_e[np.arange(1024, 1024+bb_tt.size)] = time_trace_e[np.arange(1024, 1024+bb_tt.size)] + bb_tt
rspectrum_e = np.abs(np.fft.rfft(time_trace_e))
rspectrum_w_e = np.abs(np.fft.rfft(time_trace_e*win))
[10]:
def show_plots(time_trace, show_window_func=True):
rspectrum = np.abs(np.fft.rfft(time_trace))
rspectrum_w = np.abs(np.fft.rfft(time_trace * win))
Aw = N / np.sum(win) / np.sqrt(2)
e_tt = get_energy_from_time_trace(time_trace)
e_spec = get_energy_from_one_sided_spectrum(rspectrum)
e_spec_w = get_energy_from_one_sided_spectrum(rspectrum_w * Aw)
fig, ax = plt.subplots(2, 1, figsize=(6, 6))
mu = 1e6
ax[0].plot(
tx * mu, time_trace, label="no window: E={:.1f} a.u.".format(e_tt), alpha=0.7
)
ax[0].plot(tx * mu, time_trace * win, label="windowed", alpha=0.7, c='r')
if show_window_func:
ax[0].plot(tx * mu, win * 10, label="window func. x10", alpha=1, c='g', lw=2)
ax[1].plot(
fx , rspectrum, label="no window: E={:.1f} a.u.".format(e_spec), alpha=0.7
)
ax[1].plot(
fx ,
rspectrum_w * Aw,
label="windowed & A$_w$: E={:.1f} a.u.".format(e_spec_w),
alpha=0.7,
c='r'
)
display(
get_energy_from_time_trace(time_trace),
get_energy_from_one_sided_spectrum(rspectrum),
get_energy_from_one_sided_spectrum(rspectrum_w * Aw),
)
ax[0].set_xlabel("$\mu$s")
ax[1].set_xlabel("frequency [MHz]")
ax[0].set_ylabel("amplitude [ADC]")
ax[1].set_ylabel("amplitude [ADC]")
ax[0].legend()
ax[1].legend()
ylims = ax[0].get_ylim()
ax[0].set_ylim(-abs(np.max(ylims)*2), abs(np.max(ylims))*1.5)
ylims = ax[1].get_ylim()
ax[1].set_ylim(ylims[0], ylims[1]*1.4)
ax[1].set_xlim(20, 90)
fig.subplots_adjust(hspace=0.3)
[11]:
# trace without the broad band pulse
show_plots(time_trace, show_window_func=False)
1332.5858173370361
1332.5858173370361
1312.674415864144
[12]:
# trace with the broad band pulse
time_trace_e = time_trace.copy()
time_trace_e[np.arange(1024, 1024+bb_tt.size)] = time_trace_e[np.arange(1024, 1024+bb_tt.size)] + bb_tt
show_plots(time_trace_e)
5709.23198318932
5709.23198318932
16995.217867776955
[13]:
# rolled trace to shift the broad band pulse
show_plots(np.roll(time_trace_e, 500))
5709.23198318932
5709.23198318932
1735.5628301299691
The learning here is that a amplitudes of a trace with broad band pulse cannot be safely corrected after the window function because the window function is symmetric and the broad band pulse appearing in random parts of the trace will be each time differently supressed by the window function.
Note that the energy of the trace is after the rolling of the trace still the same when no window is used.
Distributions of the energy ratios of not windowed spectra and windowed spectra with amplitude correction¶
[14]:
def get_averaged_spectra_and_diffs(arr):
diffs = np.array([])
n, N = arr.shape
avr_rspectrum = 0
avr_rspectrum_w = 0
win = blackmanharris(N)
Aw = N / np.sum(win) / np.sqrt(2)
for i in range(n):
time_trace = arr[i, :]
rspectrum = np.abs(np.fft.rfft(time_trace))
rspectrum_w = np.abs(np.fft.rfft(time_trace * win))
diffs = np.append(
diffs,
get_energy_from_time_trace(time_trace)
/ get_energy_from_one_sided_spectrum(rspectrum_w * Aw),
)
avr_rspectrum += rspectrum
avr_rspectrum_w += rspectrum_w
avr_rspectrum /= n
avr_rspectrum_w /= n
return avr_rspectrum, avr_rspectrum_w, diffs
def show_results(
avr_rspectrum,
avr_rspectrum_w,
diffs,
histo_edge=2,
N=2048,
bins=None,
xax_min=None,
xax_max=None,
xlim= [None, None]
):
fig, ax = plt.subplots()
fx = np.fft.rfftfreq(N, 1 / fs_Hz) / 1e6
if bins == None:
bins = linspace_with_middle_value(
np.mean(diffs), histo_edge * np.std(diffs), 20
)
ax.hist(diffs, bins=bins)
# Calculate mean and standard deviation
mean_diff = np.mean(diffs)
std_diff = np.std(diffs)
print(mean_diff)
print(std_diff)
# Add text box with mean and standard deviation
text_box = f"$\mu$: {mean_diff:.2f}\n$\sigma$: {std_diff:.2f}"
ax.text(
0.95,
0.95,
text_box,
transform=ax.transAxes,
verticalalignment="top",
horizontalalignment="right",
bbox=dict(boxstyle="round", facecolor="white", alpha=0.5),
)
ax.set_xlim(xax_min, xax_max)
ax.set_xlabel(
r"$\frac{\mathrm{energy: \ spectrum \ with \ no\ window}}{\mathrm{energy:spectrum \ with\ window,\ amplitudes\ corrected}}$"
)
ax.set_ylabel("entries")
fig, ax = plt.subplots()
ax.plot(fx, voltageAmp2dB(avr_rspectrum), alpha=0.7, label="no window")
ax.plot(fx, voltageAmp2dB(avr_rspectrum_w), alpha=0.7, label="with window", c='g')
ax.plot(
fx,
voltageAmp2dB(avr_rspectrum_w * Aw),
alpha=0.7,
label="with window " + "\n" + "and corrected", c='r'
)
# ax.axes.axvspan(30,80, color='b', alpha=0.1)
ax.set_xlim(*xlim)
autoscale_y(ax)
ax.set_xlabel("frequency [MHz]")
ax.set_ylabel("dB")
ax.legend()
Simple traces¶
[15]:
# create a set of 1000 time traces
n = 1000
avr_rspectrum = 0
avr_rspectrum_w = 0
time_traces = make_simple_time_trace()
for i in range(n-1):
time_traces = np.vstack((time_traces, make_simple_time_trace()))
[16]:
# calculate energy ratios of unwindowed spectra and windowed with amplitude correction
# and average spectra
avr_rspectrum, avr_rspectrum_w, diffs = get_averaged_spectra_and_diffs(time_traces)
diffs1 = diffs
label4final_histo = ['simple traces']
[17]:
show_results(avr_rspectrum, avr_rspectrum_w, diffs, histo_edge=3, xlim=[45, 65])
1.0018217390344406
0.060064483377693656
[18]:
# create a set of 1000 time traces with bb pulses
n = 1000
avr_rspectrum = 0
avr_rspectrum_w = 0
time_traces = make_simple_time_trace()
for i in range(n-1):
time_trace = make_simple_time_trace()
time_trace[0:bb_tt.size] = time_trace[0:bb_tt.size] + bb_tt
time_trace = np.roll(time_trace_e, random.randint(0, 2048 - bb_tt.size) )
time_traces = np.vstack((time_traces, time_trace))
[19]:
avr_rspectrum, avr_rspectrum_w, diffs = get_averaged_spectra_and_diffs(time_traces)
[20]:
show_results(avr_rspectrum, avr_rspectrum_w, diffs, histo_edge=3, bins='auto')
# ,xax_min=-10, xax_max=80)
2.5347509214172477
1.5478178215950311
Mock traces¶
[21]:
# read HW response
hw_file_path = "./antenna_setup_files/HardwareProfileList_realistic.xml"
hw_dict = read_hw_file(hw_file_path, interp_args={"fill_value": "extrapolate"})
hw_reponse_1 = hw_dict["RResponse"]["LNA"]
hw_reponse_2 = hw_dict["RResponse"]["digitizer"]
hw_reponse_3 = hw_dict["RResponse"]["cable_fromLNA2digitizer"]
hw_reponse_4 = hw_dict["RResponse"]["impedance_matching_EW"]
# merge all hw responses to one function
def hw_response_func(x):
return dB2PowerAmp(
hw_reponse_1(x) + hw_reponse_2(x) + hw_reponse_3(x) + hw_reponse_4(x)
)
# impedance function
impedance_func = hw_dict["IImpedance"][
"antenna_EW"
]
# read sidereal voltage square spectral density
sidereal_voltage2_density_DF = pd.read_csv(
"./voltage2_density/voltage2_density_Salla_EW_GSM16.csv",
index_col=0,
)
sidereal_voltage2_density_DF.columns = sidereal_voltage2_density_DF.columns.astype(
float
)
<?xml version="1.0" encoding="iso-8859-1"?>
<Element HardwareProfileList at 0x7fb9f197eb40>
[22]:
mock_trace_generator = Mock_trace_generator(
sidereal_voltage2_density_DF=sidereal_voltage2_density_DF,
hw_response_func=hw_response_func,
impedance_func=impedance_func,
voltage2ADC=2048,
time_trace_size=2048,
sampling_frequency_MHz=250,
)
freq_MHz_bins = mock_trace_generator.get_frequency_bins()
[23]:
piko = 1e-12
additional_noise = 5e-4*piko
debug_spectra_dict = mock_trace_generator.generate_mock_trace(
1,
lst=15,
temp_celsius=30,
nbi={"67.2": 1},
nbi_err=0.2,
return_debug_dict=True,
additional_noise=additional_noise,
)[0]
100%|████████████████████████████████████████████| 1/1 [00:00<00:00, 302.14it/s]
[24]:
number_of_traces = 1000
mock_traces_DF = mock_trace_generator.generate_mock_trace(
number_of_traces,
temp_celsius=[-10,30],
additional_noise=additional_noise,
nbi={"67.25": 1},
nbi_err=0.3,
)
100%|███████████████████████████████████████| 1000/1000 [00:31<00:00, 31.53it/s]
[25]:
avr_rspectrum, avr_rspectrum_w, diffs = get_averaged_spectra_and_diffs(mock_traces_DF.iloc[:,2:].values)
diffs2 = diffs
label4final_histo.append('mock traces')
[26]:
show_results(avr_rspectrum, avr_rspectrum_w, diffs, histo_edge=3)
0.9997583861051457
0.04984380100134047
[27]:
mock_traces_with_BB_DF = mock_traces_DF.copy(deep=True)
for i in range(mock_traces_with_BB_DF.index.size):
start_bb_index = np.random.randint(0, 2048-40)
time_trace = mock_traces_with_BB_DF.iloc[i,2:].values
time_trace[0:bb_tt.size] = time_trace[0:bb_tt.size] + bb_tt
time_trace = np.roll(time_trace_e, random.randint(0, 2048 - bb_tt.size) )
mock_traces_with_BB_DF.iloc[i,2:] = time_trace
[28]:
avr_rspectrum, avr_rspectrum_w, diffs = get_averaged_spectra_and_diffs(mock_traces_with_BB_DF.iloc[:,2:].values)
diffs3 = diffs
label4final_histo.append('mock traces with BB pulse')
[29]:
show_results(avr_rspectrum, avr_rspectrum_w, diffs, histo_edge=3)
2.52938808657187
1.5461405225345628
Summary¶
[30]:
fig, ax = plt.subplots()
fx = np.fft.rfftfreq(N, 1 / fs_Hz) / 1e6
diffs_list = np.asarray([diffs1, diffs2, diffs3], dtype="object") - 1
# diffs_list = [diffs1, diffs2]
bins = np.histogram_bin_edges(np.concatenate(diffs_list), bins=20)
# if bins == None:
bins = linspace_with_middle_value(
np.mean(np.concatenate(diffs_list)), 2 * np.std(np.concatenate(diffs_list)), 20
)
bins = np.linspace(0.5, 1.5, 100) - 1
for i, diffs in enumerate(diffs_list):
mean_diff = np.mean(diffs)
std_diff = np.std(diffs)
mean_diff_trun, std_diff_trun = calculate_truncated_stats(diffs, 1, 99)
label = (
text_box
) = f"$\mu_{{trunc}}$: {round(mean_diff_trun,2)+0:.2f}\n$\sigma_{{trunc}}$: {round(std_diff_trun,2)+0:.2f}"
ax.hist(diffs, bins=bins, alpha=1, density=True, histtype="step", lw=3, label=label)
# Calculate mean and standard deviation
print(label4final_histo[i])
print('Mean and STD:{:.4f}, {:.4f}'.format(mean_diff, std_diff))
print('Truncated Mean and STD: {:.4f}, {:.4f}'.format(mean_diff, std_diff))
print("******")
# ax.set_xlim(xax_min, xax_max)
ax.set_xlabel(
r"$\frac{\mathrm{energy: \ spectrum \ with \ no\ window}}{\mathrm{energy:spectrum \ with\ window,\ amplitudes\ corrected}} -1$"
)
ax.set_ylabel("entries")
ax.legend()
simple traces
Mean and STD:0.0018, 0.0601
Truncated Mean and STD: 0.0018, 0.0601
******
mock traces
Mean and STD:-0.0002, 0.0498
Truncated Mean and STD: -0.0002, 0.0498
******
mock traces with BB pulse
Mean and STD:1.5294, 1.5461
Truncated Mean and STD: 1.5294, 1.5461
******
[30]:
<matplotlib.legend.Legend at 0x7fb9f09af250>
[31]:
print('The colors corespond to')
print(label4final_histo)
print(', respectively.')
The colors corespond to
['simple traces', 'mock traces', 'mock traces with BB pulse']
, respectively.
CONCLUSION: The broad band pulse in the trace spoils the energy recovery.