Run sets up the agent and starts the watch loop. All configurations and system depenencies should be setup _before_ the watch loop is started. nolint:maintidx
(ctx context.Context)
| 83 | // |
| 84 | //nolint:maintidx |
| 85 | func (a *Agent) run(ctx context.Context) error { |
| 86 | a.log.Trace("connecting to master") |
| 87 | socket, err := a.connect(ctx, false) |
| 88 | if err != nil { |
| 89 | return masterConnectionError{cause: fmt.Errorf("initial connection to master failed: %w", err)} |
| 90 | } |
| 91 | defer func() { |
| 92 | a.log.Trace("cleaning up socket") |
| 93 | if cErr := socket.Close(); err != nil { |
| 94 | a.log.WithError(cErr).Error("failed to close master websocket") |
| 95 | } |
| 96 | }() |
| 97 | |
| 98 | a.log.Trace("reading master set agent options message") |
| 99 | var mopts aproto.MasterSetAgentOptions |
| 100 | select { |
| 101 | case msg, ok := <-socket.Inbox: |
| 102 | switch { |
| 103 | case !ok: |
| 104 | return fmt.Errorf("socket closed while reading setup messages") |
| 105 | case msg.MasterSetAgentOptions == nil: |
| 106 | return fmt.Errorf("master did not send setup messages") |
| 107 | default: |
| 108 | mopts = *msg.MasterSetAgentOptions |
| 109 | } |
| 110 | case <-ctx.Done(): |
| 111 | return fmt.Errorf("canceled while reading setup messages: %w", ctx.Err()) |
| 112 | } |
| 113 | |
| 114 | a.log.Trace("detecting devices") |
| 115 | devices, err := detect.Detect( |
| 116 | a.opts.SlotType, a.opts.AgentID, a.opts.VisibleGPUs, a.opts.ArtificialSlots, |
| 117 | ) |
| 118 | if err != nil { |
| 119 | return fmt.Errorf("failed to detect devices: %v", devices) |
| 120 | } |
| 121 | |
| 122 | a.log.Tracef("setting up %s runtime", a.opts.ContainerRuntime) |
| 123 | if a.opts.ContainerRuntime != options.DockerContainerRuntime { |
| 124 | a.log.Error(a.opts.ContainerRuntime, |
| 125 | " container runtime is not supported, please update runtime config to use docker instead.") |
| 126 | return fmt.Errorf("container runtime not available: %s", a.opts.ContainerRuntime) |
| 127 | } |
| 128 | |
| 129 | dcl, dErr := dclient.NewClientWithOpts(dclient.WithAPIVersionNegotiation(), dclient.FromEnv) |
| 130 | if dErr != nil { |
| 131 | return fmt.Errorf("failed to build docker client: %w", dErr) |
| 132 | } |
| 133 | defer func() { |
| 134 | a.log.Trace("cleaning up docker client") |
| 135 | if cErr := dcl.Close(); cErr != nil { |
| 136 | a.log.WithError(cErr).Error("failed to close docker client") |
| 137 | } |
| 138 | }() |
| 139 | cruntime := docker.NewClient(dcl) |
| 140 | |
| 141 | a.log.Trace("setting up container manager") |
| 142 | outbox := make(chan *aproto.MasterMessage, eventChanSize) // covers many from socket lifetimes |
no test coverage detected