NewClientStreamHandler constructs a [Handler] for a client streaming procedure.
( procedure string, implementation func(context.Context, *ClientStream[Req]) (*Response[Res], error), options ...HandlerOption, )
| 136 | |
| 137 | // NewClientStreamHandler constructs a [Handler] for a client streaming procedure. |
| 138 | func NewClientStreamHandler[Req, Res any]( |
| 139 | procedure string, |
| 140 | implementation func(context.Context, *ClientStream[Req]) (*Response[Res], error), |
| 141 | options ...HandlerOption, |
| 142 | ) *Handler { |
| 143 | config := newHandlerConfig(procedure, StreamTypeClient, options) |
| 144 | return newStreamHandler( |
| 145 | config, |
| 146 | func(ctx context.Context, conn StreamingHandlerConn) error { |
| 147 | stream := &ClientStream[Req]{ |
| 148 | conn: conn, |
| 149 | initializer: config.Initializer, |
| 150 | } |
| 151 | ctx = newHandlerContext(ctx, &streamingHandlerCallInfo{ |
| 152 | conn: conn, |
| 153 | }) |
| 154 | res, err := implementation(ctx, stream) |
| 155 | if err != nil { |
| 156 | return err |
| 157 | } |
| 158 | if res == nil { |
| 159 | // This is going to panic during serialization. Debugging is much easier |
| 160 | // if we panic here instead, so we can include the procedure name. |
| 161 | panic(procedure + " returned nil *connect.Response and nil error") //nolint: forbidigo |
| 162 | } |
| 163 | mergeHeaders(conn.ResponseHeader(), res.header) |
| 164 | mergeHeaders(conn.ResponseTrailer(), res.trailer) |
| 165 | return conn.Send(res.Msg) |
| 166 | }, |
| 167 | ) |
| 168 | } |
| 169 | |
| 170 | // NewClientStreamHandlerSimple constructs a [Handler] for a request-streaming procedure |
| 171 | // using the function signature associated with the "simple" generation option. |