Source code for giant.ufo.clearable_queue

# Copyright 2021 United States Government as represented by the Administrator of the National Aeronautics and Space
# Administration.  No copyright is claimed in the United States under Title 17, U.S. Code. All Other Rights Reserved.


from multiprocessing.queues import Queue
from multiprocessing import Value, get_context

from queue import Empty

from typing import Any


[docs]class SharedCounter: """ A synchronized shared counter. The locking done by multiprocessing.Value ensures that only a single process or thread may read or write the in-memory ctypes object. However, in order to do n += 1, Python performs a read followed by a write, so a second process may read the old value before the new one is written by the first process. The solution is to use a multiprocessing.Lock to guarantee the atomicity of the modifications to Value. This class comes almost entirely from Eli Bendersky's blog: http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ """ def __init__(self, n: int = 0): self.count = Value('i', n)
[docs] def increment(self, n: int = 1): """ Increment the counter by n (default = 1) """ with self.count.get_lock(): self.count.value += n
@property def value(self) -> int: """ Return the value of the counter """ return self.count.value
[docs]class ClearableQueue(Queue): """ A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using a synchronized shared counter (initialized to zero) and increasing / decreasing its value every time the put() and get() methods are called, respectively. This not only prevents NotImplementedError from being raised, but also allows us to implement a reliable version of both qsize() and empty(). Borrowed from https://github.com/keras-team/autokeras/issues/368 and https://stackoverflow.com/a/36018632/3431189 """ size: SharedCounter def __init__(self, *args: list, **kwargs: dict): ctx = get_context() super().__init__(*args, **kwargs, ctx=ctx) self.size = SharedCounter(0) self.holder = [] @property def maxsize(self) -> int: if hasattr(self, '_maxsize'): return self._maxsize else: return -1
[docs] def put(self, *args, **kwargs): super().put(*args, **kwargs) self.size.increment(1)
[docs] def get(self, *args, **kwargs) -> Any: """ Gets the results and tries to flush from the holder if anything is in it """ res = super().get(*args, **kwargs) try: self.size.increment(-1) except AttributeError: print('something is real wrong') self.flush_holder() return res
def __getstate__(self): return super().__getstate__() + (self.size, self.holder) def __setstate__(self, state): self.size = state[-2] self.holder = state[-1] super().__setstate__(state[:-2]) def flush_holder(self): """ Flushes the holder into the queue if it can be """ removes = [] for ind, held in enumerate(self.holder): if 0 < self.maxsize <= self.qsize(): break self.put(held) removes.append(ind) for rm in removes[::-1]: self.holder.pop(rm)
[docs] def get_nowait(self) -> Any: res = super().get_nowait() self.size.increment(-1) self.flush_holder() return res
[docs] def put_nowait(self, item: Any) -> None: res = super().put_nowait(item) self.size.increment(1) return res
def put_retry(self, item: Any): """ Attempts to put a value unless the queue is full, in which case it will hold onto it until its not full and then put it. :param item: The thing to be put """ self.holder.append(item) self.flush_holder()
[docs] def qsize(self) -> int: """ Reliable implementation of multiprocessing.Queue.qsize() """ return self.size.value + len(self.holder)
[docs] def empty(self) -> bool: """ Reliable implementation of multiprocessing.Queue.empty() """ return not self.qsize()
[docs] def clear(self): """ Clear out any data from the queue """ try: while True: self.get_nowait() except Empty: pass