mirror of
https://github.com/saymrwulf/uhd.git
synced 2026-05-16 21:10:10 +00:00
476 lines
22 KiB
Python
476 lines
22 KiB
Python
#
|
|
# Copyright 2019-2020 Ettus Research, a National Instruments Brand
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
"""
|
|
ZBX dboard implementation module
|
|
"""
|
|
|
|
import time
|
|
from usrp_mpm import tlv_eeprom
|
|
from usrp_mpm.dboard_manager import DboardManagerBase
|
|
from usrp_mpm.mpmlog import get_logger
|
|
from usrp_mpm.chips.ic_reg_maps import zbx_cpld_regs_t
|
|
from usrp_mpm.periph_manager.x4xx_periphs import get_temp_sensor
|
|
from usrp_mpm.sys_utils.udev import get_eeprom_paths_by_symbol
|
|
from usrp_mpm.mpmutils import parse_encoded_git_hash
|
|
|
|
###############################################################################
|
|
# Helpers
|
|
###############################################################################
|
|
# pylint: disable=too-few-public-methods
|
|
class EepromTagMap:
|
|
"""
|
|
Defines the tagmap for EEPROMs matching this magic.
|
|
The tagmap is a dictionary mapping an 8-bit tag to a NamedStruct instance.
|
|
The canonical list of tags and the binary layout of the associated structs
|
|
is defined in mpm/tools/tlv_eeprom/usrp_eeprom.h. Only the subset relevant
|
|
to MPM are included below.
|
|
"""
|
|
magic = 0x55535250
|
|
tagmap = {
|
|
# 0x10: usrp_eeprom_board_info
|
|
0x10: tlv_eeprom.NamedStruct('< H H H 7s 1x',
|
|
['pid', 'rev', 'rev_compat', 'serial']),
|
|
}
|
|
|
|
|
|
###############################################################################
|
|
# Main dboard control class
|
|
###############################################################################
|
|
class ZBX(DboardManagerBase):
|
|
"""
|
|
Holds all dboard specific information and methods of the ZBX dboard
|
|
"""
|
|
#########################################################################
|
|
# Overridables
|
|
#
|
|
# See DboardManagerBase for documentation on these fields
|
|
#########################################################################
|
|
pids = [0x4002]
|
|
rx_sensor_callback_map = {
|
|
'temperature': 'get_rf_temp_sensor',
|
|
}
|
|
tx_sensor_callback_map = {
|
|
'temperature': 'get_rf_temp_sensor',
|
|
}
|
|
### End of overridables #################################################
|
|
|
|
### Daughterboard driver/hardware compatibility value
|
|
# The ZBX has a field in its EEPROM which stores a rev_compat value. This
|
|
# tells us which other revisions of the ZBX this revision is compatible with.
|
|
#
|
|
# In theory, we could make the revision compatibility check a simple "less
|
|
# or equal than comparison", i.e., we can support a certain revision and all
|
|
# previous revisions. However, we deliberately don't support Revision A (0x1),
|
|
# and we prefer to explicitly list the valid compat revision numbers we
|
|
# know exist. No matter how, we need to change this line everytime we add a
|
|
# new revision that is incompatible with the previous.
|
|
#
|
|
# In the EEPROM, we only change this number for hardware revisions that are
|
|
# not compatible with this software version. Note the CPLD image has its own
|
|
# compat number (see below).
|
|
#
|
|
# RevB and all compatible revisions are supported (that includes RevC). RevA
|
|
# is not supported.
|
|
DBOARD_SUPPORTED_COMPAT_REVS = (0x2,)
|
|
|
|
# CPLD compatibility revision
|
|
# Change this revision only on breaking changes.
|
|
REQ_OLDEST_COMPAT_REV = 0x20110611
|
|
REQ_COMPAT_REV = 0x20110611
|
|
|
|
#########################################################################
|
|
# MPM Initialization
|
|
#########################################################################
|
|
def __init__(self, slot_idx, **kwargs):
|
|
DboardManagerBase.__init__(self, slot_idx, **kwargs)
|
|
self.log = get_logger("ZBX-{}".format(slot_idx))
|
|
self.log.trace("Initializing ZBX daughterboard, slot index %d",
|
|
self.slot_idx)
|
|
|
|
# local variable to track if PLL ref clock is enabled for the CPLD logic
|
|
self._clock_enabled = False
|
|
|
|
# Interface with MB HW
|
|
if 'db_iface' not in kwargs:
|
|
self.log.error("Required DB Iface was not provided!")
|
|
raise RuntimeError("Required DB Iface was not provided!")
|
|
self.db_iface = kwargs['db_iface']
|
|
|
|
self.eeprom_symbol = f"db{slot_idx}_eeprom"
|
|
eeprom = self._get_eeprom()
|
|
self._assert_rev_compatibility(eeprom["rev_compat"])
|
|
# Initialize daughterboard CPLD control
|
|
self.poke_cpld = self.db_iface.poke_db_cpld
|
|
self.peek_cpld = self.db_iface.peek_db_cpld
|
|
self.regs = zbx_cpld_regs_t()
|
|
self._spi_addr = self.regs.SPI_READY_addr
|
|
self._enable_base_power()
|
|
# Check register map compatibility
|
|
self._check_compat_version()
|
|
self.log.debug("ZBX CPLD build git hash: %s", self._get_cpld_git_hash())
|
|
# Power up the DB
|
|
self._enable_power()
|
|
# enable PLL reference clock
|
|
self.reset_clock(False)
|
|
self._cpld_set_safe_defaults()
|
|
|
|
def _get_eeprom(self):
|
|
"""
|
|
Return the eeprom data.
|
|
"""
|
|
path = get_eeprom_paths_by_symbol(self.eeprom_symbol)[self.eeprom_symbol]
|
|
eeprom, _ = tlv_eeprom.read_eeprom(path, EepromTagMap.tagmap, EepromTagMap.magic, None)
|
|
return eeprom
|
|
|
|
def _assert_rev_compatibility(self, rev_compat):
|
|
"""
|
|
Check the ZBX hardware revision compatibility with this driver.
|
|
|
|
Throws a RuntimeError() if this version of MPM does not recognize the
|
|
hardware.
|
|
|
|
Note: The CPLD image version is checked separately.
|
|
"""
|
|
if rev_compat not in self.DBOARD_SUPPORTED_COMPAT_REVS:
|
|
err = "This MPM version is not compatible with this ZBX daughterboard. " \
|
|
f"Found rev_compat value: 0x{rev_compat:02x}. " \
|
|
"Please update your MPM version to support this daughterboard revision."
|
|
self.log.error(err)
|
|
raise RuntimeError(err)
|
|
|
|
def _enable_base_power(self, enable=True):
|
|
"""
|
|
Enables or disables power to the DB which enables communication to DB CPLD
|
|
"""
|
|
if enable:
|
|
self.db_iface.enable_daughterboard(enable=True)
|
|
if not self.db_iface.check_enable_daughterboard():
|
|
self.db_iface.enable_daughterboard(enable=False)
|
|
self.log.error('ZBX {} power up failed'.format(self.slot_idx))
|
|
raise RuntimeError('ZBX {} power up failed'.format(self.slot_idx))
|
|
else: # disable
|
|
# Removing power from the CPLD will set all the the output pins to open and the
|
|
# supplies default to disabled on power up.
|
|
self.db_iface.enable_daughterboard(enable=False)
|
|
if self.db_iface.check_enable_daughterboard():
|
|
self.log.error('ZBX {} power down failed'.format(self.slot_idx))
|
|
|
|
def _enable_power(self, enable=True):
|
|
""" Enables or disables power switches internal to the DB CPLD """
|
|
self.regs.ENABLE_TX_POS_7V0 = self.regs.ENABLE_TX_POS_7V0_t(int(enable))
|
|
self.regs.ENABLE_RX_POS_7V0 = self.regs.ENABLE_RX_POS_7V0_t(int(enable))
|
|
self.regs.ENABLE_POS_3V3 = self.regs.ENABLE_POS_3V3_t(int(enable))
|
|
self.poke_cpld(
|
|
self.regs.ENABLE_POS_3V3_addr,
|
|
self.regs.get_reg(self.regs.ENABLE_POS_3V3_addr))
|
|
|
|
def _check_compat_version(self):
|
|
""" Check compatibility of DB CPLD image and SW regmap """
|
|
compat_revision_addr = self.regs.OLDEST_COMPAT_REVISION_addr
|
|
cpld_oldest_compat_revision = self.peek_cpld(compat_revision_addr)
|
|
if cpld_oldest_compat_revision < self.REQ_OLDEST_COMPAT_REV:
|
|
err_msg = (
|
|
f'DB CPLD oldest compatible revision 0x{cpld_oldest_compat_revision:x}'
|
|
f' is out of date, the required revision is 0x{self.REQ_OLDEST_COMPAT_REV:x}. '
|
|
f'Update your CPLD image.')
|
|
self.log.error(err_msg)
|
|
raise RuntimeError(err_msg)
|
|
if cpld_oldest_compat_revision > self.REQ_OLDEST_COMPAT_REV:
|
|
err_msg = (
|
|
f'DB CPLD oldest compatible revision 0x{cpld_oldest_compat_revision:x}'
|
|
f' is newer than the expected revision 0x{self.REQ_OLDEST_COMPAT_REV:x}.'
|
|
' Downgrade your CPLD image or update MPM.')
|
|
self.log.error(err_msg)
|
|
raise RuntimeError(err_msg)
|
|
|
|
if not self.has_compat_version(self.REQ_COMPAT_REV):
|
|
err_msg = (
|
|
"ZBX DB CPLD revision is too old. Update your"
|
|
f" CPLD image to at least 0x{self.REQ_COMPAT_REV:08x}.")
|
|
self.log.error(err_msg)
|
|
raise RuntimeError(err_msg)
|
|
|
|
def has_compat_version(self, min_required_version):
|
|
"""
|
|
Check for a minimum required version.
|
|
"""
|
|
cpld_image_compat_revision = self.peek_cpld(self.regs.REVISION_addr)
|
|
return cpld_image_compat_revision >= min_required_version
|
|
|
|
# pylint: disable=too-many-statements
|
|
def _cpld_set_safe_defaults(self):
|
|
"""
|
|
Set the CPLD into a safe state.
|
|
"""
|
|
cpld_regs = zbx_cpld_regs_t()
|
|
# We un-configure some registers to force a change later. None of these
|
|
# values get written to the CPLD!
|
|
cpld_regs.RF0_OPTION = cpld_regs.RF0_OPTION.RF0_OPTION_FPGA_STATE
|
|
cpld_regs.RF1_OPTION = cpld_regs.RF1_OPTION.RF1_OPTION_FPGA_STATE
|
|
cpld_regs.SW_RF0_CONFIG = 255
|
|
cpld_regs.SW_RF1_CONFIG = 255
|
|
cpld_regs.TX0_DSA1[0] = 0
|
|
cpld_regs.TX0_DSA2[0] = 0
|
|
cpld_regs.RX0_DSA1[0] = 0
|
|
cpld_regs.RX0_DSA2[0] = 0
|
|
cpld_regs.RX0_DSA3_A[0] = 0
|
|
cpld_regs.RX0_DSA3_B[0] = 0
|
|
cpld_regs.save_state()
|
|
# Now all the registers we touch will be enumerated by get_changed_addrs()
|
|
# Everything below *will* get written to the CPLD:
|
|
# ATR control
|
|
cpld_regs.RF0_OPTION = cpld_regs.RF0_OPTION.RF0_OPTION_SW_DEFINED
|
|
cpld_regs.RF1_OPTION = cpld_regs.RF1_OPTION.RF1_OPTION_SW_DEFINED
|
|
# Back to state 0 and sw-defined. That means nothing will get configured
|
|
# until UHD boots again.
|
|
cpld_regs.SW_RF0_CONFIG = 0
|
|
cpld_regs.SW_RF1_CONFIG = 0
|
|
# TX0 path control
|
|
cpld_regs.TX0_IF2_1_2[0] = cpld_regs.TX0_IF2_1_2[0].TX0_IF2_1_2_FILTER_2
|
|
cpld_regs.TX0_IF1_3[0] = cpld_regs.TX0_IF1_3[0].TX0_IF1_3_FILTER_0_3
|
|
cpld_regs.TX0_IF1_4[0] = cpld_regs.TX0_IF1_4[0].TX0_IF1_4_TERMINATION
|
|
cpld_regs.TX0_IF1_5[0] = cpld_regs.TX0_IF1_5[0].TX0_IF1_5_TERMINATION
|
|
cpld_regs.TX0_IF1_6[0] = cpld_regs.TX0_IF1_6[0].TX0_IF1_6_FILTER_0_3
|
|
cpld_regs.TX0_7[0] = cpld_regs.TX0_7[0].TX0_7_TERMINATION
|
|
cpld_regs.TX0_RF_8[0] = cpld_regs.TX0_RF_8[0].TX0_RF_8_RF_1
|
|
cpld_regs.TX0_RF_9[0] = cpld_regs.TX0_RF_9[0].TX0_RF_9_RF_1
|
|
cpld_regs.TX0_ANT_10[0] = cpld_regs.TX0_ANT_10[0].TX0_ANT_10_BYPASS_AMP
|
|
cpld_regs.TX0_ANT_11[0] = cpld_regs.TX0_ANT_11[0].TX0_ANT_11_BYPASS_AMP
|
|
cpld_regs.TX0_LO_13[0] = cpld_regs.TX0_LO_13[0].TX0_LO_13_INTERNAL
|
|
cpld_regs.TX0_LO_14[0] = cpld_regs.TX0_LO_14[0].TX0_LO_14_INTERNAL
|
|
# TX1 path control
|
|
cpld_regs.TX1_IF2_1_2[0] = cpld_regs.TX1_IF2_1_2[0].TX1_IF2_1_2_FILTER_2
|
|
cpld_regs.TX1_IF1_3[0] = cpld_regs.TX1_IF1_3[0].TX1_IF1_3_FILTER_0_3
|
|
cpld_regs.TX1_IF1_4[0] = cpld_regs.TX1_IF1_4[0].TX1_IF1_4_TERMINATION
|
|
cpld_regs.TX1_IF1_5[0] = cpld_regs.TX1_IF1_5[0].TX1_IF1_5_TERMINATION
|
|
cpld_regs.TX1_IF1_6[0] = cpld_regs.TX1_IF1_6[0].TX1_IF1_6_FILTER_0_3
|
|
cpld_regs.TX1_7[0] = cpld_regs.TX1_7[0].TX1_7_TERMINATION
|
|
cpld_regs.TX1_RF_8[0] = cpld_regs.TX1_RF_8[0].TX1_RF_8_RF_1
|
|
cpld_regs.TX1_RF_9[0] = cpld_regs.TX1_RF_9[0].TX1_RF_9_RF_1
|
|
cpld_regs.TX1_ANT_10[0] = cpld_regs.TX1_ANT_10[0].TX1_ANT_10_BYPASS_AMP
|
|
cpld_regs.TX1_ANT_11[0] = cpld_regs.TX1_ANT_11[0].TX1_ANT_11_BYPASS_AMP
|
|
cpld_regs.TX1_LO_13[0] = cpld_regs.TX1_LO_13[0].TX1_LO_13_INTERNAL
|
|
cpld_regs.TX1_LO_14[0] = cpld_regs.TX1_LO_14[0].TX1_LO_14_INTERNAL
|
|
# RX0 path control
|
|
cpld_regs.RX0_ANT_1[0] = cpld_regs.RX0_ANT_1[0].RX0_ANT_1_TERMINATION
|
|
cpld_regs.RX0_2[0] = cpld_regs.RX0_2[0].RX0_2_LOWBAND
|
|
cpld_regs.RX0_RF_3[0] = cpld_regs.RX0_RF_3[0].RX0_RF_3_RF_1
|
|
cpld_regs.RX0_4[0] = cpld_regs.RX0_4[0].RX0_4_LOWBAND
|
|
cpld_regs.RX0_IF1_5[0] = cpld_regs.RX0_IF1_5[0].RX0_IF1_5_FILTER_1
|
|
cpld_regs.RX0_IF1_6[0] = cpld_regs.RX0_IF1_6[0].RX0_IF1_6_FILTER_1
|
|
cpld_regs.RX0_LO_9[0] = cpld_regs.RX0_LO_9[0].RX0_LO_9_INTERNAL
|
|
cpld_regs.RX0_LO_10[0] = cpld_regs.RX0_LO_10[0].RX0_LO_10_INTERNAL
|
|
cpld_regs.RX0_RF_11[0] = cpld_regs.RX0_RF_11[0].RX0_RF_11_RF_3
|
|
# RX1 path control
|
|
cpld_regs.RX1_ANT_1[0] = cpld_regs.RX1_ANT_1[0].RX1_ANT_1_TERMINATION
|
|
cpld_regs.RX1_2[0] = cpld_regs.RX1_2[0].RX1_2_LOWBAND
|
|
cpld_regs.RX1_RF_3[0] = cpld_regs.RX1_RF_3[0].RX1_RF_3_RF_1
|
|
cpld_regs.RX1_4[0] = cpld_regs.RX1_4[0].RX1_4_LOWBAND
|
|
cpld_regs.RX1_IF1_5[0] = cpld_regs.RX1_IF1_5[0].RX1_IF1_5_FILTER_1
|
|
cpld_regs.RX1_IF1_6[0] = cpld_regs.RX1_IF1_6[0].RX1_IF1_6_FILTER_1
|
|
cpld_regs.RX1_LO_9[0] = cpld_regs.RX1_LO_9[0].RX1_LO_9_INTERNAL
|
|
cpld_regs.RX1_LO_10[0] = cpld_regs.RX1_LO_10[0].RX1_LO_10_INTERNAL
|
|
cpld_regs.RX1_RF_11[0] = cpld_regs.RX1_RF_11[0].RX1_RF_11_RF_3
|
|
# TX DSA
|
|
cpld_regs.TX0_DSA1[0] = 31
|
|
cpld_regs.TX0_DSA2[0] = 31
|
|
# RX DSA
|
|
cpld_regs.RX0_DSA1[0] = 15
|
|
cpld_regs.RX0_DSA2[0] = 15
|
|
cpld_regs.RX0_DSA3_A[0] = 15
|
|
cpld_regs.RX0_DSA3_B[0] = 15
|
|
for addr in cpld_regs.get_changed_addrs():
|
|
self.poke_cpld(addr, cpld_regs.get_reg(addr))
|
|
# pylint: enable=too-many-statements
|
|
|
|
#########################################################################
|
|
# UHD (De-)Initialization
|
|
#########################################################################
|
|
def init(self, args):
|
|
"""
|
|
Execute necessary init dance to bring up dboard. This happens when a UHD
|
|
session starts.
|
|
"""
|
|
self.log.debug("init() called with args `{}'".format(
|
|
",".join(['{}={}'.format(x, args[x]) for x in args])
|
|
))
|
|
return True
|
|
|
|
def deinit(self):
|
|
"""
|
|
De-initialize after UHD session completes
|
|
"""
|
|
self.log.debug("Setting CPLD back to safe defaults after UHD session.")
|
|
self._cpld_set_safe_defaults()
|
|
|
|
def tear_down(self):
|
|
self.db_iface.tear_down()
|
|
|
|
#########################################################################
|
|
# API calls needed by the zbx_dboard driver
|
|
#########################################################################
|
|
def enable_iq_swap(self, enable, trx, channel):
|
|
"""
|
|
Turn on IQ swapping in the RFDC
|
|
"""
|
|
self.db_iface.enable_iq_swap(enable, trx, channel)
|
|
|
|
def get_dboard_sample_rate(self):
|
|
"""
|
|
Return the RFDC rate. This is usually a big number in the 3 GHz range.
|
|
"""
|
|
return self.db_iface.get_sample_rate()
|
|
|
|
def get_dboard_prc_rate(self):
|
|
"""
|
|
Return the PRC rate. The CPLD and LOs are clocked with this.
|
|
"""
|
|
return self.db_iface.get_prc_rate()
|
|
|
|
def _has_compat_version(self, min_required_version):
|
|
"""
|
|
Check for a minimum required version.
|
|
"""
|
|
cpld_image_compat_revision = self.peek_cpld(self.regs.REVISION_addr)
|
|
return cpld_image_compat_revision >= min_required_version
|
|
|
|
def _get_cpld_git_hash(self):
|
|
"""
|
|
Trace build of MB CPLD
|
|
"""
|
|
git_hash_rb = self.peek_cpld(self.regs.GIT_HASH_addr)
|
|
(git_hash, dirtiness_qualifier) = parse_encoded_git_hash(git_hash_rb)
|
|
return "{:07x} ({})".format(git_hash, dirtiness_qualifier)
|
|
|
|
def reset_clock(self, value):
|
|
"""
|
|
Disable PLL reference clock to enable SPLL reconfiguration
|
|
|
|
Puts the clock into reset if value is True, takes it out of reset
|
|
otherwise.
|
|
"""
|
|
if self._clock_enabled != bool(value):
|
|
return
|
|
addr = self.regs.get_addr("PLL_REF_CLOCK_ENABLE")
|
|
enum = self.regs.PLL_REF_CLOCK_ENABLE_t
|
|
if value:
|
|
reg_value = enum.PLL_REF_CLOCK_ENABLE_DISABLE.value
|
|
else:
|
|
reg_value = enum.PLL_REF_CLOCK_ENABLE_ENABLE.value
|
|
self.poke_cpld(addr, reg_value)
|
|
self._clock_enabled = not bool(value)
|
|
|
|
#########################################################################
|
|
# LO SPI API
|
|
#
|
|
# We keep a LO peek/poke interface for debugging purposes.
|
|
#########################################################################
|
|
def _wait_for_spi_ready(self, timeout):
|
|
""" Returns False if a timeout occurred. timeout is in ms """
|
|
for _ in range(timeout):
|
|
if (self.peek_cpld(self._spi_addr) >> self.regs.SPI_READY_shift) \
|
|
& self.regs.SPI_READY_mask:
|
|
return True
|
|
time.sleep(0.001)
|
|
return False
|
|
|
|
def _lo_spi_send_tx(self, lo_name, write, addr, data=None):
|
|
""" Wait for SPI Ready and setup the TX data for a LO SPI transaction """
|
|
if not self._wait_for_spi_ready(timeout=100):
|
|
self.log.error('Timeout before LO SPI transaction waiting for SPI Ready')
|
|
raise RuntimeError('Timeout before LO SPI transaction waiting for SPI Ready')
|
|
lo_enum_name = 'LO_SELECT_' + lo_name.upper()
|
|
assert hasattr(self.regs.LO_SELECT_t, lo_enum_name), \
|
|
"Invalid LO name: {}".format(lo_name)
|
|
self.regs.LO_SELECT = getattr(self.regs.LO_SELECT_t, lo_enum_name)
|
|
if write:
|
|
self.regs.READ_FLAG = self.regs.READ_FLAG_t.READ_FLAG_WRITE
|
|
else:
|
|
self.regs.READ_FLAG = self.regs.READ_FLAG_t.READ_FLAG_READ
|
|
if data is not None:
|
|
self.regs.DATA = data
|
|
else:
|
|
self.regs.DATA = 0
|
|
self.regs.ADDRESS = addr
|
|
self.regs.START_TRANSACTION = \
|
|
self.regs.START_TRANSACTION_t.START_TRANSACTION_ENABLE
|
|
self.poke_cpld(self._spi_addr, self.regs.get_reg(self._spi_addr))
|
|
|
|
def _lo_spi_check_status(self, lo_name, addr, write=False):
|
|
""" Wait for SPI Ready and check the success of the LO SPI transaction """
|
|
# SPI Ready indicates that the previous transaction has completed
|
|
# and the RX data is ready to be consumed
|
|
if not write and not self._wait_for_spi_ready(timeout=100):
|
|
self.log.error('Timeout after LO SPI transaction waiting for SPI Ready')
|
|
raise RuntimeError('Timeout after LO SPI transaction waiting for SPI Ready')
|
|
# If the address or CS are not the same as what we set, there
|
|
# was interference during the SPI transaction
|
|
lo_select = self.regs.LO_SELECT.name[len('LO_SELECT_'):]
|
|
if self.regs.ADDRESS != addr or lo_select != lo_name.upper():
|
|
self.log.error('SPI transaction to LO failed!')
|
|
raise RuntimeError('SPI transaction to LO failed!')
|
|
|
|
def _lo_spi_get_rx(self):
|
|
""" Return RX data read from the LO SPI transaction """
|
|
spi_reg = self.peek_cpld(self._spi_addr)
|
|
return (spi_reg >> self.regs.DATA_shift) & self.regs.DATA_mask
|
|
|
|
def peek_lo_spi(self, lo_name, addr):
|
|
""" Perform a register read access to an LO via SPI """
|
|
self._lo_spi_send_tx(lo_name=lo_name, write=False, addr=addr)
|
|
self._lo_spi_check_status(lo_name, addr)
|
|
return self._lo_spi_get_rx()
|
|
|
|
def poke_lo_spi(self, lo_name, addr, val):
|
|
""" Perform a register write access to an LO via SPI """
|
|
self._lo_spi_send_tx(lo_name=lo_name, write=True, addr=addr, data=val)
|
|
self._lo_spi_check_status(lo_name, addr, write=True)
|
|
|
|
###########################################################################
|
|
# LEDs
|
|
###########################################################################
|
|
def set_leds(self, channel, rx, trx_rx, trx_tx):
|
|
""" Set the frontpanel LEDs """
|
|
assert channel in (0, 1)
|
|
|
|
self.regs.save_state()
|
|
if channel == 0:
|
|
# ensure to be in SW controlled mode
|
|
self.regs.RF0_OPTION = self.regs.RF0_OPTION.RF0_OPTION_SW_DEFINED
|
|
self.regs.SW_RF0_CONFIG = 0
|
|
self.regs.RX0_RX_LED[0] = self.regs.RX0_RX_LED[0].RX0_RX_LED_ENABLE \
|
|
if bool(rx) else self.regs.RX0_RX_LED[0].RX0_RX_LED_DISABLE
|
|
self.regs.RX0_TRX_LED[0] = self.regs.RX0_TRX_LED[0].RX0_TRX_LED_ENABLE \
|
|
if bool(trx_rx) else self.regs.RX0_TRX_LED[0].RX0_TRX_LED_DISABLE
|
|
self.regs.TX0_TRX_LED[0] = self.regs.TX0_TRX_LED[0].TX0_TRX_LED_ENABLE \
|
|
if bool(trx_tx) else self.regs.TX0_TRX_LED[0].TX0_TRX_LED_DISABLE
|
|
else:
|
|
# ensure to be in SW controlled mode
|
|
self.regs.RF1_OPTION = self.regs.RF1_OPTION.RF1_OPTION_SW_DEFINED
|
|
self.regs.SW_RF1_CONFIG = 0
|
|
self.regs.RX1_RX_LED[0] = self.regs.RX1_RX_LED[0].RX1_RX_LED_ENABLE \
|
|
if bool(rx) else self.regs.RX1_RX_LED[0].RX1_RX_LED_DISABLE
|
|
self.regs.RX1_TRX_LED[0] = self.regs.RX1_TRX_LED[0].RX1_TRX_LED_ENABLE \
|
|
if bool(trx_rx) else self.regs.RX1_TRX_LED[0].RX1_TRX_LED_DISABLE
|
|
self.regs.TX1_TRX_LED[0] = self.regs.TX1_TRX_LED[0].TX1_TRX_LED_ENABLE \
|
|
if bool(trx_tx) else self.regs.TX1_TRX_LED[0].TX1_TRX_LED_DISABLE
|
|
|
|
for addr in self.regs.get_changed_addrs():
|
|
self.poke_cpld(addr, self.regs.get_reg(addr))
|
|
|
|
###########################################################################
|
|
# Sensors
|
|
###########################################################################
|
|
def get_rf_temp_sensor(self, _):
|
|
"""
|
|
Return the RF temperature sensor value
|
|
"""
|
|
self.log.trace("Reading RF daughterboard temperature.")
|
|
sensor_names = [
|
|
f"TMP112 DB{self.slot_idx} Top",
|
|
f"TMP112 DB{self.slot_idx} Bottom",
|
|
]
|
|
return get_temp_sensor(sensor_names, log=self.log)
|