uhd/mpm/python/tests/test_utilities.py
Martin Braun 21107ff36c mpm: Fix test utilities
- Fix MockGpiod object: This class should mock the gpiod module, but it was
  missing the LINE_REQ_EV_FALLING_EDGE attribute.
- While we're at it, fix some pylint warnings.
2023-02-08 16:59:34 -06:00

220 lines
6 KiB
Python
Executable file

#
# Copyright 2019 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
""" Utility classes to facilitate unit testing """
import queue
import platform
import os
def on_linux():
"""
Returns True if this is being executed on a Linux system
"""
return 'linux' in platform.system().lower()
def on_usrp():
"""
Returns True if this is being executed on an USRP
"""
# Check device tree standard property for manufacturer info
path = '/sys/firmware/devicetree/base/compatible'
if not os.path.exists(path):
return False
with open(path, 'r') as f:
s = f.read()
# String returned is actually a list of null-terminated strings,
# replace the null-terminations with a separator
s.replace('\x00', ';')
return 'ettus' in s
def _mock_gpiod_pkg():
"""
Replace the gpiod python package with a mock version if import
of the gpiod package fails. This package is not available on all
OS versions that we would like to test in.
"""
try:
import gpiod
except Exception as ex:
# The gpiod package should be available if testing on a USRP
if on_usrp():
raise ex
import sys
sys.modules["gpiod"] = MockGpiod
class MockGpiod:
"""
Mocks a portion of the gpiod python package without actually
accessing GPIO hardware.
"""
LINE_REQ_DIR_IN = 0
LINE_REQ_DIR_OUT = 1
LINE_REQ_EV_FALLING_EDGE = 4
_DEFAULT_LINE_VAL = 0
class MockLine(object):
def __init__(self, val):
self.val = val
def request(self):
pass
def release(self):
pass
def get_value(self):
return self.val
def set_value(self, val):
self.val = val
def __init__(self):
self.lines = {}
def find_line(self, name):
if name not in self.lines.keys():
self.lines[name] = self.MockLine(self._DEFAULT_LINE_VAL)
return self.lines[name]
class MockRegsIface(object):
"""
Mocks the interaction with a register interface by returning
values from an ic_reg_map
"""
def __init__(self, register_map):
self.map = register_map
self.recent_vals = {}
self.next_vals = {}
self.recent_addrs = []
def peek32(self, addr):
"""
Reads either
a. the default value (if no values were queued)
or
b. the next value queued for this register
"""
if (addr in self.next_vals) and (not self.next_vals[addr].empty()):
return self.next_vals[addr].get_nowait()
else:
return self.map.get_reg(addr)
def poke32(self, addr, value):
"""
Saves a new value to the given register and stores a history
of all values previously set for that register.
"""
self.map.set_reg(addr, value)
self.recent_addrs.append(addr)
# Store written value in a list
if addr in self.recent_vals:
self.recent_vals[addr].append(value)
else:
self.recent_vals[addr] = [value]
def peek16(self, addr):
"""
Pass the request to the 32 bit version
"""
return self.peek32(addr) & 0xFFFF
def poke16(self, addr, value):
"""
Pass the request to the 32 bit version
"""
self.poke32(addr, value)
def get_recent_addrs(self):
return self.recent_addrs
def clear_recent_addrs(self):
self.recent_addrs = []
def get_recent_vals(self, addr):
"""
Returns the past values written to a given address.
Useful for validating HW interaction
"""
return self.recent_vals.get(addr, [])
def clear_recent_vals(self, addr):
"""
Clears the past values written to a given address.
Useful for validating HW interaction
"""
self.recent_vals[addr] = []
def set_next_vals(self, addr, vals):
"""
Sets a list of the next values to be read from the
corresponding register address.
Useful for mocking HW interaction.
"""
if addr not in self.next_vals:
self.next_vals[addr] = queue.Queue()
for val in vals:
self.next_vals[addr].put_nowait(val)
class MockLog:
"""
Mocks logging functionality for testing purposes by putting log
messages in a queue.
"""
# The MockLog class is not currently implemented to be thread safe
def __init__(self):
self.error_log = queue.Queue()
self.warning_log = queue.Queue()
self.info_log = queue.Queue()
self.trace_log = queue.Queue()
self.debug_log = queue.Queue()
def error(self, msg):
self.error_log.put_nowait(msg)
def warning(self, msg):
self.warning_log.put_nowait(msg)
def info(self, msg):
self.info_log.put_nowait(msg)
def trace(self, msg):
self.trace_log.put_nowait(msg)
def debug(self, msg):
self.debug_log.put_nowait(msg)
def clear_all(self):
""" Clears all log queues """
self.error_log.queue.clear()
self.warning_log.queue.clear()
self.info_log.queue.clear()
self.trace_log.queue.clear()
self.debug_log.queue.clear()
def get_last_msg(self, log_level):
"""
Gets the last message logged to a given queue. Will return an
empty string if the queue is empty or throw an error if a queue
of that log_level does not exist.
"""
queue_name = log_level + '_log'
if not hasattr(self, queue_name):
raise RuntimeError("Log level {} does not exist in " \
"mock log".format(log_level))
log_messages = getattr(self, queue_name)
if log_messages.empty():
return ''
else:
return log_messages.get_nowait()
# importing this utilities package should mock out the gpiod package
# if necessary so that usrp_mpm can be imported on devices without
# gpiod support for the OS.
_mock_gpiod_pkg()