uhd/host/tests/devtest/recv_stability_test.py
Martin Braun d0e75f66fa devtest: Add receive stability test
At this point, this test chokes an RX streamer to force an overrun. It
then confirms that the overrun message is returned to the call site, and
that the streamer returns to continuous streaming after the overrun
handling.
2021-09-28 06:24:05 -07:00

174 lines
5.9 KiB
Python

#!/usr/bin/env python3
#
# Copyright 2021 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
""" Test recv, including some corner cases. """
import sys
import time
import argparse
import numpy as np
import uhd
try:
from ruamel import yaml
except:
import yaml
def run_recv_stability_test(usrp, device_config):
"""
Run tests related to recv().
"""
num_chans = device_config.get('num_channels', usrp.get_rx_num_channels())
str_args = uhd.usrp.StreamArgs(
device_config.get('cpu_type', 'fc32'),
device_config.get('cpu_type', 'sc16'),
)
str_args.channels = list(range(num_chans))
rx_streamer = usrp.get_rx_stream(str_args)
usrp.set_rx_rate(device_config.get('rate', 1e6))
# Run tests
run_choke_test(usrp, rx_streamer, device_config)
return True
def run_choke_test(usrp, rx_streamer, device_config):
"""
This will kick off a continuous stream, then interrupt it, then resume it.
We verify that:
- We get an overrun message (metadata)
- The stream resumes.
"""
bufsize = 100 * rx_streamer.get_max_num_samps()
recv_buf = np.zeros(
(rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_cmd.stream_now = False
init_delay = device_config.get('init_delay', 1.0)
stream_cmd.time_spec = usrp.get_time_now() + init_delay
rx_streamer.issue_stream_cmd(stream_cmd)
metadata = uhd.types.RXMetadata()
long_timeout = init_delay + 1.0
num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
test_pass = True
# Large timeout, small rate: This should not fail
if num_samps_recvd != bufsize:
test_pass = False
print(f"run_choke_test(): First buffer recv() failed, rx'd "
f"{num_samps_recvd}/{bufsize} samples!")
if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
test_pass = False
print(
f"run_choke_test(): First buffer recv() failed, rx'd "
f"error code: {metadata.strerror()}")
print("Choking RX...")
time.sleep(1)
print("Now, collecting the overrun (you should see an 'O' now).")
# On one of the next recv(), we should get an overrun. It may take a bit,
# because there will already be data in the pipe and depending on the USRP,
# overruns will be inline.
max_num_samps_before_o = device_config.get('max_num_samps_before_o', 1000000)
num_samps_recvd = 0
overrun_recvd = False
while num_samps_recvd < max_num_samps_before_o:
num_samps_recvd += rx_streamer.recv(recv_buf, metadata, timeout=.1)
if metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
overrun_recvd = True
break
if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
test_pass = False
print(
f"run_choke_test(): Second buffer recv() failed, rx'd "
f"error code: {metadata.strerror()} (should have been overflow or none)")
if not overrun_recvd:
test_pass = False
print(
f"run_choke_test(): Second buffer recv() failed, never rx'd "
f"an overflow")
# It should recover now:
num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
if num_samps_recvd != bufsize:
test_pass = False
print(
f"run_choke_test(): Third buffer recv() failed, rx'd "
f"{num_samps_recvd}/{bufsize} samples!")
if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
test_pass = False
print(
f"run_choke_test(): Third buffer recv() failed, rx'd "
f"error code: {metadata.strerror()}")
stop_and_flush(rx_streamer)
if not test_pass:
raise RuntimeError("run_choke_test(): Test failed.")
def stop_and_flush(rx_streamer):
"""
Utility to stop a streamer and clear the FIFO.
"""
print("Flushing FIFOs...")
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
bufsize = rx_streamer.get_max_num_samps()
recv_buf = np.zeros(
(rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
rx_streamer.issue_stream_cmd(stream_cmd)
metadata = uhd.types.RXMetadata()
stop_time = time.monotonic() + 5.0
while time.monotonic() < stop_time:
rx_streamer.recv(recv_buf, metadata, timeout=0)
if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
break
def parse_args():
"""
Parse args.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--args', default='',
)
parser.add_argument(
'--dump-defaults',
help="Specify a device type, and the default config will be dumped as YAML"
)
parser.add_argument(
'--device-config',
help="Specify path to YAML file to use as device config"
)
return parser.parse_args()
def get_device_config(usrp_type, device_config_path=None):
"""
Return a device configuration object.
"""
if device_config_path:
with open(device_config_path, 'r') as yaml_f:
return yaml.load(yaml_f)
return {}
def dump_defaults(usrp_type):
"""
Print the hard-coded defaults as YAML
"""
defaults = get_device_config(usrp_type)
print(yaml.dump(defaults, default_flow_style=False))
def main():
"""
Returns True on Success
"""
args = parse_args()
if args.dump_defaults:
dump_defaults(args.dump_defaults)
return 0
usrp = uhd.usrp.MultiUSRP(args.args)
usrp_type = usrp.get_usrp_rx_info().get('mboard_id')
device_config = get_device_config(usrp_type, args.device_config)
ret_val = run_recv_stability_test(usrp, device_config)
if ret_val != 1:
raise Exception("Python API Tester Received Errors")
return ret_val
if __name__ == "__main__":
sys.exit(not main())