uhd/host/examples/python/benchmark_rate.py
Martin Braun b34e2f0593 python: Fix dropped-sample calculation in benchmark_rate.py
This fixes a subtle bug, where a variable to cache the timestamp of an
error gets bound to the metadata instead of creating a copy thereof.
Without this fix, the calculation of dropped samples would always be 0,
because the difference in timestamps would incorrectly be always zero.
This fix will now make a copy of the timestamp.

Shoutout to GitHub user bhorsfield for finding this issue.
2021-09-28 06:25:25 -07:00

490 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
#
# Copyright 2018 Ettus Research, a National Instruments Company
# Copyright 2019 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
Benchmark rate using Python API
"""
import argparse
from datetime import datetime, timedelta
import sys
import time
import threading
import logging
import numpy as np
import uhd
CLOCK_TIMEOUT = 1000 # 1000mS timeout for external clock locking
INIT_DELAY = 0.05 # 50mS initial delay before transmit
def parse_args():
"""Parse the command line arguments"""
description = """UHD Benchmark Rate (Python API)
Utility to stress test a USRP device.
Specify --rx_rate for a receive-only test.
Specify --tx_rate for a transmit-only test.
Specify both options for a full-duplex test.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description=description)
parser.add_argument("-a", "--args", default="", type=str, help="single uhd device address args")
parser.add_argument("-d", "--duration", default=10.0, type=float,
help="duration for the test in seconds")
parser.add_argument("--rx_subdev", type=str, help="specify the device subdev for RX")
parser.add_argument("--tx_subdev", type=str, help="specify the device subdev for TX")
parser.add_argument("--rx_rate", type=float, help="specify to perform a RX rate test (sps)")
parser.add_argument("--tx_rate", type=float, help="specify to perform a TX rate test (sps)")
parser.add_argument("--rx_otw", type=str, default="sc16",
help="specify the over-the-wire sample mode for RX")
parser.add_argument("--tx_otw", type=str, default="sc16",
help="specify the over-the-wire sample mode for TX")
parser.add_argument("--rx_cpu", type=str, default="fc32",
help="specify the host/cpu sample mode for RX")
parser.add_argument("--tx_cpu", type=str, default="fc32",
help="specify the host/cpu sample mode for TX")
parser.add_argument("--rx_stream_args",
help="stream args for RX streamer", default="")
parser.add_argument("--tx_stream_args", help="stream args for TX streamer",
default="")
parser.add_argument("--ref", type=str,
help="clock reference (internal, external, mimo, gpsdo)")
parser.add_argument("--pps", type=str, help="PPS source (internal, external, mimo, gpsdo)")
parser.add_argument("--random", action="store_true", default=False,
help="Run with random values of samples in send() and recv() to stress-test"
" the I/O.")
parser.add_argument("-c", "--channels", default=[0], nargs="+", type=int,
help="which channel(s) to use (specify \"0\", \"1\", \"0 1\", etc)")
parser.add_argument("--rx_channels", nargs="+", type=int,
help="which RX channel(s) to use (specify \"0\", \"1\", \"0 1\", etc)")
parser.add_argument("--tx_channels", nargs="+", type=int,
help="which TX channel(s) to use (specify \"0\", \"1\", \"0 1\", etc)")
return parser.parse_args()
class LogFormatter(logging.Formatter):
"""Log formatter which prints the timestamp with fractional seconds"""
@staticmethod
def pp_now():
"""Returns a formatted string containing the time of day"""
now = datetime.now()
return "{:%H:%M}:{:05.2f}".format(now, now.second + now.microsecond / 1e6)
# return "{:%H:%M:%S}".format(now)
def formatTime(self, record, datefmt=None):
converter = self.converter(record.created)
if datefmt:
formatted_date = converter.strftime(datefmt)
else:
formatted_date = LogFormatter.pp_now()
return formatted_date
def setup_ref(usrp, ref, num_mboards):
"""Setup the reference clock"""
if ref == "mimo":
if num_mboards != 2:
logger.error("ref = \"mimo\" implies 2 motherboards; "
"your system has %d boards", num_mboards)
return False
usrp.set_clock_source("mimo", 1)
else:
usrp.set_clock_source(ref)
# Lock onto clock signals for all mboards
if ref != "internal":
logger.debug("Now confirming lock on clock signals...")
end_time = datetime.now() + timedelta(milliseconds=CLOCK_TIMEOUT)
for i in range(num_mboards):
if ref == "mimo" and i == 0:
continue
is_locked = usrp.get_mboard_sensor("ref_locked", i)
while (not is_locked) and (datetime.now() < end_time):
time.sleep(1e-3)
is_locked = usrp.get_mboard_sensor("ref_locked", i)
if not is_locked:
logger.error("Unable to confirm clock signal locked on board %d", i)
return False
return True
def setup_pps(usrp, pps, num_mboards):
"""Setup the PPS source"""
if pps == "mimo":
if num_mboards != 2:
logger.error("ref = \"mimo\" implies 2 motherboards; "
"your system has %d boards", num_mboards)
return False
# make mboard 1 a slave over the MIMO Cable
usrp.set_time_source("mimo", 1)
else:
usrp.set_time_source(pps)
return True
def check_channels(usrp, args):
"""Check that the device has sufficient RX and TX channels available"""
# Check RX channels
if args.rx_rate:
if args.rx_channels:
rx_channels = args.rx_channels
else:
rx_channels = args.channels
# Check that each channel specified is less than the number of total number of rx channels
# the device can support
dev_rx_channels = usrp.get_rx_num_channels()
if not all(map((lambda chan: chan < dev_rx_channels), rx_channels)):
logger.error("Invalid RX channel(s) specified.")
return [], []
else:
rx_channels = []
# Check TX channels
if args.tx_rate:
if args.tx_channels:
tx_channels = args.tx_channels
else:
tx_channels = args.channels
# Check that each channel specified is less than the number of total number of tx channels
# the device can support
dev_tx_channels = usrp.get_tx_num_channels()
if not all(map((lambda chan: chan < dev_tx_channels), tx_channels)):
logger.error("Invalid TX channel(s) specified.")
return [], []
else:
tx_channels = []
return rx_channels, tx_channels
def benchmark_rx_rate(usrp, rx_streamer, random, timer_elapsed_event, rx_statistics):
"""Benchmark the receive chain"""
logger.info("Testing receive rate {:.3f} Msps on {:d} channels".format(
usrp.get_rx_rate()/1e6, rx_streamer.get_num_channels()))
# Make a receive buffer
num_channels = rx_streamer.get_num_channels()
max_samps_per_packet = rx_streamer.get_max_num_samps()
# TODO: The C++ code uses rx_cpu type here. Do we want to use that to set dtype?
recv_buffer = np.empty((num_channels, max_samps_per_packet), dtype=np.complex64)
metadata = uhd.types.RXMetadata()
# Craft and send the Stream Command
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_cmd.stream_now = (num_channels == 1)
stream_cmd.time_spec = uhd.types.TimeSpec(usrp.get_time_now().get_real_secs() + INIT_DELAY)
rx_streamer.issue_stream_cmd(stream_cmd)
# To estimate the number of dropped samples in an overflow situation, we need the following
# On the first overflow, set had_an_overflow and record the time
# On the next ERROR_CODE_NONE, calculate how long its been since the recorded time, and use the
# tick rate to estimate the number of dropped samples. Also, reset the tracking variables
had_an_overflow = False
last_overflow = uhd.types.TimeSpec(0)
# Setup the statistic counters
num_rx_samps = 0
num_rx_dropped = 0
num_rx_overruns = 0
num_rx_seqerr = 0
num_rx_timeouts = 0
num_rx_late = 0
rate = usrp.get_rx_rate()
# Receive until we get the signal to stop
while not timer_elapsed_event.is_set():
if random:
stream_cmd.num_samps = np.random.randint(1, max_samps_per_packet+1, dtype=int)
rx_streamer.issue_stream_cmd(stream_cmd)
try:
num_rx_samps += rx_streamer.recv(recv_buffer, metadata) * num_channels
except RuntimeError as ex:
logger.error("Runtime error in receive: %s", ex)
return
# Handle the error codes
if metadata.error_code == uhd.types.RXMetadataErrorCode.none:
# Reset the overflow flag
if had_an_overflow:
had_an_overflow = False
num_rx_dropped += (metadata.time_spec - last_overflow).to_ticks(rate)
elif metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
had_an_overflow = True
# Need to make sure that last_overflow is a new TimeSpec object, not
# a reference to metadata.time_spec, or it would not be useful
# further up.
last_overflow = uhd.types.TimeSpec(
metadata.time_spec.get_full_secs(),
metadata.time_spec.get_frac_secs())
# If we had a sequence error, record it
if metadata.out_of_sequence:
num_rx_seqerr += 1
# Otherwise just count the overrun
else:
num_rx_overruns += 1
elif metadata.error_code == uhd.types.RXMetadataErrorCode.late:
logger.warning("Receiver error: %s, restarting streaming...", metadata.strerror())
num_rx_late += 1
# Radio core will be in the idle state. Issue stream command to restart streaming.
stream_cmd.time_spec = uhd.types.TimeSpec(
usrp.get_time_now().get_real_secs() + INIT_DELAY)
stream_cmd.stream_now = (num_channels == 1)
rx_streamer.issue_stream_cmd(stream_cmd)
elif metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
logger.warning("Receiver error: %s, continuing...", metadata.strerror())
num_rx_timeouts += 1
else:
logger.error("Receiver error: %s", metadata.strerror())
logger.error("Unexpected error on receive, continuing...")
# Return the statistics to the main thread
rx_statistics["num_rx_samps"] = num_rx_samps
rx_statistics["num_rx_dropped"] = num_rx_dropped
rx_statistics["num_rx_overruns"] = num_rx_overruns
rx_statistics["num_rx_seqerr"] = num_rx_seqerr
rx_statistics["num_rx_timeouts"] = num_rx_timeouts
rx_statistics["num_rx_late"] = num_rx_late
# After we get the signal to stop, issue a stop command
rx_streamer.issue_stream_cmd(uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont))
def benchmark_tx_rate(usrp, tx_streamer, random, timer_elapsed_event, tx_statistics):
"""Benchmark the transmit chain"""
logger.info("Testing transmit rate %.3f Msps on %d channels",
usrp.get_tx_rate() / 1e6, tx_streamer.get_num_channels())
# Make a transmit buffer
num_channels = tx_streamer.get_num_channels()
max_samps_per_packet = tx_streamer.get_max_num_samps()
# TODO: The C++ code uses rx_cpu type here. Do we want to use that to set dtype?
transmit_buffer = np.zeros((num_channels, max_samps_per_packet), dtype=np.complex64)
metadata = uhd.types.TXMetadata()
metadata.time_spec = uhd.types.TimeSpec(usrp.get_time_now().get_real_secs() + INIT_DELAY)
metadata.has_time_spec = bool(num_channels)
# Setup the statistic counters
num_tx_samps = 0
# TODO: The C++ has a single randomly sized packet sent here, then the thread returns
num_timeouts_tx = 0
# Transmit until we get the signal to stop
if random:
while not timer_elapsed_event.is_set():
total_num_samps = np.random.randint(1, max_samps_per_packet + 1, dtype=int)
num_acc_samps = 0
while num_acc_samps < total_num_samps:
num_tx_samps += tx_streamer.send(
transmit_buffer, metadata) * num_channels
num_acc_samps += min(total_num_samps - num_acc_samps,
tx_streamer.get_max_num_samps())
else:
while not timer_elapsed_event.is_set():
try:
num_tx_samps_now = tx_streamer.send(transmit_buffer, metadata) * num_channels
num_tx_samps += num_tx_samps_now
if num_tx_samps_now == 0:
num_timeouts_tx += 1
if (num_timeouts_tx % 10000) == 1:
logger.warning("Tx timeouts: %d", num_timeouts_tx)
metadata.has_time_spec = False
except RuntimeError as ex:
logger.error("Runtime error in transmit: %s", ex)
return
tx_statistics["num_tx_samps"] = num_tx_samps
# Send a mini EOB packet
metadata.end_of_burst = True
tx_streamer.send(np.zeros((num_channels, 0), dtype=np.complex64), metadata)
def benchmark_tx_rate_async_helper(tx_streamer, timer_elapsed_event, tx_async_statistics):
"""Receive and process the asynchronous TX messages"""
async_metadata = uhd.types.TXAsyncMetadata()
# Setup the statistic counters
num_tx_seqerr = 0
num_tx_underrun = 0
num_tx_timeouts = 0 # TODO: Not populated yet
try:
while not timer_elapsed_event.is_set():
# Receive the async metadata
if not tx_streamer.recv_async_msg(async_metadata, 0.1):
continue
# Handle the error codes
if async_metadata.event_code == uhd.types.TXMetadataEventCode.burst_ack:
return
if async_metadata.event_code in (
uhd.types.TXMetadataEventCode.underflow,
uhd.types.TXMetadataEventCode.underflow_in_packet):
num_tx_underrun += 1
elif async_metadata.event_code in (
uhd.types.TXMetadataEventCode.seq_error,
uhd.types.TXMetadataEventCode.seq_error_in_packet):
num_tx_seqerr += 1
else:
logger.warning("Unexpected event on async recv (%s), continuing.",
async_metadata.event_code)
finally:
# Write the statistics back
tx_async_statistics["num_tx_seqerr"] = num_tx_seqerr
tx_async_statistics["num_tx_underrun"] = num_tx_underrun
tx_async_statistics["num_tx_timeouts"] = num_tx_timeouts
def print_statistics(rx_statistics, tx_statistics, tx_async_statistics):
"""Print TRX statistics in a formatted block"""
logger.debug("RX Statistics Dictionary: %s", rx_statistics)
logger.debug("TX Statistics Dictionary: %s", tx_statistics)
logger.debug("TX Async Statistics Dictionary: %s", tx_async_statistics)
# Print the statistics
statistics_msg = """Benchmark rate summary:
Num received samples: {}
Num dropped samples: {}
Num overruns detected: {}
Num transmitted samples: {}
Num sequence errors (Tx): {}
Num sequence errors (Rx): {}
Num underruns detected: {}
Num late commands: {}
Num timeouts (Tx): {}
Num timeouts (Rx): {}""".format(
rx_statistics.get("num_rx_samps", 0),
rx_statistics.get("num_rx_dropped", 0),
rx_statistics.get("num_rx_overruns", 0),
tx_statistics.get("num_tx_samps", 0),
tx_async_statistics.get("num_tx_seqerr", 0),
rx_statistics.get("num_rx_seqerr", 0),
tx_async_statistics.get("num_tx_underrun", 0),
rx_statistics.get("num_rx_late", 0),
tx_async_statistics.get("num_tx_timeouts", 0),
rx_statistics.get("num_rx_timeouts", 0))
logger.info(statistics_msg)
def main():
"""Run the benchmarking tool"""
args = parse_args()
# Setup some argument parsing
if not (args.rx_rate or args.tx_rate):
logger.error("Please specify --rx_rate and/or --tx_rate")
return False
# Setup a usrp device
usrp = uhd.usrp.MultiUSRP(args.args)
if usrp.get_mboard_name() == "USRP1":
logger.warning(
"Benchmark results will be inaccurate on USRP1 due to insufficient features.")
# Always select the subdevice first, the channel mapping affects the other settings
if args.rx_subdev:
usrp.set_rx_subdev_spec(uhd.usrp.SubdevSpec(args.rx_subdev))
if args.tx_subdev:
usrp.set_tx_subdev_spec(uhd.usrp.SubdevSpec(args.tx_subdev))
logger.info("Using Device: %s", usrp.get_pp_string())
# Set the reference clock
if args.ref and not setup_ref(usrp, args.ref, usrp.get_num_mboards()):
# If we wanted to set a reference clock and it failed, return
return False
# Set the PPS source
if args.pps and not setup_pps(usrp, args.pps, usrp.get_num_mboards()):
# If we wanted to set a PPS source and it failed, return
return False
# At this point, we can assume our device has valid and locked clock and PPS
rx_channels, tx_channels = check_channels(usrp, args)
if not rx_channels and not tx_channels:
# If the check returned two empty channel lists, that means something went wrong
return False
logger.info("Selected %s RX channels and %s TX channels",
rx_channels if rx_channels else "no",
tx_channels if tx_channels else "no")
logger.info("Setting device timestamp to 0...")
# If any of these conditions are met, we need to synchronize the channels
if args.pps == "mimo" or args.ref == "mimo" or len(rx_channels) > 1 or len(tx_channels) > 1:
usrp.set_time_unknown_pps(uhd.types.TimeSpec(0.0))
else:
usrp.set_time_now(uhd.types.TimeSpec(0.0))
threads = []
# Make a signal for the threads to stop running
quit_event = threading.Event()
# Create a dictionary for the RX statistics
# Note: we're going to use this without locks, so don't access it from the main thread until
# the worker has joined
rx_statistics = {}
# Spawn the receive test thread
if args.rx_rate:
usrp.set_rx_rate(args.rx_rate)
st_args = uhd.usrp.StreamArgs(args.rx_cpu, args.rx_otw)
st_args.channels = rx_channels
st_args.args = uhd.types.DeviceAddr(args.rx_stream_args)
rx_streamer = usrp.get_rx_stream(st_args)
rx_thread = threading.Thread(target=benchmark_rx_rate,
args=(usrp, rx_streamer, args.random, quit_event,
rx_statistics))
threads.append(rx_thread)
rx_thread.start()
rx_thread.setName("bmark_rx_stream")
# Create a dictionary for the RX statistics
# Note: we're going to use this without locks, so don't access it from the main thread until
# the worker has joined
tx_statistics = {}
tx_async_statistics = {}
# Spawn the transmit test thread
if args.tx_rate:
usrp.set_tx_rate(args.tx_rate)
st_args = uhd.usrp.StreamArgs(args.tx_cpu, args.tx_otw)
st_args.channels = tx_channels
st_args.args = uhd.types.DeviceAddr(args.tx_stream_args)
tx_streamer = usrp.get_tx_stream(st_args)
tx_thread = threading.Thread(target=benchmark_tx_rate,
args=(usrp, tx_streamer, args.random, quit_event,
tx_statistics))
threads.append(tx_thread)
tx_thread.start()
tx_thread.setName("bmark_tx_stream")
tx_async_thread = threading.Thread(target=benchmark_tx_rate_async_helper,
args=(tx_streamer, quit_event, tx_async_statistics))
threads.append(tx_async_thread)
tx_async_thread.start()
tx_async_thread.setName("bmark_tx_helper")
# Sleep for the required duration
# If we have a multichannel test, add some time for initialization
if len(rx_channels) > 1 or len(tx_channels) > 1:
args.duration += INIT_DELAY
time.sleep(args.duration)
# Interrupt and join the threads
logger.debug("Sending signal to stop!")
quit_event.set()
for thr in threads:
thr.join()
print_statistics(rx_statistics, tx_statistics, tx_async_statistics)
return True
if __name__ == "__main__":
# Setup the logger with our custom timestamp formatting
global logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
logger.addHandler(console)
formatter = LogFormatter(fmt='[%(asctime)s] [%(levelname)s] (%(threadName)-10s) %(message)s')
console.setFormatter(formatter)
# Vamos, vamos, vamos!
sys.exit(not main())