MCPcopy
hub / github.com/determined-ai/determined / run

Method run

agent/internal/agent.go:85–221  ·  view source on GitHub ↗

Run sets up the agent and starts the watch loop. All configurations and system depenencies should be setup _before_ the watch loop is started. nolint:maintidx

(ctx context.Context)

Source from the content-addressed store, hash-verified

83//
84//nolint:maintidx
85func (a *Agent) run(ctx context.Context) error {
86 a.log.Trace("connecting to master")
87 socket, err := a.connect(ctx, false)
88 if err != nil {
89 return masterConnectionError{cause: fmt.Errorf("initial connection to master failed: %w", err)}
90 }
91 defer func() {
92 a.log.Trace("cleaning up socket")
93 if cErr := socket.Close(); err != nil {
94 a.log.WithError(cErr).Error("failed to close master websocket")
95 }
96 }()
97
98 a.log.Trace("reading master set agent options message")
99 var mopts aproto.MasterSetAgentOptions
100 select {
101 case msg, ok := <-socket.Inbox:
102 switch {
103 case !ok:
104 return fmt.Errorf("socket closed while reading setup messages")
105 case msg.MasterSetAgentOptions == nil:
106 return fmt.Errorf("master did not send setup messages")
107 default:
108 mopts = *msg.MasterSetAgentOptions
109 }
110 case <-ctx.Done():
111 return fmt.Errorf("canceled while reading setup messages: %w", ctx.Err())
112 }
113
114 a.log.Trace("detecting devices")
115 devices, err := detect.Detect(
116 a.opts.SlotType, a.opts.AgentID, a.opts.VisibleGPUs, a.opts.ArtificialSlots,
117 )
118 if err != nil {
119 return fmt.Errorf("failed to detect devices: %v", devices)
120 }
121
122 a.log.Tracef("setting up %s runtime", a.opts.ContainerRuntime)
123 if a.opts.ContainerRuntime != options.DockerContainerRuntime {
124 a.log.Error(a.opts.ContainerRuntime,
125 " container runtime is not supported, please update runtime config to use docker instead.")
126 return fmt.Errorf("container runtime not available: %s", a.opts.ContainerRuntime)
127 }
128
129 dcl, dErr := dclient.NewClientWithOpts(dclient.WithAPIVersionNegotiation(), dclient.FromEnv)
130 if dErr != nil {
131 return fmt.Errorf("failed to build docker client: %w", dErr)
132 }
133 defer func() {
134 a.log.Trace("cleaning up docker client")
135 if cErr := dcl.Close(); cErr != nil {
136 a.log.WithError(cErr).Error("failed to close docker client")
137 }
138 }()
139 cruntime := docker.NewClient(dcl)
140
141 a.log.Trace("setting up container manager")
142 outbox := make(chan *aproto.MasterMessage, eventChanSize) // covers many from socket lifetimes

Callers 1

NewAgentFunction · 0.95

Calls 15

connectMethod · 0.95
senderMethod · 0.95
reconnectFlowMethod · 0.95
DetectFunction · 0.92
NewClientFunction · 0.92
NewFunction · 0.92
makeFunction · 0.85
ErrMethod · 0.80
ReattachContainersMethod · 0.80
StartContainerMethod · 0.80
NewMethod · 0.80
CloseMethod · 0.65

Tested by

no test coverage detected