Fork forks an event log from a specific conversation up to a checkpoint.
(ctx context.Context, srcConversationID string, srcSeq int32, destConversationID string)
| 270 | |
| 271 | // Fork forks an event log from a specific conversation up to a checkpoint. |
| 272 | func (d *Controller) Fork(ctx context.Context, srcConversationID string, srcSeq int32, destConversationID string) (string, error) { |
| 273 | if srcConversationID == "" { |
| 274 | return "", fmt.Errorf("src_conversation_id is required") |
| 275 | } |
| 276 | // TODO(anj-s): Check whether destination ID already exists and reject collisions. |
| 277 | if destConversationID == "" { |
| 278 | destConversationID = uuid.NewString() |
| 279 | } |
| 280 | |
| 281 | events, err := d.eventLog.Events(ctx, srcConversationID) |
| 282 | if err != nil { |
| 283 | return "", fmt.Errorf("failed to retrieve source events: %w", err) |
| 284 | } |
| 285 | if len(events) == 0 { |
| 286 | return "", fmt.Errorf("source conversation %s not found or has no events", srcConversationID) |
| 287 | } |
| 288 | |
| 289 | // When the caller specifies srcSeq, require that it actually exists in |
| 290 | // the source event log. Without this check a typo or stale checkpoint |
| 291 | // silently degrades to "fork all events", which is misleading. Walk |
| 292 | // the events once: stop as soon as we pass the requested seq, and |
| 293 | // truncate the slice on an exact match so the copy loop below doesn't |
| 294 | // need to re-check the bound. |
| 295 | if srcSeq > 0 { |
| 296 | found := false |
| 297 | for i, ev := range events { |
| 298 | if ev.Seq == srcSeq { |
| 299 | events = events[:i+1] |
| 300 | found = true |
| 301 | break |
| 302 | } |
| 303 | if ev.Seq > srcSeq { |
| 304 | break |
| 305 | } |
| 306 | } |
| 307 | if !found { |
| 308 | return "", fmt.Errorf("src_seq %d not found in conversation %s", srcSeq, srcConversationID) |
| 309 | } |
| 310 | } |
| 311 | |
| 312 | for _, ev := range events { |
| 313 | // Clone the event to update the conversation ID. |
| 314 | newEvent := &proto.ConversationEvent{ |
| 315 | ConversationId: destConversationID, |
| 316 | Seq: ev.Seq, |
| 317 | ExecId: ev.ExecId, |
| 318 | Messages: ev.Messages, |
| 319 | State: ev.State, |
| 320 | } |
| 321 | if _, err := d.eventLog.Append(ctx, newEvent); err != nil { |
| 322 | return "", fmt.Errorf("failed to append forked event: %w", err) |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | return destConversationID, nil |
| 327 | } |
| 328 | |
| 329 | // Registry returns the agent registry. |