Source code for giant.ufo.ekf_tracker

# Copyright 2021 United States Government as represented by the Administrator of the National Aeronautics and Space
# Administration.  No copyright is claimed in the United States under Title 17, U.S. Code. All Other Rights Reserved.


"""
This module provides a class for tracking UFO detections across monocular images.

Description
-----------

UFO tracking is done using Extended Kalman Filters to predict the locations of UFOs in subsequent images and then match
UFO detections in those images to the predict locations.  Multiple paths are followed for each possible particle
resulting in many possible tracks across all of the images.  Each of these possible tracks is then filtered to remove
extraneous tracks (and tracks that are subsets of other tracks) leaving only the tracks we are relatively confident in
the quality of.  Further description of the process can be found in the paper at
https://agupubs.onlinelibrary.wiley.com/doi/pdf/10.1029/2019EA000843

Use
---

To track UFOs from image to image, initialize the :class:`.Tracker` class with the required information and then call
method :meth:`.Tracker.track`.  This will go through the whole process of initial tracking and then filtering for all
the possible particles in the loaded data, storing the results in the :attr:`.confirmed_tracks` and
:attr:`.confirmed_standard_deviations` lists.  You can also save the results to a csv file using method
:meth:`~.Tracker.save_results`.

You may also be interested in using the :class:`.UFO` class which combines both detection and tracking instead of using
this class directly.
"""

import logging

from copy import deepcopy, copy

from multiprocessing import Process, cpu_count, Array

from tempfile import TemporaryFile

from time import time, sleep

from pathlib import Path

from queue import Empty, Full

from datetime import timedelta

# no real risk hear because the pickle files are created by this script itself
import pickle  # nosec

from typing import List, Optional, Callable, Union, Tuple, Hashable, Iterable

from scipy.spatial.kdtree import KDTree
import numpy as np

import spiceypy as spice

from giant.image import OpNavImage
from giant.ray_tracer.scene import Scene
from giant.camera import Camera
from giant.ufo.extended_kalman_filter import ExtendedKalmanFilter, STATE_INITIALIZER_TYPE
from giant.ufo.measurements import OpticalBearingMeasurement
from giant.ufo.dynamics import Dynamics
from giant.ufo.clearable_queue import ClearableQueue

from giant._typing import Real, PATH


_LOGGER: logging.Logger = logging.getLogger(__name__)
"""
The logger to use to report status/errors/warning/results/etc
"""


_ZERO_TIMEDELTA: timedelta = timedelta()
"""
The 0 timedelta constant
"""


def _pickle_generator(file: TemporaryFile) -> Iterable[ExtendedKalmanFilter]:
    """
    Simple generator to work through the pickled objects in a file
    :param file: the file object to work on
    """

    while True:
        try:
            # we are using pickle for swap essentially therefore there is no real risk
            yield pickle.load(file)  # nosec
        except EOFError:
            break


