Utilities¶
Tabulate ULSA¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Tabulate ULSA (Ultra Longitudinal Sky Absorption) sky maps for a given frequency range and index type.
Usage:
python <script_name>.py -i <index_type> -f <from_freq> <to_freq> [-s <freq_step>] [-n <nside>] [-o <output_path>]
Arguments:
-i, --indextype Type of index used for sky map generation. Possible values are:
freq_dependent_index: frequency-dependent spectral index
constant_index: constant spectral index
direction_dependent_index: direction-dependent spectral index
-f, --frequencyrange Frequency range in MHz for which sky maps are to be generated. Should be specified as 'from' and 'to' values.
-s, --frequencystep Frequency step for generating sky maps. Default is 1 MHz.
-n, --nside Value of the NSIDE parameter for the healpy map. Default is 64.
-o, --outputpath Output folder path for saving the generated ULSA sky maps. If not specified, the maps will be saved in the current working directory.
Returns:
pandas DataFrame: DataFrame containing the ULSA sky maps for the given frequency range and index type. The sky maps are tabulated and saved as a CSV file with filename format: ULSA_<index_type>_maps_<from_freq>-<to_freq>MHz.csv.
Dependencies:
- ULSA.sky_map.produce_absorbed_sky_map: Module for generating ULSA sky maps
- pandas: Library for data manipulation and analysis
- numpy: Library for numerical computing
- tqdm: Library for progress bars
Example:
To generate ULSA sky maps for a frequency range of 100-150 MHz with constant spectral index and save the maps in the folder '/maps', run the following command:
python <script_name>.py -i constant_index -f 100 150 -o /maps
"""
from ULSA.sky_map.produce_absorbed_sky_map import absorption_JRZ
import pandas as pd
import numpy as np
from tqdm import tqdm
import sys
import os
import argparse
from pathlib import Path
import decimal
# Disable
def _blockPrint():
sys.stdout = open(os.devnull, "w")
# Restore
def _enablePrint():
sys.stdout = sys.__stdout__
def _frange(i, f, s, endpoint=True):
d = decimal.Decimal(str(s))
exp = float(d.as_tuple().exponent)
print(exp)
i = int(i * 10**-exp)
f = int(f * 10**-exp)
s = int(s * 10**-exp)
if endpoint:
f = f + s
values = range(i, f, s)
if exp == 0:
return values
else:
return [x * 10**exp for x in values]
def tabulate_ulsa(freq_range, index_type, nside=64):
ultralon_maps_dict = {}
for f in tqdm(freq_range):
_blockPrint() # supress ULSA output
cla = absorption_JRZ(
f,
nside=nside,
index_type=index_type,
distance=50,
using_raw_diffuse=False,
using_default_params=True,
critical_dis=False,
output_absorp_free_skymap=False,
)
ultralon_maps_dict[f] = np.asarray(cla.mpi())[:, 1]
return pd.DataFrame(ultralon_maps_dict)
def main():
ultralon_maps_DF = tabulate_ulsa(freq_range, index_type, nside=nside)
ultralon_maps_DF.to_csv(
os.path.join(output_path, "ulsa_{}_{:.0f}-{:.0f}MHz.csv").format(
index_type, freq_range[0], freq_range[-1]
),
index=False,
)
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap._action_groups.pop()
required = ap.add_argument_group("required arguments")
optional = ap.add_argument_group("optional arguments")
required.add_argument(
"-i",
"--indextype",
nargs="?",
required=True,
help="freq_dependent_index constant_index direction_dependent_index ",
)
required.add_argument(
"-f",
"--frequencyrange",
nargs="*",
required=True,
help="Frequency range in MHz, 'from' 'to'",
)
optional.add_argument(
"-s",
"--frequencystep",
nargs="?",
default=1,
help="Frequency step. Default is 1 MHz",
)
optional.add_argument(
"-n",
"--nside",
nargs="?",
default=64,
help="Value of the NSIDE parameter for the healpy map. Default is 64.",
)
optional.add_argument(
"-o",
"--outputpath",
nargs="?",
default="./",
help="Output folder. If it does not exists, it will be created.",
)
args = vars(ap.parse_args())
index_types = [
"freq_dependent_index",
"constant_index",
"direction_dependent_index",
]
index_type = args["indextype"]
if index_type not in index_types:
print("[ERROR] Index type not recognized, use one of these: ", index_types)
sys.exit()
freq_range = _frange(
float(args["frequencyrange"][0]),
float(args["frequencyrange"][1]),
float(args["frequencystep"]),
)
nside = int(args["nside"])
output_path = args["outputpath"]
sys.exit(main())
Get power dataset¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import argparse
# This uses the relative path to the package, one directory up.
# Add path to the radiocalibrationtoolkit to your PYTHONPATH to avoid this.
sys.path.append(os.path.join(os.path.abspath(""), "../.."))
from radiocalibrationtoolkit import *
def save_figure(df, ylabel):
# save figure
fig = px.imshow(
df.T,
width=600,
aspect="cube",
color_continuous_scale="jet",
title="<b>{}</b>".format(gal_model),
)
fig.update_layout(
xaxis=dict(title="<b>LST</b>", tickprefix="<b>", ticksuffix="</b>", dtick=2),
yaxis=dict(
title="<b>frequency [MHz]</b>",
tickprefix="<b>",
ticksuffix="</b>",
range=(30, 80),
tick0=0,
dtick=10,
autorange=False,
),
)
fig.update_layout(
coloraxis=dict(
colorbar=dict(
title=dict(
text="<b>{}</b>".format(ylabel),
side="right",
),
tickprefix="<b>",
ticksuffix="</b>",
),
cmin=0,
cmax=20,
),
font=dict(
# family=font,
size=20,
color="black",
),
)
fig.write_image(
os.path.join(
args["outputpath"]
+ "{}_{}_{}.png".format(
args["datasettype"], args["savefilenameprefix"], gal_model
)
)
)
def main():
antenna_inst = AntennaPattern(args["antennamodel"])
freq_Mhz_range = range(args["frequencyrange"][0], args["frequencyrange"][1], 1)
if args["datasettype"] == "power":
lst_range = np.asarray(list(range(lst_range_arg))) + shift_LST_bins
# read hw file
# read impedance
hw_file_path = args["hwmodel"]
hw_dict = read_hw_file(hw_file_path)
impedance_func = hw_dict["IImpedance"]["antenna_{}".format(orientation)]
print('[INFO] HW file contains: ')
print([[(main_key, j) for j in hw_dict[main_key]] for main_key in hw_dict])
# calculate the power dataset
power_density_DF = calculate_power_spectral_density(
antenna_inst=antenna_inst,
galactic_map_inst=galactic_map_inst,
lst_range=lst_range,
freq_Mhz_range=freq_Mhz_range,
latitude=args["latitude"],
update_antenna_conventions={
"shift_phi": shift_phi,
"flip_theta": True,
"flip_phi": False,
"in_degrees": True,
"add_invisible_sky": True,
},
impedance_func=impedance_func,
)
# integrate the power spectrum density to power
power_DF = integrate_spectral_density(
power_density_DF, integrated_MHz_bands=power_density_DF.columns
)
# read HW response
hw_reponse_1 = dB2PowerAmp(hw_dict["RResponse"]["LNA"](power_DF.columns))
hw_reponse_2 = dB2PowerAmp(hw_dict["RResponse"]["digitizer"](power_DF.columns))
hw_reponse_3 = dB2PowerAmp(
hw_dict["RResponse"]["cable_fromLNA2digitizer"](power_DF.columns)
)
hw_reponse_4 = dB2PowerAmp(
hw_dict["RResponse"]["impedance_matching_{}".format(orientation)](
power_DF.columns
)
)
# apply response (i.e. propagate through HW signal chain)s
power_DF = power_DF.multiply(
hw_reponse_1 * hw_reponse_2 * hw_reponse_3 * hw_reponse_4
)
# to piko
power_DF = power_DF * 1e12
# save path
save_path = os.path.join(
args["outputpath"], "{}_{}".format(args["savefilenameprefix"], gal_model)
)
(power_DF.round(3)).to_csv(save_path + ".csv")
save_figure(power_DF, "Power [pW]")
elif args["datasettype"] == "voltage2":
lst_range = range(25)
voltage2_density_DF = calculate_power_spectral_density(
antenna_inst=antenna_inst,
galactic_map_inst=galactic_map_inst,
lst_range=lst_range,
freq_Mhz_range=freq_Mhz_range,
latitude=args["latitude"],
update_antenna_conventions={
"shift_phi": shift_phi,
"flip_theta": True,
"flip_phi": False,
"in_degrees": True,
"add_invisible_sky": True,
},
)
voltage2_density_DF.index.name = "LST"
voltage2_density_DF.columns.name = "freq_MHz"
voltage2_density_DF.to_csv(
os.path.join(
args["outputpath"],
"voltage2_density_{}_{}.csv".format(
args["savefilenameprefix"], gal_model
),
)
)
if __name__ == "__main__":
# select galactic radio emission model
galactic_models = [
"LFSS",
"GSM08",
"GSM16",
"Haslam",
"LFmap",
"SSM",
"GMOSS",
"ULSA",
]
ap = argparse.ArgumentParser()
ap._action_groups.pop()
required = ap.add_argument_group("required arguments")
optional = ap.add_argument_group("optional arguments")
required.add_argument(
"-g",
"--galacticmodel",
nargs="?",
required=True,
choices=galactic_models,
help="Galactic radio emission model, choose from these: {}".format(
galactic_models
),
)
required.add_argument(
"-a",
"--antennamodel",
nargs="?",
required=True,
help="Path to the antenna model file.",
),
optional.add_argument(
"-w",
"--hwmodel",
nargs="?",
default="",
help="Path to the hardware response file.",
),
optional.add_argument(
"-f",
"--frequencyrange",
nargs="+",
default=[30, 81],
type=int,
help="Frequency range in MHz, 'from' 'to', must be an integer! Default is [30 81)",
)
optional.add_argument(
"-l",
"--latitude",
nargs="?",
default=-35.206667,
type=float,
help="Latitude of the local observer, default it -35.206667 (Malargue, Argentina).",
)
optional.add_argument(
"-o",
"--outputpath",
nargs="?",
default="./simulated_power_datasets/",
help="Output folder. If it does not exists, it will be created.",
)
optional.add_argument(
"-s",
"--savefilenameprefix",
nargs="?",
default="power_",
help="Saved power dataset filename prefix.",
)
optional.add_argument(
"-t",
"--datasettype",
nargs="?",
default="power",
help="Calculate 'power' or 'voltage2'.",
)
optional.add_argument(
"-r",
"--orientation",
nargs="?",
default="EW",
help="This will read HW response with suffix either EW or NS. Default: EW",
)
optional.add_argument(
"-z",
"--shiftazimuth",
nargs="?",
default=-90,
type=float,
help="Modify antenna response conventions by shifting the azimuth. Default:-90 degrees.",
)
optional.add_argument(
"-ls",
"--shiftlst",
nargs="?",
default=0.5,
type=float,
help="Shift LST bins.",
)
optional.add_argument(
"-lr",
"--lstrange",
nargs="?",
default=24,
type=int,
help="Usually one need just 0-23 (or 0.5-23.5) LST. But if the dataset is to be interpolated later, better is to do 0-24 (set the arg to 25 for this)",
)
args = vars(ap.parse_args())
if (args["datasettype"] == "voltage2") and (
args["outputpath"] == "./simulated_power_datasets/"
):
args["outputpath"] = "./voltage2_density/"
orientation = args["orientation"]
gal_model = args["galacticmodel"]
shift_phi = args["shiftazimuth"]
shift_LST_bins = args["shiftlst"]
lst_range_arg = args["lstrange"]
if gal_model == "LFSS":
galactic_map_inst = LowFrequencySkyModel(freq_unit="MHz")
elif gal_model == "GSM08":
galactic_map_inst = GlobalSkyModel(freq_unit="MHz")
elif gal_model == "GSM16":
galactic_map_inst = GlobalSkyModel2016(freq_unit="MHz")
elif gal_model == "Haslam":
galactic_map_inst = HaslamSkyModel(freq_unit="MHz", spectral_index=-2.53)
elif gal_model == "LFmap":
galactic_map_inst = LFmap()
elif gal_model == "SSM":
galactic_map_inst = SSM()
elif gal_model == "GMOSS":
galactic_map_inst = GMOSS()
elif gal_model == "ULSA":
galactic_map_inst = ULSA(index_type="freq_dependent_index")
else:
print("[ERROR] Model not recognized, use some from these:", galactic_models)
sys.exit()
mkdir(args["outputpath"])
print('[INFO] ARGS: ', args)
sys.exit(main())
Get mock dataset¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
from radiocalibrationtoolkit import *
import argparse
from pathlib import Path
piko = 1e-12
def to_string(input_val):
if isinstance(input_val, (float, int)):
return str(input_val)
elif isinstance(input_val, Union[tuple, list]):
return "_".join(str(x) for x in input_val)
else:
raise TypeError("Input must be a float, int, or tuple")
def main():
# read HW response
hw_dict = read_hw_file(
args["hwprofilefile"], interp_args={"fill_value": "extrapolate"}
)
print()
hw_reponse_1 = hw_dict["RResponse"]["LNA"]
hw_reponse_2 = hw_dict["RResponse"]["digitizer"]
hw_reponse_3 = hw_dict["RResponse"]["cable_fromLNA2digitizer"]
hw_reponse_4 = hw_dict["RResponse"]["impedance_matching_EW"]
# merge all hw responses to one function
def hw_response_func(x):
return dB2PowerAmp(
hw_reponse_1(x) + hw_reponse_2(x) + hw_reponse_3(x) + hw_reponse_4(x)
)
# impedance function
impedance_func = hw_dict["IImpedance"]["antenna_EW"]
# read sidereal voltage square spectral density
sidereal_voltage2_density_DF = pd.read_csv(
args["voltagedensity2file"],
index_col=0,
)
sidereal_voltage2_density_DF.columns = sidereal_voltage2_density_DF.columns.astype(
float
)
def hw_response_func(x):
return dB2PowerAmp(
hw_reponse_1(x) + hw_reponse_2(x) + hw_reponse_3(x) + hw_reponse_4(x)
)
mock_trace_generator = Mock_trace_generator(
sidereal_voltage2_density_DF=sidereal_voltage2_density_DF,
hw_response_func=hw_response_func,
impedance_func=impedance_func,
voltage2ADC=2048,
time_trace_size=2048,
sampling_frequency_MHz=250,
)
freq_MHz_bins = mock_trace_generator.get_frequency_bins()
mock_traces_DF = mock_trace_generator.generate_mock_trace(
args["numberoftraces"],
temp_celsius=args["temperature"],
additional_noise=args["additionalnoise"],
# nbi={"67.25": 1},
# nbi_err=0.3,
rounding=not args["turn_off_rounding"],
)
# system parameters
sampling_frequency_MHz = 250
N = mock_traces_DF.columns[2:].size
ADC2Volts = 1 / 2048
trace_time_length_sec = N / (sampling_frequency_MHz * 1e6)
# FFT to spectra
spectra_df = time_trace_df_2_spectra_df(
mock_traces_DF, DSC=2, sampling_frequency_MHz=sampling_frequency_MHz
)
# use the formula, create the integrand first
integrand_df = (
(spectra_df * ADC2Volts / (sampling_frequency_MHz * 1e6)) ** 2
).divide(impedance_func(spectra_df.columns.values))
fr = args["frequencyrange"]
# integrate
mock_power_unbinned_DF = (2 / trace_time_length_sec) * integrate_spectral_density(
integrand_df,
integrated_MHz_bands=np.linspace(fr[0], fr[1], fr[1] - fr[0] + 1),
integrating_method="on_discontinuous_function",
)
mock_power_unbinned_DF = pd.concat(
(mock_traces_DF.iloc[:, :2], mock_power_unbinned_DF), axis=1
)
mock_power_DF = bin_df_rows(
mock_power_unbinned_DF, binning_column="lst", bins=list(range(25))
)
mock_power_DF.index.name = "lst"
mock_power_DF = mock_power_DF.drop(["temp_c", "lst"], axis=1)
if args["overridesavefilename"] is not None:
filename = args["savefilename"]
else:
filename = "mock_power_dataset-{}_N{}_temp{}C_{}additionalnoise_rounding-{}.csv".format(
Path(args["voltagedensity2file"]).stem.replace("voltage2_density_", ""),
args["numberoftraces"],
to_string(args["temperature"]),
args["additionalnoise"],
not args["turn_off_rounding"],
)
((mock_power_DF / piko).round(3)).to_csv(os.path.join(args["outputpath"], filename))
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap._action_groups.pop()
required = ap.add_argument_group("required arguments")
optional = ap.add_argument_group("optional arguments")
required.add_argument(
"-v",
"--voltagedensity2file",
nargs="?",
required=True,
help="Path to the antenna model file.",
),
optional.add_argument(
"-w",
"--hwprofilefile",
nargs="?",
default="",
help="Path to the hardware response file.",
),
optional.add_argument(
"-f",
"--frequencyrange",
nargs="*",
default=(30, 81),
help="Frequency range in MHz, 'from' 'to', must be an integer! Default is [30 81)",
)
optional.add_argument(
"-t",
"--temperature",
nargs="+",
default=[-10, 50],
action=AppendOnlyIfMore,
type=float,
help="Temperature range. Random temperature from this range will be used for the temperature noise.\
If only one values is specified,\
the temperature will be fixed to this temperature.",
)
optional.add_argument(
"-n",
"--numberoftraces",
nargs="?",
default=5000,
type=int,
help="Number of generated time traces",
)
optional.add_argument(
"-a",
"--additionalnoise",
nargs="?",
default=0,
type=float,
help="Mean of the additional noise added to final spectrum. Default is zero (no extra noise).\
Reasonable value for the extra noise is 5e-16",
)
optional.add_argument(
"-tor",
"--turn-off-rounding",
# nargs="?",
action="store_true",
help="Whether the generated time traces in ADC units should be rounded.",
)
optional.add_argument(
"-o",
"--outputpath",
nargs="?",
default="./mock_power_datasets/",
help="Output folder. If it does not exists, it will be created.",
)
optional.add_argument(
"-s",
"--overridesavefilename",
nargs="?",
default=None,
help="Overrides filename.",
)
args = vars(ap.parse_args())
mkdir(args["outputpath"])
print(args)
sys.exit(main())
Generate voltage squared spectral density¶
import os
import sys
from pylfmap import LFmap
from radiocalibrationtoolkit import *
LATITUDE = -35.206667
antenna_inst = AntennaPattern(
"./antenna_setup_files/SALLA_32S_4NEC2_WCD_8S_PERMITTIVITY5.5_CONDUCTIVITY0.0014_extended_EW.xml",
)
# Prepare objects
lfmap = LFmap()
lfss = LowFrequencySkyModel(freq_unit="MHz")
gsm2008 = GlobalSkyModel(freq_unit="MHz")
gsm2016 = GlobalSkyModel2016(freq_unit="MHz")
haslam = HaslamSkyModel(freq_unit="MHz", spectral_index=-2.53)
ssm = SSM()
gmoss = GMOSS()
ulsa_fdi = ULSA(index_type="freq_dependent_index")
map_instances_dict = {
"LFSS": lfss,
"GSM08": gsm2008,
"GSM16": gsm2016,
"Haslam": haslam,
"LFmap": lfmap,
"SSM": ssm,
"GMOSS": gmoss,
"ULSA": ulsa_fdi,
}
# for just the voltage square spectral density
# do not use impedance and do not integrate!
for map_label in tqdm(map_instances_dict.keys()):
galactic_map_inst = map_instances_dict[map_label]
lst_range = range(25)
freq_Mhz_range = range(10, 125, 1)
voltage2_density_DF = calculate_power_spectral_density(
antenna_inst=antenna_inst,
galactic_map_inst=galactic_map_inst,
lst_range=lst_range,
freq_Mhz_range=freq_Mhz_range,
latitude=LATITUDE,
update_antenna_conventions={
"shift_phi": -90,
"flip_theta": True,
"flip_phi": False,
"in_degrees": True,
"add_invisible_sky": True,
},
)
voltage2_density_DF.index.name = "LST"
voltage2_density_DF.columns.name = "freq_MHz"
voltage2_density_DF.to_csv(
"./voltage2_density/voltage2_density_{}.csv".format(map_label)
)
Create isotropic antenna pattern¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import sys
import argparse
def nparray2string(arr):
string = ""
for i in arr:
# string+='{:.2f} '.format(i)
string += "{} ".format(round(i, 2))
return string
def main():
thetaList, phiList = np.meshgrid(
np.asarray(list(range(0, 91, 5))), np.asarray(list(range(0, 361, 5)))
)
thetaList = thetaList.flatten()
phiList = phiList.flatten()
freqList = np.asarray(list(range(0, 126, 1)))
vel_shifted_mean = np.ones(freqList.size)
vel_theta = {}
vel_phi = {}
vel_ephi_phase = {}
vel_etheta_phase = {}
for f in freqList:
vel_theta[f] = np.ones(thetaList.size)
vel_phi[f] = np.ones(phiList.size)
vel_ephi_phase[f] = np.zeros(phiList.size)
vel_etheta_phase[f] = np.zeros(thetaList.size)
# antennaPatternXMLfile = "../antenna_setup_files/Isotropic_antenna_pattern.xml"
with open(antennaPatternXMLfile, "w") as xml_file:
xml_file.write('<frequency unit="MHz"> <!-- Simulated Frequencies -->\n')
xml_file.write(nparray2string(freqList) + "\n")
xml_file.write(
'</frequency>\n<theta unit="deg"> <!-- zenith angle theta -->\n'
)
xml_file.write(nparray2string(thetaList) + "\n")
xml_file.write('</theta>\n<phi unit="deg"> <!-- azimuth angle phi -->\n')
xml_file.write(nparray2string(phiList) + "\n")
xml_file.write(
'</phi>\n<MeanTransfer unit="m"> <!-- Mean Transfer of the antenna to electric field - not directional -->\n'
)
xml_file.write(nparray2string(vel_shifted_mean) + "\n")
xml_file.write("</MeanTransfer>\n")
#
xml_file.write(
"<!-- EAHTheta_amp: Projection of the effective antenna height in e_theta direction -->\n"
)
for f in freqList:
xml_file.write('<EAHTheta_amp unit="m" idfreq="%.1f">' % f)
xml_file.write(nparray2string(vel_theta[f]) + "</EAHTheta_amp>\n")
xml_file.write("<!-- end of EAHTheta_amp -->\n")
#
xml_file.write(
"<!-- EAHTheta_phase: Phase of the effective antenna height in e_theta direction -->\n"
)
for f in freqList:
xml_file.write('<EAHTheta_phase unit="deg" idfreq="%.1f">' % f)
xml_file.write(nparray2string(vel_etheta_phase[f]) + "</EAHTheta_phase>\n")
xml_file.write("<!-- end of EAHTheta_phase -->\n")
#
xml_file.write(
"<!-- EAHPhi_amp: Projection of the effective antenna height in e_phi direction -->\n"
)
for f in freqList:
xml_file.write('<EAHPhi_amp unit="m" idfreq="%.1f">' % f)
xml_file.write(nparray2string(vel_phi[f]) + "</EAHPhi_amp>\n")
xml_file.write("<!-- end of EAHPhi_amp -->\n")
#
xml_file.write(
"<!-- EAHPhi_phase: Phase of the effective antenna height in e_phi direction -->\n"
)
for f in freqList:
xml_file.write('<EAHPhi_phase unit="deg" idfreq="%.1f">' % f)
xml_file.write(nparray2string(vel_ephi_phase[f]) + "</EAHPhi_phase>\n")
xml_file.write("<!-- end of EAHPhi_phase -->\n")
print(
'[INFO] XML file with Isotropic antenna pattern at "'
+ antennaPatternXMLfile
+ '" should have been created.'
)
# You should have the XML with the antenna pattern created by now! Check the folder.
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap._action_groups.pop()
optional = ap.add_argument_group("optional arguments")
optional.add_argument(
"-o",
"--outputpath",
nargs="?",
default="../antenna_setup_files/Isotropic_antenna_pattern.xml",
help="Save path",
)
args = vars(ap.parse_args())
antennaPatternXMLfile = args["outputpath"]
sys.exit(main())
Create flat HW profile response¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import sys
import argparse
def nparray2string(arr):
string = ""
for i in arr:
# string+='{:.2f} '.format(i)
string += "{} ".format(round(i, 2))
return string
def get_hw_profile_xml_string(
responseId,
freqString,
magString,
phaseString,
magUnit="dB",
phaseUnit="degree",
type_="Response",
):
freqString = nparray2string(freqString)
magString = nparray2string(magString)
header = "<!-- {} -->".format(responseId)
freqUnit = "MHz"
magUnit = "dB"
phaseUnit = "degree"
type__ = type_[0] + type_
if phaseString is not None:
phase_XML_string = (
' <Phase unit ="' + phaseUnit + '">\n'
" " + phaseString + " </Phase>\n"
)
else:
phase_XML_string = ""
xml_string = (
header
+ "\n\n\n"
+ " <"
+ type__
+ ' id="'
+ responseId
+ '"> \n'
+ " <"
+ type_
+ "> \n "
+ ' <x unit="'
+ freqUnit
+ '"> \n'
+ " "
+ freqString
+ " </x>\n"
+ " <"
+ magUnit
+ ">\n"
+ " "
+ magString
+ " </"
+ magUnit
+ ">\n"
+ phase_XML_string
+ " </"
+ type_
+ ">\n"
+ " </"
+ type__
+ ">"
)
return xml_string
def main():
flat_hw_0 = -6.35
flat_hw_1 = 13.12
flat_hw_2 = -1.05
flat_hw_3 = 17.64
flat_impedance = 400
frequency = np.asarray(list(range(1, 126)))
custom_hw_0 = np.ones(len(frequency)) * flat_hw_0
custom_hw_1 = np.ones(len(frequency)) * flat_hw_1
custom_hw_2 = np.ones(len(frequency)) * flat_hw_2
custom_hw_3 = np.ones(len(frequency)) * flat_hw_3
# Supression until 30 MHz and from 80 MHz
custom_hw_3[(frequency < 30) | (frequency > 80)] = -20
hw_xml_string_0 = get_hw_profile_xml_string(
"impedance_matching_EW",
frequency,
custom_hw_0,
None,
magUnit="dB",
phaseUnit="degree",
)
hw_xml_string_1 = get_hw_profile_xml_string(
"LNA",
frequency,
custom_hw_1,
None,
magUnit="dB",
phaseUnit="degree",
)
hw_xml_string_2 = get_hw_profile_xml_string(
"cable_fromLNA2digitizer",
frequency,
custom_hw_2,
None,
magUnit="dB",
phaseUnit="degree",
)
hw_xml_string_3 = get_hw_profile_xml_string(
"digitizer",
frequency,
custom_hw_3,
None,
magUnit="dB",
phaseUnit="degree",
)
imp_string = get_hw_profile_xml_string(
"antenna_EW",
frequency,
np.ones(frequency.size) * flat_impedance,
None,
magUnit="y",
type_="Impedance",
)
hw_xml_string = (
hw_xml_string_0
+ "\n\n\n"
+ hw_xml_string_1
+ "\n\n\n"
+ hw_xml_string_2
+ "\n\n\n"
+ hw_xml_string_3
+ "\n\n\n"
+ imp_string
+ "\n\n\n"
)
closing_tags = "</HardwareProfileList>"
opening_tags = "<HardwareProfileList>\n\n"
with open(output_path, "w") as file:
file.write(opening_tags + hw_xml_string + closing_tags)
print(
'[INFO] XML file with flat HW response at "'
+ output_path
+ '" should have been created.'
)
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap._action_groups.pop()
optional = ap.add_argument_group("optional arguments")
optional.add_argument(
"-o",
"--outputpath",
nargs="?",
default="../antenna_setup_files/HardwareProfileList_flat.xml",
help="Save path",
)
args = vars(ap.parse_args())
output_path = args["outputpath"]
sys.exit(main())
Convert 2-port network SKRF file to XML¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# convertSParameterFile2XML.py
"""
convertSParameterFile2XML.py
This script converts S-parameter data from a file into an XML format. It calculates complex voltage amplification, magnitude in dB, and phase in degrees or radians and saves the results in an XML file.
Parameters
----------
- sfile (str): Path to the S-parameter file.
- output_name (str, optional): File name of the produced XML file and the name of the HW response. By default, the filename of the S-parameter file will be used.
- output_dir (str, optional): Output directory. Default is the current directory.
- unwrap (bool, optional): If True, phase values are unwrapped; otherwise, they are left wrapped. Default is True.
- method (str, optional): The amplification method, either 'withS11' (S21/(S11+1)) or 'withoutS11' (S21). Default is 'withS11'.
- rounding (int, optional): Rounding of saved numbers. Default is 4. Use None for no rounding.
Returns
-------
int: Return code, 0 on success.
Note
----
This script requires the `pandas`, `numpy`, `matplotlib`, `scipy`, and `scikit-rf` (skrf) libraries.
Example Usage
-------------
$ python convertSParameterFile2XML.py -s input.s2p -o output.xml -d /output/directory -u True -m withS11 -r 3
"""
import argparse
import re
import io
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import skrf as rf
from scipy.interpolate import interp1d
from scipy.optimize import curve_fit
def main(args):
np.set_printoptions(suppress=True)
skrf = rf.Network(sparameterFilePath)
MHz = 1e-6
frequencyMHz = skrf.f * MHz
if method == 'withS11':
complex_voltage_amplification = (skrf.s21.s/(1+skrf.s11.s)).flatten()
elif method == 'withoutS11':
complex_voltage_amplification = (skrf.s21.s/(1)).flatten()
dBMagnitude = 20*np.log10(np.abs(complex_voltage_amplification))
phaseDegreeWrapped = np.rad2deg(np.angle(complex_voltage_amplification))
# unwrape phase if wraped
phaseDegreeUnwrapped = np.unwrap(phaseDegreeWrapped, 180)
if useUnwrappedPhase == True:
phaseDeg = phaseDegreeUnwrapped
else:
phaseDeg = phaseDegreeWrapped
#if rounding != None:
# dBMagnitude = np.asarray([round(i, rounding) for i in dBMagnitude])
# phaseDeg = np.asarray([round(i, rounding) for i in phaseDeg])
# save in the Offline XML format
freqString = re.sub(
" +",
" ",
np.array2string(
frequencyMHz, separator=" ", max_line_width=np.inf, suppress_small=False, formatter={'float_kind': lambda x: f'{x:.{rounding}f}'}
).strip("[,]"),
)
magString = re.sub(
" +",
" ",
np.array2string(
dBMagnitude, separator=" ", max_line_width=np.inf, suppress_small=False, formatter={'float_kind': lambda x: f'{x:.{rounding}f}'}
).strip("[,]"),
)
phaseString = re.sub(
" +",
" ",
np.array2string(
phaseDeg, separator=" ", max_line_width=np.inf, suppress_small=False, formatter={'float_kind': lambda x: f'{x:.{rounding}f}'}
).strip("[,]"),
)
# header = "<!-- {} -->".format(responseName)
header = ''
responseId = responseName
freqUnit = "MHz" # 'MHz'
magUnit = (
"dB"
) # 'LgAmp' dB::: <!-- amplification in dB --> LgAmp::: <!-- amplification in Log10(Amplitude) -->
phaseUnit = "degree" #'degree' # 'radiands' ?
string = (
header
+ "\n\n\n"
+ ' <RResponse id="'
+ responseId
+ '"> \n'
+ " <Response> \n "
+ ' <x unit="'
+ freqUnit
+ '"> \n'
+ " "
+ freqString
+ " </x>\n"
+ " <"
+ magUnit
+ ">\n"
+ " "
+ magString
+ " </"
+ magUnit
+ ">\n"
+ ' <Phase unit ="'
+ phaseUnit
+ '">\n'
" "
+ phaseString
+ " </Phase>\n"
+ " </Response>\n"
+ " </RResponse>"
)
with open(os.path.join(outdir, responseName + ".xml"), "w") as file:
file.write(string)
# control plots
'''
plt.figure(0)
skrf.plot_s_db()
plt.figure(1)
skrf.plot_s_deg()
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
fig.suptitle("what goes to the converted XML file")
ax[0].plot(frequencyMHz, dBMagnitude)
ax[1].plot(frequencyMHz, phaseDeg)
[a.set_xlabel("frequency [MHz]") for a in ax]
ax[0].set_ylabel("dB")
ax[1].set_ylabel("phase [deg]")
plt.subplots_adjust(wspace=0.4)
plt.show()
'''
return 0
if __name__ == "__main__":
import sys
import os
# Construct the argument parser
ap = argparse.ArgumentParser()
ap._action_groups.pop()
required = ap.add_argument_group("required arguments")
optional = ap.add_argument_group("optional arguments")
# Add the arguments to the parser
required.add_argument(
"-s", "--sfile", required=True, help="Path to the s-parameter file."
)
optional.add_argument(
"-o",
"--output_name",
nargs="?",
required=False,
default=None,
help="File name of the produced XML file. Also it is the name of the HW response. By default filename of the s-parameter file will be used.",
)
optional.add_argument(
"-d",
"--output_dir",
nargs="?",
required=False,
default='./',
help="Output directory",
)
optional.add_argument(
"-u",
"--unwrap",
nargs="?",
required=False,
default=True,
help="Unwrap phase. Default is True.",
)
optional.add_argument(
"-m",
"--method",
nargs="?",
required=False,
default='withS11',
help="The amplification is either given as S21/(S11+1) or as just S21. Thus our options are 'withS11' or 'withoutS11. Default: 'withS11",
)
optional.add_argument(
"-r",
"--rounding",
nargs="?",
required=False,
default=4,
help="Rouding of the save numbers. Default is 4. For no rouding type None",
)
args = vars(ap.parse_args())
method = args['method']
sparameterFilePath = args["sfile"]
useUnwrappedPhase = args["unwrap"]
rounding = int(args["rounding"])
if args["output_name"] == None:
responseName = os.path.splitext(os.path.basename(sparameterFilePath))[0]
else:
responseName = args["output_name"]
outdir = args["output_dir"]
# workDir ='/home/tomas/Documents/scripts/pyUtilScripts/'
# inputFileName = 'AugerRD_LNA_GRF4002_NS_39mA_ReIm.s2p'
# sparameterFilePath = workDir+inputFileName
# useUnwrappedPhase = True
# rounding = 3
sys.exit(main(sys.argv))