mirror of
https://github.com/saymrwulf/uhd.git
synced 2026-05-16 21:10:10 +00:00
gpsd connection is not reliable. Adding more error handling to re-connect during polling. Add control flows to get_gps_time in order to give an effect of getting the value on pps edge.
294 lines
11 KiB
Python
294 lines
11 KiB
Python
#
|
|
# Copyright 2017-2018 Ettus Research, a National Instruments Company
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
"""
|
|
GPS service daemon (GPSd) interface class
|
|
"""
|
|
|
|
from __future__ import print_function
|
|
import socket
|
|
import json
|
|
import time
|
|
import select
|
|
import datetime
|
|
from usrp_mpm.mpmlog import get_logger
|
|
|
|
|
|
class GPSDIface(object):
|
|
"""
|
|
Interface to the GPS service daemon (GPSd).
|
|
|
|
The GPSDIface implementation can be used as a context manager, and GPSD results should be
|
|
gotten with the get_gps_info() function. This will filter by the GPSD response class
|
|
(resp_class) and return that class message. If no filter is provided, this function will return
|
|
the first response (not counting the VERSION message).
|
|
|
|
The MPM SKY sensors returns the first available response- there shouldn't be anything tricky
|
|
about this. The MPM TPV sensor, however, returns an interesting value in the shortest time
|
|
possible. If the GPSD has received a TPV report since we connected (and sent the start
|
|
command), this function should return immediately. However, if no report is ready, the function
|
|
waits until an interesting result is ready and returns that. This is achieved by discarding
|
|
`mode=0` responses.
|
|
"""
|
|
def __init__(self):
|
|
# Make a logger
|
|
try:
|
|
self.log = get_logger('GPSDIface')
|
|
except AssertionError:
|
|
from usrp_mpm.mpmlog import get_main_logger
|
|
self.log = get_main_logger('GPSDIface')
|
|
# Make a socket to connect to GPSD
|
|
self.gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
def __enter__(self):
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.disable_watch()
|
|
self.close()
|
|
return exc_type is None
|
|
|
|
def open(self):
|
|
"""Open the socket to GPSD"""
|
|
self.gpsd_socket.connect(('localhost', 2947))
|
|
version_str = self.read_class("VERSION")
|
|
self.enable_watch()
|
|
self.log.trace("GPSD version: %s", version_str)
|
|
|
|
def close(self):
|
|
"""Close the socket"""
|
|
self.gpsd_socket.close()
|
|
self.log.trace("Closing the connection to GPSD.")
|
|
|
|
def enable_watch(self):
|
|
"""Send a WATCH command, which starts operation"""
|
|
self.gpsd_socket.sendall(b'?WATCH={"enable":true};')
|
|
self.log.trace(self.read_class("DEVICES"))
|
|
self.log.trace(self.read_class("WATCH"))
|
|
|
|
def poll_request(self, socket_timeout=60, num_retry=10):
|
|
"""Send a POLL command
|
|
|
|
Raises
|
|
------
|
|
json.JSONDecodeError
|
|
If the data returned from GPSd cannot be decoded with JSON.
|
|
RuntimeError
|
|
If unsuccessfully connecting to GPSD within num_retry.
|
|
"""
|
|
query_cmd = b'?POLL;'
|
|
for _ in range(num_retry):
|
|
try:
|
|
self.gpsd_socket.sendall(query_cmd)
|
|
return self.read_class("POLL", socket_timeout)
|
|
except socket.error:
|
|
self.log.warning("Reconnecting to GPSD.")
|
|
try:
|
|
self.gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.open()
|
|
except socket.error:
|
|
self.log.warning("Error during GPSD reconnect.")
|
|
continue
|
|
raise RuntimeError("Unsuccessfully connecting to GPSD within {} tries".format(num_retry))
|
|
|
|
def disable_watch(self):
|
|
"""Send the command to stop operation"""
|
|
query_cmd = b'?WATCH={"enable":false};'
|
|
self.gpsd_socket.sendall(query_cmd)
|
|
|
|
def socket_read_line(self, timeout=60, interval=0):
|
|
"""
|
|
Read from a socket until newline. If there was no newline until the timeout
|
|
occurs, raise an error. Otherwise, return the line.
|
|
"""
|
|
line = b''
|
|
end_time = time.time() + timeout
|
|
while time.time() < end_time:
|
|
socket_ready = select.select([self.gpsd_socket], [], [], 0)[0]
|
|
if socket_ready:
|
|
next_char = self.gpsd_socket.recv(1)
|
|
if next_char == b'\n':
|
|
return line.decode('ascii')
|
|
line += next_char
|
|
else:
|
|
time.sleep(interval)
|
|
raise socket.timeout
|
|
|
|
def read_class(self, class_name, socket_timeout=60):
|
|
"""return json data for spcecfic key of 'class'
|
|
This function will read until socket timeout (or no data on socket)
|
|
|
|
Raises
|
|
------
|
|
json.JSONDecodeError
|
|
If the data returned from GPSd cannot be decoded with JSON.
|
|
"""
|
|
while True:
|
|
json_result = json.loads(self.socket_read_line(socket_timeout))
|
|
if json_result.get('class', '') == class_name:
|
|
return json_result
|
|
|
|
def get_gps_info(self, resp_class='', timeout=60):
|
|
"""Convenience function for getting a response which contains a response class"""
|
|
# Read results until we see one which contains the requested response class, ie TPV or SKY
|
|
result = {}
|
|
end_time = time.time() + timeout
|
|
while not result.get(resp_class, {}):
|
|
try:
|
|
# Do poll request with socket timeout of 5s here.
|
|
# It should not be that long, since GPSD should send POLL object promptly.
|
|
result = self.poll_request(5)
|
|
if (resp_class == "") or (time.time() > end_time):
|
|
# If we don't have a response class filter, just return the first response
|
|
# or if we timeout
|
|
break
|
|
except json.JSONDecodeError:
|
|
# If we get an incomplete packet, this will trigger
|
|
# In this case, just retry
|
|
self.log.warning("JSON decode error: %s", result)
|
|
continue
|
|
# Filter the result by resp_class or return the entire thing
|
|
# In the filtered case, the result contains a list of 'resp_class' responses,
|
|
# so we need to get one valid one.
|
|
return result if (resp_class == "") else result.get(resp_class, [{}])[0]
|
|
|
|
|
|
class GPSDIfaceExtension(object):
|
|
"""
|
|
Wrapper class that facilitates the 'extension' of a `context` object. The
|
|
intention here is for a object to instantiate a GPSDIfaceExtension object,
|
|
then call the `extend` method with `context` object as an argument. This
|
|
will then add the GPSDIfaceExtension methods to the `context` object's
|
|
methods. The `context` object can then call the convenience functions to
|
|
retrieve GPS information from GPSd.
|
|
For example:
|
|
|
|
class foo:
|
|
def __init__(self):
|
|
self.gps_ext = GPSDIfaceExtension()
|
|
self.gps_ext.extend(self)
|
|
# The GPSDIfaceExtension methods are now registered with foo, so
|
|
# we can call `get_gps_time`
|
|
print(self.get_gps_time())
|
|
"""
|
|
def __init__(self):
|
|
self._gpsd_iface = GPSDIface()
|
|
self._gpsd_iface.open()
|
|
self._log = self._gpsd_iface.log
|
|
|
|
def __del__(self):
|
|
self._gpsd_iface.close()
|
|
|
|
def extend(self, context):
|
|
"""Register the GSPDIfaceExtension object's public function with `context`"""
|
|
new_methods = [method_name for method_name in dir(self)
|
|
if not method_name.startswith('_') \
|
|
and callable(getattr(self, method_name)) \
|
|
and method_name != "extend"]
|
|
for method_name in new_methods:
|
|
new_method = getattr(self, method_name)
|
|
self._log.trace("%s: Adding %s method", context, method_name)
|
|
setattr(context, method_name, new_method)
|
|
return new_methods
|
|
|
|
def get_gps_time_sensor(self):
|
|
"""
|
|
Retrieve the GPS time using a TPV response from GPSd, and returns as a sensor dict.
|
|
This returns a sensor dictionary on the second edge containing the latest second.
|
|
For example, if we call this function at the gps time of 1.001s it will wait until
|
|
just after 2.000s to return 2 second. This effect is similar to get gps time on
|
|
the next edge of pps.
|
|
"""
|
|
def parse_time(time_str):
|
|
"""parse a string of time in format of %Y-%m-%dT%H:%M:%S.%fZ
|
|
return in unit second
|
|
"""
|
|
time_dt = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
|
|
epoch_dt = datetime.datetime(1970, 1, 1)
|
|
return (time_dt - epoch_dt).total_seconds()
|
|
# Read responses from GPSD until we get a non-trivial mode and until next second.
|
|
gps_time_prev = 0
|
|
while True:
|
|
gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15)
|
|
gps_mode = gps_info.get("mode", 0)
|
|
gps_time = parse_time(gps_info.get("time", ""))
|
|
if gps_mode == 0:
|
|
self._log.warning("GPSD reported invalid mode."
|
|
"Return from GPSD is %s", gps_info)
|
|
continue
|
|
if gps_time_prev == 0:
|
|
gps_time_prev = gps_time
|
|
continue
|
|
else:
|
|
if int(gps_time) - int(gps_time_prev) >= 1:
|
|
return {
|
|
'name': 'gps_time',
|
|
'type': 'INTEGER',
|
|
'unit': 'seconds',
|
|
'value': str(int(gps_time)),
|
|
}
|
|
|
|
def get_gps_tpv_sensor(self):
|
|
"""Get a TPV response from GPSd as a sensor dict"""
|
|
self._log.trace("Polling GPS TPV results from GPSD")
|
|
# Read responses from GPSD until we get a non-trivial mode
|
|
while True:
|
|
gps_info = self._gpsd_iface.get_gps_info(resp_class='tpv', timeout=15)
|
|
self._log.trace("GPS info: {}".format(gps_info))
|
|
if gps_info.get("mode", 0) > 0:
|
|
break
|
|
# Return the JSON'd results
|
|
gps_tpv = json.dumps(gps_info)
|
|
return {
|
|
'name': 'gps_tpv',
|
|
'type': 'STRING',
|
|
'unit': '',
|
|
'value': gps_tpv,
|
|
}
|
|
|
|
def get_gps_sky_sensor(self):
|
|
"""Get a SKY response from GPSd as a sensor dict"""
|
|
self._log.trace("Polling GPS SKY results from GPSD")
|
|
# Just get the first SKY result
|
|
gps_info = self._gpsd_iface.get_gps_info(resp_class='sky', timeout=15)
|
|
# Return the JSON'd results
|
|
gps_sky = json.dumps(gps_info)
|
|
return {
|
|
'name': 'gps_sky',
|
|
'type': 'STRING',
|
|
'unit': '',
|
|
'value': gps_sky,
|
|
}
|
|
|
|
|
|
def main():
|
|
"""Test functionality of the GPSDIface class"""
|
|
# Do some setup
|
|
import argparse
|
|
|
|
def parse_args():
|
|
"""Parse the command-line arguments"""
|
|
parser = argparse.ArgumentParser(description="Read messages from GPSD")
|
|
parser.add_argument("--timeout", help="Timeout for the GPSD read", default=20)
|
|
return parser.parse_args()
|
|
|
|
args = parse_args()
|
|
|
|
with GPSDIface() as gps_iface:
|
|
result = gps_iface.get_gps_info(resp_class='', timeout=args.timeout)
|
|
tpv_result = gps_iface.get_gps_info(resp_class='tpv', timeout=args.timeout)
|
|
sky_result = gps_iface.get_gps_info(resp_class='sky', timeout=args.timeout)
|
|
print("Sample result: {}".format(result))
|
|
print("TPV: {}".format(tpv_result))
|
|
print("SKY: {}".format(sky_result))
|
|
gps_ext = GPSDIfaceExtension()
|
|
for _ in range(10):
|
|
print(gps_ext.get_gps_time_sensor().get('value'))
|
|
#TODO Add GPSDIfaceExtension code
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|