Source code for giant.ufo.measurements
# Copyright 2021 United States Government as represented by the Administrator of the National Aeronautics and Space
# Administration. No copyright is claimed in the United States under Title 17, U.S. Code. All Other Rights Reserved.
from abc import ABCMeta, abstractmethod
from datetime import datetime
from uuid import uuid4
from typing import Optional, Hashable
import numpy as np
from giant.ufo.dynamics import Dynamics
from giant.image import OpNavImage
from giant.camera_models import CameraModel
from giant._typing import SCALAR_OR_ARRAY
[docs]class Measurement(metaclass=ABCMeta):
"""
This ABC defines the interface for a measurement that is expected by the :class:`.ExtendedKalmanFilter` class for a
measurement.
To implement a new measurement type, simply subclass this class, implement the abstract methods and properties (plus
whatever you need for the methods/properties) and then use it to feed measurements into the EKF.
"""
@property
@abstractmethod
def covariance(self) -> np.ndarray:
"""
Returns the covariance of the measurement as a numpy array
"""
pass
[docs] @abstractmethod
def predict(self, state: Dynamics.State) -> SCALAR_OR_ARRAY:
"""
Predicts what the measurement should be given the current state.
:param state: The state object defining the current state of the target.
:return: The predicted measurement as a scalar or numpy array
"""
pass
@property
@abstractmethod
def observed(self) -> SCALAR_OR_ARRAY:
"""
Returns the observed measurement.
"""
pass
[docs] @abstractmethod
def compute_jacobian(self, state: Dynamics.State) -> np.ndarray:
"""
Computes and returns the change in the predicted measurement given a change in the state.
This is also known as the observation matrix. It linearly maps changes in the state to changes in the
measurement. As such, it should be a matrix of length nxm where n is the number of elements in the measurement
(for instance n=2 for a single pixel measurement) and m is the length of the state vector (``len(state)``).
The state vector is always guaranteed to contain position and velocity as the first 2 components of the state
vector.
:param state: The state object defining the current state of the target.
:return: The Jacobian of the measurement as a numpy array
"""
pass
@property
@abstractmethod
def observer_location(self) -> Dynamics.State:
"""
Returns the state of the observer at the time the measurement was captured in the base dynamics frame
"""
pass
@property
@abstractmethod
def time(self) -> datetime:
"""
Returns the time of the measurement as a python datetime object
"""
pass
@property
@abstractmethod
def identity(self) -> Hashable:
"""
The identity of this measurement.
This is used primarily by the tracker to link measurement back to observation ids
"""
pass
[docs] @staticmethod
@abstractmethod
def compare_residuals(first: SCALAR_OR_ARRAY, second: SCALAR_OR_ARRAY) -> bool:
"""
This compares residuals computed using these measurement models.
If this returns ``True``, then ``first`` is smaller than or equal to ``second`` (according to the definition of
the residuals for this measurement). If ``False, then ``first`` is larger than ``second``. This is used to
check for divergence.
:param first: The first residual
:param second: The second residual
:return: ``True`` if the first residual <= second residual otherwise ``False``
"""
pass
[docs]class OpticalBearingMeasurement(Measurement):
"""
This class implements a concrete measurement model for optical bearing measurements presented as pixel/line pairs.
This class serves both as a functional measurement model and as an example for how to define your own measurement
model. Because of the power of the :class:`.CameraModel` classes in GIANT, this largely just serves as a wrapper
around the :class:`.CameraModel` that describes the camera the measurements are being generated from. As such, at
initialization the camera model must be specified, as well as the image the measurement came from.
Note that this class does not implement the ability to extract measurements from an image, just the ability to model
them. GIANT provides lots of capabilities for extracting measurements from images in the :mod:`.ufo` and
:mod:`.relative_opnav` modules that you could then feed into this class for modelling in the GIANT filter.
"""
def __init__(self, observed_measurement: np.ndarray, camera_model: CameraModel, image: OpNavImage,
observer_location: Dynamics.State, covariance: Optional[np.ndarray] = None,
identity: Optional[Hashable] = None):
"""
:param observed_measurement: The observed measurement as a length 2 array of x, y in pixels
:param camera_model: The camera model that represents the camera
:param image: The image that the measurement was extracted from
:param observer_location: The location of the camera in the base dynamics frame when the image was captured
:param covariance: The measurement covariance matrix as a 2x2 numpy array or ``None``. If this is ``None``
then the identity matrix will be assumed. The units should be pixels squared.
:param identity: A unique identity
"""
self.observed_measurement: np.ndarray = observed_measurement
"""
The observed measurement as a length 2 array of x, y pixels
"""
self.camera_model: CameraModel = camera_model
"""
The camera model that represents the camera
"""
self.image: OpNavImage = np.zeros(1, dtype=np.uint8).view(image.__class__)
"""
The image that the measurement was extracted from, which gives the temperature of the camera as well as the
orientation of the camera at the time the image was captured.
This only includes the header data for the image, not the image data itself for memory management purposes
"""
self.image.__array_finalize__(image)
self._observer_location: Dynamics.State = observer_location
"""
The location of the camera in the base dynamics frame when the measurement was captured.
"""
self._covariance: np.ndarray = covariance
"""
The measurement covariance matrix as a 2x2 numpy array or ``None``.
If this is ``None`` then the identity matrix will be assumed. The units should be pixels squared.
"""
self._identity: Hashable = identity if identity is not None else uuid4()
"""
The identity of this measurement.
This is used primarily by the tracker to link measurement back to observation ids
"""
if self._covariance is None:
self._covariance = np.eye(2, dtype=np.float64)
@property
def identity(self) -> Hashable:
"""
The identity of this measurement.
This is used primarily by the tracker to link measurement back to observation ids
"""
return self._identity
@property
def observed(self) -> np.ndarray:
"""
The observed measurement as a length 2 numpy array
"""
return self.observed_measurement
@property
def covariance(self) -> np.ndarray:
"""
The measurement covariance matrix as a 2x2 numpy array.
The units of this should be pixels squared for the main diagonal.
"""
return self._covariance
@property
def observer_location(self) -> Dynamics.State:
"""
The location of the camera when this measurement was captured in the base Dynamics frame.
"""
return self._observer_location
@property
def time(self) -> datetime:
"""
The time when this measurement was captured as a datetime object.
"""
return self.image.observation_date
[docs] def predict(self, state: Dynamics.State) -> SCALAR_OR_ARRAY:
"""
Predicts the pixel location of an observation for the given state of the target and the location of the camera
at the time the target was observed.
This is computed by getting the relative state between the ``state``` and the ``observer_location`` in the
camera frame and then projecting the relative state using :meth:`.CameraModel.project_onto_image`.
:param state: The estimated state of the target at the time the target was observed
:return: The length 2 array giving the predicted pixel location of the target in the image
"""
# compute the relative state
relative_state = state - self._observer_location
# put the relative state in the "inertial" frame
relative_state_inertial = relative_state.orientation.matrix.T @ relative_state.position
# put it into the camera frame
relative_state_camera = self.image.rotation_inertial_to_camera.matrix @ relative_state_inertial
# return the predicted location
return self.camera_model.project_onto_image(relative_state_camera, temperature=self.image.temperature)
[docs] def compute_jacobian(self, state: Dynamics.State) -> np.ndarray:
"""
Computes and returns the change in the predicted measurement given a change in the state.
This is also known as the observation matrix. It linearly maps changes in the state to changes in the
measurement. As such, it should be a matrix of length nxm where n is the number of elements in the measurement
(for instance n=2 for a single pixel measurement) and m is the length of the state vector (``len(state)``).
The state vector is always guaranteed to contain position and velocity as the first 2 components of the state
vector.
:param state: The state object defining the current state of the target.
:return: The Jacobian of the measurement as a numpy array
"""
# compute the relative state
relative_state = state - self._observer_location
# put the relative state in the "inertial" frame
relative_state_inertial = relative_state.orientation.matrix.T @ relative_state.position
# put it into the camera frame
relative_state_camera = self.image.rotation_inertial_to_camera.matrix @ relative_state_inertial
# initialize the observation matrix
obs_mat = np.zeros((2, len(state)), dtype=np.float64)
# store the jacobian with respect to the position which is the only thing that matters
obs_mat[:, :3] = (self.camera_model.compute_pixel_jacobian(relative_state_camera.reshape(3, 1)) @
self.image.rotation_inertial_to_camera.matrix)
return obs_mat
[docs] @staticmethod
def compare_residuals(first: np.ndarray, second: np.ndarray) -> bool:
"""
This compares residuals computed using these measurement models.
If this returns ``True``, then ``first`` is smaller than or equal to ``second`` (according to the definition of
the residuals for this measurement). If ``False, then ``first`` is larger than ``second``. This is used to
check for divergence.
Here the residuals are compared by their 2 norm.
:param first: The first residual
:param second: The second residual
:return: ``True`` if the first residual <= second residual otherwise ``False``
"""
return np.linalg.norm(first) <= np.linalg.norm(second)