(self, key, value, is_result=False)
| 635 | self.counter_key = lambda k: cp + encode(k) |
| 636 | |
| 637 | def put_data(self, key, value, is_result=False): |
| 638 | if is_result: |
| 639 | # We only want to expire task result data. If we are storing an |
| 640 | # important metadata like a revocation key, we need to preserve it. |
| 641 | self.conn.setex(self.result_key(key), self._expire_time, value) |
| 642 | if isinstance(key, bytes): |
| 643 | key = key.decode('utf8') |
| 644 | if self.notify_result: |
| 645 | nkey = self.notify_prefix + key |
| 646 | pipe = self.conn.pipeline() |
| 647 | pipe.lpush(nkey, b'1') |
| 648 | pipe.expire(nkey, self.notify_result_ttl) |
| 649 | pipe.execute() |
| 650 | else: |
| 651 | self.conn.set(self.result_key(key), value) |
| 652 | |
| 653 | def peek_data(self, key): |
| 654 | pipe = self.conn.pipeline() |
nothing calls this directly
no test coverage detected