Animate a thinking spinner while a task is being executed. It use a daemon thread to run the animation. This will not block the main thread. Color are the same as pretty_print.
(text, color="status", duration=120)
| 59 | print(colored(text, color_map[color]), end='' if no_newline else "\n") |
| 60 | |
| 61 | def animate_thinking(text, color="status", duration=120): |
| 62 | """ |
| 63 | Animate a thinking spinner while a task is being executed. |
| 64 | It use a daemon thread to run the animation. This will not block the main thread. |
| 65 | Color are the same as pretty_print. |
| 66 | """ |
| 67 | global current_animation_thread |
| 68 | |
| 69 | thinking_event.set() |
| 70 | if current_animation_thread and current_animation_thread.is_alive(): |
| 71 | current_animation_thread.join() |
| 72 | thinking_event.clear() |
| 73 | |
| 74 | def _animate(): |
| 75 | color_map = { |
| 76 | "success": (Fore.GREEN, "green"), |
| 77 | "failure": (Fore.RED, "red"), |
| 78 | "status": (Fore.LIGHTGREEN_EX, "light_green"), |
| 79 | "code": (Fore.LIGHTBLUE_EX, "light_blue"), |
| 80 | "warning": (Fore.YELLOW, "yellow"), |
| 81 | "output": (Fore.LIGHTCYAN_EX, "cyan"), |
| 82 | "default": (Fore.RESET, "black"), |
| 83 | "info": (Fore.CYAN, "cyan") |
| 84 | } |
| 85 | fore_color, term_color = color_map.get(color, color_map["default"]) |
| 86 | spinner = itertools.cycle([ |
| 87 | '▉▁▁▁▁▁', '▉▉▂▁▁▁', '▉▉▉▃▁▁', '▉▉▉▉▅▁', '▉▉▉▉▉▇', '▉▉▉▉▉▉', |
| 88 | '▉▉▉▉▇▅', '▉▉▉▆▃▁', '▉▉▅▃▁▁', '▉▇▃▁▁▁', '▇▃▁▁▁▁', '▃▁▁▁▁▁', |
| 89 | '▁▃▅▃▁▁', '▁▅▉▅▁▁', '▃▉▉▉▃▁', '▅▉▁▉▅▃', '▇▃▁▃▇▅', '▉▁▁▁▉▇', |
| 90 | '▉▅▃▁▃▅', '▇▉▅▃▅▇', '▅▉▇▅▇▉', '▃▇▉▇▉▅', '▁▅▇▉▇▃', '▁▃▅▇▅▁' |
| 91 | ]) |
| 92 | end_time = time.time() + duration |
| 93 | |
| 94 | while not thinking_event.is_set() and time.time() < end_time: |
| 95 | symbol = next(spinner) |
| 96 | if platform.system().lower() != "windows": |
| 97 | print(f"\r{fore_color}{symbol} {text}{Fore.RESET}", end="", flush=True) |
| 98 | else: |
| 99 | print(f"\r{colored(f'{symbol} {text}', term_color)}", end="", flush=True) |
| 100 | time.sleep(0.2) |
| 101 | print("\r" + " " * (len(text) + 7) + "\r", end="", flush=True) |
| 102 | current_animation_thread = threading.Thread(target=_animate, daemon=True) |
| 103 | current_animation_thread.start() |
| 104 | |
| 105 | def timer_decorator(func): |
| 106 | """ |
no test coverage detected