MCPcopy
hub / github.com/crewAIInc/crewAI / emit

Method emit

lib/crewai/src/crewai/events/event_bus.py:572–647  ·  view source on GitHub ↗

Emit an event to all registered handlers. If handlers have dependencies (registered with depends_on), they execute in dependency order. Otherwise, handlers execute as before (sync in thread pool, async fire-and-forget). Stream chunk events always execute synchronous

(self, source: Any, event: BaseEvent)

Source from the content-addressed store, hash-verified

570 self._record_event(event)
571
572 def emit(self, source: Any, event: BaseEvent) -> Future[None] | None:
573 """Emit an event to all registered handlers.
574
575 If handlers have dependencies (registered with depends_on), they execute
576 in dependency order. Otherwise, handlers execute as before (sync in thread
577 pool, async fire-and-forget).
578
579 Stream chunk events always execute synchronously to preserve ordering.
580
581 Args:
582 source: The emitting object
583 event: The event instance to emit
584
585 Returns:
586 Future that completes when handlers finish. Returns:
587 - Future for sync-only handlers (ThreadPoolExecutor future)
588 - Future for async handlers or mixed handlers (asyncio future)
589 - Future for dependency-managed handlers (asyncio future)
590 - None if no handlers or sync stream chunk events
591
592 Example:
593 >>> future = crewai_event_bus.emit(source, event)
594 >>> if future:
595 ... await asyncio.wrap_future(future) # In async test
596 ... # or future.result(timeout=5.0) in sync code
597 """
598 self._prepare_event(source, event)
599
600 event_type = type(event)
601
602 with self._rwlock.r_locked():
603 if self._shutting_down:
604 self._console.print(
605 "[CrewAIEventsBus] Warning: Attempted to emit event during shutdown. Ignoring."
606 )
607 return None
608 has_dependencies = event_type in self._handler_dependencies
609 sync_handlers = self._sync_handlers.get(event_type, frozenset())
610 async_handlers = self._async_handlers.get(event_type, frozenset())
611
612 if not sync_handlers and not async_handlers:
613 return None
614
615 self._ensure_executor_initialized()
616 self._has_pending_events = True
617
618 state = self._runtime_state
619
620 if has_dependencies:
621 return self._track_future(
622 asyncio.run_coroutine_threadsafe(
623 self._emit_with_dependencies(source, event, state),
624 self._loop,
625 )
626 )
627
628 if sync_handlers:
629 if event_type is LLMStreamChunkEvent:

Calls 9

_prepare_eventMethod · 0.95
_track_futureMethod · 0.95
_call_handlersMethod · 0.95
_acall_handlersMethod · 0.95
r_lockedMethod · 0.80
printMethod · 0.45
getMethod · 0.45