(ctx context.Context, jobId string, blockId string)
| 1409 | } |
| 1410 | |
| 1411 | func AttachJobToBlock(ctx context.Context, jobId string, blockId string) error { |
| 1412 | err := wstore.WithTx(ctx, func(tx *wstore.TxWrap) error { |
| 1413 | var oldJobId string |
| 1414 | |
| 1415 | err := wstore.DBUpdateFn(tx.Context(), blockId, func(block *waveobj.Block) { |
| 1416 | oldJobId = block.JobId |
| 1417 | block.JobId = jobId |
| 1418 | }) |
| 1419 | if err != nil { |
| 1420 | return fmt.Errorf("failed to update block: %w", err) |
| 1421 | } |
| 1422 | |
| 1423 | if oldJobId != "" && oldJobId != jobId { |
| 1424 | err = wstore.DBUpdateFn(tx.Context(), oldJobId, func(oldJob *waveobj.Job) { |
| 1425 | if oldJob.AttachedBlockId == blockId { |
| 1426 | oldJob.AttachedBlockId = "" |
| 1427 | } |
| 1428 | }) |
| 1429 | if err != nil { |
| 1430 | log.Printf("[job:%s] warning: could not detach old job: %v", oldJobId, err) |
| 1431 | } |
| 1432 | } |
| 1433 | |
| 1434 | err = wstore.DBUpdateFnErr(tx.Context(), jobId, func(job *waveobj.Job) error { |
| 1435 | if job.AttachedBlockId != "" && job.AttachedBlockId != blockId { |
| 1436 | return fmt.Errorf("job %s already attached to block %s", jobId, job.AttachedBlockId) |
| 1437 | } |
| 1438 | job.AttachedBlockId = blockId |
| 1439 | return nil |
| 1440 | }) |
| 1441 | if err != nil { |
| 1442 | return fmt.Errorf("failed to update job: %w", err) |
| 1443 | } |
| 1444 | |
| 1445 | log.Printf("[job:%s] attached to block:%s", jobId, blockId) |
| 1446 | return nil |
| 1447 | }) |
| 1448 | if err != nil { |
| 1449 | return err |
| 1450 | } |
| 1451 | |
| 1452 | SendBlockJobStatusEvent(ctx, blockId) |
| 1453 | wcore.SendWaveObjUpdate(waveobj.MakeORef(waveobj.OType_Block, blockId)) |
| 1454 | return nil |
| 1455 | } |
| 1456 | |
| 1457 | func DetachJobFromBlock(ctx context.Context, jobId string, updateBlock bool) error { |
| 1458 | var blockId string |
no test coverage detected