MCPcopy
hub / github.com/apache/devlake / Stream

Method Stream

backend/server/services/remote/bridge/cmd.go:89–147  ·  view source on GitHub ↗
(methodName string, ctx plugin.ExecContext, args ...any)

Source from the content-addressed store, hash-verified

87}
88
89func (c *CmdInvoker) Stream(methodName string, ctx plugin.ExecContext, args ...any) *MethodStream {
90 recvChannel := make(chan *StreamResult)
91 stream := &MethodStream{
92 outbound: nil,
93 inbound: recvChannel,
94 }
95 serializedArgs, err := serialize(append([]any{DefaultContext.GetRemoteConfig()}, args...)...)
96 if err != nil {
97 recvChannel <- NewStreamResult(nil, err)
98 return stream
99 }
100 executable, inputArgs := c.resolveCmd(methodName, serializedArgs...)
101 cmdCtx := DefaultContext.GetContext() // grabbing context off of ctx kills the cmd after a couple of seconds... why?
102 cmd := exec.CommandContext(cmdCtx, executable, inputArgs...)
103 if c.workingPath != "" {
104 cmd.Dir = c.workingPath
105 }
106 processHandle, err := utils.StreamProcess(cmd, &utils.StreamProcessOptions{
107 OnStdout: func(b []byte) {
108 msg := string(b)
109 c.logRemoteMessage(ctx.GetLogger(), msg)
110 },
111 OnStderr: func(b []byte) {
112 msg := string(b)
113 c.logRemoteError(ctx.GetLogger(), msg)
114 },
115 UseFdOut: true,
116 })
117 if err != nil {
118 recvChannel <- NewStreamResult(nil, err)
119 return stream
120 }
121 go func() {
122 defer close(recvChannel)
123 for msg := range processHandle.Receive() {
124 if err = msg.GetError(); err != nil {
125 recvChannel <- NewStreamResult(nil, err)
126 }
127 if !c.cancelled {
128 select {
129 case <-ctx.GetContext().Done():
130 err = processHandle.Cancel()
131 if err != nil {
132 recvChannel <- NewStreamResult(nil, errors.Default.Wrap(err, "error cancelling python target"))
133 return
134 }
135 c.cancelled = true
136 // continue until the stream gets closed by the child
137 default:
138 }
139 }
140 response := msg.GetFdOut()
141 if response != nil {
142 recvChannel <- NewStreamResult(response, nil)
143 }
144 }
145 }()
146 return stream

Callers

nothing calls this directly

Calls 12

logRemoteMessageMethod · 0.95
logRemoteErrorMethod · 0.95
serializeFunction · 0.85
NewStreamResultFunction · 0.85
GetErrorMethod · 0.80
CancelMethod · 0.80
WrapMethod · 0.80
GetFdOutMethod · 0.80
GetRemoteConfigMethod · 0.65
GetContextMethod · 0.65
GetLoggerMethod · 0.65
ReceiveMethod · 0.45

Tested by

no test coverage detected