mirror of
https://github.com/saymrwulf/uhd.git
synced 2026-05-14 20:58:09 +00:00
- Fix typo in company name (missing 'a') - Updated SPDX license identifier to version 3.0
94 lines
2.8 KiB
Python
Executable file
94 lines
2.8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#
|
|
# Copyright 2017 Ettus Research, a National Instruments Company
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
import socket
|
|
from mprpc import RPCClient
|
|
import usrp_mpm as mpm
|
|
import argparse
|
|
import random
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-a", "--address", default="0.0.0.0", type=str, help="Destination address")
|
|
parser.add_argument("-p", "--port", default=0, type=int, help="Destination port")
|
|
sub_parsers = parser.add_subparsers(dest="command")
|
|
|
|
rpc_parser = sub_parsers.add_parser("rpc", help="Issue RPC")
|
|
rpc_parser.add_argument("-c", "--call", required=True, help="command to issue")
|
|
rpc_parser.add_argument("arguments", nargs="*")
|
|
|
|
disc_parser = sub_parsers.add_parser("disc", help="Issue discovery")
|
|
echo_parser = sub_parsers.add_parser("echo", help="Issue UDP echo")
|
|
return parser.parse_args()
|
|
|
|
|
|
def rpc(address, port, command, *args):
|
|
if not port:
|
|
port = mpm.types.MPM_RPC_PORT
|
|
client = RPCClient(address, port)
|
|
if args:
|
|
args = [eval(arg.lstrip("=")) if arg.startswith("=") else arg for arg in args]
|
|
result = client.call(command, *args)
|
|
else:
|
|
result = client.call(command)
|
|
return result
|
|
|
|
|
|
def discovery(address, port):
|
|
if not port:
|
|
port = mpm.types.MPM_DISCOVERY_PORT
|
|
sock = socket.socket(
|
|
socket.AF_INET,
|
|
socket.SOCK_DGRAM)
|
|
sock.sendto(mpm.types.MPM_DISCOVERY_MESSAGE, (address, port))
|
|
sock.settimeout(1.0) # wait max 1 second
|
|
while True:
|
|
try:
|
|
data, sender = sock.recvfrom(8000)
|
|
print("Received response from: {}".format(sender[0]))
|
|
print("Dicovery data: {}".format(data))
|
|
except:
|
|
break
|
|
|
|
|
|
def echo(address, port):
|
|
if not port:
|
|
port = mpm.types.MPM_DISCOVERY_PORT
|
|
sock = socket.socket(
|
|
socket.AF_INET,
|
|
socket.SOCK_DGRAM)
|
|
message = "MPM-ECHO" + bytearray(random.getrandbits(8) for _ in range(8000-8))
|
|
sock.sendto(message, (address, port))
|
|
sock.settimeout(0.05) # wait max 50 ms
|
|
while True:
|
|
try:
|
|
data, sender = sock.recvfrom(9000)
|
|
print("Received response from: {}".format(sender[0]))
|
|
print("Echo data size: {}".format(len(data)))
|
|
except:
|
|
break
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
if args.command == "rpc":
|
|
if args.arguments:
|
|
result = rpc(args.address, args.port, args.call, *args.arguments)
|
|
else:
|
|
result = rpc(args.address, args.port, args.call)
|
|
print(result)
|
|
elif args.command == "disc":
|
|
discovery(args.address, args.port)
|
|
result = True
|
|
elif args.command == "echo":
|
|
echo(args.address, args.port)
|
|
result = True
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(not(main()))
|