(ctx context.Context)
| 423 | } |
| 424 | |
| 425 | func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) { |
| 426 | optionRows, err := a.CatalogPool.Query(ctx, ` |
| 427 | SELECT p.name, p.options, p.enc_key_id |
| 428 | FROM peers p |
| 429 | WHERE p.type = $1 AND EXISTS(SELECT * FROM flows f WHERE p.id = f.source_peer)`, protos.DBType_POSTGRES) |
| 430 | if err != nil { |
| 431 | return nil, err |
| 432 | } |
| 433 | |
| 434 | return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) { |
| 435 | var peerName string |
| 436 | var encPeerOptions []byte |
| 437 | var encKeyID string |
| 438 | if err := optionRows.Scan(&peerName, &encPeerOptions, &encKeyID); err != nil { |
| 439 | return nil, err |
| 440 | } |
| 441 | |
| 442 | peerOptions, err := internal.Decrypt(ctx, encKeyID, encPeerOptions) |
| 443 | if err != nil { |
| 444 | return nil, err |
| 445 | } |
| 446 | |
| 447 | var pgPeerConfig protos.PostgresConfig |
| 448 | unmarshalErr := proto.Unmarshal(peerOptions, &pgPeerConfig) |
| 449 | if unmarshalErr != nil { |
| 450 | return nil, unmarshalErr |
| 451 | } |
| 452 | return &protos.Peer{ |
| 453 | Name: peerName, |
| 454 | Type: protos.DBType_POSTGRES, |
| 455 | Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig}, |
| 456 | }, nil |
| 457 | }) |
| 458 | } |
| 459 | |
| 460 | // replicateQRepPartition replicates a QRepPartition from the source to the destination. |
| 461 | func replicateQRepPartition[TRead any, TWrite QRepStreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore]( |
no test coverage detected