mirror of
https://github.com/saymrwulf/uhd.git
synced 2026-05-14 20:58:09 +00:00
1069 lines
38 KiB
Python
1069 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Copyright 2020 Ettus Research, a National Instruments Brand
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
"""
|
|
X4XX Built-In Self Test (BIST)
|
|
|
|
Will work on all derivatives of the X4xx series.
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
from usrp_mpm import bist
|
|
|
|
# Timeout values are in seconds:
|
|
GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute"
|
|
GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test
|
|
# does not necessarily require GPS lock to pass, we
|
|
# reduce this value in order for the BIST to pass faster
|
|
# by default.
|
|
DEFAULT_DB_ID = 0
|
|
|
|
# We will import stuff as late as possible, intentionally, so let's calm down
|
|
# PyLint
|
|
# pylint: disable=import-outside-toplevel
|
|
|
|
|
|
def db_bist(test_fn):
|
|
def inner(self):
|
|
db_ids = [0, 1]
|
|
|
|
db_id = self.args.option.get('db_id')
|
|
if db_id is not None:
|
|
db_id = int(db_id)
|
|
assert db_id in [0, 1]
|
|
db_ids = [db_id]
|
|
|
|
test_status = True
|
|
test_result = {}
|
|
for db_id in db_ids:
|
|
status, result = test_fn(self, db_id)
|
|
test_status &= status
|
|
test_result[f"db{db_id}"] = result
|
|
if 'error_msg' in result:
|
|
msg = f"db{db_id}: " + result['error_msg'] + "\n"
|
|
test_result['error_msg'] = test_result.get('error_msg', '') + msg
|
|
return test_status, test_result
|
|
return inner
|
|
|
|
|
|
##############################################################################
|
|
# Bist class
|
|
##############################################################################
|
|
class X4XXBIST(bist.UsrpBIST):
|
|
"""
|
|
BIST Tool for the USRP X4xx series
|
|
"""
|
|
usrp_type = "X4XX"
|
|
# This defines special tests that are really collections of other tests.
|
|
collections = {
|
|
'standard': ["gpsdo", "rtc", "temp", "fan"],
|
|
'extended': "*",
|
|
}
|
|
# Default FPGA image type
|
|
DEFAULT_FPGA_TYPE = 'X4_200'
|
|
lv_compat_format = {
|
|
'dram': {
|
|
'throughput': -1,
|
|
},
|
|
'gpsdo': {
|
|
"class": "",
|
|
"time": "",
|
|
"ept": -1,
|
|
"lat": -1,
|
|
"lon": -1,
|
|
"alt": -1,
|
|
"epx": -1,
|
|
"epy": -1,
|
|
"epv": -1,
|
|
"track": -1,
|
|
"speed": -1,
|
|
"climb": -1,
|
|
"eps": -1,
|
|
"mode": -1,
|
|
},
|
|
'gpio': {
|
|
'write_patterns': [],
|
|
'read_patterns': [],
|
|
},
|
|
'temp': {
|
|
"DRAM PCB": 40000,
|
|
"EC Internal": 41000,
|
|
"PMBUS-0": 42000,
|
|
"PMBUS-1": 43000,
|
|
"Power Supply PCB": 44000,
|
|
"RFSoC": 45000,
|
|
"Sample Clock PCB": 46000,
|
|
"TMP464 Internal": 47000,
|
|
},
|
|
'fan': {
|
|
'fan0': -1,
|
|
'fan1': -1,
|
|
},
|
|
}
|
|
device_args = "type=x4xx,mgmt_addr=127.0.0.1"
|
|
|
|
def __init__(self):
|
|
bist.UsrpBIST.__init__(self)
|
|
self.product_id = bist.get_product_id_from_eeprom(
|
|
valid_ids=['x410',],
|
|
cmd='eeprom-id mb')
|
|
|
|
def get_mb_periph_mgr(self):
|
|
"""Return reference to an x4xx periph manager"""
|
|
from usrp_mpm.periph_manager.x4xx import x4xx
|
|
return x4xx
|
|
|
|
def get_product_id(self):
|
|
"""Return the mboard product ID:"""
|
|
return self.product_id
|
|
|
|
def reload_fpga(self, fpga_type, sfp0_addrs):
|
|
"""
|
|
Loads the specified fpga and checks for sfp0 address, if the sfp0 address
|
|
existed, restores the original sfp0 address
|
|
"""
|
|
from pyroute2 import IPRoute
|
|
import errno
|
|
bist.load_fpga_image(
|
|
fpga_type,
|
|
self.device_args,
|
|
self.product_id,
|
|
)
|
|
if sfp0_addrs:
|
|
ipr = IPRoute()
|
|
dev = ipr.link_lookup(ifname='sfp0')[0]
|
|
ipr.link('set', index=dev, state='down')
|
|
try:
|
|
ipr.addr('add', index=dev, address=sfp0_addrs[0], mask=24, label='sfp0')
|
|
except Exception as ex:
|
|
# If the addr already exists then ignore the error
|
|
if ex.code == errno.EEXIST:
|
|
pass
|
|
ipr.link('set', index=dev, state='up')
|
|
|
|
def ensure_oss_image(self):
|
|
"""
|
|
If a LV image is present, load the default OSS image. If we already have
|
|
an OSS image, do nothing.
|
|
"""
|
|
from usrp_mpm.periph_manager import x4xx, x4xx_periphs
|
|
mboard_regs_control = x4xx_periphs.MboardRegsControl(
|
|
x4xx.x4xx.mboard_regs_label, self.log)
|
|
fpga_str = mboard_regs_control.get_fpga_type()
|
|
if not fpga_str or fpga_str == 'LV':
|
|
bist.load_fpga_image(
|
|
self.DEFAULT_FPGA_TYPE,
|
|
self.device_args,
|
|
self.product_id,
|
|
)
|
|
|
|
def get_valid_mcrs(self):
|
|
"""
|
|
Return all valid MCRs for the currently loaded bitfile.
|
|
"""
|
|
from usrp_mpm.periph_manager import x4xx_rfdc_regs
|
|
from usrp_mpm.periph_manager import x4xx_rfdc_ctrl
|
|
rfdc_regs_control = x4xx_rfdc_regs.RfdcRegsControl(
|
|
x4xx_rfdc_ctrl.X4xxRfdcCtrl.rfdc_regs_label, self.log)
|
|
bandwidth = rfdc_regs_control.get_rfdc_info(0)['bw']
|
|
return {
|
|
100: [122.88e6, 125e6],
|
|
200: [122.88e6*2, 125e6*2],
|
|
400: [122.88e6*4, 125e6*4],
|
|
}[bandwidth]
|
|
|
|
def get_default_mcr(self):
|
|
"""
|
|
Return a valid MCR for the currently loaded bitfile.
|
|
"""
|
|
return min(self.get_valid_mcrs())
|
|
|
|
#############################################################################
|
|
# BISTS
|
|
# All bist_* methods must return True/False success values!
|
|
#############################################################################
|
|
def bist_dram(self):
|
|
"""
|
|
BIST for PL DDR4 DRAM
|
|
Description: Calls a test to examine the speed of the DRAM. To be
|
|
precise, it fires up a UHD session, which runs a DRAM BiST internally.
|
|
If that works, it'll return estimated throughput that was gathered
|
|
during the DRAM BiST.
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
- throughput: The estimated throughput in bytes/s
|
|
|
|
Return status:
|
|
True if the DRAM bist passed
|
|
"""
|
|
assert 'dram' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'throughput': 1700e6}
|
|
result = bist.test_ddr3_with_usrp_probe()
|
|
return result.get('throughput', 0) > 1500e6, result
|
|
|
|
def bist_gpsdo(self):
|
|
"""
|
|
BIST for GPSDO
|
|
Description: Returns GPS information
|
|
External Equipment: None; Recommend attaching an antenna or providing
|
|
fake GPS information
|
|
|
|
Return dictionary: A TPV dictionary as returned by gpsd.
|
|
See also: http://www.catb.org/gpsd/gpsd_json.html
|
|
|
|
Check for mode 2 or 3 to see if it's locked.
|
|
"""
|
|
assert 'gpsdo' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {
|
|
"class": "TPV",
|
|
"time": "2017-04-30T11:48:20.10Z",
|
|
"ept": 0.005,
|
|
"lat": 30.407899,
|
|
"lon": -97.726634,
|
|
"alt": 1327.689,
|
|
"epx": 15.319,
|
|
"epy": 17.054,
|
|
"epv": 124.484,
|
|
"track": 10.3797,
|
|
"speed": 0.091,
|
|
"climb": -0.085,
|
|
"eps": 34.11,
|
|
"mode": 3
|
|
}
|
|
from usrp_mpm.periph_manager import x4xx
|
|
# Turn on GPS, give some time to acclimatize
|
|
clk_aux_brd = x4xx.ClockingAuxBrdControl(default_source="gpsdo")
|
|
time.sleep(5)
|
|
gps_warmup_timeout = float(
|
|
self.args.option.get('gps_warmup_timeout', GPS_WARMUP_TIMEOUT))
|
|
gps_lockok_timeout = float(
|
|
self.args.option.get('gps_lockok_timeout', GPS_LOCKOK_TIMEOUT))
|
|
# Wait for WARMUP to go low
|
|
sys.stderr.write(
|
|
"Waiting for WARMUP to go low for up to {} seconds...\n".format(
|
|
gps_warmup_timeout))
|
|
if not bist.poll_with_timeout(
|
|
lambda: not clk_aux_brd.get_gps_warmup(),
|
|
gps_warmup_timeout*1000, 1000
|
|
):
|
|
raise RuntimeError(
|
|
"GPS-WARMUP did not go low within {} seconds!".format(
|
|
gps_warmup_timeout))
|
|
sys.stderr.write("Chip is warmed up.\n")
|
|
# Wait for LOCKOK. Data sheet says wait up to 15 minutes for GPS lock.
|
|
sys.stderr.write(
|
|
"Waiting for LOCKOK to go high for up to {} seconds...\n".format(
|
|
gps_lockok_timeout))
|
|
if not bist.poll_with_timeout(
|
|
clk_aux_brd.get_gps_lock,
|
|
gps_lockok_timeout*1000,
|
|
1000
|
|
):
|
|
sys.stderr.write("No GPS-LOCKOK!\n")
|
|
sys.stderr.write("GPS-SURVEY status: {}\n".format(
|
|
clk_aux_brd.get_gps_survey()
|
|
))
|
|
sys.stderr.write("GPS-PHASELOCK status: {}\n".format(
|
|
clk_aux_brd.get_gps_phase_lock()
|
|
))
|
|
sys.stderr.write("GPS-ALARM status: {}\n".format(
|
|
clk_aux_brd.get_gps_alarm()
|
|
))
|
|
# Now the chip is on, read back the TPV result
|
|
result = bist.get_gpsd_tpv_result()
|
|
# If we reach this line, we have a valid result and the chip responded.
|
|
# However, it doesn't necessarily mean we had a GPS lock.
|
|
return True, result
|
|
|
|
def bist_ref_clock_mboard(self):
|
|
"""
|
|
BIST for clock lock from mboard clock source (local to the motherboard)
|
|
|
|
Description: Checks to see if the motherboard can lock to its internal
|
|
clock source.
|
|
|
|
External Equipment: None
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
|
|
There can be multiple ref lock sensors; for a pass condition they all
|
|
need to be asserted.
|
|
"""
|
|
assert 'ref_clock_mboard' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'ref_locked': True}
|
|
mcrs_to_test = self.get_valid_mcrs()
|
|
for master_clock_rate in mcrs_to_test:
|
|
sys.stderr.write(
|
|
f"Testing master_clock_rate {master_clock_rate/1e6} Msps")
|
|
result = bist.get_ref_clock_prop(
|
|
'mboard',
|
|
'internal',
|
|
extra_args={
|
|
'addr': '169.254.0.2',
|
|
'mgmt_addr': '127.0.0.1',
|
|
'master_clock_rate': master_clock_rate,
|
|
}
|
|
)
|
|
if 'error_msg' in result:
|
|
return False, result
|
|
return True, result
|
|
|
|
def bist_ref_clock_ext(self):
|
|
"""
|
|
BIST for clock lock from external source.
|
|
|
|
Description: Checks to see if the motherboard can lock to the external
|
|
reference clock.
|
|
|
|
External Equipment: 10 MHz reference source connected to "ref in".
|
|
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
|
|
There can be multiple ref lock sensors; for a pass condition they all
|
|
need to be asserted.
|
|
"""
|
|
assert 'ref_clock_ext' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'ref_locked': True}
|
|
result = bist.get_ref_clock_prop(
|
|
'external',
|
|
'external',
|
|
extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'}
|
|
)
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_ref_clock_gpsdo(self):
|
|
"""
|
|
BIST for clock lock from gpsdo source.
|
|
|
|
Description: Checks to see if the motherboard can lock to the gpsdo
|
|
reference clock.
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
|
|
There can be multiple ref lock sensors; for a pass condition they all
|
|
need to be asserted.
|
|
"""
|
|
assert 'ref_clock_gpsdo' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'ref_locked': True}
|
|
result = bist.get_ref_clock_prop(
|
|
'gpsdo',
|
|
'gpsdo',
|
|
extra_args={}
|
|
)
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_ref_clock_int(self):
|
|
"""
|
|
BIST for clock lock from internal source.
|
|
|
|
Description: Checks to see if the motherboard can lock to the
|
|
clocking aux board's internal reference clock.
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
|
|
There can be multiple ref lock sensors; for a pass condition they all
|
|
need to be asserted.
|
|
"""
|
|
assert 'ref_clock_int' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'ref_locked': True}
|
|
result = bist.get_ref_clock_prop(
|
|
'internal',
|
|
'internal',
|
|
extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'}
|
|
)
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_ref_clock_nsync(self):
|
|
"""
|
|
BIST for clock lock from nsync source.
|
|
|
|
Description: Checks to see if the motherboard can lock to the nsync
|
|
reference clock.
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
- <sensor-name>:
|
|
- locked: Boolean lock status
|
|
|
|
There can be multiple ref lock sensors; for a pass condition they all
|
|
need to be asserted.
|
|
"""
|
|
assert 'ref_clock_nsync' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'ref_locked': True}
|
|
result = bist.get_ref_clock_prop(
|
|
'nsync',
|
|
'internal',
|
|
extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'}
|
|
)
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_nsync_fabric(self):
|
|
"""
|
|
BIST for testing the fabric_clk signal from the motherboard
|
|
|
|
Description: Checks to see if the LMK will detect the experimental fabric_clk
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
- fabric_clk successfully enabled and validated: signal has been validated
|
|
- fabric_clk successfully disabled: signal has been disabled
|
|
"""
|
|
assert 'nsync_fabric' in self.tests_to_run
|
|
import uhd
|
|
from uhd.usrp import multi_usrp
|
|
|
|
try:
|
|
self.ensure_oss_image()
|
|
mcr = self.get_default_mcr()
|
|
usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
|
|
"master_clock_rate={}".format(str(mcr)))
|
|
except Exception as ex:
|
|
return False, {
|
|
'error_msg': "Failed to create usrp device: {}".format(str(ex))
|
|
}
|
|
|
|
mpm_c = usrp_dev.get_mpm_client()
|
|
|
|
# Make sure fabric_clk is turned off
|
|
mpm_c.enable_ecpri_clocks(False)
|
|
# Check validation bit - reg 0x19B second bit as per datasheet
|
|
pri_ref_disabled_condition = not (int(mpm_c.peek_clkaux(0x19B),16) & 0x4)
|
|
# Turn on fabric_clk
|
|
mpm_c.nsync_change_input_source('fabric_clk')
|
|
# Give it some time
|
|
time.sleep(0.5)
|
|
# Status 1 checks that the lmk is configured to use the priref signal
|
|
# True = secref, False = priref
|
|
using_pri_ref = not mpm_c.clkaux_get_nsync_status1()
|
|
# Check validation bit - reg 0x19B second bit as per datasheet
|
|
pri_ref_enabled = (int(mpm_c.peek_clkaux(0x19B),16) & 0x4) == 0x4
|
|
# Turn off clock
|
|
mpm_c.enable_ecpri_clocks(False)
|
|
# Check validation bit - reg 0x19B second bit as per datasheet
|
|
pri_ref_disabled = not (int(mpm_c.peek_clkaux(0x19B),16) & 0x4)
|
|
|
|
result = pri_ref_disabled and using_pri_ref and pri_ref_enabled and pri_ref_disabled_condition
|
|
|
|
return result, {"fabric_clock_successfully_enabled": pri_ref_enabled,
|
|
"fabric_clock_successfully_disabled": pri_ref_disabled}
|
|
|
|
def bist_nsync_gty(self):
|
|
"""
|
|
BIST for testing the gty_rcv_clk signal from the motherboard
|
|
|
|
Description: Checks to see if the LMK on the clocking auxiliary board
|
|
can lock to the gty_rcv clock signal output by the motherboard. We check
|
|
this by verifying that the pri_ref signal is being used, the dpll is locked,
|
|
the apll1 is locked, and apll2 is unlocked.
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
- gty_rcv_clk pll lock: Did the clkaux lmk lock to the gty_rcv_clk signal
|
|
"""
|
|
assert 'nsync_gty' in self.tests_to_run
|
|
import uhd
|
|
from uhd.usrp import multi_usrp
|
|
|
|
try:
|
|
self.ensure_oss_image()
|
|
mcr = self.get_default_mcr()
|
|
usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
|
|
"master_clock_rate={}".format(str(mcr)))
|
|
except Exception as ex:
|
|
return False, {
|
|
'error_msg': "Failed to create usrp device: {}".format(str(ex))
|
|
}
|
|
|
|
mpm_c = usrp_dev.get_mpm_client()
|
|
|
|
mpm_c.nsync_change_input_source('gty_rcv_clk')
|
|
# The lock should happen fast, but give it half a second
|
|
time.sleep(0.5)
|
|
# Status 1 checks that the lmk is configured to use the priref signal
|
|
# True = secref, False = priref
|
|
using_pri_ref = not mpm_c.clkaux_get_nsync_status1()
|
|
# Status 0 checks dpll loss of lock
|
|
dpll_lock = not mpm_c.clkaux_get_nsync_status0()
|
|
# Reg 0xD checks several APLL statuses, we need to make sure APLL1 is
|
|
# locked and APLL2 is unlocked
|
|
apll_lock = mpm_c.peek_clkaux(0xD) == '0x8'
|
|
|
|
result = using_pri_ref and dpll_lock and apll_lock
|
|
|
|
mpm_c.enable_ecpri_clocks(False)
|
|
# Set the clock source back to internal, as the register values
|
|
# for locking to the gty_rcv_clk cause the mboard to lose ref_lock
|
|
# to the nsync lmk.
|
|
mpm_c.set_clock_source('internal')
|
|
|
|
return result, {"pri_ref_selected": using_pri_ref,
|
|
"dpll_locked": dpll_lock,
|
|
"apll1_locked_apll2_unlocked": apll_lock}
|
|
|
|
def bist_clkaux_fpga_aux_ref(self):
|
|
"""
|
|
BIST for testing the fpga_aux_ref pps source
|
|
|
|
Description: Checks to see if the fpga_aux_ref can output a valid pps signal
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
"""
|
|
assert 'clkaux_fpga_aux_ref' in self.tests_to_run
|
|
import uhd
|
|
from uhd.usrp import multi_usrp
|
|
|
|
try:
|
|
self.ensure_oss_image()
|
|
mcr = self.get_default_mcr()
|
|
usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
|
|
"master_clock_rate={}".format(str(mcr)))
|
|
except Exception as ex:
|
|
return False, {
|
|
'error_msg': "Failed to create usrp device: {}".format(str(ex))
|
|
}
|
|
|
|
mpm_c = usrp_dev.get_mpm_client()
|
|
|
|
count = mpm_c.get_fpga_aux_ref_freq()
|
|
|
|
# We expect a pps signal, pulse is reported in 40 MHz clock ticks, so
|
|
# 1 PPS is expected to return 40 million ticks, this gives 1% tolerance
|
|
return 39600000 < count < 40400000, {}
|
|
|
|
def bist_nsync_rpll_config(self):
|
|
"""
|
|
BIST for testing that the LMK28PRIRefClk can be used as a source for the
|
|
motherboard RPLL
|
|
|
|
Description: Enable the LMK on the clocking auxiliary board to output a signal
|
|
on the LMK28PRIRefClk line to the motherboard RPLL. Configure the RPLL to use
|
|
this source, and check to see if the RPLL can lock to the source.
|
|
|
|
External Equipment: None
|
|
|
|
Return dictionary:
|
|
"""
|
|
assert 'nsync_rpll_config' in self.tests_to_run
|
|
import uhd
|
|
from uhd.usrp import multi_usrp
|
|
from usrp_mpm.sys_utils import net
|
|
|
|
try:
|
|
self.ensure_oss_image()
|
|
mcr = self.get_default_mcr()
|
|
usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
|
|
"master_clock_rate={}".format(str(mcr)))
|
|
except Exception as ex:
|
|
return False, {
|
|
'error_msg': "Failed to create usrp device: {}".format(str(ex))
|
|
}
|
|
|
|
sfp0_addrs = net.get_iface_info('sfp0')['ip_addrs']
|
|
|
|
mpm_c = usrp_dev.get_mpm_client()
|
|
|
|
mpm_c.config_rpll_to_nsync()
|
|
|
|
ref_locked = mpm_c.get_ref_lock_sensor()
|
|
|
|
result = ref_locked.get('value') == 'true'
|
|
|
|
fpga_type = mpm_c.get_device_info()['fpga']
|
|
|
|
del usrp_dev
|
|
|
|
# This is a brute-force way of making sure the device gets back into a clean state after
|
|
# we touched the rpll configuration
|
|
self.reload_fpga(fpga_type, sfp0_addrs)
|
|
|
|
return result, ref_locked
|
|
|
|
def bist_gpio(self):
|
|
"""
|
|
BIST for GPIO
|
|
Description: Writes and reads the values to the GPIO
|
|
|
|
Needed Equipment: External loopback cable between port 0 and port 1
|
|
|
|
Notes:
|
|
- X4xx has two FP-GPIO connectors (HDMI connectors) with 12 programmable
|
|
pins each
|
|
|
|
Return dictionary:
|
|
- write_patterns: A list of patterns that were written
|
|
- read_patterns: A list of patterns that were read back
|
|
"""
|
|
assert 'gpio' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
patterns = range(64)
|
|
return True, {
|
|
'write_patterns': list(patterns),
|
|
'read_patterns': list(patterns),
|
|
}
|
|
from uhd.usrp import multi_usrp
|
|
try:
|
|
usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost")
|
|
except Exception as ex:
|
|
return False, {
|
|
'error_msg': "Failed to create usrp device: {}".format(str(ex))
|
|
}
|
|
mpm_c = usrp_dev.get_mpm_client()
|
|
def _run_sub_test(inport, outport, pin_mode, voltage, pattern):
|
|
"""
|
|
Closure to run an actual test. The GPIO control object is enclosed.
|
|
|
|
Arguments:
|
|
inport: "port" argument for DioControl, input port
|
|
outport: "port" argument for DioControl, input port
|
|
pin_mode: HDMI or DIO (see DioControl)
|
|
voltage: Valid arg for DioControl.set_voltage_level()
|
|
pattern: Bits to write to the inport, should be read back at outport
|
|
"""
|
|
mpm_c.dio_set_port_mapping(pin_mode)
|
|
# We set all pins to be driven by the PS
|
|
# in HDMI mode not all pins can be accessed by the user
|
|
if pin_mode == "HDMI":
|
|
mask = 0xDB6D
|
|
else:
|
|
mask = 0xFFF
|
|
bank_convert = {
|
|
"PORTA": "GPIO0",
|
|
"PORTB": "GPIO1",
|
|
}
|
|
ps_control_args = ["MPM"] * 12
|
|
mpm_c.dio_set_gpio_src(bank_convert[inport], ps_control_args)
|
|
mpm_c.dio_set_gpio_src(bank_convert[outport], ps_control_args)
|
|
mpm_c.dio_set_voltage_level(inport, voltage)
|
|
mpm_c.dio_set_voltage_level(outport, voltage)
|
|
mpm_c.dio_set_pin_directions(inport, 0x00000)
|
|
mpm_c.dio_set_pin_directions(outport, 0xFFFFF)
|
|
mpm_c.dio_set_pin_outputs(outport, pattern)
|
|
read_values = mpm_c.dio_get_pin_inputs(inport)
|
|
if (pattern & mask) != read_values:
|
|
sys.stderr.write(mpm_c.dio_status())
|
|
return False, {'write_patterns': ["0x{:04X}".format(pattern)],
|
|
'read_patterns': ["0x{:04X}".format(read_values)]}
|
|
return True, {'write_patterns': ["0x{:04X}".format(pattern)],
|
|
'read_patterns': ["0x{:04X}".format(read_values)]}
|
|
# Now run tests:
|
|
for voltage in ["1V8", "2V5", "3V3"]:
|
|
for mode in ["DIO", "HDMI"]:
|
|
for pattern in [0xFFFF, 0xA5A5, 0x5A5A, 0x0000]:
|
|
sys.stderr.write("test: PortA -> PortB, {}, {}, 0x{:04X}\n"
|
|
.format(voltage, mode, pattern))
|
|
status, data = _run_sub_test(
|
|
"PORTB", "PORTA", mode, voltage, pattern)
|
|
if not status:
|
|
return status, data
|
|
sys.stderr.write("test: PortB -> PortA, {}, {}, 0x{:04X}\n"
|
|
.format(voltage, mode, pattern))
|
|
status, data = _run_sub_test(
|
|
"PORTA", "PORTB", mode, voltage, pattern)
|
|
if not status:
|
|
return status, data
|
|
return status, data
|
|
|
|
|
|
def bist_qsfp(self):
|
|
"""
|
|
BiST for QSFP status and property read out.
|
|
|
|
Description: Tests MODSEL (write) and MODPRS (read) pin at QSFP port
|
|
and I2C communication with QSFP module. By default all
|
|
ports are tested. The user can add module to the option
|
|
argument when calling the test to select a specific
|
|
port out of [0,1]. A negative number will test all
|
|
ports (the default)
|
|
|
|
Example: The following example will run the test on port 0:
|
|
> x4xx_bist qsfp --option module=0
|
|
|
|
Needed Equipment: None, for exhaustive test results the test should
|
|
be run with loopback test modules.
|
|
|
|
Notes: The test ensures consistency of I2C communication and the
|
|
MODSEL and MODPRS pins. If MODPRS pin is active low
|
|
(module present) I2C communication must return valid values for
|
|
all QSFP properties. On the other hand when MODPRS pin is high
|
|
QSFP properties should report None as the only valid value.
|
|
The test also disables the I2C communication (unsetting MODSEL
|
|
pin) and checks that the low level I2C communication with the
|
|
module fails. The communication is reenabled after 3sec. This
|
|
window allows a tester to check whether a loopback adapter
|
|
signals the state of MODSEL correctly.
|
|
"""
|
|
|
|
assert 'qsfp' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {}
|
|
|
|
from usrp_mpm.periph_manager import x4xx
|
|
from usrp_mpm.periph_manager.x4xx_periphs import QSFPModule
|
|
|
|
def add_error_msg(msg):
|
|
# add error message to result map
|
|
if "error_msg" not in infos:
|
|
infos["error_msg"] = []
|
|
infos["error_msg"].append(msg)
|
|
|
|
def modules_to_test():
|
|
module = self.args.option.get("module", -1)
|
|
try:
|
|
module = int(module)
|
|
except ValueError:
|
|
add_error_msg("Module '{}' is not an int".format(module))
|
|
return None
|
|
if module < 0:
|
|
return x4xx.X400_QSFP_I2C_CONFIGS
|
|
if module not in [0, 1]:
|
|
add_error_msg("Module to test must be 0 or 1")
|
|
return None
|
|
return [x4xx.X400_QSFP_I2C_CONFIGS[module]]
|
|
|
|
|
|
infos = {}
|
|
modules_to_test = modules_to_test()
|
|
if not modules_to_test:
|
|
return False, infos
|
|
|
|
result = True
|
|
|
|
for config in modules_to_test:
|
|
qsfp = QSFPModule(
|
|
config.modprs, config.modsel, config.devsymbol, self.log)
|
|
info = {}
|
|
info["available"] = qsfp.is_available()
|
|
if info["available"]:
|
|
# if adapter is available, read out prop via I2C and
|
|
# verify they are all valid (not None)
|
|
props = [qsfp.decoded_status,
|
|
qsfp.vendor_name,
|
|
qsfp.connector_type]
|
|
for prop in props:
|
|
info[prop.__name__] = prop()
|
|
if not all(info.values()):
|
|
result = False
|
|
add_error_msg("QSFP adapter at {} is present but has "
|
|
"None property.".format(config.devsymbol))
|
|
else:
|
|
# if adapter is not available
|
|
# reading a property *must* return None
|
|
if qsfp.connector_type() is not None:
|
|
result = False
|
|
add_error_msg("QSFP adapter at {} reports valid property "
|
|
"but is not present.".format(config.devsymbol))
|
|
|
|
infos[config.devsymbol] = info
|
|
|
|
# disable i2c communication and check for failure on low level read
|
|
qsfp.enable_i2c(False)
|
|
try:
|
|
qsfp.qsfp_regs.peek8(0)
|
|
result = False
|
|
add_error_msg("Reading I2C when disabled should fail with "
|
|
"RuntimeError at {}\n".format(config.devsymbol))
|
|
except RuntimeError:
|
|
pass
|
|
sys.stderr.write("A loopback QSFP adapter at {} should report MODSEL "
|
|
"inactive for 3 sec.".format(config.devsymbol))
|
|
time.sleep(3)
|
|
qsfp.enable_i2c(True)
|
|
|
|
return result, infos
|
|
|
|
|
|
def bist_temp(self):
|
|
"""
|
|
BIST for temperature sensors
|
|
Description: Reads the temperature sensors on the motherboards and
|
|
returns their values in mC
|
|
|
|
Return dictionary:
|
|
- <thermal-zone-name>: temp in mC
|
|
"""
|
|
assert 'temp' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {"DRAM PCB": 40000,
|
|
"EC Internal": 41000,
|
|
"PMBUS-0": 42000,
|
|
"PMBUS-1": 43000,
|
|
"Power Supply PCB": 44000,
|
|
"RFSoC": 45000,
|
|
"Sample Clock PCB": 46000,
|
|
"TMP464 Internal": 47000,
|
|
}
|
|
|
|
result = bist.get_iio_temp_sensor_values()
|
|
|
|
if len(result) < 1:
|
|
result['error_msg'] = "No temperature sensors found!"
|
|
|
|
return 'error_msg' not in result, result
|
|
|
|
def bist_fan(self):
|
|
"""
|
|
BIST for fans
|
|
Description: Reads the RPM values of the fans on the motherboard
|
|
|
|
Return dictionary:
|
|
- <fan-name>: Fan speed in RPM
|
|
|
|
External Equipment: None
|
|
"""
|
|
assert 'fan' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {'fan0': 10000, 'fan1': 10000}
|
|
result = bist.get_ectool_fan_values()
|
|
return len(result) == 2, result
|
|
|
|
def _db_flash_init(self, db_flash):
|
|
"""
|
|
Initialize the specified DB Flash and verify
|
|
its state
|
|
"""
|
|
db_flash.init()
|
|
if not db_flash.initialized:
|
|
raise RuntimeError()
|
|
|
|
def _db_flash_deinit(self, db_flash):
|
|
"""
|
|
De-initialize the specified DB Flash and verify
|
|
its state
|
|
"""
|
|
db_flash.deinit()
|
|
if db_flash.initialized:
|
|
raise RuntimeError()
|
|
|
|
@db_bist
|
|
def bist_spi_flash_integrity(self, db_id):
|
|
"""
|
|
BIST for SPI flash on DB
|
|
Description: Performs data integrity test on a section of
|
|
the flash memory. Stop the MPM service before running this
|
|
test using "systemctl stop usrp-hwd" command.
|
|
|
|
External Equipment: None, but at least one daughterboard
|
|
should be installed in the X410 unit.
|
|
|
|
Return dictionary:
|
|
|
|
"""
|
|
assert 'spi_flash_integrity' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {}
|
|
|
|
import os
|
|
from usrp_mpm.sys_utils.db_flash import DBFlash
|
|
FIXED_MEMORY_PATTERN = 'fixed'
|
|
RANDOM_MEMORY_PATTERN = 'random'
|
|
|
|
db_flash = DBFlash(db_id, log=None)
|
|
|
|
buf_size = 100
|
|
memory_pattern = str(self.args.option.get('memory_pattern', RANDOM_MEMORY_PATTERN))
|
|
assert memory_pattern in (FIXED_MEMORY_PATTERN, RANDOM_MEMORY_PATTERN)
|
|
if memory_pattern == RANDOM_MEMORY_PATTERN:
|
|
buff = os.urandom(buf_size)
|
|
else:
|
|
buff = [0xA5] * buf_size
|
|
|
|
data_valid = False
|
|
|
|
try:
|
|
self._db_flash_init(db_flash)
|
|
except (ValueError, RuntimeError) as ex:
|
|
return False, {"error_msg": "Error while initializing flash storage: " + str(ex)}
|
|
|
|
file_path = f'/mnt/db{db_id}_flash/test.bin'
|
|
|
|
sys.stderr.write("Testing DB{} with {} memory pattern..".format(db_id, memory_pattern))
|
|
with open(file_path, "wb") as f:
|
|
f.write(bytearray(buff))
|
|
|
|
try:
|
|
self._db_flash_deinit(db_flash)
|
|
self._db_flash_init(db_flash)
|
|
except (ValueError, RuntimeError) as ex:
|
|
return False, {"error_msg": "Error while init(deinit)ializing flash storage: " + str(ex)}
|
|
|
|
with open(file_path, "rb") as f:
|
|
read_data = f.read()
|
|
data_valid = read_data == bytearray(buff)
|
|
os.remove(file_path)
|
|
self._db_flash_deinit(db_flash)
|
|
|
|
return data_valid, {}
|
|
|
|
@db_bist
|
|
def bist_spi_flash_speed(self, db_id):
|
|
"""
|
|
BIST for SPI flash on DB
|
|
Description: Performs read and write speed test on the SPI flash
|
|
memory on DB. Stop the MPM service before running this test
|
|
using "systemctl stop usrp-hwd" command.
|
|
|
|
External Equipment: None, but at least one daughterboard
|
|
should be installed in the X410 unit.
|
|
|
|
Return dictionary:
|
|
|
|
"""
|
|
assert 'spi_flash_speed' in self.tests_to_run
|
|
if self.args.dry_run:
|
|
return True, {}
|
|
|
|
import os
|
|
import re
|
|
from usrp_mpm.sys_utils.db_flash import DBFlash
|
|
|
|
MIN_WRITE_SPEED = 5000 #B/s
|
|
MIN_READ_SPEED = 5000 #B/s
|
|
def parse_speed(cmd_output):
|
|
mobj = re.search(
|
|
r"(.*records in\n)(.*records out\n)(.* (?P<speed>[0-9.]+) (?P<order>\S?)B\/s)",
|
|
cmd_output)
|
|
if mobj is None:
|
|
return 0
|
|
scale = {'': 1, 'k': 1024, 'M': 1024*1024, 'G': 1024*1024*1024}
|
|
order = mobj.group('order')
|
|
if order not in scale:
|
|
raise ValueError(f"unsupported unit '{order}B/s'")
|
|
return float(mobj.group('speed')) * scale[order]
|
|
|
|
|
|
db_flash = DBFlash(db_id, log=None)
|
|
file_path = f'/mnt/db{db_id}_flash/test.bin'
|
|
|
|
try:
|
|
self._db_flash_init(db_flash)
|
|
except (ValueError, RuntimeError) as ex:
|
|
return False, {
|
|
"error_msg": "Error while initializing flash storage: {}".format(str(ex))
|
|
}
|
|
|
|
sys.stderr.write("Testing DB{}..".format(db_id))
|
|
write_error_msg = None
|
|
sys.stderr.write("Write Speed Test:")
|
|
cmd = [
|
|
'dd',
|
|
'if=/dev/zero',
|
|
'of=' + file_path,
|
|
'bs=512',
|
|
'count=1000',
|
|
'oflag=dsync'
|
|
]
|
|
|
|
try:
|
|
output = subprocess.check_output(
|
|
cmd,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
except subprocess.CalledProcessError as ex:
|
|
output = ex.output
|
|
write_error_msg = "Error during Write Test: {}".format(output)
|
|
write_test_output = output.decode("utf-8")
|
|
sys.stderr.write(write_test_output)
|
|
|
|
# De-init and init flash here to mitigate any effects
|
|
# caching may have on the read speed.
|
|
try:
|
|
self._db_flash_deinit(db_flash)
|
|
self._db_flash_init(db_flash)
|
|
except (ValueError, RuntimeError) as ex:
|
|
return False, {
|
|
"error_msg": "Error while init(deinit)ializing flash storage: {}".format(str(ex))}
|
|
|
|
sys.stderr.write("Read Speed Test:")
|
|
read_error_msg = None
|
|
cmd = [
|
|
'dd',
|
|
'if=' + file_path,
|
|
'of=/dev/null',
|
|
'bs=512',
|
|
'count=1000',
|
|
'oflag=dsync'
|
|
]
|
|
|
|
try:
|
|
output = subprocess.check_output(
|
|
cmd,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
except subprocess.CalledProcessError as ex:
|
|
output = ex.output
|
|
read_error_msg = "Error during Read Test: {}".format(output)
|
|
read_test_output = output.decode("utf-8")
|
|
sys.stderr.write(read_test_output)
|
|
|
|
# Clean up.
|
|
os.remove(file_path)
|
|
self._db_flash_deinit(db_flash)
|
|
|
|
test_status = bool(write_error_msg is None and read_error_msg is None)
|
|
|
|
if test_status:
|
|
write_speed = parse_speed(write_test_output)
|
|
read_speed = parse_speed(read_test_output)
|
|
|
|
if write_speed == 0:
|
|
test_status = False
|
|
write_error_msg = "Write speed parse error"
|
|
elif write_speed < MIN_WRITE_SPEED:
|
|
test_status = False
|
|
write_error_msg = \
|
|
"Write speed {} B/s is below minimum requirement of {} B/s".format(
|
|
write_speed, MIN_WRITE_SPEED)
|
|
|
|
if read_speed == 0:
|
|
test_status = False
|
|
read_error_msg = "Read speed parse error"
|
|
elif read_speed < MIN_READ_SPEED:
|
|
test_status = False
|
|
read_error_msg = \
|
|
"Read speed {} B/s is below minimum requirement of {} B/s".format(
|
|
read_speed, MIN_READ_SPEED)
|
|
|
|
return test_status, {
|
|
"Write Test": write_test_output if write_error_msg is None else write_error_msg,
|
|
"Read Test": read_test_output if read_error_msg is None else read_error_msg
|
|
}
|
|
|
|
##############################################################################
|
|
# main
|
|
##############################################################################
|
|
def main():
|
|
" Go, go, go! "
|
|
return X4XXBIST().run()
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(not main())
|