MCPcopy
hub / github.com/mastra-ai/mastra / MastraStateAdapter

Class MastraStateAdapter

packages/core/src/channels/state-adapter.ts:19–244  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

17 * short-lived (seconds to minutes) and don't need persistence.
18 */
19export class MastraStateAdapter implements StateAdapter {
20 private memoryStore: MemoryStorage;
21 private connected = false;
22 private connectPromise: Promise<void> | null = null;
23
24 // In-memory ephemeral state (cache, locks, lists, queues)
25 private readonly cache = new Map<string, CachedValue>();
26 private readonly locks = new Map<string, Lock>();
27 private readonly lists = new Map<string, { values: unknown[]; expiresAt: number | null }>();
28 private readonly queues = new Map<string, QueueEntry[]>();
29
30 constructor(memoryStore: MemoryStorage) {
31 this.memoryStore = memoryStore;
32 }
33
34 async connect(): Promise<void> {
35 if (this.connected) return;
36 if (!this.connectPromise) {
37 this.connectPromise = Promise.resolve().then(() => {
38 this.connected = true;
39 });
40 }
41 await this.connectPromise;
42 }
43
44 async disconnect(): Promise<void> {
45 this.connected = false;
46 this.connectPromise = null;
47 this.cache.clear();
48 this.locks.clear();
49 this.lists.clear();
50 this.queues.clear();
51 }
52
53 // ---------------------------------------------------------------------------
54 // Subscriptions — persisted via Mastra thread metadata
55 // ---------------------------------------------------------------------------
56
57 async subscribe(threadId: string): Promise<void> {
58 // Find the Mastra thread mapped to this external thread ID and mark it
59 const thread = await this.findThreadByExternalId(threadId);
60 if (!thread) return; // Thread not yet mapped — subscribe will be a no-op
61 await this.memoryStore.updateThread({
62 id: thread.id,
63 title: thread.title ?? '',
64 metadata: { ...thread.metadata, channel_subscribed: 'true' },
65 });
66 }
67
68 async unsubscribe(threadId: string): Promise<void> {
69 const thread = await this.findThreadByExternalId(threadId);
70 if (!thread) return;
71 await this.memoryStore.updateThread({
72 id: thread.id,
73 title: thread.title ?? '',
74 metadata: { ...((thread.metadata ?? {}) as Record<string, unknown>), channel_subscribed: 'false' },
75 });
76 }

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…