MCPcopy
hub / github.com/Koed00/django-q / Queue

Class Queue

django_q/queues.py:37–81  ·  view source on GitHub ↗

A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using

Source from the content-addressed store, hash-verified

35
36
37class Queue(multiprocessing.queues.Queue):
38 """A portable implementation of multiprocessing.Queue.
39
40 Because of multithreading / multiprocessing semantics, Queue.qsize() may
41 raise the NotImplementedError exception on Unix platforms like Mac OS X
42 where sem_getvalue() is not implemented. This subclass addresses this
43 problem by using a synchronized shared counter (initialized to zero) and
44 increasing / decreasing its value every time the put() and get() methods
45 are called, respectively. This not only prevents NotImplementedError from
46 being raised, but also allows us to implement a reliable version of both
47 qsize() and empty().
48 """
49
50 def __init__(self, *args, **kwargs):
51 if sys.version_info < (3, 0):
52 super(Queue, self).__init__(*args, **kwargs)
53 else:
54 super(Queue, self).__init__(
55 *args, ctx=multiprocessing.get_context(), **kwargs
56 )
57 self.size = SharedCounter(0)
58
59 def __getstate__(self):
60 return super(Queue, self).__getstate__() + (self.size,)
61
62 def __setstate__(self, state):
63 super(Queue, self).__setstate__(state[:-1])
64 self.size = state[-1]
65
66 def put(self, *args, **kwargs):
67 super(Queue, self).put(*args, **kwargs)
68 self.size.increment(1)
69
70 def get(self, *args, **kwargs):
71 x = super(Queue, self).get(*args, **kwargs)
72 self.size.increment(-1)
73 return x
74
75 def qsize(self) -> int:
76 """Reliable implementation of multiprocessing.Queue.qsize()"""
77 return self.size.value
78
79 def empty(self) -> bool:
80 """Reliable implementation of multiprocessing.Queue.empty()"""
81 return not self.qsize() > 0

Callers 11

_syncFunction · 0.90
ConfClass · 0.90
__init__Method · 0.90
test_schedulerFunction · 0.90
test_cachedFunction · 0.90
test_clusterFunction · 0.90
test_enqueueFunction · 0.90
test_recycleFunction · 0.90
test_max_rssFunction · 0.90
test_bad_secretFunction · 0.90

Calls

no outgoing calls

Tested by 8

test_schedulerFunction · 0.72
test_cachedFunction · 0.72
test_clusterFunction · 0.72
test_enqueueFunction · 0.72
test_recycleFunction · 0.72
test_max_rssFunction · 0.72
test_bad_secretFunction · 0.72