Source code for giant.calibration.estimators.geometric.iterative_nonlinear_lstsq

from dataclasses import dataclass

from typing import Optional, cast, Sequence

import warnings

import numpy as np
from numpy.typing import NDArray

from giant.calibration.estimators.geometric.geometric_estimator import GeometricEstimatorBC, GeometricEstimatorOptions, ModelT
from giant.camera_models import CameraModel
from giant._typing import DOUBLE_ARRAY, ARRAY_LIKE


[docs] @dataclass class IterativeNonlinearLstSqOptions(GeometricEstimatorOptions): max_iter: int = 20 """ The maximum number of iteration steps to attempt to reach convergence. If convergence has not been reached after attempting ``max_iter`` steps, a warning will be raised that the model has not converged and :attr:`successful` will be set to ``False``. """ residual_atol: float = 1e-10 """ The absolute convergence tolerance criteria for the sum of squares of the residual. """ residual_rtol: float = 1e-10 """ The relative convergence tolerance criteria for the sum of squares of the residual. """ state_atol: float = 1e-10 """ The absolute convergence tolerance criteria for the elements of the state vecto. """ state_rtol: float = 1e-10 """ The relative convergence tolerance criteria for the elements of the state vector. """
[docs] class IterativeNonlinearLSTSQ(GeometricEstimatorBC[ModelT], IterativeNonlinearLstSqOptions): r""" This concrete estimator implements iterative non-linear least squares for estimating an updated camera model. Iterative non-linear least squares estimation is done by estimating updates to the "state" vector (in this case the camera model parameters being updated) iteratively. At each step, the system is linearized about the current estimate of the state and the additive update is estimated. This iteration is repeated until convergence (or divergence) based on the pre/post update residuals and the update vector itself. The state vector that is being estimated by this class is controlled by the :attr:`.CameraModel.estimation_parameters` attribute of the provided camera model. This class does not actually use the :attr:`.CameraModel.estimation_parameters` attribute since it is handled by the :meth:`.CameraModel.compute_jacobian` and :meth:`.CameraModel.apply_update` methods of the provided camera model internally, but it is mentioned here to show how to control what exactly is being estimated. Because this class linearizes about the current estimate of the state, it requires an initial guess for the camera model that is "close enough" to the actual model to ensure convergence. Defining "close enough" in any broad sense is impossible, but based on experience, using the manufacturer defined specs for focal length/pixel pitch and assuming no distortion is generally "close enough" even for cameras with heavy distortion (star identification may require a better initial model than this anyway). As this class converges the state estimate, it updates the supplied camera model in place, therefore, if you wish to keep a copy of the original camera model, you should manually create a copy of it before calling the :meth:`estimate` method on this class. In the :meth:`estimate` method, convergence is checked on both the sum of squares of the residuals and the update vector for the state. That is convergence is reached when either of .. math:: :nowrap: \begin{gather*} \left\|\mathbf{r}_{pre}^T\mathbf{r}_{pre} - \mathbf{r}_{post}^T\mathbf{r}_{post}\right\| \le(a_r+r_r\mathbf{r}_{pre}^T\mathbf{r}_{pre}) \\ \text{all}\left[\left\|\mathbf{u}\right\|\le(a_s+r_s\mathbf{s}_{pre})\right] \end{gather*} is ``True``. Here :math:`\mathbf{r}_{pre}` is the nx1 vector of residuals before the update is applied, :math:`\mathbf{r}_{post}` is the nx1 vector of residuals after the update is applied, :math:`a_r` is the :attr:`residual_atol` absolute residual tolerance, :math:`r_r` is the :attr:`residual_rtol` relative residual tolerance, :math:`\mathbf{u}` is the update vector, :math:`\text{all}` indicates that the contained expression is ``True`` for all elements, :math:`a_s` is the :attr:`state_atol` absolute tolerance for the state vector, :math:`r_s` is the :attr:`state_rtol` relative tolerance for the state vector, and :math:`\mathbf{s}_{pre}` is the state vector before the update is applied. Divergence is only checked on the sum of squares of the residuals, that is, divergence is occurring when .. math:: \mathbf{r}_{pre}^T\mathbf{r}_{pre} < \mathbf{r}_{post}^T\mathbf{r}_{post} where all is as defined as before. If a case is diverging then a warning will be printed, the iteration will cease, and :attr:`successful` will be set to ``False``. Typically this class is not used by the user, and instead it is used internally by the :class:`.Calibration` class which handles data preparation for you. If you wish to use this externally from the :class:`.Calibration` class you must first set * :attr:`model` * :attr:`measurements` * :attr:`camera_frame_directions` * :attr:`temperatures` * :attr:`weighted_estimation` * :attr:`measurement_covariance` *if* :attr:`weighted_estimation` *is* ``True`` * :attr:`a_priori_state_covariance` *if* :attr:`~.CameraModel.use_a_priori` *is set to* ``True`` for the camera model. according to their documentation. Once those have been set, you can perform the estimation using :meth:`estimate` which will iterate until convergence (or divergence). If the fit successfully converges, :attr:`successful` will be set to ``True`` and attributes :attr:`postfit_covariance` and :attr:`postfit_residuals` will both return numpy arrays instead of ``None``. If you wish to use the same instance of this class to do another estimation you should call :meth:`reset` before setting the new data to ensure that data is not mixed between estimation runs and all flags are set correctly. """ def __init__(self, model: ModelT, options: IterativeNonlinearLstSqOptions | None = None): r""" :param model: The camera model instance to be estimated set with an initial guess of the state. :param options: the dataclass containing the options to configure the class with """ super().__init__(IterativeNonlinearLstSqOptions, options=options) self._model: ModelT = model """ The instance attribute to store the camera model being estimated. """ self._measurements: Optional[DOUBLE_ARRAY] = None """ The instance attribute to store the measurement array """ self._base_frame_directions: Optional[list[DOUBLE_ARRAY | list[list]]] = None """ The instance attribute to store the base frame direction list """ self._temperatures: Optional[list[float]] = None """ The instance attribute to store the camera temperature values for each image. """ # set the internal success flag self._successful: bool = False """ The instance attribute to store the success flag """ self._measurement_covariance: Optional[float | DOUBLE_ARRAY] = None """ The instance attribute to store the measurement covariance matrix """ self._jacobian: Optional[DOUBLE_ARRAY] = None """ A place to store the Jacobian matrix """ self._postfit_covariance: Optional[DOUBLE_ARRAY] = None """ A place to cache the post-fit covariance matrix """ self._postfit_residuals: Optional[DOUBLE_ARRAY] = None """ A place to cache the post-fit residual vector """ @property def model(self) -> ModelT: """ The camera model that is being estimated. Typically this should be a subclass of :class:`.CameraModel`. """ return self._model @model.setter def model(self, val: ModelT): if not isinstance(val, CameraModel): warnings.warn("You are setting a camera model that is not a subclass of CameraModel. We'll assume duck " "typing for now, but be sure that you have implemented all required interfaces or you'll end " "up with an error.") self._model = val @property def successful(self) -> bool: """ A boolean flag indicating whether the fit was successful or not. If the fit was successful this should return ``True``, and ``False`` if otherwise. A fit is defined as successful if convergence criteria were reached before the maximum number of iterations. Divergence and non-convergence are both considered an unsuccessful fit resulting in this being set to ``False`` """ return self._successful @property def measurement_covariance(self) -> Optional[DOUBLE_ARRAY | float]: """ A square numpy array containing the covariance matrix for the measurements or a scalar containing the variance for all of the measurements. If :attr:`weighted_estimation` is set to ``True`` then this property will contain the measurement covariance matrix as a square, full rank, numpy array or the measurement variance as a scalar float. If :attr:`weighted_estimation` is set to ``False`` then this property may be ``None`` and will be ignored. If specified as a scalar, it is treated as the **variance** for each measurement (that is ``cov = v*I(n,n)`` where ``cov`` is the covariance matrix, ``v`` is the specified scalar variance, and ``I(n,n)`` is a nxn identity matrix) in a memory efficient way. :raises ValueError: When attempting to set an array that does not have the proper shape for the :attr:`measurements` vector """ return self._measurement_covariance @measurement_covariance.setter def measurement_covariance(self, val: Optional[float | ARRAY_LIKE]): if self._measurements is not None: if not np.isscalar(val): val = np.asanyarray(val, dtype=np.float64) if val.shape[0] != self._measurements.size: raise ValueError('The measurement covariance matrix must be a square matrix of nxn where n is the ' 'number of measurements being used in estimation.\n\tmeasurement_covariance shape = {}' '\n\tnumber of measurements = {}'.format(val.shape, self._measurements.size)) self._postfit_covariance = None # drop the cache self._measurement_covariance = val # type: ignore @property def measurements(self) -> np.ndarray: """ A nx2 numpy array of the observed pixel locations for stars across all images Each column of this array corresponds to the same column of the :attr:`camera_frame_directions` concatenated down the last axis. (That is ``measurements[:, i] <-> np.concatenate(camera_frame_directions, axis=-1)[:, i]``) This must always be set before a call to :meth:`estimate`. """ assert self._measurements is not None, "measurements shouldn't be none at this point" return self._measurements @measurements.setter def measurements(self, val: Optional[ARRAY_LIKE]): if val is not None: self._measurements = np.asanyarray(val, dtype=np.float64) else: self._measurements = None @property def camera_frame_directions(self) -> list[DOUBLE_ARRAY | list[list]]: """ A length m list of unit vectors in the camera frame as numpy arrays for m images corresponding to the :attr:`measurements` attribute. Each element of this list corresponds to a unique image that is being considered for estimation and the subsequent element in the :attr:`temperatures` list. Each column of this concatenated array will correspond to the same column of the :attr:`measurements` array. (That is ``np.concatenate(camera_frame_directions, axis=-1)[:, i] <-> measurements[:, i]``). Any images for which no stars were identified (due to any number of reasons) will have a list of empty arrays in the corresponding element of this list (that is ``camera_frame_directions[i] == [[], [], []]`` where ``i`` is an image with no measurements identified). These will be automatically dropped by numpy's concatenate, but are included to notify the which temperatures/misalignments to use. This must always be set before a call to :meth:`estimate`. """ assert self._base_frame_directions is not None, 'base frame directions should not be None at this point' return self._base_frame_directions @camera_frame_directions.setter def camera_frame_directions(self, val: Optional[Sequence[DOUBLE_ARRAY | list[list]]]): self._base_frame_directions = list(val) if val is not None else None @property def temperatures(self) -> list[float]: """ A length m list of temperatures of the camera for each image being considered in estimation. Each element of this list corresponds to a unique image that is being considered for estimation and the subsequent element in the :attr:`camera_frame_directions` list. This must always be set before a call to :meth:`estimate` (although sometimes it may be a list of all zeros if temperature data is not available for the camera). """ assert self._temperatures is not None, "Temperatures should not be None at this point" return self._temperatures @temperatures.setter def temperatures(self, val: Optional[list[float]]): self._temperatures = val @property def postfit_covariance(self) -> Optional[DOUBLE_ARRAY]: """ The post-fit state covariance matrix, taking into account the measurement covariance matrix (if applicable). This returns the post-fit state covariance matrix after a call to :meth:`estimate`. The covariance matrix will be in the order according to :attr:`~.CameraModel.estimation_parameters` and if :attr:`weighted_estimation` is ``True`` will return the state covariance matrix taking into account the measurement covariance matrix. If :attr:`weighted_estimation` is ``False``, then this will return the post-fit state covariance matrix assuming no measurement weighting (that is a measurement covariance matrix of the identity matrix). If :meth:`estimate` has not been called yet or the fit was unsuccessful then this will return ``None`` """ return self._calc_covariance() @property def postfit_residuals(self) -> Optional[DOUBLE_ARRAY]: """ The post-fit observed-computed measurement residuals as a 2xn numpy array. This returns the post-fit observed minus computed measurement residuals after a call to :meth:`estimate`. If :meth:`estimate` has not been called yet or the fit was unsuccessful then this will return ``None``. """ if self._successful: return self._postfit_residuals else: return None
[docs] def reset(self) -> None: """ This method resets all of the data attributes to their default values to prepare for another estimation. Specifically * :attr:`successful` * :attr:`measurement_covariance` * :attr:`a_priori_state_covariance` * :attr:`measurements` * :attr:`camera_frame_directions` * :attr:`temperatures` * :attr:`postfit_covariance` * :attr:`postfit_residuals` are reset to their default values (typically ``None``). This also clears the caches for some internally used attributes. """ self._successful = False self._measurement_covariance = None self._measurements = None self._base_frame_directions = None self._temperatures = None self._postfit_covariance = None self._postfit_residuals = None self._jacobian = None
def _calc_covariance(self): r""" This method calculates the post fit covariance (if a fit was successful) using the cached value if available. The post-fit covariance is defined as .. math:: \mathbf{C}=\left((\mathbf{J}^T\mathbf{J})^{-1} \mathbf{J}^T\mathbf{W}\mathbf{J} (\mathbf{J}^T\mathbf{J})^{-1}\right)^{-1} where :math:`\mathbf{J}` is the Jacobian matrix evaluated at the final state estimate and :math:`\mathbf{W}=\mathbf{R}^{-1}` is the weight matrix, which is the inverse of the measurement covariance matrix (if applicable). If the fit was not successful (or it has not been performed yet) this will return ``None``. """ # if the fit was unsuccessful return None if not self.successful: return None # if the covariance is cached return it if self._postfit_covariance is not None: return self._postfit_covariance # otherwise compute it weight_matrix = self._compute_weight_matrix(len(self.model.state_vector), self.measurements.size) assert self._jacobian is not None, "The jacobian hasn't been computed, something is wrong" if not np.isscalar(weight_matrix): orthogonal_project_mat = np.linalg.inv(self._jacobian.T @ self._jacobian) @ self._jacobian.T self._postfit_covariance = np.linalg.inv(orthogonal_project_mat @ weight_matrix @ orthogonal_project_mat.T).astype(np.float64) else: self._postfit_covariance = np.linalg.inv(self._jacobian.T @ self._jacobian * weight_matrix).astype(np.float64) # type: ignore return self._postfit_covariance
[docs] def compute_residuals(self, model: Optional[CameraModel] = None) -> np.ndarray: """ This method computes the observed minus computed residuals for the current model (or an input model). The residuals are returned as a 2xn numpy array where n is the number of stars observed with units of pixels. The computed values are determined by calls to ``model.project_onto_image`` for the :attr:`camera_frame_directions` for each image. :param model: An optional model to compute the residuals using. If ``None``, then will use :attr:`model`. :return: The observed minus computed residuals as a numpy array """ # use the model attribute if necessary if model is None: model = self.model return self.measurements - np.concatenate( [model.project_onto_image(vecs, image=ind, temperature=self.temperatures[ind]) for ind, vecs in enumerate(self.camera_frame_directions)], axis=1 )
def _compute_weight_matrix(self, state_vector_size, number_of_measurements) -> float | DOUBLE_ARRAY: """ This method computes the weight matrix based on whether weighted estimation is being performed, and whether using the a priori state as a measurement. :param state_vector_size: The size of the state vector :param number_of_measurements: The number of measurements :return: the weight matrix, or 1 if a weight matrix is not needed """ if self.weighted_estimation and self.model.use_a_priori: weight_matrix = np.zeros((state_vector_size + number_of_measurements, state_vector_size + number_of_measurements), dtype=np.float64) if self.a_priori_model_covariance is not None: weight_matrix[number_of_measurements:, number_of_measurements:] = np.linalg.inv(np.asanyarray(self.a_priori_model_covariance)) else: weight_matrix[number_of_measurements:, number_of_measurements:] = np.eye(state_vector_size) if self._measurement_covariance is not None: if np.isscalar(self.measurement_covariance): measurement_info = 1.0 / cast(float, self.measurement_covariance) for i in range(number_of_measurements): weight_matrix[i, i] = measurement_info else: weight_matrix[:number_of_measurements, :number_of_measurements] = np.linalg.inv(np.asanyarray(self.measurement_covariance)) else: for i in range(number_of_measurements): weight_matrix[i, i] = 1 elif self.weighted_estimation: if self._measurement_covariance is not None: if np.isscalar(self.measurement_covariance): weight_matrix = 1.0 / cast(float, self.measurement_covariance) else: weight_matrix = np.linalg.inv(np.asanyarray(self.measurement_covariance)) else: weight_matrix = 1 elif self.model.use_a_priori: weight_matrix = np.eye(state_vector_size + number_of_measurements, dtype=np.float64) if self.a_priori_model_covariance is not None: weight_matrix[number_of_measurements:, number_of_measurements:] = np.linalg.inv(np.asanyarray(self.a_priori_model_covariance)) else: weight_matrix = 1 return weight_matrix
[docs] def estimate(self) -> None: """ Estimates an updated camera model that better transforms the camera frame directions into pixel locations to minimize the residuals between the observed and the predicted star locations. Upon successful completion, the updated camera model is stored in the :attr:`model` attribute, the :attr:`successful` will return ``True``, and :attr:`postfit_residuals` and :attr:`postfit_covariance` should both be not None. If estimation is unsuccessful, then :attr:`successful` should be set to ``False``. The estimation is done using nonlinear iterative least squares, as discussed in the class documentation (:class:`IterativeNonlinearLSTSQ`). :raises ValueError: if :attr:`measurements` or :attr:`camera_frame_directions` are ``None``. """ if self.measurements is None: raise ValueError("measurements must not be None before a call to estimate") if self.camera_frame_directions is None: raise ValueError("camera_frame_directions must not be None before a call to estimate") if self.weighted_estimation and (self.measurement_covariance is None): raise ValueError("measurement_covariance must not be None before a call to estimate " "if weighed_estimation is True") if self.model.use_a_priori and (self.a_priori_model_covariance is None): raise ValueError("a_priori_model_covariance must not be None before a call to estimate " "if model.use_a_priori is True") # get the size of the state vector a_priori_state = np.array(self.model.state_vector) state_size = len(a_priori_state) # get the number of measurements num_meas = self.measurements.size # get the weight matrix weight_matrix = self._compute_weight_matrix(state_size, num_meas) # calculate the prefit residuals prefit_residuals = self.compute_residuals() pre_ss = prefit_residuals.ravel()@prefit_residuals.ravel() for _ in range(self.max_iter): jacobian = self.model.compute_jacobian(self.camera_frame_directions, temperature=self.temperatures) if self.model.use_a_priori: residuals_vec = np.concatenate([prefit_residuals.reshape((-1, 1), order='F'), np.zeros((state_size, 1))], axis=0) else: residuals_vec = prefit_residuals.reshape((-1, 1), order='F') if np.isscalar(weight_matrix): lhs = np.sqrt(weight_matrix)*jacobian.T@jacobian rhs = np.sqrt(weight_matrix)*jacobian.T@residuals_vec else: lhs = jacobian.T@weight_matrix@jacobian rhs = jacobian.T@weight_matrix@residuals_vec update_vec = np.linalg.solve(lhs, rhs).astype(np.float64) model_copy: ModelT = self.model.copy() model_copy.apply_update(update_vec) postfit_residuals = self.compute_residuals(model=model_copy) post_ss = postfit_residuals.ravel()@postfit_residuals.ravel() resid_change = abs(pre_ss-post_ss) # check for convergence if resid_change <= (self.residual_atol+self.residual_rtol*pre_ss): self._successful = True self._postfit_residuals = postfit_residuals self.model = model_copy self._jacobian = self.model.compute_jacobian(self.camera_frame_directions, temperature=self.temperatures) return elif (np.abs(update_vec) <= (self.state_atol+self.state_rtol*a_priori_state)).all(): self._successful = True self._postfit_residuals = postfit_residuals self.model = model_copy self._jacobian = self.model.compute_jacobian(self.camera_frame_directions, temperature=self.temperatures) return elif pre_ss < post_ss: # check for divergence warnings.warn('Solution is diverging. Stopping iteration.' '\n\tpre-update residuals {}' '\n\tpost-update residuals {}'.format(pre_ss, post_ss)) self._successful = False self._postfit_residuals = None self._jacobian = None return else: # converging self.model = model_copy prefit_residuals = postfit_residuals pre_ss = post_ss a_priori_state = np.array(self.model.state_vector) warnings.warn("Solution didn't converge in the requested number of iterations") self._successful = False self._postfit_residuals = prefit_residuals self.model self._jacobian = self.model.compute_jacobian(self.camera_frame_directions, temperature=self.temperatures)