InstallBackend sends a backend.install request-reply to a worker node. Idempotent on the worker: if the (modelID, replica) process is already running, the worker short-circuits and returns its address; if the binary is on disk, the worker just spawns a process; only a missing binary triggers a full
( nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent), )
| 118 | // it lives on a different NATS subject so it cannot head-of-line-block |
| 119 | // routine load traffic on the same worker. |
| 120 | func (a *RemoteUnloaderAdapter) InstallBackend( |
| 121 | nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, |
| 122 | replicaIndex int, |
| 123 | opID string, |
| 124 | onProgress func(messaging.BackendInstallProgressEvent), |
| 125 | ) (*messaging.BackendInstallReply, error) { |
| 126 | subject := messaging.SubjectNodeBackendInstall(nodeID) |
| 127 | xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "opID", opID) |
| 128 | |
| 129 | // Subscribe to the per-op progress subject BEFORE publishing the install |
| 130 | // request so we don't miss early events. |
| 131 | sub := a.subscribeProgress(nodeID, opID, onProgress) |
| 132 | |
| 133 | reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ |
| 134 | Backend: backendType, |
| 135 | ModelID: modelID, |
| 136 | BackendGalleries: galleriesJSON, |
| 137 | URI: uri, |
| 138 | Name: name, |
| 139 | Alias: alias, |
| 140 | ReplicaIndex: int32(replicaIndex), |
| 141 | OpID: opID, |
| 142 | }, a.installTimeout) |
| 143 | |
| 144 | if sub != nil { |
| 145 | _ = sub.Unsubscribe() |
| 146 | } |
| 147 | |
| 148 | if err != nil && isNATSTimeout(err) { |
| 149 | return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v", |
| 150 | galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err) |
| 151 | } |
| 152 | return reply, err |
| 153 | } |
| 154 | |
| 155 | // subscribeProgress subscribes to the per-op backend-install progress subject |
| 156 | // so the master can stream per-node download ticks while a worker installs or |
nothing calls this directly
no test coverage detected