Emit an event to all registered handlers. If handlers have dependencies (registered with depends_on), they execute in dependency order. Otherwise, handlers execute as before (sync in thread pool, async fire-and-forget). Stream chunk events always execute synchronous
(self, source: Any, event: BaseEvent)
| 570 | self._record_event(event) |
| 571 | |
| 572 | def emit(self, source: Any, event: BaseEvent) -> Future[None] | None: |
| 573 | """Emit an event to all registered handlers. |
| 574 | |
| 575 | If handlers have dependencies (registered with depends_on), they execute |
| 576 | in dependency order. Otherwise, handlers execute as before (sync in thread |
| 577 | pool, async fire-and-forget). |
| 578 | |
| 579 | Stream chunk events always execute synchronously to preserve ordering. |
| 580 | |
| 581 | Args: |
| 582 | source: The emitting object |
| 583 | event: The event instance to emit |
| 584 | |
| 585 | Returns: |
| 586 | Future that completes when handlers finish. Returns: |
| 587 | - Future for sync-only handlers (ThreadPoolExecutor future) |
| 588 | - Future for async handlers or mixed handlers (asyncio future) |
| 589 | - Future for dependency-managed handlers (asyncio future) |
| 590 | - None if no handlers or sync stream chunk events |
| 591 | |
| 592 | Example: |
| 593 | >>> future = crewai_event_bus.emit(source, event) |
| 594 | >>> if future: |
| 595 | ... await asyncio.wrap_future(future) # In async test |
| 596 | ... # or future.result(timeout=5.0) in sync code |
| 597 | """ |
| 598 | self._prepare_event(source, event) |
| 599 | |
| 600 | event_type = type(event) |
| 601 | |
| 602 | with self._rwlock.r_locked(): |
| 603 | if self._shutting_down: |
| 604 | self._console.print( |
| 605 | "[CrewAIEventsBus] Warning: Attempted to emit event during shutdown. Ignoring." |
| 606 | ) |
| 607 | return None |
| 608 | has_dependencies = event_type in self._handler_dependencies |
| 609 | sync_handlers = self._sync_handlers.get(event_type, frozenset()) |
| 610 | async_handlers = self._async_handlers.get(event_type, frozenset()) |
| 611 | |
| 612 | if not sync_handlers and not async_handlers: |
| 613 | return None |
| 614 | |
| 615 | self._ensure_executor_initialized() |
| 616 | self._has_pending_events = True |
| 617 | |
| 618 | state = self._runtime_state |
| 619 | |
| 620 | if has_dependencies: |
| 621 | return self._track_future( |
| 622 | asyncio.run_coroutine_threadsafe( |
| 623 | self._emit_with_dependencies(source, event, state), |
| 624 | self._loop, |
| 625 | ) |
| 626 | ) |
| 627 | |
| 628 | if sync_handlers: |
| 629 | if event_type is LLMStreamChunkEvent: |