mirror of
https://github.com/saymrwulf/uhd.git
synced 2026-05-15 21:01:26 +00:00
- ref_clock_(int/ext) test was not changing adf400x driver settings for new ref clock frequency. Therefore, changed the implementation to use uhd_usrp_probe --sensor to set clock_source and get 'ref_locked' sensor value
853 lines
30 KiB
Python
Executable file
853 lines
30 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#
|
|
# Copyright 2018 Ettus Research, a National Instruments Company
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
"""
|
|
E320 Built-In Self Test (BIST)
|
|
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import re
|
|
import socket
|
|
import select
|
|
import time
|
|
import json
|
|
from datetime import datetime
|
|
import argparse
|
|
from six import iteritems
|
|
|
|
# Timeout values are in seconds:
|
|
GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute"
|
|
GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test
|
|
# does not necessarily require GPS lock to pass, we
|
|
# reduce this value in order for the BIST to pass faster
|
|
# by default.
|
|
# Temperature Sensor Mapping based on location
|
|
temp_sensor_map = {
|
|
"thermal_zone0" : "internal",
|
|
"thermal_zone1" : "rf_channelA",
|
|
"thermal_zone2" : "fpga",
|
|
"thermal_zone3" : "rf_channelB",
|
|
"thermal_zone4" : "main_power"
|
|
}
|
|
##############################################################################
|
|
# Aurora/SFP BIST code
|
|
##############################################################################
|
|
def get_sfp_bist_defaults():
|
|
" Default dictionary for SFP/Aurora BIST dry-runs "
|
|
return {
|
|
'elapsed_time': 1.0,
|
|
'max_roundtrip_latency': 0.8e-6,
|
|
'throughput': 1000e6,
|
|
'max_ber': 8.5e-11,
|
|
'errors': 0,
|
|
'bits': 12012486656,
|
|
}
|
|
|
|
def assert_aurora_image(master, slave):
|
|
"""
|
|
Make sure we have an FPGA image with which we can run the requested tests.
|
|
|
|
Will load an AA image if not, which always satisfies all conditions for
|
|
running Aurora tests.
|
|
"""
|
|
from usrp_mpm.sys_utils import uio
|
|
if not uio.find_uio_device(master)[0] or \
|
|
(slave is not None and not uio.find_uio_device(slave)[0]):
|
|
load_fpga_image('AA')
|
|
|
|
def run_aurora_bist(master, slave=None):
|
|
"""
|
|
Spawn a BER test
|
|
"""
|
|
from usrp_mpm import aurora_control
|
|
from usrp_mpm.sys_utils.uio import open_uio
|
|
|
|
class DummyContext(object):
|
|
"""Dummy class for context managers"""
|
|
def __enter__(self):
|
|
return
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
return exc_type is None
|
|
|
|
# Go, go, go!
|
|
try:
|
|
assert_aurora_image(master, slave)
|
|
with open_uio(label=master, read_only=False) as master_au_uio:
|
|
master_au_ctrl = aurora_control.AuroraControl(master_au_uio)
|
|
with open_uio(label=slave, read_only=False)\
|
|
if slave is not None else DummyContext() as slave_au_uio:
|
|
slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\
|
|
if slave is not None else None
|
|
return master_au_ctrl.run_ber_loopback_bist(
|
|
duration=10,
|
|
requested_rate=1300 * 8e6,
|
|
slave=slave_au_ctrl,
|
|
)
|
|
except Exception as ex:
|
|
print("Unexpected exception: {}".format(str(ex)))
|
|
exit(1)
|
|
|
|
|
|
def aurora_results_to_status(bist_results):
|
|
"""
|
|
Convert a dictionary coming from AuroraControl BIST to one that we can use
|
|
for this BIST
|
|
"""
|
|
return bist_results['mst_errors'] == 0, {
|
|
'elapsed_time': bist_results['time_elapsed'],
|
|
'max_roundtrip_latency': bist_results['mst_latency_us'],
|
|
'throughput': bist_results['approx_throughput'],
|
|
'max_ber': bist_results['max_ber'],
|
|
'errors': bist_results['mst_errors'],
|
|
'bits': bist_results['mst_samps'],
|
|
}
|
|
|
|
##############################################################################
|
|
# Helpers
|
|
##############################################################################
|
|
def post_results(results):
|
|
"""
|
|
Given a dictionary, post the results.
|
|
|
|
This will print the results as JSON to stdout.
|
|
"""
|
|
print(json.dumps(
|
|
results,
|
|
sort_keys=True,
|
|
indent=4,
|
|
separators=(',', ': ')
|
|
))
|
|
|
|
def filter_results_for_lv(results):
|
|
"""
|
|
The LabView JSON parser does not support a variety of things, such as
|
|
nested dicts, and some downstream LV applications freak out if certain keys
|
|
are not what they expect.
|
|
This is a long hard-coded list of how results should look like for those
|
|
cases. Note: This list needs manual supervision and attention for the case
|
|
where either subsystems get renamed, or other architectural changes should
|
|
occur.
|
|
"""
|
|
lv_compat_format = {
|
|
'ddr3': {
|
|
'throughput': -1,
|
|
},
|
|
'gpsdo': {
|
|
"class": "",
|
|
"time": "",
|
|
"ept": -1,
|
|
"lat": -1,
|
|
"lon": -1,
|
|
"alt": -1,
|
|
"epx": -1,
|
|
"epy": -1,
|
|
"epv": -1,
|
|
"track": -1,
|
|
"speed": -1,
|
|
"climb": -1,
|
|
"eps": -1,
|
|
"mode": -1,
|
|
},
|
|
'tpm': {
|
|
'tpm0_caps': "",
|
|
},
|
|
'sfp_loopback': {
|
|
'elapsed_time': -1,
|
|
'max_roundtrip_latency': -1,
|
|
'throughput': -1,
|
|
'max_ber': -1,
|
|
'errors': -1,
|
|
'bits': -1,
|
|
},
|
|
'gpio': {
|
|
'write_patterns': [],
|
|
'read_patterns': [],
|
|
},
|
|
'temp': {
|
|
temp_sensor_map['thermal_zone0']: -1,
|
|
temp_sensor_map['thermal_zone1']: -1,
|
|
temp_sensor_map['thermal_zone2']: -1,
|
|
temp_sensor_map['thermal_zone3']: -1,
|
|
temp_sensor_map['thermal_zone4']: -1,
|
|
},
|
|
'fan': {
|
|
'cooling_device0': -1,
|
|
},
|
|
}
|
|
# OK now go and brush up the results:
|
|
def fixup_dict(result_dict, ref_dict):
|
|
"""
|
|
Touches up result_dict according to ref_dict by the following rules:
|
|
- If a key is in result_dict that is not in ref_dict, delete that
|
|
- If a key is in ref_dict that is not in result_dict, use the value
|
|
from ref_dict
|
|
"""
|
|
ref_dict['error_msg'] = ""
|
|
ref_dict['status'] = False
|
|
result_dict = {
|
|
k: v for k, v in iteritems(result_dict)
|
|
if k in ref_dict or k in ('error_msg', 'status')
|
|
}
|
|
result_dict = {
|
|
k: result_dict.get(k, ref_dict[k]) for k in ref_dict
|
|
}
|
|
return result_dict
|
|
results = {
|
|
testname: fixup_dict(testresults, lv_compat_format[testname]) \
|
|
if testname in lv_compat_format else testresults
|
|
for testname, testresults in iteritems(results)
|
|
}
|
|
return results
|
|
|
|
def sock_read_line(my_sock, timeout=60, interval=0.1):
|
|
"""
|
|
Read from a socket until newline. If there was no newline until the timeout
|
|
occurs, raise an error. Otherwise, return the line.
|
|
"""
|
|
line = b''
|
|
end_time = time.time() + timeout
|
|
while time.time() < end_time:
|
|
socket_ready = select.select([my_sock], [], [], 0)[0]
|
|
if socket_ready:
|
|
next_char = my_sock.recv(1)
|
|
if next_char == b'\n':
|
|
return line.decode('ascii')
|
|
line += next_char
|
|
else:
|
|
time.sleep(interval)
|
|
raise RuntimeError("sock_read_line() exceeded read timeout!")
|
|
|
|
def poll_with_timeout(state_check, timeout_ms, interval_ms):
|
|
"""
|
|
Calls state_check() every interval_ms until it returns a positive value, or
|
|
until a timeout is exceeded.
|
|
|
|
Returns True if state_check() returned True within the timeout.
|
|
"""
|
|
max_time = time.time() + (float(timeout_ms) / 1000)
|
|
interval_s = float(interval_ms) / 1000
|
|
while time.time() < max_time:
|
|
if state_check():
|
|
return True
|
|
time.sleep(interval_s)
|
|
return False
|
|
|
|
def expand_options(option_list):
|
|
"""
|
|
Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar',
|
|
'spam': 'eggs'}.
|
|
"""
|
|
return dict(x.split('=') for x in option_list)
|
|
|
|
##############################################################################
|
|
# Bist class
|
|
##############################################################################
|
|
class E320BIST(object):
|
|
"""
|
|
BIST Tool for the USRP E320
|
|
"""
|
|
# This defines special tests that are really collections of other tests.
|
|
collections = {
|
|
'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", "ref_clock_int"],
|
|
'extended': "*",
|
|
}
|
|
# Default FPGA image type
|
|
DEFAULT_FPGA_TYPE = '1G'
|
|
|
|
@staticmethod
|
|
def make_arg_parser():
|
|
"""
|
|
Return arg parser
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description="E320 BIST Tool",
|
|
)
|
|
parser.add_argument(
|
|
'-n', '--dry-run', action='store_true',
|
|
help="Fake out the tests. All tests will return a valid" \
|
|
" response, but will not actually interact with hardware.",
|
|
)
|
|
parser.add_argument(
|
|
'-v', '--verbose', action='store_true',
|
|
help="Crank up verbosity level",
|
|
)
|
|
parser.add_argument(
|
|
'--debug', action='store_true',
|
|
help="For debugging this tool.",
|
|
)
|
|
parser.add_argument(
|
|
'--option', '-o', action='append', default=[],
|
|
help="Option for individual test.",
|
|
)
|
|
parser.add_argument(
|
|
'--lv-compat', action='store_true',
|
|
help="Provides compatibility with the LV JSON parser. Don't run "
|
|
"this mode unless you know what you're doing. The JSON "
|
|
"output does not necessarily reflect the actual system "
|
|
"status when using this mode.",
|
|
)
|
|
parser.add_argument(
|
|
'--skip-fpga-reload', action='store_true',
|
|
help="Skip reloading the default FPGA image post-test. Note: by"
|
|
"specifying this argument, the FPGA image loaded could be "
|
|
"anything post-test.",
|
|
)
|
|
parser.add_argument(
|
|
'tests',
|
|
help="List the tests that should be run",
|
|
nargs='+', # There has to be at least one
|
|
)
|
|
return parser
|
|
|
|
def __init__(self):
|
|
self.args = E320BIST.make_arg_parser().parse_args()
|
|
self.args.option = expand_options(self.args.option)
|
|
# If this is true, trigger a reload of the default FPGA image
|
|
self.reload_fpga_image = False
|
|
try:
|
|
from usrp_mpm.periph_manager.e320 import e320
|
|
default_rev = e320.mboard_max_rev
|
|
except ImportError:
|
|
# This means we're in dry run mode or something like that, so just
|
|
# pick something
|
|
default_rev = 3
|
|
self.mb_rev = int(self.args.option.get('mb_rev', default_rev))
|
|
self.tests_to_run = set()
|
|
for test in self.args.tests:
|
|
if test in self.collections:
|
|
for test in self.expand_collection(test):
|
|
self.tests_to_run.add(test)
|
|
else:
|
|
self.tests_to_run.add(test)
|
|
try:
|
|
# Keep this import here so we can do dry-runs without any MPM code
|
|
from usrp_mpm import get_main_logger
|
|
if not self.args.verbose:
|
|
from usrp_mpm.mpmlog import WARNING
|
|
get_main_logger().setLevel(WARNING)
|
|
self.log = get_main_logger().getChild('main')
|
|
except ImportError:
|
|
print("No logging capability available.")
|
|
|
|
def expand_collection(self, coll):
|
|
"""
|
|
Return names of tests in a collection
|
|
"""
|
|
tests = self.collections[coll]
|
|
if tests == "*":
|
|
tests = {x.replace('bist_', '')
|
|
for x in dir(self)
|
|
if x.find('bist_') == 0
|
|
}
|
|
else:
|
|
tests = set(tests)
|
|
return tests
|
|
|
|
def run(self):
|
|
"""
|
|
Execute tests.
|
|
|
|
Returns True on Success.
|
|
"""
|
|
def execute_test(testname):
|
|
"""
|
|
Actually run a test.
|
|
"""
|
|
testmethod_name = "bist_{0}".format(testname)
|
|
sys.stderr.write(
|
|
"Executing test method: {0}\n\n".format(testmethod_name)
|
|
)
|
|
try:
|
|
status, data = getattr(self, testmethod_name)()
|
|
data['status'] = status
|
|
data['error_msg'] = data.get('error_msg', '')
|
|
return status, data
|
|
except AttributeError:
|
|
sys.stderr.write("Test not defined: {}\n".format(testname))
|
|
return False, {}
|
|
except Exception as ex:
|
|
sys.stderr.write(
|
|
"Test {} failed to execute: {}\n".format(testname, str(ex))
|
|
)
|
|
if self.args.debug:
|
|
raise
|
|
return False, {'error_msg': str(ex)}
|
|
tests_successful = True
|
|
result = {}
|
|
for test in self.tests_to_run:
|
|
status, result_data = execute_test(test)
|
|
tests_successful = tests_successful and status
|
|
result[test] = result_data
|
|
if self.args.lv_compat:
|
|
result = filter_results_for_lv(result)
|
|
post_results(result)
|
|
if self.reload_fpga_image and not self.args.skip_fpga_reload:
|
|
load_fpga_image(self.DEFAULT_FPGA_TYPE)
|
|
return tests_successful
|
|
|
|
#############################################################################
|
|
# BISTS
|
|
# All bist_* methods must return True/False success values!
|
|
#############################################################################
|
|
def bist_rtc(self):
|
|
"""
|
|
BIST for RTC (real time clock)
|
|
|
|
Return dictionary:
|
|
- date: Returns the current UTC time, with seconds-accuracy, in ISO 8601
|
|
format, as a string. As if running 'date -Iseconds -u'.
|
|
- time: Same time, but in seconds since epoch.
|
|
|
|
Return status:
|
|
Unless datetime throws an exception, returns True.
|
|
"""
|
|
assert 'rtc' in self.tests_to_run
|
|
utc_now = datetime.utcnow()
|
|
return True, {
|
|
'time': time.mktime(utc_now.timetuple()),
|
|
'date': utc_now.replace(microsecond=0).isoformat() + "+00:00",
|
|
}
|
|
|
|
def bist_gyro(self):
|
|
"""
|
|
BIST for GYRO (MPU9250)
|
|
|
|
Return dictionary:
|
|
|
|
Return status: True if MPU9250 is detected.
|
|
"""
|
|
assert 'gyro' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'device_name': 'dry_run mpu9250'}
|
|
import pyudev
|
|
context = pyudev.Context()
|
|
result = {
|
|
'device_name': device.get('OF_NAME')
|
|
for device in context.list_devices(subsystem='iio', DEVTYPE='iio_device')
|
|
if 'mpu9250' in device.get('OF_NAME') is not None
|
|
}
|
|
if len(result) < 1:
|
|
result['error_msg'] = "No GYRO detected!"
|
|
return 'error_msg' not in result, result
|
|
|
|
|
|
def bist_ddr3(self):
|
|
"""
|
|
BIST for PL DDR3 DRAM
|
|
Description: Calls a test to examine the speed of the DDR3. To be
|
|
precise, it fires up a UHD session, which runs a DDR3 BiST internally.
|
|
If that works, it'll return estimated throughput that was gathered
|
|
during the DDR3 BiST.
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
- throughput: The estimated throughput in bytes/s
|
|
|
|
Return status:
|
|
True if the DDR3 bist passed
|
|
"""
|
|
assert 'ddr3' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'throughput': 1250e6}
|
|
result = {}
|
|
ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1'
|
|
try:
|
|
output = subprocess.check_output(
|
|
ddr3_bist_executor,
|
|
stderr=subprocess.STDOUT,
|
|
shell=True,
|
|
)
|
|
except subprocess.CalledProcessError as ex:
|
|
# Don't throw errors from uhd_usrp_probe
|
|
output = ex.output
|
|
output = output.decode("utf-8")
|
|
mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output)
|
|
if mobj is not None:
|
|
result['throughput'] = float(mobj.group('thrup')) * 1000
|
|
else:
|
|
result['throughput'] = 0
|
|
result['error_msg'] = result.get('error_msg', '') + \
|
|
"\n\nFailed match throughput regex!"
|
|
return result.get('throughput', 0) > 1000e3, result
|
|
|
|
def bist_gpsdo(self):
|
|
"""
|
|
BIST for GPSDO
|
|
Description: Returns GPS information
|
|
External Equipment: None; Recommend attaching an antenna or providing
|
|
fake GPS information
|
|
|
|
Return dictionary: A TPV dictionary as returned by gpsd.
|
|
See also: http://www.catb.org/gpsd/gpsd_json.html
|
|
|
|
Check for mode 2 or 3 to see if it's locked.
|
|
"""
|
|
assert 'gpsdo' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {
|
|
"class": "TPV",
|
|
"time": "2014-30T11:48:20.10Z",
|
|
"ept": 0.005,
|
|
"lat": 30.407899,
|
|
"lon": -97.726634,
|
|
"alt": 1327.689,
|
|
"epx": 15.319,
|
|
"epy": 17.054,
|
|
"epv": 124.484,
|
|
"track": 10.3797,
|
|
"speed": 0.091,
|
|
"climb": -0.085,
|
|
"eps": 34.11,
|
|
"mode": 3
|
|
}
|
|
from usrp_mpm.periph_manager import e320
|
|
# Turn on GPS, give some time to acclimatize
|
|
mb_regs = e320.MboardRegsControl(e320.e320.mboard_regs_label, self.log)
|
|
mb_regs.enable_gps(True)
|
|
time.sleep(5)
|
|
gps_warmup_timeout = float(
|
|
self.args.option.get('gps_warmup_timeout', GPS_WARMUP_TIMEOUT))
|
|
gps_lockok_timeout = float(
|
|
self.args.option.get('gps_lockok_timeout', GPS_LOCKOK_TIMEOUT))
|
|
# Wait for WARMUP to go low
|
|
sys.stderr.write(
|
|
"Waiting for WARMUP to go low for up to {} seconds...\n".format(
|
|
gps_warmup_timeout))
|
|
if not poll_with_timeout(
|
|
lambda: not bool((mb_regs.get_gps_status() >> 4) & 0x1),
|
|
gps_warmup_timeout*1000, 1000
|
|
):
|
|
raise RuntimeError(
|
|
"GPS-WARMUP did not go low within {} seconds!".format(
|
|
gps_warmup_timeout))
|
|
sys.stderr.write("Chip is warmed up.\n")
|
|
# Wait for LOCKOK. Data sheet says wait up to 15 minutes for GPS lock.
|
|
sys.stderr.write(
|
|
"Waiting for LOCKOK to go high for up to {} seconds...\n".format(
|
|
gps_lockok_timeout))
|
|
if not poll_with_timeout(
|
|
mb_regs.get_gps_locked_val,
|
|
gps_lockok_timeout*1000,
|
|
1000
|
|
):
|
|
sys.stderr.write("No GPS-LOCKOK!\n")
|
|
gps_status = mb_regs.get_gps_status()
|
|
sys.stderr.write("GPS-SURVEY status: {}\n".format(
|
|
(gps_status >> 3) & 0x1
|
|
))
|
|
sys.stderr.write("GPS-PHASELOCK status: {}\n".format(
|
|
(gps_status >> 2) & 0x1
|
|
))
|
|
sys.stderr.write("GPS-ALARM status: {}\n".format(
|
|
(gps_status >> 1) & 0x1
|
|
))
|
|
# Now read back response from chip
|
|
my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
my_sock.connect(('localhost', 2947))
|
|
sys.stderr.write("Connected to GPSDO socket.\n")
|
|
query_cmd = b'?WATCH={"enable":true,"json":true}'
|
|
my_sock.sendall(query_cmd)
|
|
sys.stderr.write("Sent query: {}\n".format(query_cmd))
|
|
sock_read_line(my_sock, timeout=10)
|
|
sys.stderr.write("Received initial newline.\n")
|
|
result = {}
|
|
while result.get('class', None) != 'TPV':
|
|
json_result = sock_read_line(my_sock, timeout=60)
|
|
sys.stderr.write(
|
|
"Received JSON response: {}\n\n".format(json_result)
|
|
)
|
|
result = json.loads(json_result)
|
|
my_sock.sendall(b'?WATCH={"enable":false}')
|
|
my_sock.close()
|
|
# If we reach this line, we have a valid result and the chip responded.
|
|
# However, it doesn't necessarily mean we had a GPS lock.
|
|
return True, result
|
|
|
|
def bist_tpm(self):
|
|
"""
|
|
BIST for TPM (Trusted Platform Module)
|
|
|
|
This reads the caps value for all detected TPM devices.
|
|
|
|
Return dictionary:
|
|
- tpm<N>_caps: TPM manufacturer and version info. Is a multi-line
|
|
string.
|
|
|
|
Return status: True if exactly one TPM device is detected.
|
|
"""
|
|
assert 'tpm' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {
|
|
'tpm0_caps': "Fake caps value\n\nVersion 0.0.0",
|
|
}
|
|
result = {}
|
|
props_to_read = ('caps',)
|
|
base_path = '/sys/class/tpm'
|
|
for tpm_device in os.listdir(base_path):
|
|
if tpm_device.startswith('tpm'):
|
|
for key in props_to_read:
|
|
result['{}_{}'.format(tpm_device, key)] = open(
|
|
os.path.join(base_path, tpm_device, key), 'r'
|
|
).read().strip()
|
|
return len(result) == 1, result
|
|
|
|
def ref_clock_helper(self,clock_source):
|
|
"""
|
|
Helper function to determine reference clock lock
|
|
Description: Checks to see if we can lock to a clock source.
|
|
|
|
External Equipment: None
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
"""
|
|
assert clock_source in ("internal", "external"),\
|
|
"Invalid clock source selected ({}). Valid choices: {}".format(
|
|
clock_source, ("internal", "external"))
|
|
if self.args.dry_run:
|
|
return True, {'ref_locked': True}
|
|
result = {}
|
|
cmd = ['uhd_usrp_probe', '--args', 'addr=127.0.0.1,clock_source=' + clock_source,
|
|
'--sensor']
|
|
sensor_path = '/mboards/0/sensors/ref_locked'
|
|
cmd.append(sensor_path)
|
|
ref_lock_executor = ' '.join(cmd)
|
|
print(ref_lock_executor)
|
|
try:
|
|
output = subprocess.check_output(
|
|
ref_lock_executor,
|
|
stderr=subprocess.PIPE,
|
|
shell=True,
|
|
)
|
|
except subprocess.CalledProcessError as ex:
|
|
# Don't throw errors from uhd_usrp_probe
|
|
output = ex.output
|
|
output = output.decode("utf-8")
|
|
mobj = re.search(r"true$", output.strip())
|
|
if mobj is not None:
|
|
result['ref_locked'] = True
|
|
else:
|
|
result['ref_locked'] = False
|
|
result['error_msg'] = "Reference Clock not locked"
|
|
return result
|
|
|
|
def bist_ref_clock_int(self):
|
|
"""
|
|
BIST for clock lock from internal (20MHz).
|
|
Description: Checks to see if we can lock to an internal
|
|
clock source.
|
|
|
|
External Equipment: None
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
|
|
There can be multiple ref lock sensors; for a pass condition they all
|
|
need to be asserted.
|
|
"""
|
|
assert 'ref_clock_int' in self.tests_to_run
|
|
result = self.ref_clock_helper('internal')
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_ref_clock_ext(self):
|
|
"""
|
|
BIST for clock lock from external (10MHz) source.
|
|
Description: Checks to see if we can lock to an external
|
|
clock source.
|
|
|
|
External Equipment: External 10 MHz reference clock needed (from Octoclock)
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
|
|
There can be multiple ref lock sensors; for a pass condition they all
|
|
need to be asserted.
|
|
"""
|
|
assert 'ref_clock_ext' in self.tests_to_run
|
|
result = self.ref_clock_helper('external')
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_sfp_loopback(self):
|
|
"""
|
|
BIST for SFP+ ports:
|
|
Description: Uses one SFP+ port in loopback mode.
|
|
|
|
External Equipment: Loopback module in SFP required
|
|
required.
|
|
|
|
Return dictionary:
|
|
- elapsed_time: Float value, test time in seconds
|
|
- max_roundtrip_latency: Float value, max roundtrip latency in seconds
|
|
- throughput: Approximate data throughput in bytes/s
|
|
- max_ber: Estimated maximum BER, float value.
|
|
- errors: Number of errors
|
|
- bits: Number of bits that were transferred
|
|
"""
|
|
if self.args.dry_run:
|
|
return True, get_sfp_bist_defaults()
|
|
sfp_bist_results = run_aurora_bist(master='misc-auro-regs')
|
|
self.reload_fpga_image = True
|
|
return aurora_results_to_status(sfp_bist_results)
|
|
|
|
def bist_gpio(self):
|
|
"""
|
|
BIST for GPIO
|
|
Description: Writes and reads the values to the GPIO
|
|
|
|
Needed Equipment: External loopback as follows
|
|
GPIO
|
|
0<->4
|
|
1<->5
|
|
2<->6
|
|
3<->7
|
|
|
|
Return dictionary:
|
|
- write_patterns: A list of patterns that were written
|
|
- read_patterns: A list of patterns that were read back
|
|
"""
|
|
assert 'gpio' in self.tests_to_run
|
|
GPIO_WIDTH = 8
|
|
patterns = range(16)
|
|
if self.args.dry_run:
|
|
return True, {
|
|
'write_patterns': list(patterns),
|
|
'read_patterns': list(patterns),
|
|
}
|
|
from usrp_mpm.periph_manager import e320, e320_periphs
|
|
mb_regs = e320_periphs.MboardRegsControl(e320.e320.mboard_regs_label, self.log)
|
|
mb_regs.enable_fp_gpio(True)
|
|
mb_regs.set_fp_gpio_master(0xFF)
|
|
# Allow some time for the front-panel GPIOs to become usable
|
|
time.sleep(1)
|
|
ddr1 = 0xf0
|
|
ddr2 = 0x0f
|
|
def _run_gpio(ddr, patterns):
|
|
" Run a GPIO test for a given set of patterns "
|
|
gpio_ctrl = e320_periphs.FrontpanelGPIO(ddr)
|
|
for pattern in patterns:
|
|
gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr)
|
|
time.sleep(0.1)
|
|
gpio_rb = gpio_ctrl.get_all()
|
|
if pattern != gpio_rb:
|
|
return False, {'write_patterns': [pattern],
|
|
'read_patterns': [gpio_rb]}
|
|
return True, {'write_patterns': list(patterns),
|
|
'read_patterns': list(patterns)}
|
|
status, data = _run_gpio(ddr1, patterns)
|
|
if not status:
|
|
return status, data
|
|
status, data = _run_gpio(ddr2, patterns)
|
|
return status, data
|
|
|
|
def bist_temp(self):
|
|
"""
|
|
BIST for temperature sensors
|
|
Description: Reads the temperature sensors on the motherboards and
|
|
returns their values in mC
|
|
|
|
Return dictionary:
|
|
- <thermal-zone-name>: temp in mC
|
|
"""
|
|
assert 'temp' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'internal': 30000}
|
|
import pyudev
|
|
context = pyudev.Context()
|
|
result = {
|
|
temp_sensor_map[device.sys_name]: \
|
|
int(device.attributes.get('temp').decode('ascii'))
|
|
for device in context.list_devices(subsystem='thermal')
|
|
|
|
if 'temp' in device.attributes.available_attributes \
|
|
and device.attributes.get('temp') is not None
|
|
}
|
|
if len(result) < 1:
|
|
result['error_msg'] = "No temperature sensors found!"
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_fan(self):
|
|
"""
|
|
BIST for fan
|
|
Description: Reads the RPM values of the fans on the motherboard
|
|
|
|
External Equipment: Fan with connector
|
|
|
|
Return dictionary:
|
|
- <fan-name>: Fan speed in RPM
|
|
"""
|
|
assert 'fan' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'cooling_device0': 10000}
|
|
import pyudev
|
|
context = pyudev.Context()
|
|
result = {
|
|
device.sys_name: int(device.attributes.get('cur_state'))
|
|
for device in context.list_devices(subsystem='thermal')
|
|
if 'cur_state' in device.attributes.available_attributes \
|
|
and device.attributes.get('cur_state') is not None
|
|
}
|
|
return len(result) == 1, result
|
|
|
|
|
|
def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask):
|
|
"""Helper function for set gpio.
|
|
What this function do is take decimal value and convert to a binary string
|
|
then try to set those individual bits to the gpio_bank.
|
|
Arguments:
|
|
gpio_bank -- gpio bank type.
|
|
value -- value to set onto gpio bank.
|
|
gpio_size -- size of the gpio bank
|
|
ddr_mask -- data direction register bit mask. 0 is input; 1 is output.
|
|
"""
|
|
ddr_size = bin(ddr_mask).count("1")
|
|
value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):]
|
|
ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):]
|
|
for i in range(gpio_size):
|
|
if ddr_bitstring[gpio_size - 1 - i] == "1":
|
|
gpio_bank.set(i, value_bitstring[i % ddr_size])
|
|
|
|
def get_product_id():
|
|
"""Return the mboard product ID (e320):"""
|
|
cmd = ['eeprom-id']
|
|
output = subprocess.check_output(
|
|
cmd,
|
|
stderr=subprocess.STDOUT,
|
|
shell=True,
|
|
).decode('utf-8')
|
|
if 'e320' in output:
|
|
return 'e320'
|
|
raise AssertionError("Cannot determine product ID.")
|
|
|
|
def load_fpga_image(fpga_type):
|
|
"""Load an FPGA image (1G, XG, AA, ...)"""
|
|
cmd = ['uhd_image_loader', '--args', 'type=e3xx,addr=127.0.0.1', '--fpga-path']
|
|
images_folder = '/usr/share/uhd/images/'
|
|
fpga_file_name = \
|
|
'usrp_' + get_product_id() + '_fpga_' + fpga_type.upper() + '.bit'
|
|
fpga_image = images_folder + fpga_file_name
|
|
cmd.append(fpga_image)
|
|
cmd_str = ' '.join(cmd)
|
|
subprocess.check_output(
|
|
cmd_str,
|
|
stderr=subprocess.STDOUT,
|
|
shell=True
|
|
)
|
|
|
|
##############################################################################
|
|
# main
|
|
##############################################################################
|
|
def main():
|
|
" Go, go, go! "
|
|
return E320BIST().run()
|
|
|
|
if __name__ == '__main__':
|
|
exit(not main())
|