returns jobRouteId, cleanupFunc, error
(ctx context.Context, jobId string, mainServerJwtToken string)
| 43 | |
| 44 | // returns jobRouteId, cleanupFunc, error |
| 45 | func (impl *ServerImpl) connectToJobManager(ctx context.Context, jobId string, mainServerJwtToken string) (string, func(), error) { |
| 46 | socketPath := wavebase.GetRemoteJobSocketPath(jobId) |
| 47 | log.Printf("connectToJobManager: connecting to socket: %s\n", socketPath) |
| 48 | conn, err := net.Dial("unix", socketPath) |
| 49 | if err != nil { |
| 50 | log.Printf("connectToJobManager: error connecting to socket: %v\n", err) |
| 51 | return "", nil, fmt.Errorf("cannot connect to job manager socket: %w", err) |
| 52 | } |
| 53 | log.Printf("connectToJobManager: connected to socket\n") |
| 54 | |
| 55 | proxy := wshutil.MakeRpcProxy("jobmanager") |
| 56 | linkId := impl.Router.RegisterUntrustedLink(proxy) |
| 57 | |
| 58 | var cleanupOnce sync.Once |
| 59 | cleanup := func() { |
| 60 | cleanupOnce.Do(func() { |
| 61 | conn.Close() |
| 62 | impl.Router.UnregisterLink(linkId) |
| 63 | impl.removeJobManagerConnection(jobId) |
| 64 | }) |
| 65 | } |
| 66 | |
| 67 | go func() { |
| 68 | writeErr := wshutil.AdaptOutputChToStream(proxy.ToRemoteCh, conn) |
| 69 | if writeErr != nil { |
| 70 | log.Printf("connectToJobManager: error writing to job manager socket: %v\n", writeErr) |
| 71 | } |
| 72 | }() |
| 73 | go func() { |
| 74 | defer func() { |
| 75 | close(proxy.FromRemoteCh) |
| 76 | cleanup() |
| 77 | }() |
| 78 | wshutil.AdaptStreamToMsgCh(conn, proxy.FromRemoteCh, nil) |
| 79 | }() |
| 80 | |
| 81 | routeId := wshutil.MakeLinkRouteId(linkId) |
| 82 | authData := wshrpc.CommandAuthenticateToJobData{ |
| 83 | JobAccessToken: mainServerJwtToken, |
| 84 | } |
| 85 | err = wshclient.AuthenticateToJobManagerCommand(impl.RpcClient, authData, &wshrpc.RpcOpts{Route: routeId}) |
| 86 | if err != nil { |
| 87 | cleanup() |
| 88 | return "", nil, fmt.Errorf("authentication to job manager failed: %w", err) |
| 89 | } |
| 90 | |
| 91 | jobRouteId := wshutil.MakeJobRouteId(jobId) |
| 92 | waitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) |
| 93 | defer cancel() |
| 94 | err = impl.Router.WaitForRegister(waitCtx, jobRouteId) |
| 95 | if err != nil { |
| 96 | cleanup() |
| 97 | return "", nil, fmt.Errorf("timeout waiting for job route to register: %w", err) |
| 98 | } |
| 99 | |
| 100 | jobConn := &JobManagerConnection{ |
| 101 | JobId: jobId, |
| 102 | Conn: conn, |
no test coverage detected