uhd/tools/gr-usrptest/python/functions.py

158 lines
5.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python2
import numpy as np
import time
import copy
import logging
def setup_phase_alignment_parser(parser):
test_group = parser.add_argument_group(
'Phase alignment specific arguments')
test_group.add_argument(
'--runs',
default=10,
type=int,
help='Number of times to retune and measure d_phi')
test_group.add_argument(
'--duration',
default=5.0,
type=float,
help='Duration of a measurement run')
test_group.add_argument(
'--measurement-setup',
type=str,
help='Comma-seperated list of channel ids. Phase difference will be calculated between consecutive channels. default=(0,1,2,..,M-1) M: num_chan'
)
test_group.add_argument(
'--log-level',
type=str,
choices=["critical", "error", "warning", "info", "debug"],
default="info")
test_group.add_argument(
'--freq-bands',
type=int,
help="Number of frequency bands in daughterboard range to randomly retune to",
default=1)
return parser
def setup_tx_phase_alignment_parser(parser):
tx_group = parser.add_argument_group(
'TX Phase alignment specific arguments.')
tx_group.add_argument(
'--tx-channels', type=str, help='which channels to use')
tx_group.add_argument(
'--tx-antenna',
type=str,
help='comma-separated list of channel antennas for tx')
tx_group.add_argument(
'--tx-offset',
type=float,
help='frequency offset in Hz which should be added to center frequency for transmission'
)
return parser
def setup_rts_phase_alignment_parser(parser):
rts_group = parser.add_argument_group(
'RTS Phase alignment specific arguments')
rts_group.add_argument(
'-pd',
'--phasedev',
type=float,
default=1.0,
help='maximum phase standard deviation of dphi in a run which is considered settled (in deg)'
)
rts_group.add_argument(
'-dp',
'--dphi',
type=float,
default=2.0,
help='maximum allowed d_phase deviation between runs (in deg)')
rts_group.add_argument(
'--freqlist',
type=str,
help='comma-separated list of frequencies to test')
rts_group.add_argument(
'--lv-host',
type=str,
help='specify this argument if running tests with vst/switch')
rts_group.add_argument('--lv-vst-name', type=str, help='vst device name')
rts_group.add_argument(
'--lv-switch-name', type=str, help='executive switch name')
rts_group.add_argument(
'--lv-basepath',
type=str,
help='basepath for LabVIEW VIs on Windows')
rts_group.add_argument(
'--tx-offset',
type=float,
help='transmitter frequency offset in VST')
rts_group.add_argument(
'--lv-switch-ports', type=str, help='comma-separated switch-port pair')
return parser
def setup_manual_phase_alignment_parser(parser):
manual_group = parser.add_argument_group(
'Manual Phase alignment specific arguments')
manual_group.add_argument(
'--plot',
dest='plot',
action='store_true',
help='Set this argument to enable plotting results with matplotlib'
)
manual_group.add_argument(
'--auto',
action='store_true',
help='Set this argument to enable automatic selection of test frequencies'
)
manual_group.add_argument(
'--start-freq',
type=float,
default=0.0,
help='Start frequency for automatic selection'
),
manual_group.add_argument(
'--stop-freq',
type=float,
default=0.0,
help='Stop frequency for automatic selection')
parser.set_defaults(plot=False,auto=False)
return parser
def process_measurement_sinks(top_block):
data = list()
curr_data = dict()
for num, chan in enumerate(top_block.measurement_channels[:-1]):
curr_data['avg'] = list(top_block.measurement_sink[num].get_avg())
curr_data['stddev'] = list(top_block.measurement_sink[num].get_stddev(
))
curr_data['first'] = top_block.measurement_channels_names[num]
curr_data['second'] = top_block.measurement_channels_names[num + 1]
data.append(copy.copy(curr_data))
return data
def run_test(top_block, ntimes):
results = dict()
num_sinks = len(top_block.measurement_sink)
for i in xrange(ntimes):
#tune frequency to random position and back to specified frequency
top_block.retune_frequency(bands=top_block.uhd_app.args.freq_bands,band_num=i+1)
time.sleep(2)
#trigger start in all measurement_sinks
for sink in top_block.measurement_sink:
sink.start_run()
#wait until every measurement_sink is ready with the current run
while (sum([ms.get_run() for ms in top_block.measurement_sink]) < (
(i + 1) * num_sinks)):
time.sleep(1)
results = process_measurement_sinks(top_block)
return results
def log_level(string):
return getattr(logging, string.upper())