MCPcopy
hub / github.com/PeerDB-io/peerdb / getPostgresPeerConfigs

Method getPostgresPeerConfigs

flow/activities/flowable_core.go:425–458  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

423}
424
425func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
426 optionRows, err := a.CatalogPool.Query(ctx, `
427 SELECT p.name, p.options, p.enc_key_id
428 FROM peers p
429 WHERE p.type = $1 AND EXISTS(SELECT * FROM flows f WHERE p.id = f.source_peer)`, protos.DBType_POSTGRES)
430 if err != nil {
431 return nil, err
432 }
433
434 return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) {
435 var peerName string
436 var encPeerOptions []byte
437 var encKeyID string
438 if err := optionRows.Scan(&peerName, &encPeerOptions, &encKeyID); err != nil {
439 return nil, err
440 }
441
442 peerOptions, err := internal.Decrypt(ctx, encKeyID, encPeerOptions)
443 if err != nil {
444 return nil, err
445 }
446
447 var pgPeerConfig protos.PostgresConfig
448 unmarshalErr := proto.Unmarshal(peerOptions, &pgPeerConfig)
449 if unmarshalErr != nil {
450 return nil, unmarshalErr
451 }
452 return &protos.Peer{
453 Name: peerName,
454 Type: protos.DBType_POSTGRES,
455 Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig},
456 }, nil
457 })
458}
459
460// replicateQRepPartition replicates a QRepPartition from the source to the destination.
461func replicateQRepPartition[TRead any, TWrite QRepStreamCloser, TSync connectors.QRepSyncConnectorCore, TPull connectors.QRepPullConnectorCore](

Callers 1

SendWALHeartbeatMethod · 0.95

Calls 3

DecryptMethod · 0.80
QueryMethod · 0.65
ScanMethod · 0.45

Tested by

no test coverage detected