MCPcopy
hub / github.com/mastra-ai/mastra / ThreadStateLibSQL

Class ThreadStateLibSQL

stores/libsql/src/storage/domains/thread-state/index.ts:22–139  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

20 * payload (e.g. the task list for `type = 'task'`).
21 */
22export class ThreadStateLibSQL extends ThreadStateStorage {
23 /**
24 * `thread_state` grows as a side effect of thread activity (one row per
25 * thread per state type). It anchors on `updatedAt` (last activity), so state
26 * for a thread that is still being appended to is not pruned by creation age.
27 */
28 static readonly retentionTables: RetentionTablesDescriptor = {
29 threadState: { table: TABLE_THREAD_STATE, column: 'updatedAt', indexed: true },
30 };
31
32 #db: LibSQLDB;
33 #client: Client;
34
35 constructor(config: LibSQLDomainConfig) {
36 super();
37 const client = resolveClient(config);
38 this.#client = client;
39 this.#db = new LibSQLDB({ client, maxRetries: config.maxRetries, initialBackoffMs: config.initialBackoffMs });
40 }
41
42 async init(): Promise<void> {
43 await this.#db.createTable({
44 tableName: TABLE_THREAD_STATE,
45 schema: THREAD_STATE_SCHEMA,
46 compositePrimaryKey: ['threadId', 'type'],
47 });
48 }
49
50 /** Delete thread state older than the `threadState` policy's `maxAge`, batched. */
51 async prune(policies: Record<string, TableRetentionPolicy>, options?: PruneOptions): Promise<PruneResult[]> {
52 const targets = resolveTargets({
53 policies,
54 descriptor: ThreadStateLibSQL.retentionTables,
55 order: ['threadState'],
56 });
57 return runPrune({ db: this.#db, domain: 'threadState', targets, options, logger: this.logger });
58 }
59
60 async dangerouslyClearAll(): Promise<void> {
61 try {
62 await this.#client.execute(`DELETE FROM "${TABLE_THREAD_STATE}"`);
63 } catch (error) {
64 throw new MastraError(
65 {
66 id: createStorageErrorId('LIBSQL', 'THREAD_STATE_CLEAR_ALL', 'FAILED'),
67 domain: ErrorDomain.STORAGE,
68 category: ErrorCategory.THIRD_PARTY,
69 },
70 error,
71 );
72 }
73 }
74
75 async getState<T = unknown>({ threadId, type }: { threadId: string; type: string }): Promise<T | undefined> {
76 try {
77 const result = await this.#client.execute({
78 sql: `SELECT "value" FROM "${TABLE_THREAD_STATE}" WHERE "threadId" = ? AND "type" = ? LIMIT 1`,
79 args: [threadId, type],

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…