mirror of
https://github.com/saymrwulf/uhd.git
synced 2026-05-14 20:58:09 +00:00
ci: add devtest e320 support
This commit adds devtest support for e320 via tftp. The e320 has a hardware incompatibility with sdmuxes that we use for the n3xx devices, which makes them unreliable. Instead this loads a small Linux OS into the e320 system memory and reimages the sd card from there. Signed-off-by: Steven Koo <steven.koo@ni.com>
This commit is contained in:
parent
9bd160663c
commit
510803153c
9 changed files with 380 additions and 14 deletions
|
|
@ -10,7 +10,7 @@ parameters:
|
|||
default: current
|
||||
- name: testDevices
|
||||
type: string
|
||||
default: 'x3xx,b2xx,n3xx'
|
||||
default: 'x3xx,b2xx,n3xx,e320'
|
||||
|
||||
jobs:
|
||||
- template: job-uhd-devtest.yml
|
||||
|
|
@ -136,3 +136,20 @@ jobs:
|
|||
devtestPattern: 'n3x0'
|
||||
devSDImage: gnuradio-image-ni-sulfur-rev11-mender.sdimg.bz2
|
||||
devLabgridConfig: .ci/templates/tests/rhombus-labgrid/device-configs/rhombus-n321-0.yml
|
||||
|
||||
${{ if contains(parameters.testDevices, 'e320') }}:
|
||||
rhombus-e320-0:
|
||||
devAgent: rhombus-e320-0
|
||||
devType: 'e3xx'
|
||||
devModel: 'e320'
|
||||
devName: rhombus-e320-0
|
||||
devSerial: '31A8171'
|
||||
devHostname: 'ni-e320-31a8171'
|
||||
devBus: 'ip'
|
||||
devAddr: '192.168.20.7'
|
||||
sfpAddrs: '192.168.20.7'
|
||||
devFpga: 'XG'
|
||||
devtestPattern: 'e320'
|
||||
devInitramfsImage: fitImage-manufacturing
|
||||
devSDImage: gnuradio-image-ni-neon-rev2-mender.sdimg.bz2
|
||||
devLabgridConfig: .ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml
|
||||
|
|
|
|||
|
|
@ -67,18 +67,30 @@ jobs:
|
|||
cleanDestinationFolder: true
|
||||
|
||||
- download: ${{ parameters.uhdArtifactSource }}
|
||||
artifact: $(devType)-images
|
||||
artifact: n3xx-images
|
||||
# Only sync the bz2 sdimg since the bmap
|
||||
# is incompatible with mender
|
||||
patterns: '**/*.bz2'
|
||||
patterns: |
|
||||
**/*.bz2
|
||||
fitImage-manufacturing
|
||||
displayName: Download $(devType)-images artifact
|
||||
condition: and(succeeded(), eq(variables.devType, 'n3xx'))
|
||||
|
||||
- download: ${{ parameters.uhdArtifactSource }}
|
||||
artifact: e320-images
|
||||
# Only sync the bz2 sdimg since the bmap
|
||||
# is incompatible with mender
|
||||
patterns: |
|
||||
**/*.bz2
|
||||
fitImage-manufacturing
|
||||
displayName: Download $(devType)-images artifact
|
||||
condition: and(succeeded(), eq(variables.devModel, 'e320'))
|
||||
|
||||
- script: |
|
||||
cd $(Build.BinariesDirectory)/uhddev/build
|
||||
mkdir -p fpga_images
|
||||
rm -rf fpga_images/*
|
||||
python3 utils/uhd_images_downloader.py -t $(devModel) -i fpga_images \
|
||||
python3 utils/uhd_images_downloader.py -t $(devModel)_fpga -i fpga_images \
|
||||
-b $(sdr-fileserver)
|
||||
if [ "$(devType)" = "b200" ]; then
|
||||
python3 utils/uhd_images_downloader.py -t b2xx_common -i fpga_images \
|
||||
|
|
@ -93,7 +105,7 @@ jobs:
|
|||
export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH
|
||||
export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images
|
||||
python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \
|
||||
--sdimage $(devType),$(devModel),$(uhd_artifact_directory)/$(devType)-images/$(devSDImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig),$(devHostname) \
|
||||
--sdimage_sdmux $(devType),$(devModel),$(uhd_artifact_directory)/$(devType)-images/$(devSDImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig),$(devHostname) \
|
||||
--fpgas $(devFpga) \
|
||||
--sfp_addrs $(sfpAddrs) \
|
||||
${{ parameters.redisHost }} $(devName) \
|
||||
|
|
@ -107,6 +119,27 @@ jobs:
|
|||
condition: and(succeeded(), eq(variables.devType, 'n3xx'), eq(variables.devBus, 'ip'))
|
||||
displayName: Run n3xx devtest on $(devName)
|
||||
|
||||
- script: |
|
||||
mkdir -p $(Common.TestResultsDirectory)/devtest
|
||||
cd $(Common.TestResultsDirectory)/devtest
|
||||
export PATH=$(Build.BinariesDirectory)/uhddev/build/utils:$(Build.BinariesDirectory)/uhddev/build/examples:$PATH
|
||||
export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH
|
||||
export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images
|
||||
python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \
|
||||
--sdimage_tftp $(devType),$(devModel),$(uhd_artifact_directory)/e320-images/$(devSDImage),$(uhd_artifact_directory)/e320-images/$(devInitramfsImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig) \
|
||||
--fpgas $(devFpga) \
|
||||
--sfp_addrs $(sfpAddrs) \
|
||||
${{ parameters.redisHost }} $(devName) \
|
||||
"$(Build.BinariesDirectory)/uhddev/build/utils/uhd_usrp_probe --args addr=$(devAddr)" \
|
||||
"python3 ${{ parameters.uhdSrcDir }}/host/tests/devtest/run_testsuite.py \
|
||||
--src-dir ${{ parameters.uhdSrcDir }}/host/tests/devtest \
|
||||
--devtest-pattern $(devtestPattern) --args addr=$(devAddr),type=$(devType) \
|
||||
--build-type Release --build-dir $(Build.BinariesDirectory)/uhddev/build \
|
||||
--python-interp python3 --xml"
|
||||
continueOnError: true
|
||||
condition: and(succeeded(), eq(variables.devModel, 'e320'), eq(variables.devBus, 'ip'))
|
||||
displayName: Run e320 devtest on $(devName)
|
||||
|
||||
- script: |
|
||||
mkdir -p $(Common.TestResultsDirectory)/devtest
|
||||
cd $(Common.TestResultsDirectory)/devtest
|
||||
|
|
|
|||
|
|
@ -246,8 +246,8 @@ stages:
|
|||
uhdSrcDir: $(Build.SourcesDirectory)
|
||||
testDevices: 'x3xx,b2xx'
|
||||
|
||||
- stage: devtest_uhd_n3xx_stage
|
||||
displayName: devtest UHD n3xx
|
||||
- stage: devtest_uhd_n3xx_e320_stage
|
||||
displayName: devtest UHD n3xx e320
|
||||
dependsOn:
|
||||
- build_uhd_stage_linux
|
||||
- build_uhd_embedded_system_images
|
||||
|
|
@ -256,7 +256,7 @@ stages:
|
|||
parameters:
|
||||
testOS: ubuntu2004
|
||||
uhdSrcDir: $(Build.SourcesDirectory)
|
||||
testDevices: 'n3xx'
|
||||
testDevices: 'n3xx,e320'
|
||||
|
||||
- stage: test_uhd_x4xx_stage
|
||||
displayName: Test UHD x4xx
|
||||
|
|
|
|||
|
|
@ -32,3 +32,19 @@ rhombus-n321-0:
|
|||
reservation: null
|
||||
tags: {}
|
||||
|
||||
rhombus-e320-0:
|
||||
acquired: null
|
||||
acquired_resources: []
|
||||
aliases: []
|
||||
allowed: []
|
||||
changed: 1654034475.1935894
|
||||
comment: ''
|
||||
created: 1654034136.0077882
|
||||
matches:
|
||||
- cls: '*'
|
||||
exporter: '*'
|
||||
group: rhombus-e320-0-group
|
||||
name: null
|
||||
rename: null
|
||||
reservation: null
|
||||
tags: {}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
targets:
|
||||
main:
|
||||
resources:
|
||||
RemotePlace:
|
||||
name: 'rhombus-e320-0'
|
||||
drivers:
|
||||
- SerialDriver:
|
||||
name: 'linux_serial_driver'
|
||||
bindings:
|
||||
port: 'console-linux'
|
||||
- SerialDriver:
|
||||
name: 'scu_serial_driver'
|
||||
bindings:
|
||||
port: 'console-scu'
|
||||
|
|
@ -32,3 +32,16 @@ rhombus-n321-0-group:
|
|||
match:
|
||||
ID_SERIAL_SHORT: '000000001140'
|
||||
|
||||
rhombus-e320-0-group:
|
||||
console-scu:
|
||||
cls: USBSerialPort
|
||||
match:
|
||||
ID_SERIAL: 'Silicon_Labs_CP2105_Dual_USB_to_UART_Bridge_Controller_0097841B'
|
||||
ID_USB_INTERFACE_NUM: '00'
|
||||
speed: 115200
|
||||
console-linux:
|
||||
cls: USBSerialPort
|
||||
match:
|
||||
ID_SERIAL: 'Silicon_Labs_CP2105_Dual_USB_to_UART_Bridge_Controller_0097841B'
|
||||
ID_USB_INTERFACE_NUM: '01'
|
||||
speed: 115200
|
||||
|
|
|
|||
63
.ci/utils/httpd.py
Normal file
63
.ci/utils/httpd.py
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
import http.server
|
||||
import time
|
||||
import os
|
||||
import pyroute2
|
||||
import socket
|
||||
import socketserver
|
||||
import threading
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
|
||||
class ThreadingHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
||||
pass
|
||||
|
||||
class HTTPServer:
|
||||
def __init__(self, path, remote_ip):
|
||||
self.path = path
|
||||
self.port = None
|
||||
self.old_path = None
|
||||
self.httpd = None
|
||||
|
||||
with pyroute2.IPRoute() as ipr:
|
||||
r = ipr.route('get', dst=remote_ip)
|
||||
for attr in r[0]['attrs']:
|
||||
if attr[0] == 'RTA_PREFSRC':
|
||||
self.ip = attr[1]
|
||||
with socket.socket() as s:
|
||||
s.bind(('', 0))
|
||||
self.port = s.getsockname()[1]
|
||||
|
||||
def get_url(self, filename):
|
||||
path = Path(self.path) / filename
|
||||
assert path.exists()
|
||||
return f"http://{self.ip}:{self.port}/{filename}"
|
||||
|
||||
def __enter__(self):
|
||||
def start_server():
|
||||
Handler = http.server.SimpleHTTPRequestHandler
|
||||
self.httpd = ThreadingHTTPServer(("", self.port), Handler)
|
||||
self.httpd.serve_forever()
|
||||
|
||||
# Kind of annoying, but to work with older pythons where
|
||||
# SimpleHTTPRequestHandler doesn't take a directory parameter but only
|
||||
# serves the current directory:
|
||||
self.old_path = os.getcwd()
|
||||
os.chdir(self.path)
|
||||
|
||||
self.thread = threading.Thread(target=start_server)
|
||||
self.thread.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, exc):
|
||||
if self.httpd is not None:
|
||||
self.httpd.shutdown()
|
||||
self.httpd.server_close()
|
||||
if self.old_path is not None:
|
||||
os.chdir(self.old_path)
|
||||
|
||||
if __name__ == '__main__':
|
||||
with HTTPServer("/tmp", "127.0.0.1") as server:
|
||||
print("server ip", server.ip)
|
||||
print("server port", server.port)
|
||||
time.sleep(300)
|
||||
|
||||
|
|
@ -8,17 +8,22 @@ import labgrid
|
|||
import os
|
||||
import pathlib
|
||||
import shlex
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from fabric import Connection
|
||||
from httpd import HTTPServer
|
||||
from pottery import Redlock
|
||||
from redis import Redis
|
||||
from tftp import TFTPServer
|
||||
|
||||
bitfile_name = "usrp_{}_fpga_{}.bit"
|
||||
|
||||
def jtag_x3xx(dev_type, dev_model, jtag_server, jtag_serial, fpga_folder, fpga, redis_server):
|
||||
if dev_model not in ["x300", "x310"]:
|
||||
raise RuntimeError(f'{dev_type} not supported with jtag_x3xx')
|
||||
remote_working_dir = "pipeline_fpga"
|
||||
vivado_program_jtag = "/opt/Xilinx/Vivado_Lab/2020.1/bin/vivado_lab -mode batch -source {}/viv_hardware_utils.tcl -nolog -nojournal -tclargs program".format(
|
||||
remote_working_dir)
|
||||
|
|
@ -50,7 +55,12 @@ def set_sfp_addrs(mgmt_addr, sfp_addrs):
|
|||
dut.run(f"ip link set sfp{idx} up")
|
||||
time.sleep(30)
|
||||
|
||||
def flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs):
|
||||
def flash_sdimage_sdmux(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs):
|
||||
""" This method uses an sdmux (https://linux-automation.com/en/products/usb-sd-mux.html)
|
||||
to reimage the sd card.
|
||||
"""
|
||||
if dev_model not in ["n300", "n310", "n320", "n321"]:
|
||||
raise RuntimeError(f'{dev_model} not supported with sdimage_sdmux')
|
||||
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release --kick"))
|
||||
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} acquire"))
|
||||
env = labgrid.Environment(labgrid_device_yaml)
|
||||
|
|
@ -101,6 +111,97 @@ def flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_a
|
|||
|
||||
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release"))
|
||||
|
||||
def flash_sdimage_tftp(dev_model, sdimage_path, initramfs_path, labgrid_device_yaml, sfp_addrs, redis_server):
|
||||
""" This method uses tftp to boot the device into a small Linux envionment to
|
||||
write to the device's sd card. This method is used on the E320 since it has
|
||||
a hardware incompatibility with sdmuxes.
|
||||
"""
|
||||
if dev_model not in ["e320"]:
|
||||
raise RuntimeError(f'{dev_model} not supported with sdimage_tftp')
|
||||
|
||||
if dev_model == "e320":
|
||||
dev_ram_address = '0x20000000'
|
||||
dev_bootm_config = 'conf@zynq-ni-${mboard}.dtb'
|
||||
|
||||
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release --kick"))
|
||||
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} acquire"))
|
||||
env = labgrid.Environment(labgrid_device_yaml)
|
||||
target = env.get_target()
|
||||
|
||||
cp_scu = target.get_driver(labgrid.protocol.ConsoleProtocol, name="scu_serial_driver")
|
||||
cp_linux = target.get_driver(labgrid.protocol.ConsoleProtocol, name="linux_serial_driver")
|
||||
|
||||
print("Powering down DUT", flush=True)
|
||||
cp_scu.write("\napshutdown\n".encode())
|
||||
time.sleep(10)
|
||||
|
||||
print("Powering on DUT", flush=True)
|
||||
cp_scu.write("\npowerbtn\n".encode())
|
||||
# Sometimes it requires multiple powerbtn calls to turn on device
|
||||
try:
|
||||
cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=5)
|
||||
except Exception:
|
||||
print("Device didn't power on with first attempt. Trying again...", flush=True)
|
||||
cp_scu.write("\npowerbtn\n".encode())
|
||||
cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=5)
|
||||
|
||||
print("Attempting to get into uboot console", flush=True)
|
||||
cp_linux.write("noautoboot".encode())
|
||||
# Handle if the watchdog triggers
|
||||
try:
|
||||
cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=30)
|
||||
cp_linux.write("noautoboot".encode())
|
||||
except Exception:
|
||||
pass
|
||||
cp_linux.expect("uboot>")
|
||||
print("Waiting for NIC to come up", flush=True)
|
||||
time.sleep(10)
|
||||
cp_linux.write(f"setenv autoload no; dhcp;\n".encode())
|
||||
cp_linux.expect("DHCP client bound to address")
|
||||
expect_index, expect_before, expect_match , expect_after = cp_linux.expect(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b")
|
||||
mgmt_addr = expect_match[0].decode()
|
||||
print(f"Dev got IP Address {mgmt_addr}")
|
||||
|
||||
with TFTPServer(initramfs_path, mgmt_addr) as server:
|
||||
time.sleep(10)
|
||||
cp_linux.expect("uboot>")
|
||||
cp_linux.write(f"setenv tftpdstp {server.port}\n".encode())
|
||||
cp_linux.expect("uboot>")
|
||||
print("TFTPing initramfs image", flush=True)
|
||||
cp_linux.write(f"tftpboot {dev_ram_address} {server.ip}:{os.path.basename(initramfs_path)}\n".encode())
|
||||
cp_linux.expect("uboot>", timeout=120)
|
||||
print("Booting into initramfs", flush=True)
|
||||
cp_linux.write(f"bootm {dev_ram_address}#{dev_bootm_config}\n".encode())
|
||||
cp_linux.expect("mender login:", timeout=120)
|
||||
print("Logging into Linux", flush=True)
|
||||
cp_linux.write("root\n".encode())
|
||||
cp_linux.expect("mender:~#")
|
||||
print("Waiting for NIC to DHCP", flush=True)
|
||||
time.sleep(10)
|
||||
|
||||
with HTTPServer(os.path.dirname(sdimage_path), mgmt_addr) as server:
|
||||
print(f"Writing SD Card using {sdimage_path}", flush=True)
|
||||
print("Running bmaptool... This will take awhile", flush=True)
|
||||
cp_linux.write(f"bmaptool copy --nobmap {server.get_url(os.path.basename(sdimage_path))} /dev/mmcblk0\n".encode())
|
||||
cp_linux.expect("mender:~#", timeout=1800)
|
||||
cp_linux.write("echo bmaptool exit code: $?\n".encode())
|
||||
cp_linux.expect("bmaptool exit code: 0", timeout=10)
|
||||
time.sleep(10)
|
||||
print("Rebooting into new image from sd card", flush=True)
|
||||
cp_linux.write("reboot\n".encode())
|
||||
|
||||
print("Waiting 2 minutes for device to boot", flush=True)
|
||||
time.sleep(120)
|
||||
cp_linux.expect("login:", timeout=30)
|
||||
known_hosts_path = os.path.expanduser("~/.ssh/known_hosts")
|
||||
subprocess.run(shlex.split(f"ssh-keygen -f \"{known_hosts_path}\" -R \"{mgmt_addr}\""))
|
||||
|
||||
if sfp_addrs:
|
||||
set_sfp_addrs(mgmt_addr, sfp_addrs)
|
||||
|
||||
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release"))
|
||||
return mgmt_addr
|
||||
|
||||
def main(args):
|
||||
redis_server = {Redis.from_url(
|
||||
"redis://{}:6379/0".format(args.redis_server))}
|
||||
|
|
@ -108,13 +209,21 @@ def main(args):
|
|||
with Redlock(key=args.dut_name, masters=redis_server, auto_release_time=1000 * 60 * args.dut_timeout):
|
||||
print("Got mutex for {}".format(args.dut_name), flush=True)
|
||||
|
||||
if args.sdimage:
|
||||
dev_type, dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr = args.sdimage.split(',')
|
||||
if args.sdimage_sdmux:
|
||||
dev_type, dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr = args.sdimage_sdmux.split(',')
|
||||
if args.sfp_addrs:
|
||||
sfp_addrs = args.sfp_addrs.split(',')
|
||||
else:
|
||||
sfp_addrs = None
|
||||
flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs)
|
||||
flash_sdimage_sdmux(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs)
|
||||
|
||||
if args.sdimage_tftp:
|
||||
dev_type, dev_model, sdimage_path, initramfs_path, labgrid_device_yaml = args.sdimage_tftp.split(',')
|
||||
if args.sfp_addrs:
|
||||
sfp_addrs = args.sfp_addrs.split(',')
|
||||
else:
|
||||
sfp_addrs = None
|
||||
mgmt_addr = flash_sdimage_tftp(dev_model, sdimage_path, initramfs_path, labgrid_device_yaml, sfp_addrs, redis_server)
|
||||
|
||||
if args.fpgas:
|
||||
working_dir = os.getcwd()
|
||||
|
|
@ -146,14 +255,17 @@ def main(args):
|
|||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
# jtag_x3xx will flash the fpga for a given jtag_serial using
|
||||
# Vivado on jtag_server. It uses SSH to control jtag_server.
|
||||
# Provide fpga_path as a local path and it will be copied
|
||||
# to jtag_server.
|
||||
parser.add_argument("--jtag_x3xx", type=str,
|
||||
group.add_argument("--jtag_x3xx", type=str,
|
||||
help="dev_type,dev_model,user@jtag_server,jtag_serial,fpga_folder")
|
||||
parser.add_argument("--sdimage", type=str,
|
||||
group.add_argument("--sdimage_sdmux", type=str,
|
||||
help="dev_type,dev_model,sdimg_path,labgrid_device_yaml,mgmt_addr")
|
||||
group.add_argument("--sdimage_tftp", type=str,
|
||||
help="dev_type,dev_model,sdimg_path,initramfs_path,labgrid_device_yaml")
|
||||
parser.add_argument("--sfp_addrs", type=str,
|
||||
help="sfp0ip,sfp1ip,...")
|
||||
parser.add_argument("--fpgas", type=str,
|
||||
|
|
|
|||
98
.ci/utils/tftp.py
Normal file
98
.ci/utils/tftp.py
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import py3tftp.protocols
|
||||
import pyroute2
|
||||
import socket
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class FileReaderSingle:
|
||||
def __init__(self, path, fname_req, chunk_size=0):
|
||||
self.path = path
|
||||
# TODO: Should check fname_req against actual name
|
||||
self.chunk_size = chunk_size
|
||||
self._f = None
|
||||
self._f = open(self.path, 'rb')
|
||||
self.finished = False
|
||||
|
||||
def file_size(self):
|
||||
return self.path.stat().st_size
|
||||
|
||||
def read_chunk(self, size=None):
|
||||
size = size or self.chunk_size
|
||||
if self.finished:
|
||||
return b''
|
||||
|
||||
data = self._f.read(size)
|
||||
if not data or (size > 0 and len(data) < size):
|
||||
self._f.close()
|
||||
self.finished = True
|
||||
|
||||
return data
|
||||
|
||||
def __del__(self):
|
||||
if self._f and not self._f.closed:
|
||||
self._f.close()
|
||||
|
||||
|
||||
class TFTPServerSingle(py3tftp.protocols.BaseTFTPServerProtocol):
|
||||
def __init__(self, path, host_interface, loop, extra_opts):
|
||||
super().__init__(host_interface, loop, extra_opts)
|
||||
self.path = path
|
||||
|
||||
def select_protocol(self, packet):
|
||||
if packet.is_rrq():
|
||||
return py3tftp.protocols.RRQProtocol
|
||||
raise py3tftp.protocols.ProtocolException("Unhandled protocol")
|
||||
|
||||
def select_file_handler(self, packet):
|
||||
if packet.is_rrq():
|
||||
return lambda filename, opts: FileReaderSingle(self.path, filename, opts)
|
||||
|
||||
|
||||
class TFTPServer:
|
||||
"""
|
||||
Simple TFTP server, meant to be short-lived and capable of serving a single
|
||||
file only
|
||||
"""
|
||||
def __init__(self, filename, remote_ip, port=None):
|
||||
self.path = Path(filename).absolute()
|
||||
assert self.path.exists()
|
||||
assert self.path.is_file()
|
||||
|
||||
self.filename = self.path.name
|
||||
|
||||
if port == None:
|
||||
with socket.socket() as s:
|
||||
s.bind(('', 0))
|
||||
self.port = s.getsockname()[1]
|
||||
else:
|
||||
self.port = port
|
||||
|
||||
with pyroute2.IPRoute() as ipr:
|
||||
r = ipr.route('get', dst=remote_ip)
|
||||
for attr in r[0]['attrs']:
|
||||
if attr[0] == 'RTA_PREFSRC':
|
||||
self.ip = attr[1]
|
||||
|
||||
def __enter__(self):
|
||||
self.loop = asyncio.new_event_loop()
|
||||
listen = self.loop.create_datagram_endpoint(
|
||||
lambda: TFTPServerSingle(self.path, self.ip, self.loop, {}),
|
||||
local_addr=(self.ip, self.port))
|
||||
|
||||
def start_loop(loop):
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_forever()
|
||||
|
||||
self.transport, protocol = self.loop.run_until_complete(listen)
|
||||
self.thread = threading.Thread(target=start_loop, args=(self.loop,))
|
||||
self.thread.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, exc):
|
||||
self.transport.close()
|
||||
self.loop.call_soon_threadsafe(self.loop.stop)
|
||||
self.thread.join()
|
||||
Loading…
Reference in a new issue