2020-08-05 15:00:20 +00:00
|
|
|
#
|
|
|
|
|
# Copyright 2020 Ettus Research, a National Instruments Brand
|
|
|
|
|
#
|
|
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
|
#
|
|
|
|
|
"""
|
|
|
|
|
This is a stream endpoint node, which is used in the RFNoCGraph class.
|
|
|
|
|
It also houses the logic to create output and input streams.
|
|
|
|
|
"""
|
|
|
|
|
from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \
|
2020-08-04 19:48:34 +00:00
|
|
|
ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus
|
2020-08-05 15:00:20 +00:00
|
|
|
from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER
|
|
|
|
|
from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
|
|
|
|
|
from .chdr_stream import ChdrOutputStream, ChdrInputStream
|
|
|
|
|
|
|
|
|
|
class StreamEndpointNode(Node):
|
|
|
|
|
"""Represents a Stream endpoint node
|
|
|
|
|
|
|
|
|
|
This class contains a StreamEpRegs object. To clarify, management
|
|
|
|
|
packets access these registers, while control packets access the
|
|
|
|
|
registers of the noc_blocks which are held in the RFNoCGraph and
|
|
|
|
|
passed into handle_packet as the regs parameter
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, node_inst, source_gen, sink_gen):
|
|
|
|
|
super().__init__(node_inst)
|
|
|
|
|
self.epid = node_inst
|
|
|
|
|
self.dst_epid = None
|
|
|
|
|
self.upstream = None
|
|
|
|
|
# ---- These 4 aren't configurable right now
|
|
|
|
|
self.has_data = True
|
|
|
|
|
self.has_ctrl = True
|
|
|
|
|
self.input_ports = 1
|
|
|
|
|
self.output_ports = 1
|
|
|
|
|
# ----
|
|
|
|
|
self.input_stream = None
|
|
|
|
|
self.output_stream = None
|
|
|
|
|
self.chdr_w = None
|
|
|
|
|
self.send_wrapper = None
|
|
|
|
|
self.dst_to_addr = None
|
|
|
|
|
self.source_gen = source_gen
|
|
|
|
|
self.sink_gen = sink_gen
|
2020-08-04 19:48:34 +00:00
|
|
|
self.downstream_capacity = None
|
|
|
|
|
self.strs_handlers = {}
|
2020-08-05 15:00:20 +00:00
|
|
|
self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
|
|
|
|
|
self.ctrl_status_callback_out, self.ctrl_status_callback_in,
|
|
|
|
|
4, 4 * 8000)
|
|
|
|
|
|
|
|
|
|
def ctrl_status_callback_out(self, status):
|
|
|
|
|
"""Called by the ep_regs when on a write to the
|
|
|
|
|
REG_OSTRM_CTRL_STATUS register
|
|
|
|
|
"""
|
|
|
|
|
if status.cfg_start:
|
|
|
|
|
# This only creates a new ChdrOutputStream if the cfg_start flag is set
|
|
|
|
|
self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
|
|
|
|
|
addr = self.dst_to_addr(self)
|
|
|
|
|
self.send_strc(addr)
|
2020-08-04 19:48:34 +00:00
|
|
|
def handle_initial_strs(strs_packet):
|
|
|
|
|
payload = strs_packet.get_payload_strs()
|
|
|
|
|
assert payload.status == StrsStatus.OKAY, \
|
|
|
|
|
"Received STRS Packet Err: {}".format(payload.status)
|
|
|
|
|
self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes)
|
|
|
|
|
self.strs_handlers[self.epid] = handle_initial_strs
|
2020-08-05 15:00:20 +00:00
|
|
|
return STRM_STATUS_FC_ENABLED
|
|
|
|
|
|
|
|
|
|
def ctrl_status_callback_in(self, status):
|
|
|
|
|
"""Called by the ep_regs on a write to the
|
|
|
|
|
REG_OSTRM_CTRL_STATUS register
|
|
|
|
|
"""
|
|
|
|
|
# This always triggers the graph to create a new ChdrInputStream
|
|
|
|
|
self.begin_input()
|
|
|
|
|
return STRM_STATUS_FC_ENABLED
|
|
|
|
|
|
|
|
|
|
def graph_init(self, log, set_device_id, send_wrapper, chdr_w, dst_to_addr, **kwargs):
|
|
|
|
|
super().graph_init(log, set_device_id)
|
|
|
|
|
self.ep_regs.log = log
|
|
|
|
|
self.chdr_w = chdr_w
|
|
|
|
|
self.send_wrapper = send_wrapper
|
|
|
|
|
self.dst_to_addr = dst_to_addr
|
|
|
|
|
|
|
|
|
|
def get_type(self):
|
|
|
|
|
return NodeType.STRM_EP
|
|
|
|
|
|
|
|
|
|
def get_epid(self):
|
|
|
|
|
"""Get this endpoint's endpoint id"""
|
|
|
|
|
return self.epid
|
|
|
|
|
|
|
|
|
|
def set_epid(self, epid):
|
|
|
|
|
"""Set this endpoint's endpoint id"""
|
|
|
|
|
self.epid = epid
|
|
|
|
|
|
|
|
|
|
def set_dst_epid(self, dst_epid):
|
|
|
|
|
"""Set this endpoint's destination endpoint id"""
|
|
|
|
|
self.dst_epid = dst_epid
|
|
|
|
|
|
|
|
|
|
def _handle_mgmt_packet(self, packet, **kwargs):
|
|
|
|
|
send_upstream = False
|
|
|
|
|
payload = packet.get_payload_mgmt()
|
|
|
|
|
our_hop = payload.pop_hop()
|
|
|
|
|
for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
|
|
|
|
|
if op.op_code == MgmtOpCode.INFO_REQ:
|
|
|
|
|
ext_info = 0
|
|
|
|
|
ext_info |= (1 if self.has_ctrl else 0) << 0
|
|
|
|
|
ext_info |= (1 if self.has_data else 0) << 1
|
|
|
|
|
ext_info |= (self.input_ports & 0x3F) << 2
|
|
|
|
|
ext_info |= (self.output_ports & 0x3F) << 8
|
|
|
|
|
payload.get_hop(0).add_op(self.info_response(ext_info))
|
|
|
|
|
elif op.op_code == MgmtOpCode.RETURN:
|
|
|
|
|
send_upstream = True
|
|
|
|
|
swap_src_dst(packet, payload)
|
|
|
|
|
elif op.op_code == MgmtOpCode.NOP:
|
|
|
|
|
pass
|
|
|
|
|
elif op.op_code == MgmtOpCode.CFG_RD_REQ:
|
|
|
|
|
request = MgmtOpCfg.parse(op.get_op_payload())
|
|
|
|
|
value = self.ep_regs.read(request.addr)
|
|
|
|
|
payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
|
|
|
|
|
MgmtOpCfg(request.addr, value)))
|
|
|
|
|
elif op.op_code == MgmtOpCode.CFG_WR_REQ:
|
|
|
|
|
request = MgmtOpCfg.parse(op.get_op_payload())
|
|
|
|
|
self.ep_regs.write(request.addr, request.data)
|
|
|
|
|
else:
|
|
|
|
|
raise NotImplementedError("op_code {} is not implemented for StreamEndpointNode"
|
|
|
|
|
.format(op.op_code))
|
|
|
|
|
self.log.trace("Stream Endpoint {} processed hop:\n{}"
|
|
|
|
|
.format(self.node_inst, our_hop))
|
|
|
|
|
packet.set_payload(payload)
|
|
|
|
|
if send_upstream:
|
|
|
|
|
return RETURN_TO_SENDER
|
|
|
|
|
self.log.trace("Stream Endpoint {} received packet:\n{}"
|
|
|
|
|
.format(self.node_inst, packet))
|
|
|
|
|
|
|
|
|
|
def _handle_ctrl_packet(self, packet, regs, **kwargs):
|
|
|
|
|
payload = packet.get_payload_ctrl()
|
|
|
|
|
swap_src_dst(packet, payload)
|
|
|
|
|
if payload.status != CtrlStatus.OKAY:
|
|
|
|
|
raise RuntimeError("Control Status not OK: {}".format(payload.status))
|
|
|
|
|
if payload.op_code == CtrlOpCode.READ:
|
|
|
|
|
payload.is_ack = True
|
|
|
|
|
payload.set_data([regs.read(payload.address)])
|
|
|
|
|
elif payload.op_code == CtrlOpCode.WRITE:
|
|
|
|
|
payload.is_ack = True
|
|
|
|
|
regs.write(payload.address, payload.get_data()[0])
|
|
|
|
|
else:
|
|
|
|
|
raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
|
|
|
|
|
packet.set_payload(payload)
|
|
|
|
|
return RETURN_TO_SENDER
|
|
|
|
|
|
|
|
|
|
def _handle_data_packet(self, packet, num_bytes, sender, **kwargs):
|
|
|
|
|
assert self.input_stream is not None
|
|
|
|
|
self.input_stream.queue_packet(packet, num_bytes, sender)
|
|
|
|
|
|
|
|
|
|
def _handle_strs_packet(self, packet, **kwargs):
|
2020-08-04 19:48:34 +00:00
|
|
|
header = packet.get_header()
|
|
|
|
|
if header.dst_epid in self.strs_handlers:
|
|
|
|
|
self.strs_handlers.pop(header.dst_epid)(packet)
|
|
|
|
|
else:
|
|
|
|
|
self.output_stream.queue_packet(packet)
|
2020-08-05 15:00:20 +00:00
|
|
|
|
|
|
|
|
def _handle_strc_packet(self, packet, **kwargs):
|
|
|
|
|
self._handle_data_packet(packet, **kwargs)
|
|
|
|
|
|
|
|
|
|
def send_strc(self, addr):
|
|
|
|
|
"""Send a Stream Command packet from the specified stream_ep
|
|
|
|
|
to the specified address.
|
|
|
|
|
|
|
|
|
|
This is not handled in ChdrOutputStream because the STRC must be
|
|
|
|
|
dispatched before UHD configures the samples per packet,
|
|
|
|
|
which is required to complete a StreamSpec
|
|
|
|
|
"""
|
|
|
|
|
header = ChdrHeader()
|
|
|
|
|
header.dst_epid = self.dst_epid
|
|
|
|
|
header.pkt_type = PacketType.STRC
|
|
|
|
|
payload = StrcPayload()
|
|
|
|
|
payload.src_epid = self.epid
|
|
|
|
|
payload.op_code = StrcOpCode.INIT
|
2020-08-04 19:48:34 +00:00
|
|
|
payload.num_pkts = 10
|
|
|
|
|
payload.num_bytes = 10 * 1024 # 10 KB
|
2020-08-05 15:00:20 +00:00
|
|
|
packet = ChdrPacket(self.chdr_w, header, payload)
|
|
|
|
|
self.send_wrapper.send_packet(packet, addr)
|
|
|
|
|
|
|
|
|
|
def begin_output(self, stream_spec):
|
|
|
|
|
"""Spin up a new ChdrOutputStream thread which transmits from src_epid
|
|
|
|
|
according to stream_spec.
|
|
|
|
|
|
|
|
|
|
This is triggered from RFNoC Graph when the radio receives a
|
|
|
|
|
Stream Command
|
|
|
|
|
"""
|
|
|
|
|
# As of now, only one stream endpoint port per stream endpoint
|
|
|
|
|
# is supported.
|
|
|
|
|
assert self.output_stream is None, \
|
|
|
|
|
"Output Stream already running on epid: {}".format(self.epid)
|
|
|
|
|
stream_spec.dst_epid = self.dst_epid
|
2020-08-04 19:48:34 +00:00
|
|
|
stream_spec.capacity_packets = self.downstream_capacity[0]
|
|
|
|
|
stream_spec.capacity_bytes = self.downstream_capacity[1]
|
|
|
|
|
self.downstream_capacity = None
|
2020-08-05 15:00:20 +00:00
|
|
|
self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),
|
|
|
|
|
stream_spec, self.send_wrapper)
|
|
|
|
|
|
|
|
|
|
def end_output(self):
|
|
|
|
|
"""Stops src_epid's current transmission. This opens up the sep
|
|
|
|
|
to new transmissions in the future.
|
|
|
|
|
|
|
|
|
|
This is triggered either by the RFNoC Graph when the radio
|
|
|
|
|
receives a Stop Stream command or when the transmission has no
|
|
|
|
|
more samples to send.
|
|
|
|
|
"""
|
|
|
|
|
self.output_stream.finish()
|
|
|
|
|
self.output_stream = None
|
|
|
|
|
|
|
|
|
|
def begin_input(self):
|
|
|
|
|
"""Spin up a new ChdrInputStream thread which receives all data and strc
|
|
|
|
|
|
|
|
|
|
This is triggered by RFNoC Graph when there is a write to the
|
|
|
|
|
REG_ISTRM_CTRL_STATUS register
|
|
|
|
|
"""
|
|
|
|
|
# As of now, only one stream endpoint port per stream endpoint
|
|
|
|
|
# is supported.
|
|
|
|
|
|
|
|
|
|
# Rx streams aren't explicitly ended by UHD. If we try to start
|
|
|
|
|
# a new one on the same epid, just quietly close the old one.
|
|
|
|
|
if self.input_stream is not None:
|
|
|
|
|
self.input_stream.finish()
|
|
|
|
|
self.input_stream = ChdrInputStream(self.log, self.chdr_w,
|
|
|
|
|
self.sink_gen(), self.send_wrapper, self.epid)
|