(self)
| 23 | |
| 24 | class TestRateLimit: |
| 25 | def testCall(self): |
| 26 | obj1 = ExampleClass() |
| 27 | obj2 = ExampleClass() |
| 28 | |
| 29 | s = time.time() |
| 30 | assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted" |
| 31 | assert around(time.time() - s, 0.0) # First allow to call instantly |
| 32 | assert obj1.counted == 1 |
| 33 | |
| 34 | # Call again |
| 35 | assert not RateLimit.isAllowed("counting", 0.1) |
| 36 | assert RateLimit.isAllowed("something else", 0.1) |
| 37 | assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted" |
| 38 | assert around(time.time() - s, 0.1) # Delays second call within interval |
| 39 | assert obj1.counted == 2 |
| 40 | time.sleep(0.1) # Wait the cooldown time |
| 41 | |
| 42 | # Call 3 times async |
| 43 | s = time.time() |
| 44 | assert obj2.counted == 0 |
| 45 | threads = [ |
| 46 | gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # Instant |
| 47 | gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # 0.1s delay |
| 48 | gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)) # 0.2s delay |
| 49 | ] |
| 50 | gevent.joinall(threads) |
| 51 | assert [thread.value for thread in threads] == ["counted", "counted", "counted"] |
| 52 | assert around(time.time() - s, 0.2) |
| 53 | |
| 54 | # Wait 0.1s cooldown |
| 55 | assert not RateLimit.isAllowed("counting", 0.1) |
| 56 | time.sleep(0.11) |
| 57 | assert RateLimit.isAllowed("counting", 0.1) |
| 58 | |
| 59 | # No queue = instant again |
| 60 | s = time.time() |
| 61 | assert RateLimit.isAllowed("counting", 0.1) |
| 62 | assert RateLimit.call("counting", allowed_again=0.1, func=obj2.count) == "counted" |
| 63 | assert around(time.time() - s, 0.0) |
| 64 | |
| 65 | assert obj2.counted == 4 |
| 66 | |
| 67 | def testCallAsync(self): |
| 68 | obj1 = ExampleClass() |
nothing calls this directly
no test coverage detected