MCPcopy
hub / github.com/rq/django-rq / register

Method register

django_rq/cron.py:67–139  ·  view source on GitHub ↗

Register a function to be run at regular intervals. On first call, this sets the Redis connection for the scheduler. Subsequent calls validate that the queue uses the same Redis connection. Args: func: Function to be scheduled queue_name: Na

(
        self,
        func: Callable[..., Any],
        queue_name: str,
        args: Optional[tuple[Any, ...]] = None,
        kwargs: Optional[dict[str, Any]] = None,
        interval: Optional[int] = None,
        cron: Optional[str] = None,
        job_timeout: Optional[int] = None,
        result_ttl: int = 500,
        ttl: Optional[int] = None,
        failure_ttl: Optional[int] = None,
        meta: Optional[dict[str, Any]] = None,
    )

Source from the content-addressed store, hash-verified

65 return {key: kwargs.get(key) for key in essential_params if key in kwargs}
66
67 def register(
68 self,
69 func: Callable[..., Any],
70 queue_name: str,
71 args: Optional[tuple[Any, ...]] = None,
72 kwargs: Optional[dict[str, Any]] = None,
73 interval: Optional[int] = None,
74 cron: Optional[str] = None,
75 job_timeout: Optional[int] = None,
76 result_ttl: int = 500,
77 ttl: Optional[int] = None,
78 failure_ttl: Optional[int] = None,
79 meta: Optional[dict[str, Any]] = None,
80 ):
81 """
82 Register a function to be run at regular intervals.
83
84 On first call, this sets the Redis connection for the scheduler.
85 Subsequent calls validate that the queue uses the same Redis connection.
86
87 Args:
88 func: Function to be scheduled
89 queue_name: Name of the django_rq queue (must exist in RQ_QUEUES)
90 args: Arguments to pass to the function
91 kwargs: Keyword arguments to pass to the function
92 interval: Interval in seconds (mutually exclusive with cron)
93 cron: Cron expression (mutually exclusive with interval)
94 job_timeout: Job timeout in seconds
95 result_ttl: How long to keep job results
96 ttl: Job time-to-live
97 failure_ttl: How long to keep failed job info
98 meta: Additional job metadata
99
100 Returns:
101 CronJob instance
102
103 Raises:
104 ValueError: If queue not found or uses different Redis connection
105 """
106 # Get connection for this queue
107 connection = get_connection(queue_name)
108 current_config = self._get_connection_config(connection)
109
110 if self._connection_config:
111 # Validate that this queue uses the same Redis connection
112 if current_config != self._connection_config:
113 raise ValueError(
114 f"Queue '{queue_name}' uses a different Redis connection than previously "
115 + 'registered queues. All jobs in a DjangoCronScheduler instance must use '
116 + 'queues with the same Redis connection.'
117 )
118 else:
119 # First registration - set connection
120 self.connection = connection
121 self._connection_config = current_config
122 # Clear cached_property so it recalculates with new connection
123 if 'connection_index' in self.__dict__:
124 del self.__dict__['connection_index']

Calls 2

get_connectionFunction · 0.85