blob: 72c8bc67c3c46a69833b36dbf60e0a193d5f90ca [file] [log] [blame]
package testcluster
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/mitchellh/mapstructure"
)
func GetPerformanceToken(pri VaultCluster, id, secondaryPublicKey string) (string, error) {
client := pri.Nodes()[0].APIClient()
req := map[string]interface{}{
"id": id,
}
if secondaryPublicKey != "" {
req["secondary_public_key"] = secondaryPublicKey
}
secret, err := client.Logical().Write("sys/replication/performance/primary/secondary-token", req)
if err != nil {
return "", err
}
if secondaryPublicKey != "" {
return secret.Data["token"].(string), nil
}
return secret.WrapInfo.Token, nil
}
func EnablePerfPrimary(ctx context.Context, pri VaultCluster) error {
client := pri.Nodes()[0].APIClient()
_, err := client.Logical().WriteWithContext(ctx, "sys/replication/performance/primary/enable", nil)
if err != nil {
return err
}
err = WaitForPerfReplicationState(ctx, pri, consts.ReplicationPerformancePrimary)
if err != nil {
return err
}
return WaitForActiveNodeAndPerfStandbys(ctx, pri)
}
func WaitForPerfReplicationState(ctx context.Context, cluster VaultCluster, state consts.ReplicationState) error {
client := cluster.Nodes()[0].APIClient()
var health *api.HealthResponse
var err error
for ctx.Err() == nil {
health, err = client.Sys().HealthWithContext(ctx)
if err == nil && health.ReplicationPerformanceMode == state.GetPerformanceString() {
return nil
}
time.Sleep(500 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return err
}
func EnablePerformanceSecondaryNoWait(ctx context.Context, perfToken string, pri, sec VaultCluster, updatePrimary bool) error {
postData := map[string]interface{}{
"token": perfToken,
"ca_file": DefaultCAFile,
}
path := "sys/replication/performance/secondary/enable"
if updatePrimary {
path = "sys/replication/performance/secondary/update-primary"
}
err := WaitForActiveNodeAndPerfStandbys(ctx, sec)
if err != nil {
return err
}
_, err = sec.Nodes()[0].APIClient().Logical().Write(path, postData)
if err != nil {
return err
}
return WaitForPerfReplicationState(ctx, sec, consts.ReplicationPerformanceSecondary)
}
func EnablePerformanceSecondary(ctx context.Context, perfToken string, pri, sec VaultCluster, updatePrimary, skipPoisonPill bool) (string, error) {
if err := EnablePerformanceSecondaryNoWait(ctx, perfToken, pri, sec, updatePrimary); err != nil {
return "", err
}
if err := WaitForMatchingMerkleRoots(ctx, "sys/replication/performance/", pri, sec); err != nil {
return "", err
}
root, err := WaitForPerformanceSecondary(ctx, pri, sec, skipPoisonPill)
if err != nil {
return "", err
}
if err := WaitForPerfReplicationWorking(ctx, pri, sec); err != nil {
return "", err
}
return root, nil
}
func WaitForMatchingMerkleRoots(ctx context.Context, endpoint string, pri, sec VaultCluster) error {
getRoot := func(mode string, cli *api.Client) (string, error) {
status, err := cli.Logical().Read(endpoint + "status")
if err != nil {
return "", err
}
if status == nil || status.Data == nil || status.Data["mode"] == nil {
return "", fmt.Errorf("got nil secret or data")
}
if status.Data["mode"].(string) != mode {
return "", fmt.Errorf("expected mode=%s, got %s", mode, status.Data["mode"].(string))
}
return status.Data["merkle_root"].(string), nil
}
secClient := sec.Nodes()[0].APIClient()
priClient := pri.Nodes()[0].APIClient()
for i := 0; i < 30; i++ {
secRoot, err := getRoot("secondary", secClient)
if err != nil {
return err
}
priRoot, err := getRoot("primary", priClient)
if err != nil {
return err
}
if reflect.DeepEqual(priRoot, secRoot) {
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("roots did not become equal")
}
func WaitForPerformanceWAL(ctx context.Context, pri, sec VaultCluster) error {
endpoint := "sys/replication/performance/"
if err := WaitForMatchingMerkleRoots(ctx, endpoint, pri, sec); err != nil {
return nil
}
getWAL := func(mode, walKey string, cli *api.Client) (int64, error) {
status, err := cli.Logical().Read(endpoint + "status")
if err != nil {
return 0, err
}
if status == nil || status.Data == nil || status.Data["mode"] == nil {
return 0, fmt.Errorf("got nil secret or data")
}
if status.Data["mode"].(string) != mode {
return 0, fmt.Errorf("expected mode=%s, got %s", mode, status.Data["mode"].(string))
}
return status.Data[walKey].(json.Number).Int64()
}
secClient := sec.Nodes()[0].APIClient()
priClient := pri.Nodes()[0].APIClient()
for ctx.Err() == nil {
secLastRemoteWAL, err := getWAL("secondary", "last_remote_wal", secClient)
if err != nil {
return err
}
priLastPerfWAL, err := getWAL("primary", "last_performance_wal", priClient)
if err != nil {
return err
}
if secLastRemoteWAL >= priLastPerfWAL {
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("performance WALs on the secondary did not catch up with the primary, context err: %w", ctx.Err())
}
func WaitForPerformanceSecondary(ctx context.Context, pri, sec VaultCluster, skipPoisonPill bool) (string, error) {
if len(pri.GetRecoveryKeys()) > 0 {
sec.SetBarrierKeys(pri.GetRecoveryKeys())
sec.SetRecoveryKeys(pri.GetRecoveryKeys())
} else {
sec.SetBarrierKeys(pri.GetBarrierKeys())
sec.SetRecoveryKeys(pri.GetBarrierKeys())
}
if len(sec.Nodes()) > 1 {
if skipPoisonPill {
// As part of prepareSecondary on the active node the keyring is
// deleted from storage. Its absence can cause standbys to seal
// themselves. But it's not reliable, so we'll seal them
// ourselves to force the issue.
for i := range sec.Nodes()[1:] {
if err := SealNode(ctx, sec, i+1); err != nil {
return "", err
}
}
} else {
// We want to make sure we unseal all the nodes so we first need to wait
// until two of the nodes seal due to the poison pill being written
if err := WaitForNCoresSealed(ctx, sec, len(sec.Nodes())-1); err != nil {
return "", err
}
}
}
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return "", err
}
if err := UnsealAllNodes(ctx, sec); err != nil {
return "", err
}
perfSecondaryRootToken, err := GenerateRoot(sec, GenerateRootRegular)
if err != nil {
return "", err
}
sec.SetRootToken(perfSecondaryRootToken)
if err := WaitForActiveNodeAndPerfStandbys(ctx, sec); err != nil {
return "", err
}
return perfSecondaryRootToken, nil
}
func WaitForPerfReplicationWorking(ctx context.Context, pri, sec VaultCluster) error {
priActiveIdx, err := WaitForActiveNode(ctx, pri)
if err != nil {
return err
}
secActiveIdx, err := WaitForActiveNode(ctx, sec)
if err != nil {
return err
}
priClient, secClient := pri.Nodes()[priActiveIdx].APIClient(), sec.Nodes()[secActiveIdx].APIClient()
mountPoint, err := uuid.GenerateUUID()
if err != nil {
return err
}
err = priClient.Sys().Mount(mountPoint, &api.MountInput{
Type: "kv",
Local: false,
})
if err != nil {
return fmt.Errorf("unable to mount KV engine on primary")
}
path := mountPoint + "/foo"
_, err = priClient.Logical().Write(path, map[string]interface{}{
"bar": 1,
})
if err != nil {
return fmt.Errorf("unable to write KV on primary", "path", path)
}
for ctx.Err() == nil {
var secret *api.Secret
secret, err = secClient.Logical().Read(path)
if err == nil && secret != nil {
err = priClient.Sys().Unmount(mountPoint)
if err != nil {
return fmt.Errorf("unable to unmount KV engine on primary")
}
return nil
}
time.Sleep(100 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return fmt.Errorf("unable to read replicated KV on secondary, path=%s, err=%v", path, err)
}
func SetupTwoClusterPerfReplication(ctx context.Context, pri, sec VaultCluster) error {
if err := EnablePerfPrimary(ctx, pri); err != nil {
return err
}
perfToken, err := GetPerformanceToken(pri, sec.ClusterID(), "")
if err != nil {
return err
}
_, err = EnablePerformanceSecondary(ctx, perfToken, pri, sec, false, false)
return err
}
// PassiveWaitForActiveNodeAndPerfStandbys should be used instead of
// WaitForActiveNodeAndPerfStandbys when you don't want to do any writes
// as a side-effect. This returns perfStandby nodes in the cluster and
// an error.
func PassiveWaitForActiveNodeAndPerfStandbys(ctx context.Context, pri VaultCluster) (VaultClusterNode, []VaultClusterNode, error) {
leaderNode, standbys, err := GetActiveAndStandbys(ctx, pri)
if err != nil {
return nil, nil, fmt.Errorf("failed to derive standby nodes, %w", err)
}
for i, node := range standbys {
client := node.APIClient()
// Make sure we get perf standby nodes
if err = EnsureCoreIsPerfStandby(ctx, client); err != nil {
return nil, nil, fmt.Errorf("standby node %d is not a perfStandby, %w", i, err)
}
}
return leaderNode, standbys, nil
}
func GetActiveAndStandbys(ctx context.Context, cluster VaultCluster) (VaultClusterNode, []VaultClusterNode, error) {
var leaderIndex int
var err error
if leaderIndex, err = WaitForActiveNode(ctx, cluster); err != nil {
return nil, nil, err
}
var leaderNode VaultClusterNode
var nodes []VaultClusterNode
for i, node := range cluster.Nodes() {
if i == leaderIndex {
leaderNode = node
continue
}
nodes = append(nodes, node)
}
return leaderNode, nodes, nil
}
func EnsureCoreIsPerfStandby(ctx context.Context, client *api.Client) error {
var err error
var health *api.HealthResponse
for ctx.Err() == nil {
health, err = client.Sys().HealthWithContext(ctx)
if err == nil && health.PerformanceStandby {
return nil
}
time.Sleep(time.Millisecond * 500)
}
if err == nil {
err = ctx.Err()
}
return err
}
func WaitForDRReplicationState(ctx context.Context, cluster VaultCluster, state consts.ReplicationState) error {
client := cluster.Nodes()[0].APIClient()
var health *api.HealthResponse
var err error
for ctx.Err() == nil {
health, err = client.Sys().HealthWithContext(ctx)
if err == nil && health.ReplicationDRMode == state.GetDRString() {
return nil
}
time.Sleep(500 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return err
}
func EnableDrPrimary(ctx context.Context, pri VaultCluster) error {
client := pri.Nodes()[0].APIClient()
_, err := client.Logical().Write("sys/replication/dr/primary/enable", nil)
if err != nil {
return err
}
err = WaitForDRReplicationState(ctx, pri, consts.ReplicationDRPrimary)
if err != nil {
return err
}
return WaitForActiveNodeAndPerfStandbys(ctx, pri)
}
func GenerateDRActivationToken(pri VaultCluster, id, secondaryPublicKey string) (string, error) {
client := pri.Nodes()[0].APIClient()
req := map[string]interface{}{
"id": id,
}
if secondaryPublicKey != "" {
req["secondary_public_key"] = secondaryPublicKey
}
secret, err := client.Logical().Write("sys/replication/dr/primary/secondary-token", req)
if err != nil {
return "", err
}
if secondaryPublicKey != "" {
return secret.Data["token"].(string), nil
}
return secret.WrapInfo.Token, nil
}
func WaitForDRSecondary(ctx context.Context, pri, sec VaultCluster, skipPoisonPill bool) error {
if len(pri.GetRecoveryKeys()) > 0 {
sec.SetBarrierKeys(pri.GetRecoveryKeys())
sec.SetRecoveryKeys(pri.GetRecoveryKeys())
} else {
sec.SetBarrierKeys(pri.GetBarrierKeys())
sec.SetRecoveryKeys(pri.GetBarrierKeys())
}
if len(sec.Nodes()) > 1 {
if skipPoisonPill {
// As part of prepareSecondary on the active node the keyring is
// deleted from storage. Its absence can cause standbys to seal
// themselves. But it's not reliable, so we'll seal them
// ourselves to force the issue.
for i := range sec.Nodes()[1:] {
if err := SealNode(ctx, sec, i+1); err != nil {
return err
}
}
} else {
// We want to make sure we unseal all the nodes so we first need to wait
// until two of the nodes seal due to the poison pill being written
if err := WaitForNCoresSealed(ctx, sec, len(sec.Nodes())-1); err != nil {
return err
}
}
}
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return err
}
// unseal nodes
for i := range sec.Nodes() {
if err := UnsealNode(ctx, sec, i); err != nil {
// Sometimes when we get here it's already unsealed on its own
// and then this fails for DR secondaries so check again
// The error is "path disabled in replication DR secondary mode".
if healthErr := NodeHealthy(ctx, sec, i); healthErr != nil {
// return the original error
return err
}
}
}
sec.SetRootToken(pri.GetRootToken())
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return err
}
return nil
}
func EnableDRSecondaryNoWait(ctx context.Context, sec VaultCluster, drToken string) error {
postData := map[string]interface{}{
"token": drToken,
"ca_file": DefaultCAFile,
}
_, err := sec.Nodes()[0].APIClient().Logical().Write("sys/replication/dr/secondary/enable", postData)
if err != nil {
return err
}
return WaitForDRReplicationState(ctx, sec, consts.ReplicationDRSecondary)
}
func WaitForReplicationStatus(ctx context.Context, client *api.Client, dr bool, accept func(map[string]interface{}) bool) error {
url := "sys/replication/performance/status"
if dr {
url = "sys/replication/dr/status"
}
var err error
var secret *api.Secret
for ctx.Err() == nil {
secret, err = client.Logical().Read(url)
if err == nil && secret != nil && secret.Data != nil {
if accept(secret.Data) {
return nil
}
}
time.Sleep(500 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return fmt.Errorf("unable to get acceptable replication status: error=%v secret=%#v", err, secret)
}
func WaitForDRReplicationWorking(ctx context.Context, pri, sec VaultCluster) error {
priClient := pri.Nodes()[0].APIClient()
secClient := sec.Nodes()[0].APIClient()
// Make sure we've entered stream-wals mode
err := WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool {
return secret["state"] == string("stream-wals")
})
if err != nil {
return err
}
// Now write some data and make sure that we see last_remote_wal nonzero, i.e.
// at least one WAL has been streamed.
secret, err := priClient.Auth().Token().Create(&api.TokenCreateRequest{})
if err != nil {
return err
}
// Revoke the token since some tests won't be happy to see it.
err = priClient.Auth().Token().RevokeTree(secret.Auth.ClientToken)
if err != nil {
return err
}
err = WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool {
if secret["state"] != string("stream-wals") {
return false
}
if secret["last_remote_wal"] != nil {
lastRemoteWal, _ := secret["last_remote_wal"].(json.Number).Int64()
return lastRemoteWal > 0
}
return false
})
if err != nil {
return err
}
return nil
}
func EnableDrSecondary(ctx context.Context, pri, sec VaultCluster, drToken string) error {
err := EnableDRSecondaryNoWait(ctx, sec, drToken)
if err != nil {
return err
}
if err = WaitForMatchingMerkleRoots(ctx, "sys/replication/dr/", pri, sec); err != nil {
return err
}
err = WaitForDRSecondary(ctx, pri, sec, false)
if err != nil {
return err
}
if err = WaitForDRReplicationWorking(ctx, pri, sec); err != nil {
return err
}
return nil
}
func SetupTwoClusterDRReplication(ctx context.Context, pri, sec VaultCluster) error {
if err := EnableDrPrimary(ctx, pri); err != nil {
return err
}
drToken, err := GenerateDRActivationToken(pri, sec.ClusterID(), "")
if err != nil {
return err
}
err = EnableDrSecondary(ctx, pri, sec, drToken)
if err != nil {
return err
}
return nil
}
func DemoteDRPrimary(client *api.Client) error {
_, err := client.Logical().Write("sys/replication/dr/primary/demote", map[string]interface{}{})
return err
}
func createBatchToken(client *api.Client, path string) (string, error) {
// TODO: should these be more random in case more than one batch token needs to be created?
suffix := strings.Replace(path, "/", "", -1)
policyName := "path-batch-policy-" + suffix
roleName := "path-batch-role-" + suffix
rules := fmt.Sprintf(`path "%s" { capabilities = [ "read", "update" ] }`, path)
// create policy
_, err := client.Logical().Write("sys/policy/"+policyName, map[string]interface{}{
"policy": rules,
})
if err != nil {
return "", err
}
// create a role
_, err = client.Logical().Write("auth/token/roles/"+roleName, map[string]interface{}{
"allowed_policies": policyName,
"orphan": true,
"renewable": false,
"token_type": "batch",
})
if err != nil {
return "", err
}
// create batch token
secret, err := client.Logical().Write("auth/token/create/"+roleName, nil)
if err != nil {
return "", err
}
return secret.Auth.ClientToken, nil
}
// PromoteDRSecondaryWithBatchToken creates a batch token for DR promotion
// before promotion, it demotes the primary cluster. The primary cluster needs
// to be functional for the generation of the batch token
func PromoteDRSecondaryWithBatchToken(ctx context.Context, pri, sec VaultCluster) error {
client := pri.Nodes()[0].APIClient()
drToken, err := createBatchToken(client, "sys/replication/dr/secondary/promote")
if err != nil {
return err
}
err = DemoteDRPrimary(client)
if err != nil {
return err
}
return promoteDRSecondaryInternal(ctx, sec, drToken)
}
// PromoteDRSecondary generates a DR operation token on the secondary using
// unseal/recovery keys. Therefore, the primary cluster could potentially
// be out of service.
func PromoteDRSecondary(ctx context.Context, sec VaultCluster) error {
// generate DR operation token to do update primary on vC to point to
// the new perfSec primary vD
drToken, err := GenerateRoot(sec, GenerateRootDR)
if err != nil {
return err
}
return promoteDRSecondaryInternal(ctx, sec, drToken)
}
func promoteDRSecondaryInternal(ctx context.Context, sec VaultCluster, drToken string) error {
secClient := sec.Nodes()[0].APIClient()
// Allow retries of 503s, e.g.: replication is still catching up,
// try again later or provide the "force" argument
oldMaxRetries := secClient.MaxRetries()
secClient.SetMaxRetries(10)
defer secClient.SetMaxRetries(oldMaxRetries)
resp, err := secClient.Logical().Write("sys/replication/dr/secondary/promote", map[string]interface{}{
"dr_operation_token": drToken,
})
if err != nil {
return err
}
if resp == nil {
return fmt.Errorf("nil status response during DR promotion")
}
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return err
}
return WaitForDRReplicationState(ctx, sec, consts.ReplicationDRPrimary)
}
func checkClusterAddr(ctx context.Context, pri, sec VaultCluster) error {
priClient := pri.Nodes()[0].APIClient()
priLeader, err := priClient.Sys().LeaderWithContext(ctx)
if err != nil {
return err
}
secClient := sec.Nodes()[0].APIClient()
endpoint := "sys/replication/dr/"
status, err := secClient.Logical().Read(endpoint + "status")
if err != nil {
return err
}
if status == nil || status.Data == nil {
return fmt.Errorf("got nil secret or data")
}
var priAddrs []string
err = mapstructure.Decode(status.Data["known_primary_cluster_addrs"], &priAddrs)
if err != nil {
return err
}
if !strutil.StrListContains(priAddrs, priLeader.LeaderClusterAddress) {
return fmt.Errorf("failed to fine the expected primary cluster address %v in known_primary_cluster_addrs", priLeader.LeaderClusterAddress)
}
return nil
}
func UpdatePrimary(ctx context.Context, pri, sec VaultCluster) error {
// generate DR operation token to do update primary on vC to point to
// the new perfSec primary vD
rootToken, err := GenerateRoot(sec, GenerateRootDR)
if err != nil {
return err
}
// secondary activation token
drToken, err := GenerateDRActivationToken(pri, sec.ClusterID(), "")
if err != nil {
return err
}
// update-primary on vC (new perfSec Dr secondary) to point to
// the new perfSec Dr primary
secClient := sec.Nodes()[0].APIClient()
resp, err := secClient.Logical().Write("sys/replication/dr/secondary/update-primary", map[string]interface{}{
"dr_operation_token": rootToken,
"token": drToken,
"ca_file": DefaultCAFile,
})
if err != nil {
return err
}
if resp == nil {
return fmt.Errorf("nil status response during update primary")
}
if _, err = WaitForActiveNode(ctx, sec); err != nil {
return err
}
if err = WaitForDRReplicationState(ctx, sec, consts.ReplicationDRSecondary); err != nil {
return err
}
if err = checkClusterAddr(ctx, pri, sec); err != nil {
return err
}
return nil
}
func SetupFourClusterReplication(ctx context.Context, pri, sec, pridr, secdr VaultCluster) error {
err := SetupTwoClusterPerfReplication(ctx, pri, sec)
if err != nil {
return err
}
err = SetupTwoClusterDRReplication(ctx, pri, pridr)
if err != nil {
return err
}
err = SetupTwoClusterDRReplication(ctx, sec, secdr)
if err != nil {
return err
}
return nil
}
type ReplicationSet struct {
// By convention, we recommend the following naming scheme for
// clusters in this map:
// A: perf primary
// B: primary's DR
// C: first perf secondary of A
// D: C's DR
// E: second perf secondary of A
// F: E's DR
// ... etc.
//
// We use generic names rather than role-specific names because
// that's less confusing when promotions take place that result in role
// changes. In other words, if D gets promoted to replace C as a perf
// secondary, and C gets demoted and updated to become D's DR secondary,
// they should maintain their initial names of D and C throughout.
Clusters map[string]VaultCluster
Builder ClusterBuilder
Logger hclog.Logger
CA *CA
}
type ClusterBuilder func(ctx context.Context, name string, logger hclog.Logger) (VaultCluster, error)
func NewReplicationSet(b ClusterBuilder) (*ReplicationSet, error) {
return &ReplicationSet{
Clusters: map[string]VaultCluster{},
Builder: b,
Logger: hclog.NewNullLogger(),
}, nil
}
func (r *ReplicationSet) StandardPerfReplication(ctx context.Context) error {
for _, name := range []string{"A", "C"} {
if _, ok := r.Clusters[name]; !ok {
cluster, err := r.Builder(ctx, name, r.Logger)
if err != nil {
return err
}
r.Clusters[name] = cluster
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := SetupTwoClusterPerfReplication(ctx, r.Clusters["A"], r.Clusters["C"])
if err != nil {
return err
}
return nil
}
func (r *ReplicationSet) StandardDRReplication(ctx context.Context) error {
for _, name := range []string{"A", "B"} {
if _, ok := r.Clusters[name]; !ok {
cluster, err := r.Builder(ctx, name, r.Logger)
if err != nil {
return err
}
r.Clusters[name] = cluster
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := SetupTwoClusterDRReplication(ctx, r.Clusters["A"], r.Clusters["B"])
if err != nil {
return err
}
return nil
}
func (r *ReplicationSet) GetFourReplicationCluster(ctx context.Context) error {
for _, name := range []string{"A", "B", "C", "D"} {
if _, ok := r.Clusters[name]; !ok {
cluster, err := r.Builder(ctx, name, r.Logger)
if err != nil {
return err
}
r.Clusters[name] = cluster
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := SetupFourClusterReplication(ctx, r.Clusters["A"], r.Clusters["C"], r.Clusters["B"], r.Clusters["D"])
if err != nil {
return err
}
return nil
}
func (r *ReplicationSet) Cleanup() {
for _, cluster := range r.Clusters {
cluster.Cleanup()
}
}
func WaitForPerfReplicationConnectionStatus(ctx context.Context, client *api.Client) error {
type Primary struct {
APIAddress string `mapstructure:"api_address"`
ConnectionStatus string `mapstructure:"connection_status"`
ClusterAddress string `mapstructure:"cluster_address"`
LastHeartbeat string `mapstructure:"last_heartbeat"`
}
type Status struct {
Primaries []Primary `mapstructure:"primaries"`
}
return WaitForPerfReplicationStatus(ctx, client, func(m map[string]interface{}) error {
var status Status
err := mapstructure.Decode(m, &status)
if err != nil {
return err
}
if len(status.Primaries) == 0 {
return fmt.Errorf("primaries is zero")
}
for _, v := range status.Primaries {
if v.ConnectionStatus == "connected" {
return nil
}
}
return fmt.Errorf("no primaries connected")
})
}
func WaitForPerfReplicationStatus(ctx context.Context, client *api.Client, accept func(map[string]interface{}) error) error {
var err error
var secret *api.Secret
for ctx.Err() == nil {
secret, err = client.Logical().Read("sys/replication/performance/status")
if err == nil && secret != nil && secret.Data != nil {
if err = accept(secret.Data); err == nil {
return nil
}
}
time.Sleep(500 * time.Millisecond)
}
return fmt.Errorf("unable to get acceptable replication status within allotted time: error=%v secret=%#v", err, secret)
}