MCPcopy
hub / github.com/wavetermdev/waveterm / connectToJobManager

Method connectToJobManager

pkg/wshrpc/wshremote/wshremote_job.go:45–109  ·  view source on GitHub ↗

returns jobRouteId, cleanupFunc, error

(ctx context.Context, jobId string, mainServerJwtToken string)

Source from the content-addressed store, hash-verified

43
44// returns jobRouteId, cleanupFunc, error
45func (impl *ServerImpl) connectToJobManager(ctx context.Context, jobId string, mainServerJwtToken string) (string, func(), error) {
46 socketPath := wavebase.GetRemoteJobSocketPath(jobId)
47 log.Printf("connectToJobManager: connecting to socket: %s\n", socketPath)
48 conn, err := net.Dial("unix", socketPath)
49 if err != nil {
50 log.Printf("connectToJobManager: error connecting to socket: %v\n", err)
51 return "", nil, fmt.Errorf("cannot connect to job manager socket: %w", err)
52 }
53 log.Printf("connectToJobManager: connected to socket\n")
54
55 proxy := wshutil.MakeRpcProxy("jobmanager")
56 linkId := impl.Router.RegisterUntrustedLink(proxy)
57
58 var cleanupOnce sync.Once
59 cleanup := func() {
60 cleanupOnce.Do(func() {
61 conn.Close()
62 impl.Router.UnregisterLink(linkId)
63 impl.removeJobManagerConnection(jobId)
64 })
65 }
66
67 go func() {
68 writeErr := wshutil.AdaptOutputChToStream(proxy.ToRemoteCh, conn)
69 if writeErr != nil {
70 log.Printf("connectToJobManager: error writing to job manager socket: %v\n", writeErr)
71 }
72 }()
73 go func() {
74 defer func() {
75 close(proxy.FromRemoteCh)
76 cleanup()
77 }()
78 wshutil.AdaptStreamToMsgCh(conn, proxy.FromRemoteCh, nil)
79 }()
80
81 routeId := wshutil.MakeLinkRouteId(linkId)
82 authData := wshrpc.CommandAuthenticateToJobData{
83 JobAccessToken: mainServerJwtToken,
84 }
85 err = wshclient.AuthenticateToJobManagerCommand(impl.RpcClient, authData, &wshrpc.RpcOpts{Route: routeId})
86 if err != nil {
87 cleanup()
88 return "", nil, fmt.Errorf("authentication to job manager failed: %w", err)
89 }
90
91 jobRouteId := wshutil.MakeJobRouteId(jobId)
92 waitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
93 defer cancel()
94 err = impl.Router.WaitForRegister(waitCtx, jobRouteId)
95 if err != nil {
96 cleanup()
97 return "", nil, fmt.Errorf("timeout waiting for job route to register: %w", err)
98 }
99
100 jobConn := &JobManagerConnection{
101 JobId: jobId,
102 Conn: conn,

Callers 2

RemoteStartJobCommandMethod · 0.95

Calls 13

GetRemoteJobSocketPathFunction · 0.92
MakeRpcProxyFunction · 0.92
AdaptOutputChToStreamFunction · 0.92
AdaptStreamToMsgChFunction · 0.92
MakeLinkRouteIdFunction · 0.92
MakeJobRouteIdFunction · 0.92
RegisterUntrustedLinkMethod · 0.80
UnregisterLinkMethod · 0.80
WaitForRegisterMethod · 0.80

Tested by

no test coverage detected