Executes tasks indefinitely.
()
| 162 | |
| 163 | |
| 164 | def task_loop(): |
| 165 | """Executes tasks indefinitely.""" |
| 166 | # Defer heavy task imports to prevent issues with multiprocessing.Process |
| 167 | from clusterfuzz._internal.bot.tasks import commands |
| 168 | |
| 169 | clean_exit = False |
| 170 | execution_count = 0 |
| 171 | max_task_executions = _get_max_task_executions() |
| 172 | |
| 173 | while True: |
| 174 | stacktrace = '' |
| 175 | exception_occurred = False |
| 176 | task = None |
| 177 | # This caches the current environment on first run. Don't move this. |
| 178 | environment.reset_environment() |
| 179 | try: |
| 180 | if update_task_enabled(): |
| 181 | logs.info("Running update task.") |
| 182 | # Run regular updates. |
| 183 | # TODO(metzman): Move this after utask_main execution |
| 184 | # so that utasks can't be updated on subsequent attempts. |
| 185 | update_task.run() |
| 186 | update_task.track_revision() |
| 187 | else: |
| 188 | logs.info( |
| 189 | "Update task not enabled. Running environment cleanup and platform " |
| 190 | "init scripts.") |
| 191 | update_task.prepare_environment_for_new_task() |
| 192 | update_task.run_platform_init_scripts() |
| 193 | |
| 194 | if environment.is_uworker(): |
| 195 | # Batch/Swarming tasks only run one at a time. |
| 196 | sys.exit(utasks.uworker_bot_main()) |
| 197 | |
| 198 | if environment.get_value('SCHEDULE_UTASK_MAINS'): |
| 199 | # If the bot is configured to schedule utask_mains, don't run any other |
| 200 | # tasks because scheduling these tasks is more important than executing |
| 201 | # any one other task. |
| 202 | |
| 203 | # TODO(metzman): Convert this to a k8s cron. |
| 204 | schedule_utask_mains() |
| 205 | continue |
| 206 | |
| 207 | if environment.is_tworker(): |
| 208 | task = tasks.tworker_get_task( |
| 209 | override_queue=_get_tworker_queue_override()) |
| 210 | else: |
| 211 | task = tasks.get_task() |
| 212 | |
| 213 | if not task: |
| 214 | continue |
| 215 | |
| 216 | with _Monitor(task): |
| 217 | with task.lease(): |
| 218 | # Execute the command and delete the task. |
| 219 | commands.process_command(task) |
| 220 | except SystemExit as e: |
| 221 | exception_occurred = True |
no test coverage detected