[docs]class Tracker: """ This class provides an interface for autonomously tracking UFOs through subsequent images in time. There are 2 main components to this tracker. The first is the use of EKFs to follow most of the possible paths forward from a single starting particle in an image. This can result in hundreds of thousands (if not millions) of possible tracks for a set of images with fairly dense UFO detections. The second component is then the filtering of these EKFs, which is done based on length (the number of measurements included in the EKF), post-fit residual statistics, and uniqueness (each starting particle only gets the best track assigned to it, and once a particle has been assigned to a track it can't be assigned to others). This filtering process normally brings the number of tracks down to a much more manageable 10s to 100s. Explicit details on how this tracker works are provided in the paper at https://agupubs.onlinelibrary.wiley.com/doi/pdf/10.1029/2019EA000843 and are not repeated here. To use this class, provide the (numerous) initialization values (the defaults will be fine for many cases) and then call method :meth:`track`. The results will then be stored in the :attr:`confirmed_filters`, :attr:`confirmed_particles`, and :attr:`confirmed_standard_deviations` attributes. In general, you may not directly interact with this class however, and instead will use the :class:`UFO` interface class to both detect and track particles. """ def __init__(self, camera: Camera, scene: Scene, dynamics: Dynamics, state_initializer: STATE_INITIALIZER_TYPE, search_distance_function: Callable[[ExtendedKalmanFilter], Real], observation_trees: Optional[List[Optional[KDTree]]] = None, observation_ids: Optional[List[Optional[List[int]]]] = None, initial_euclidean_threshold: Real = 300, measurement_covariance: Optional[np.ndarray] = None, maximum_image_timedelta: timedelta = timedelta(hours=1), maximum_paths_per_image: int = 10, maximum_paths_total: int = 15, maximum_forward_images: int = 2, maximum_track_length: int = 15, maximum_mahalanobis_distance_squared: float = 25, expected_convergence_number: int = 4, reduced_paths_forward_per_image: int = 3, minimum_number_of_measurements: int = 4, maximum_residual_standard_deviation: Real = 20, maximum_time_outs: int = 50, maximum_tracking_time_per_image: Real = 3600, kernels_to_load: Optional[List[str]] = None): """ :param camera: The :class:`.Camera` containing the images to process and the camera model :param scene: The :class:`.Scene` describing the location of the central body with respect to the camera. :param dynamics: The dynamics model to use in the EKF for propagating the state from one time to another :param state_initializer: A callable which takes in a :class:`.Measurement` instance and :class:`.Dynamics.State` class object and returns an initialized state. :param search_distance_function: A callable which takes in an :class:`.ExtendedKalmanFilter` and returns what the Euclidean search distance should be for that EKF in pixels. This is only applied after the first pair has been made :param observation_trees: A list of scipy.spatial KDTree objects that are built on the pixel locations of the detections for each image :param observation_ids: A list of the ids for each observation contained in the trees in order. :param initial_euclidean_threshold: The threshold in pixels for points to be paired form the first image to a subsequent image. This is not applied after the first pair has been made. :param measurement_covariance: The covariance matrix of the measurements as a 2x2 array. This should have units of pixels squared. :param maximum_image_timedelta: The maximum time separation between images to attempt either initial or subsequent pairings as a ``timedelta`` object :param maximum_paths_per_image: The maximum number of forward paths for a single particle to the next image to consider :param maximum_paths_total: The total maximum number of forward paths for a single image across all images to consider :param maximum_forward_images: The maximum number of images to retrieve potential pairs from (images will be processed in time order) :param maximum_track_length: The maximum length of a track before it is artificially terminated. The length of a track is defined as the number of measurements that it has ingested :param maximum_mahalanobis_distance_squared: The maximum squared Mahalanobis distance for subsequent (not initial) observations to be paired to a track. :param expected_convergence_number: The number of ingested measurements at which point tracks are expected to be fairly converged. At this point, more stringent filtering is used in determining forward paths. :param reduced_paths_forward_per_image: The number of paths forward per image that are considered once a track is considered converged. :param minimum_number_of_measurements: The minimum number of measurements in a track for it to be considered a potential good track. Typically 4 should be the absolute minimum, but in some cases it may need to be even higher than that. :param maximum_residual_standard_deviation: The maximum standard deviation of the post-fit residuals in an EKF for it to be considered a potentially good track. :param maximum_time_outs: The maximum number of time outs that can occur when attempting to retrieve tracking results from the children processes before we assume something has gone wrong and terminate all of the processes. :param maximum_tracking_time_per_image: The maximum amount of time to attempt tracking in the image before we assume something has gone wrong and terminate all of the processes. :param kernels_to_load: The spice kernels to load in each subprocess. This is required if you are using spice because the kernel pool does not subsist across processes. It should either be ``None`` if you're not using spice or a list of strings if you are. """ self.camera: Camera = camera """ The :class:`.Camera` containing the images to process and the camera model """ self.scene: Scene = scene """ The :class:`.Scene` describing the location of the central body with respect to the camera. The central body should be the first "target" in the scene. It doesn't need to have a shape associated with it (it can be a ``Point`` object) and in fact is encouraged not to (to avoid large memory overhead) """ self.observation_trees: List[Optional[KDTree]] = observation_trees """ A list of KDTrees built on the observed UFO locations. If any elements are ``None`` then it is assumed that no detections exist for that image. """ self.observation_ids: List[Optional[List[Hashable]]] = observation_ids """ A list of the ids for each observation contained in the trees. These should be unique ``Hashable`` objects like ints or strings, not mutable like lists/arrays/dictionaries. They should uniquely identify an observation. """ self.processes: List[Process] = [] """ A List of Processes that are working """ self.confirmed_particles = set() """ A set of particles that have been assigned to a track already """ self.dynamics: Dynamics = dynamics """ The dynamics model to use to propagate the states from one time to another. """ self.measurement_covariance: Optional[np.ndarray] = measurement_covariance """ The 2x2 measurement covariance matrix (or ``None`` to use the identify matrix) with units of pixels squared. """ self.state_initializer: STATE_INITIALIZER_TYPE = state_initializer """ The state initializer callable to use to initialize the state vector given the initial measurement and the type of the state vector to be initialized """ self.maximum_image_timedelta: timedelta = maximum_image_timedelta """ The maximum separation in time between images for them to be paired subsequently """ self.initial_euclidean_threshold: float = float(initial_euclidean_threshold) """ The threshold in pixels for points to be paired from the first image to a subsequent image """ self.maximum_paths_per_image: int = maximum_paths_per_image """ The maximum number of pairs from one image to the next for a single possible particle """ self.maximum_paths_total: int = maximum_paths_total """ The maximum number of pairs from one image to all images for a single particle """ self.maximum_forward_images: int = maximum_forward_images """ The maximum number of images to pair with """ self.maximum_track_length: int = maximum_track_length """ The maximum length of a track to consider before terminating. the length of a track is defined as the number of measurements that it has ingested. This is necessary for memory management purposes as long tracks frequently have many variants that are being tracked and can take up a lot of memory. """ self.search_distance_function: Callable[[ExtendedKalmanFilter], Real] = search_distance_function """ A function which returns what the search distance should be in pixels when trying to match the current EKF to new images. """ self.maximum_mahalanobis_distance_squared: float = maximum_mahalanobis_distance_squared """ This specifies the maximum Mahalanobis distance squared for a potential detection to be paired to a track The Mahalanobis distance is roughly equivalent to the sigma normalized error between the predicted and observed location. Therefore a Mahalanobis distance squares of 25 roughly corresponds to only accepting pairings that are within 5 sigma of the predicted location. """ self.expected_convergence_number: int = expected_convergence_number """ This specifies the number of measurements at which it is expected the filter will have mostly converged and we should become more selective with which paths forward we follow. """ self.reduced_paths_forward_per_image: int = reduced_paths_forward_per_image """ This specifies the reduced number of paths forward we should follow once the filter should be converged. """ self.minimum_number_of_measurements: int = minimum_number_of_measurements """ The minimum number of measurements for an EKF to be considered a track. """ self.maximum_residual_standard_deviation: float = float(maximum_residual_standard_deviation) """ The maximum post-fit standard deviation of the residuals in an EKF for it to be considered a valid fit """ self.maximum_time_outs: int = maximum_time_outs """ The maximum number of time outs in a row before we stop trying to process an image """ self.maximum_tracking_time_per_image: float = float(maximum_tracking_time_per_image) """ The maximum amount of time to attempt tracking in an image in seconds. """ self._ekfs_to_process: Optional[ClearableQueue] = None """ A Queue used to specify what ekfs need to be processed """ self._working: Optional[Array] = None """ A shared array of boolean values specifying whether each processor is actively working """ self._results: Optional[ClearableQueue] = None """ A Queue used to communicate the results for the processes. """ self._smoothing_input: Optional[ClearableQueue] = None self._smoothing_output: Optional[ClearableQueue] = None self._smoothing_working: Optional[Array] = None self.smoothing_processes: Optional[List[Process]] = None """ A List of Processes that are working on smoothing """ self.confirmed_filters: List[ExtendedKalmanFilter] = [] """ This list stores the confirmed EKFs """ self.confirmed_standard_deviations: List[float] = [] """ This list stores the standard deviation of the post-fit residuals of the confirmed EKFs """ self._n_respawns: int = 0 """ A counter for the number of times we have respawned our processes """ self._n_smoothing_respawns: int = 0 """ A counter for the number of times we have respawned our smoothing processes """ self.kernels_to_load: Optional[List[str]] = kernels_to_load """ The spice kernels to load in each subprocess. This is required if you are using spice because the kernel pool does not subsist across processes. It should either be ``None`` if you're not using spice or a list of strings if you are. """
[docs] def find_initial_pairs(self, image_ind: int, image: OpNavImage): """ This method finds the initial pairs for the input image. This is done by identifying all observations in subsequent images within a specified number of pixels of the first identification (corrected for the movement of the observer in the time period) :param image_ind: The index of the image we are identifying the initial pairs for :param image: The image that we are identifying the initial pairs for """ if self._ekfs_to_process is None: _LOGGER.warning("The work queue is not initialized. Initializing it") self._initialize_tracking_workers() _LOGGER.info(f'Processing image {image_ind}, {image.observation_date}') # get the detections and ids detections = self.observation_trees[image_ind].data if self.observation_trees[image_ind] is not None else None detection_ids = self.observation_ids[image_ind] if detections is None: _LOGGER.warning(f'No detections for {image_ind}, {image.observation_date}') return # initialize a list to store the ekfs in filters = [] # update the scene to the current time self.scene.update(image) # get the camera position in the central body centered inertial frame camera_position = -image.rotation_inertial_to_camera.matrix.T @ self.scene.target_objs[0].position.ravel() # loop through and create the filter for each possible point in the current image for possible_id, possible_detection in zip(detection_ids, detections): if possible_id in self.confirmed_particles: continue camera_location = self.dynamics.State(image.observation_date, camera_position) initial_measurement = OpticalBearingMeasurement(possible_detection, self.camera.model, image, camera_location, self.measurement_covariance, identity=possible_id) filters.append(ExtendedKalmanFilter(copy(self.dynamics), self.state_initializer, initial_measurement=initial_measurement)) number_of_linked_images = 0 for next_image_index, next_image in self.camera: if next_image_index <= image_ind: # don't want to go backwards continue if (next_image.observation_date - image.observation_date) >= self.maximum_image_timedelta: # if this image too much later then break because we're done processing break _LOGGER.info(f'linking image {image_ind}, {image.observation_date} with ' f'{next_image_index}, {next_image.observation_date}') # update the scene to the time for the next image self.scene.update(next_image) # get the camera position for this image wrt the central body next_camera_position = (-next_image.rotation_inertial_to_camera.matrix.T @ self.scene.target_objs[0].position.ravel()) next_camera_state = self.dynamics.State(next_image.observation_date, next_camera_position) # make a temporary measurement for telling the filter how to predict temporary_next_measurement = OpticalBearingMeasurement(np.zeros(2), self.camera.model, next_image, next_camera_state, self.measurement_covariance) # get the predicted location of each point from the first image in this image predicted_pixels = [] predicted_states = [] removes = [] for ind, ekf in enumerate(filters): predicted_state, predicted_observation = ekf.propagate_and_predict(temporary_next_measurement) if predicted_state is None: removes.append(ind) continue predicted_states.append(predicted_state) predicted_pixels.append(predicted_observation) for rm in removes[::-1]: filters.pop(rm) if not predicted_pixels: _LOGGER.debug(f'No predicted locations for image {image_ind} to {next_image_index}') continue # build the kdtree for the predicted locations in the next image so we can do a ball query predicted_kdtree = KDTree(np.vstack(predicted_pixels)) # do a ball query with the initial euclidean tolerance to figure out the initial pairs full_pairs = predicted_kdtree.query_ball_tree(self.observation_trees[next_image_index], self.initial_euclidean_threshold) # loop through each pair and create an EKF to follow the path valid_pairs = False for predicted_index, (ekf, pairs) in enumerate(zip(filters, full_pairs)): number_of_pairs = len(pairs) if number_of_pairs == 0: continue if number_of_pairs > self.maximum_paths_per_image: _LOGGER.debug(f'Too many forward paths {number_of_pairs}. ' f'Only taking {self.maximum_paths_per_image} closest') # pairs = self.observation_trees[next_image_index].query_ball_point( # predicted_kdtree.data[predicted_index], self.initial_euclidean_threshold / 2 # ) sorted_pairs = sorted(zip(np.linalg.norm(self.observation_trees[next_image_index].data[pairs] - predicted_kdtree.data[predicted_index], axis=-1), pairs)) pairs = [x[1] for x in list(sorted_pairs)[:self.maximum_paths_per_image]] _LOGGER.debug(f'{len(pairs)} paths forward for track {ekf.identity}') valid_pairs = True for pair in pairs: if self.observation_ids[next_image_index][pair] in self.confirmed_particles: # skip particles we have already tracked continue # clone the ekf to follow the path new_ekf = deepcopy(ekf) # get rid of the state initializer because it isn't needed anymore and makes problems with pickling new_ekf.state_initializer = None # do the measurement update new_measurement = OpticalBearingMeasurement(self.observation_trees[next_image_index].data[pair], self.camera.model, next_image, next_camera_state, self.measurement_covariance, identity=self.observation_ids[next_image_index][pair]) state_update = new_ekf.process_measurement( new_measurement, pre_update_state=predicted_states[predicted_index], pre_update_predicted_measurement=predicted_pixels[predicted_index] ) if state_update is None: _LOGGER.debug(f'Failed to propagate EKF {new_ekf.identity}.') else: self._ekfs_to_process.put_retry(new_ekf) if valid_pairs: # update the image counter number_of_linked_images += 1 if number_of_linked_images >= self.maximum_forward_images: break
def _initialize_tracking_workers(self) -> None: """ This method sets up the processes and queues for working on data in parallel for initial tracking. """ self._results = ClearableQueue() self._ekfs_to_process = ClearableQueue() self._working = Array('i', cpu_count()) self.processes = [Process(target=self._follow, args=(pid,), name=f'EKF Follower {pid}') for pid in range(cpu_count())] def _reset_tracking_workers(self) -> None: """ This method resets the Pool of workers, respawning any that have died and clears the queues/the work array for tracking workers """ self._n_respawns += 1 self._results.clear() self._ekfs_to_process.clear() # fix any dead processes for ind, process in enumerate(self.processes): self._working[ind] = False try: process.join() process.close() except ValueError: pass self.processes[ind] = Process(target=self._follow, args=(ind,), name=f'EKF Follower {ind}.{self._n_respawns}') def _tear_down_tracking_workers(self): """ This method terminates the pool of workers and the queues used to communicate with them """ for process in self.processes: try: process.join(timeout=0.1) except TimeoutError: process.terminate() except ValueError: continue process.close() self._results.close() self._ekfs_to_process.close() def _initialize_smoothing_workers(self) -> None: """ This method sets up the processes and queues for working on data in parallel for initial tracking. """ self._smoothing_input = ClearableQueue() self._smoothing_output = ClearableQueue() self._smoothing_working = Array('i', cpu_count()) self.smoothing_processes = [Process(target=self._smooth, args=(pid,), name=f'Smoother {pid}') for pid in range(cpu_count())] def _reset_smoothing_workers(self) -> None: """ This method resets the Pool of workers, respawning any that have died and clears the queues/the work array for smoothing workers """ self._n_smoothing_respawns += 1 self._smoothing_input.clear() self._smoothing_output.clear() # fix any dead processes for ind, process in enumerate(self.smoothing_processes): self._smoothing_working[ind] = False process.join() process.close() self.smoothing_processes[ind] = Process(target=self._smooth, args=(ind,), name=f'Smoother {ind}.{self._n_smoothing_respawns}') def _tear_down_smoothing_workers(self): """ This method terminates the pool of workers and the queues used to communicate with them """ for process in self.processes: try: process.join(timeout=0.1) except TimeoutError: process.terminate() except ValueError: continue process.close() self._results.close() self._ekfs_to_process.close() def _follow(self, process_number: int): """ This method retrieves an EKF from the queue and follows it to completion This is used as the target for the Processes that are created. :param process_number: The number of the process. """ if self.kernels_to_load is not None: for k in self.kernels_to_load: spice.furnsh(k) if self._ekfs_to_process is None: raise ValueError("The work queue hasn't been initialized") if self._results is None: raise ValueError("The results queue hasn't been initialized") if self._working is None: raise ValueError("The working array hasn't been initialized") while True: try: # retrieve an ekf from the queue to process ekf: ExtendedKalmanFilter = self._ekfs_to_process.get(timeout=2) self._working[process_number] = True except Empty: self._working[process_number] = False if True in self._working: _LOGGER.debug(f'QUEUE was empty for process {process_number} but another process is still working.' f'Process status: {str(list(self._working))}') continue else: _LOGGER.info('No more EKFs to follow') self._results.put_retry('DONE') break if len(ekf.measurement_history) > self.maximum_track_length: _LOGGER.warning(f'Too long of a track for ekf {ekf.identity}, length {len(ekf.measurement_history)}, ' f'stopping') continue # create a list to store the paths forward for the current ekf local_splits = [] _LOGGER.info(f'Following EKF: {ekf.identity}. Current length {len(ekf.measurement_history)}') # get the time of the last image included in the ekf last_image_time = ekf.measurement_history[-1].time # determine the accuracy this filter should have search_distance = float(self.search_distance_function(ekf)) # start a counter for the number of images we've linked with number_of_images_considered = 0 for next_image_index, next_image in self.camera: time_delta = next_image.observation_date - last_image_time if time_delta <= _ZERO_TIMEDELTA: # skip if we are still before the current image. continue if time_delta >= self.maximum_image_timedelta: # stop if the next image is too far ahead in time break if self.observation_trees[next_image_index] is None: # skip if there are no observations for this image continue # update the scene to the time of the current image self.scene.update(next_image) # get the current camera position with respect to the central body in the inertial frame next_camera_position = (-next_image.rotation_inertial_to_camera.matrix.T @ self.scene.target_objs[0].position) next_camera_state = self.dynamics.State(next_image.observation_date, next_camera_position) temporary_next_measurement = OpticalBearingMeasurement(np.zeros(2), self.camera.model, next_image, next_camera_state, self.measurement_covariance) # predict the measurement at this time predicted_state, predicted_pixels = ekf.propagate_and_predict(temporary_next_measurement) # the state failed to propagate if predicted_state is None: continue # get the innovation matrix last_measurement = ekf.measurement_history[-1] measurement_jacobian = last_measurement.compute_jacobian(predicted_state) innovation_covariance = (last_measurement.covariance + measurement_jacobian@predicted_state.covariance@measurement_jacobian.T) # get the innovation information matrix by inverting the covariance matrix try: information_matrix = np.linalg.inv(innovation_covariance) except np.linalg.linalg.LinAlgError: information_matrix = np.linalg.pinv(innovation_covariance) # query the tree to get the potential next points using the search distance pairs = self.observation_trees[next_image_index].query_ball_point(predicted_pixels.ravel(), search_distance) # loop through and filter out paths that are already taken and don't meet the mahalanobis distance # create these lists to store pairs that require more consideration keep_pairs = [] keep_mahalanobis_distances = [] for pair in pairs: if self.observation_ids[next_image_index][pair] in self.confirmed_particles: # skip if we've already used this particle continue # compute the mahalanobis distance pixel_separation = (self.observation_trees[next_image_index].data[pair].ravel() - predicted_pixels.ravel()) mahalanobis_distance_squared = pixel_separation @ information_matrix @ pixel_separation # check the squared mahalanobis distance if mahalanobis_distance_squared > self.maximum_mahalanobis_distance_squared: _LOGGER.debug(f'Rejected potential pair for {ekf.identity} due to mahalanobis distance ' f'of {mahalanobis_distance_squared}') continue # store this as a valid path forward keep_pairs.append(pair) keep_mahalanobis_distances.append(mahalanobis_distance_squared) number_of_paths = len(keep_pairs) if ((len(ekf.measurement_history) > self.expected_convergence_number) and (number_of_paths > self.reduced_paths_forward_per_image)): _LOGGER.warning(f"The filter should be converged but there are still {number_of_paths} forward." f"Only keeping the {self.reduced_paths_forward_per_image} best ones") keep_pairs = [x[1] for x in sorted(zip(keep_mahalanobis_distances, keep_pairs))[:self.reduced_paths_forward_per_image]] elif number_of_paths > self.maximum_paths_per_image: _LOGGER.warning(f"There are {number_of_paths} paths forward but only " f"{self.maximum_paths_per_image} are allowed." f"Only keeping the {self.maximum_paths_per_image} best ones") keep_pairs = [x[1] for x in sorted(zip(keep_mahalanobis_distances, keep_pairs))[:self.maximum_paths_per_image]] # loop through all of the paths we are going to follow for pair in keep_pairs: # clone the ekf to follow this path new_ekf = deepcopy(ekf) # make a measurement for this pair new_measurement = OpticalBearingMeasurement(self.observation_trees[next_image_index].data[pair], self.camera.model, next_image, next_camera_state, self.measurement_covariance, identity=self.observation_ids[next_image_index][pair]) # perform a measurement update using this path new_ekf.process_measurement(new_measurement, predicted_state, predicted_pixels) # add the new ekf to the queue to be processed _LOGGER.debug(f'Adding {new_ekf.identity} to the queue. {self._ekfs_to_process.size.value}') self._ekfs_to_process.put_retry(new_ekf) # store how many times we've spit this ekf local_splits.append(new_ekf) if len(local_splits) > self.maximum_paths_total: _LOGGER.warning(f'Too many forward splits for EKF {ekf.identity}. {len(local_splits)} already. ' f'Skipping more splits') break number_of_images_considered += 1 if number_of_images_considered > self.maximum_forward_images: break # put the split ekfs as potential ending ekfs that need to be smoothed and analyzed further self._results.put_retry([x for x in local_splits if len(x.measurement_history) > self.minimum_number_of_measurements]) self._working[process_number] = False spice.kclear() def _smooth(self, process_number: int): """ This method retrieves an EKF from the queue and smooths it. The smoothed EKF is then placed into the output queue This is used as the target for the Processes that are created for smoothing. :param process_number: The number of the process. """ if self.kernels_to_load is not None: for k in self.kernels_to_load: spice.furnsh(k) if self._smoothing_input is None: raise ValueError("The work queue hasn't been initialized") if self._smoothing_output is None: raise ValueError("The results queue hasn't been initialized") if self._smoothing_working is None: raise ValueError("The working array hasn't been initialized") num_time_out = 0 while True: try: incoming: Union[Tuple[int, ExtendedKalmanFilter], str] = self._smoothing_input.get(timeout=2) if isinstance(incoming, str) and incoming == "END": self._smoothing_working[process_number] = False self._smoothing_output.put_retry('DONE') break ind, ekf = incoming num_time_out = 0 # retrieve an ekf form the queue to process self._smoothing_working[process_number] = True except Empty: self._smoothing_working[process_number] = False num_time_out += 1 if True in self._smoothing_working: _LOGGER.debug(f'QUEUE was empty for process {process_number} but another process is still working.' f'Process status: {str(list(self._smoothing_working))}') continue elif num_time_out >= self.maximum_time_outs: _LOGGER.info('No more EKFs to smooth') self._smoothing_output.put_retry('DONE') break else: _LOGGER.debug(f'{process_number} timed out {num_time_out} times') continue smooth_successful = ekf.smooth() sent = False number_failed = 0 while not sent: try: self._smoothing_output.put_retry((smooth_successful, ind, ekf)) sent = True except Full: number_failed += 1 if number_failed > 4: _LOGGER.warning('Unable to put the smoothed results on the queue') break spice.kclear()
[docs] def filter_ekfs(self, ekfs_to_filter: Iterable[ExtendedKalmanFilter], number_of_ekfs: int): """ This method does backwards smoothing on each EKF and figures out which are actually valid :param ekfs_to_filter: The list of EKFs to filter :param number_of_ekfs: The number of ekfs there are to filter """ removes = [] residual_means = [] residual_stds = [] smoothed_ekfs: List[Optional[ExtendedKalmanFilter]] = [None] * number_of_ekfs processes: Optional[List[Process]] = self.processes smooth_process: List[Process] = self.smoothing_processes self.processes = None self.smoothing_processes = None for p in smooth_process: try: p.start() except AssertionError: pass self.processes = processes self.smoothing_processes = smooth_process for ind, ekf in enumerate(ekfs_to_filter): if len(ekf.measurement_history) < self.minimum_number_of_measurements: removes.append(ind) continue self._smoothing_input.put_retry((ind, ekf)) for _ in range(cpu_count()): self._smoothing_input.put_retry('END') number_timed_out = 0 number_not_done = len(self.processes) while number_not_done: try: result: Union[str, Tuple[bool, int, ExtendedKalmanFilter]] = self._smoothing_output.get(timeout=1) # reset the counter number_timed_out = 0 if isinstance(result, str) and (result == 'DONE'): number_not_done -= 1 _LOGGER.info(f'{number_not_done} processes still smoothing. Approximately ' f'{self._smoothing_input.qsize()} things still to do') elif result[0]: # compute and store the standard deviation and mean resid_mean, resid_std = result[2].compute_residual_statistics() residual_means.append(resid_mean) residual_stds.append(resid_std) # store the smoothed ekf smoothed_ekfs[result[1]] = result[2] else: # we didn't smooth so remove this removes.append(result[1]) except Empty: number_timed_out += 1 _LOGGER.warning(f'Timed out trying to retrieve smoothed results {number_timed_out} times') # get rid of things that were too short or didn't smooth for rm in sorted(removes, reverse=True): smoothed_ekfs.pop(rm) # kill the processes for p in self.smoothing_processes: try: p.join(timeout=0.2) except TimeoutError: p.terminate() _LOGGER.info(f'Identifying valid EKFs from {len(smoothed_ekfs)} total') id_sets = [] # filter out where the residuals are greater than the maximum standard deviation # also prepare the sets of particle ids removes = [] for ind, (std, ekf) in enumerate(zip(residual_stds, smoothed_ekfs)): if std > self.maximum_residual_standard_deviation: removes.append(ind) continue # store the set of the measurement ids id_sets.append({meas.identity for meas in ekf.measurement_history}) for rm in removes[::-1]: smoothed_ekfs.pop(rm) residual_stds.pop(rm) residual_means.pop(rm) removes = [] # loop through and figure out which ekfs are subsets of the others for first_ind, first_meas_id_set in enumerate(id_sets): for second_ind, second_meas_id_set in enumerate(id_sets): if first_ind == second_ind: # if this is the same set then skip it continue # get the points which are in the first ekf but not the second ekf first_unique = first_meas_id_set - second_meas_id_set if len(first_unique) == 0: # if the first ekf is a full subset of the later ekf then remove it removes.append(first_ind) break # we no longer need to consider this ekf # if the first ekf only has 1 unique point and the other has multiple unique points then then other # is a longer track and should be kept over this one if (len(first_unique) == 1) and len(second_meas_id_set - first_meas_id_set) >= 2: removes.append(first_ind) break # we no longer need to consider this ekf # remove things that should be removed (we might have duplicates so ignore them and unique sorts for us) for rm in np.unique(removes)[::-1]: smoothed_ekfs.pop(rm) residual_stds.pop(rm) residual_means.pop(rm) id_sets.pop(rm) # now figure out which track is best for each starting particle starting_dictionary = {} for ind, ekf in enumerate(smoothed_ekfs): # get the initial particle in this track start_id = ekf.measurement_history[0].identity # see if we've considered other tracks that contain this particle already current_best = starting_dictionary.get(start_id, None) # if this is the first track starting on this particle keep it for now if current_best is None: current_best = (ind, residual_stds[ind]) starting_dictionary[start_id] = current_best continue # compare the number of measurements of the current best track starting with this particle and the track # under consideration length_current_best = len(id_sets[current_best[0]]) length_under_consideration = len(id_sets[ind]) if length_under_consideration > (length_current_best + 1): # if the EKF under consideration has 2 or more measurements than the current best keep it starting_dictionary[start_id] = (ind, residual_stds[ind]) elif length_current_best > (length_under_consideration + 1): # if the current best has 2 or more measurements than the EKF under consideration keep it continue # if the EKF under consideration and the current best are within 1 measurement of each other keep the # one with a lower post-fit std elif current_best[-1] < residual_stds[ind]: continue else: starting_dictionary[start_id] = (ind, residual_stds[ind]) # store the confirmed EKFs and ingested particles for ind, std in starting_dictionary.values(): self.confirmed_filters.append(smoothed_ekfs[ind]) self.confirmed_standard_deviations.append(std) self.confirmed_particles.update(id_sets[ind])
[docs] def track(self) -> None: """ This method tracks particles from image to image using the EKF. This works by finding initial pairs for each image and then following those tracks to termination. It also handles setup and teardown of the working pool. It then filters all of the tracks, saving only the good ones. """ # initialize the worker pool self._initialize_tracking_workers() self._initialize_smoothing_workers() # loop through each image and identify the tracks that begin in that image first = True for ind, image in self.camera: start = time() # do the initial pairing self.find_initial_pairs(ind, image) # reset the tracking workers for the next image if not first: self._reset_tracking_workers() # start the process of tracking along these initial pairs processes: List[Process] = self.processes smooth_process = self.smoothing_processes self.processes = None self.smoothing_processes = None for process in processes: try: process.start() except AssertionError: pass self.processes = processes self.smoothing_processes = smooth_process # self._follow(-1) # loop through, retrieving the results ekfs = 0 number_to_complete = len(self.processes) number_timed_out = 0 with TemporaryFile('wb+') as ekfs_to_smooth: while number_to_complete: try: results = self._results.get(timeout=1) # if the process said it is done processing then decrease the number to complete and move along if results == 'DONE': number_to_complete -= 1 if number_to_complete == 0: _LOGGER.info(f'All processes completed. {ekfs} generated') break else: _LOGGER.info(f'Tracking process has completed, {number_to_complete} still working') else: # otherwise we got new results and should store them # ekfs.extend(results) for ekf in results: pickle.dump(ekf, ekfs_to_smooth) ekfs += 1 # reset the number timed out number_timed_out = 0 except Empty: number_timed_out += 1 if number_timed_out > self.maximum_time_outs: _LOGGER.warning('Exceeded the maximum number of time outs. Stopping') for process in self.processes: process.terminate() sleep(0.1) try: process.close() except ValueError: pass break # check if we've reached our overall time lime if (time() - start) > self.maximum_tracking_time_per_image: _LOGGER.warning('Exceeded maximum execution time. Stopping') for process in self.processes: process.terminate() sleep(0.5) try: process.close() except ValueError: pass break _LOGGER.info(f'Tracking complete for image {ind} of {sum(self.camera.image_mask)} in ' f'{(time()-start):.3f} seconds') # filter the ekfs and save only the good ones if not first: self._reset_smoothing_workers() else: first = False ekfs_to_smooth.seek(0) self.filter_ekfs(_pickle_generator(ekfs_to_smooth), ekfs) # tear down the workers self._tear_down_smoothing_workers() self._tear_down_tracking_workers()
[docs] def save_results(self, out: PATH): """ This method saves the final ekfs to a csv file. The files in the csv file are ================== ============================================================================================= Column Description ================== ============================================================================================= id The ID for the EKF. This is usually a UUID hash string length The length of the EKF (number of measurements it ingested) residual std The standard deviation of the post-fit residuals of the EKF initial time The UTC time of the initial state for the EKF initial position x The x position of the initial state for the EKF initial position y The y position of the initial state for the EKF initial position z The z position of the initial state for the EKF initial velocity x The x velocity of the initial state for the EKF initial velocity y The y velocity of the initial state for the EKF initial velocity z The z velocity of the initial state for the EKF detection ids A list of detection ids (dependent on the IDs of the UFO detections) separated by '|' ================== ============================================================================================= :param out: The file to save the csv to """ with Path(out).open('w') as out_file: out_format = '{},{},{},{},{},{},{},{},{},{},{}\n' out_file.write(out_format.format('id', 'length', 'residual std', 'initial time', 'initial position x', 'initial position y', 'initial position z', 'initial velocity x', 'initial velocity y', 'initial velocity z', 'detection ids')) for ekf, std in zip(self.confirmed_filters, self.confirmed_standard_deviations): initial_state: Dynamics.State = ekf.state_history[0][1] out_file.write(out_format.format(ekf.identity, len(ekf.measurement_history), std, initial_state.time.isoformat(), *initial_state.position, *initial_state.velocity, '|'.join(str(meas.identity) for meas in ekf.measurement_history)))