(sess *Session, msg *ClientComMessage)
| 351 | } |
| 352 | |
| 353 | func pluginFireHose(sess *Session, msg *ClientComMessage) (*ClientComMessage, *ServerComMessage) { |
| 354 | if globals.plugins == nil { |
| 355 | // Return the original message to continue processing without changes |
| 356 | return msg, nil |
| 357 | } |
| 358 | |
| 359 | var req *pbx.ClientReq |
| 360 | |
| 361 | id, topic := pluginIDAndTopic(msg) |
| 362 | ts := time.Now().UTC().Round(time.Millisecond) |
| 363 | for i := range globals.plugins { |
| 364 | p := &globals.plugins[i] |
| 365 | if !pluginDoFiltering(p.filterFireHose, msg) { |
| 366 | // Plugin is not interested in FireHose |
| 367 | continue |
| 368 | } |
| 369 | |
| 370 | if req == nil { |
| 371 | // Generate request only if needed |
| 372 | req = pluginGenerateClientReq(sess, msg) |
| 373 | if req == nil { |
| 374 | // Failed to serialize message. Most likely the message is invalid. |
| 375 | break |
| 376 | } |
| 377 | } |
| 378 | |
| 379 | var ctx context.Context |
| 380 | var cancel context.CancelFunc |
| 381 | if p.timeout > 0 { |
| 382 | ctx, cancel = context.WithTimeout(context.Background(), p.timeout) |
| 383 | defer cancel() |
| 384 | } else { |
| 385 | ctx = context.Background() |
| 386 | } |
| 387 | if resp, err := p.client.FireHose(ctx, req); err == nil { |
| 388 | respStatus := resp.GetStatus() |
| 389 | // CONTINUE means default processing |
| 390 | if respStatus == pbx.RespCode_CONTINUE { |
| 391 | continue |
| 392 | } |
| 393 | // DROP means stop processing of the message |
| 394 | if respStatus == pbx.RespCode_DROP { |
| 395 | return nil, nil |
| 396 | } |
| 397 | // REPLACE: ClientMsg was updated by the plugin. Use the new one for further processing. |
| 398 | if respStatus == pbx.RespCode_REPLACE { |
| 399 | return pbCliDeserialize(resp.GetClmsg()), nil |
| 400 | } |
| 401 | |
| 402 | // RESPOND: Plugin provided an alternative response message. Use it |
| 403 | return nil, pbServDeserialize(resp.GetSrvmsg()) |
| 404 | |
| 405 | } else if p.failureCode != 0 { |
| 406 | // Plugin failed and it's configured to stop further processing. |
| 407 | logs.Err.Println("plugin: failed,", p.name, err) |
| 408 | return nil, &ServerComMessage{ |
| 409 | Ctrl: &MsgServerCtrl{ |
| 410 | Id: id, |
no test coverage detected
searching dependent graphs…