| // Copyright (c) HashiCorp, Inc. |
| // SPDX-License-Identifier: MPL-2.0 |
| |
| package raft |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "math" |
| "os" |
| "strconv" |
| "sync" |
| "time" |
| |
| "github.com/armon/go-metrics" |
| "github.com/hashicorp/go-secure-stdlib/parseutil" |
| "github.com/hashicorp/go-secure-stdlib/strutil" |
| "github.com/hashicorp/raft" |
| autopilot "github.com/hashicorp/raft-autopilot" |
| "github.com/mitchellh/mapstructure" |
| "go.uber.org/atomic" |
| ) |
| |
| type CleanupDeadServersValue int |
| |
| const ( |
| CleanupDeadServersUnset CleanupDeadServersValue = 0 |
| CleanupDeadServersTrue CleanupDeadServersValue = 1 |
| CleanupDeadServersFalse CleanupDeadServersValue = 2 |
| AutopilotUpgradeVersionTag string = "upgrade_version" |
| AutopilotRedundancyZoneTag string = "redundancy_zone" |
| ) |
| |
| func (c CleanupDeadServersValue) Value() bool { |
| switch c { |
| case CleanupDeadServersTrue: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // AutopilotConfig is used for querying/setting the Autopilot configuration. |
| type AutopilotConfig struct { |
| // CleanupDeadServers controls whether to remove dead servers from the Raft |
| // peer list periodically or when a new server joins. |
| CleanupDeadServers bool `mapstructure:"cleanup_dead_servers"` |
| |
| // CleanupDeadServersValue is used to shadow the CleanupDeadServers field in |
| // storage. Having it as an int helps in knowing if the value was set explicitly |
| // using the API or not. |
| CleanupDeadServersValue CleanupDeadServersValue `mapstructure:"cleanup_dead_servers_value"` |
| |
| // LastContactThreshold is the limit on the amount of time a server can go |
| // without leader contact before being considered unhealthy. |
| LastContactThreshold time.Duration `mapstructure:"-"` |
| |
| // DeadServerLastContactThreshold is the limit on the amount of time a server |
| // can go without leader contact before being considered failed. This takes |
| // effect only when CleanupDeadServers is set. |
| DeadServerLastContactThreshold time.Duration `mapstructure:"-"` |
| |
| // MaxTrailingLogs is the amount of entries in the Raft Log that a server can |
| // be behind before being considered unhealthy. |
| MaxTrailingLogs uint64 `mapstructure:"max_trailing_logs"` |
| |
| // MinQuorum sets the minimum number of servers allowed in a cluster before |
| // autopilot can prune dead servers. |
| MinQuorum uint `mapstructure:"min_quorum"` |
| |
| // ServerStabilizationTime is the minimum amount of time a server must be in a |
| // stable, healthy state before it can be added to the cluster. Only applicable |
| // with Raft protocol version 3 or higher. |
| ServerStabilizationTime time.Duration `mapstructure:"-"` |
| |
| // (Enterprise-only) DisableUpgradeMigration will disable Autopilot's upgrade migration |
| // strategy of waiting until enough newer-versioned servers have been added to the |
| // cluster before promoting them to voters. |
| DisableUpgradeMigration bool `mapstructure:"disable_upgrade_migration"` |
| |
| // (Enterprise-only) RedundancyZoneTag is the node tag to use for separating |
| // servers into zones for redundancy. If left blank, this feature will be disabled. |
| RedundancyZoneTag string `mapstructure:"redundancy_zone_tag"` |
| |
| // (Enterprise-only) UpgradeVersionTag is the node tag to use for version info when |
| // performing upgrade migrations. If left blank, the Consul version will be used. |
| UpgradeVersionTag string `mapstructure:"upgrade_version_tag"` |
| } |
| |
| // Merge combines the supplied config with the receiver. Supplied ones take |
| // priority. |
| func (to *AutopilotConfig) Merge(from *AutopilotConfig) { |
| if from == nil { |
| return |
| } |
| if from.CleanupDeadServersValue != CleanupDeadServersUnset { |
| to.CleanupDeadServers = from.CleanupDeadServersValue.Value() |
| } |
| if from.MinQuorum != 0 { |
| to.MinQuorum = from.MinQuorum |
| } |
| if from.LastContactThreshold != 0 { |
| to.LastContactThreshold = from.LastContactThreshold |
| } |
| if from.DeadServerLastContactThreshold != 0 { |
| to.DeadServerLastContactThreshold = from.DeadServerLastContactThreshold |
| } |
| if from.MaxTrailingLogs != 0 { |
| to.MaxTrailingLogs = from.MaxTrailingLogs |
| } |
| if from.ServerStabilizationTime != 0 { |
| to.ServerStabilizationTime = from.ServerStabilizationTime |
| } |
| |
| // UpgradeVersionTag and RedundancyZoneTag are purposely not included here since those values aren't user |
| // controllable and should never change. |
| to.DisableUpgradeMigration = from.DisableUpgradeMigration |
| } |
| |
| // Clone returns a duplicate instance of AutopilotConfig with the exact same values. |
| func (ac *AutopilotConfig) Clone() *AutopilotConfig { |
| if ac == nil { |
| return nil |
| } |
| return &AutopilotConfig{ |
| CleanupDeadServers: ac.CleanupDeadServers, |
| LastContactThreshold: ac.LastContactThreshold, |
| DeadServerLastContactThreshold: ac.DeadServerLastContactThreshold, |
| MaxTrailingLogs: ac.MaxTrailingLogs, |
| MinQuorum: ac.MinQuorum, |
| ServerStabilizationTime: ac.ServerStabilizationTime, |
| UpgradeVersionTag: ac.UpgradeVersionTag, |
| RedundancyZoneTag: ac.RedundancyZoneTag, |
| DisableUpgradeMigration: ac.DisableUpgradeMigration, |
| } |
| } |
| |
| // MarshalJSON makes the autopilot config fields JSON compatible |
| func (ac *AutopilotConfig) MarshalJSON() ([]byte, error) { |
| return json.Marshal(map[string]interface{}{ |
| "cleanup_dead_servers": ac.CleanupDeadServers, |
| "cleanup_dead_servers_value": ac.CleanupDeadServersValue, |
| "last_contact_threshold": ac.LastContactThreshold.String(), |
| "dead_server_last_contact_threshold": ac.DeadServerLastContactThreshold.String(), |
| "max_trailing_logs": ac.MaxTrailingLogs, |
| "min_quorum": ac.MinQuorum, |
| "server_stabilization_time": ac.ServerStabilizationTime.String(), |
| "upgrade_version_tag": ac.UpgradeVersionTag, |
| "redundancy_zone_tag": ac.RedundancyZoneTag, |
| "disable_upgrade_migration": ac.DisableUpgradeMigration, |
| }) |
| } |
| |
| // UnmarshalJSON parses the autopilot config JSON blob |
| func (ac *AutopilotConfig) UnmarshalJSON(b []byte) error { |
| var data interface{} |
| err := json.Unmarshal(b, &data) |
| if err != nil { |
| return err |
| } |
| |
| conf := data.(map[string]interface{}) |
| if err = mapstructure.WeakDecode(conf, ac); err != nil { |
| return err |
| } |
| if ac.LastContactThreshold, err = parseutil.ParseDurationSecond(conf["last_contact_threshold"]); err != nil { |
| return err |
| } |
| if ac.DeadServerLastContactThreshold, err = parseutil.ParseDurationSecond(conf["dead_server_last_contact_threshold"]); err != nil { |
| return err |
| } |
| if ac.ServerStabilizationTime, err = parseutil.ParseDurationSecond(conf["server_stabilization_time"]); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| // FollowerState represents the information about peer that the leader tracks. |
| type FollowerState struct { |
| AppliedIndex uint64 |
| LastHeartbeat time.Time |
| LastTerm uint64 |
| IsDead *atomic.Bool |
| DesiredSuffrage string |
| Version string |
| UpgradeVersion string |
| RedundancyZone string |
| } |
| |
| // EchoRequestUpdate is here to avoid 1) the list of arguments to Update() getting huge 2) an import cycle on the vault package |
| type EchoRequestUpdate struct { |
| NodeID string |
| AppliedIndex uint64 |
| Term uint64 |
| DesiredSuffrage string |
| UpgradeVersion string |
| SDKVersion string |
| RedundancyZone string |
| } |
| |
| // FollowerStates holds information about all the followers in the raft cluster |
| // tracked by the leader. |
| type FollowerStates struct { |
| l sync.RWMutex |
| followers map[string]*FollowerState |
| } |
| |
| // NewFollowerStates creates a new FollowerStates object |
| func NewFollowerStates() *FollowerStates { |
| return &FollowerStates{ |
| followers: make(map[string]*FollowerState), |
| } |
| } |
| |
| // Update the peer information in the follower states. Note that this function |
| // runs on the active node. Returns true if a new entry was added, as opposed |
| // to modifying one already present. |
| func (s *FollowerStates) Update(req *EchoRequestUpdate) bool { |
| s.l.Lock() |
| defer s.l.Unlock() |
| |
| state, present := s.followers[req.NodeID] |
| if !present { |
| state = &FollowerState{ |
| IsDead: atomic.NewBool(false), |
| } |
| s.followers[req.NodeID] = state |
| } |
| |
| state.IsDead.Store(false) |
| state.AppliedIndex = req.AppliedIndex |
| state.LastTerm = req.Term |
| state.DesiredSuffrage = req.DesiredSuffrage |
| state.LastHeartbeat = time.Now() |
| state.Version = req.SDKVersion |
| state.UpgradeVersion = req.UpgradeVersion |
| state.RedundancyZone = req.RedundancyZone |
| |
| return !present |
| } |
| |
| // Clear wipes all the information regarding peers in the follower states. |
| func (s *FollowerStates) Clear() { |
| s.l.Lock() |
| for i := range s.followers { |
| delete(s.followers, i) |
| } |
| s.l.Unlock() |
| } |
| |
| // Delete the entry of a peer represented by the nodeID from follower states. |
| func (s *FollowerStates) Delete(nodeID string) { |
| s.l.Lock() |
| delete(s.followers, nodeID) |
| s.l.Unlock() |
| } |
| |
| // MinIndex returns the minimum raft index applied in the raft cluster. |
| func (s *FollowerStates) MinIndex() uint64 { |
| var min uint64 = math.MaxUint64 |
| minFunc := func(a, b uint64) uint64 { |
| if a > b { |
| return b |
| } |
| return a |
| } |
| |
| s.l.RLock() |
| for _, state := range s.followers { |
| min = minFunc(min, state.AppliedIndex) |
| } |
| s.l.RUnlock() |
| |
| if min == math.MaxUint64 { |
| return 0 |
| } |
| |
| return min |
| } |
| |
| // Ensure that the Delegate implements the ApplicationIntegration interface |
| var _ autopilot.ApplicationIntegration = (*Delegate)(nil) |
| |
| // Delegate is an implementation of autopilot.ApplicationIntegration interface. |
| // This is used by the autopilot library to retrieve information and to have |
| // application specific tasks performed. |
| type Delegate struct { |
| *RaftBackend |
| |
| // dl is a lock dedicated for guarding delegate's fields |
| dl sync.RWMutex |
| inflightRemovals map[raft.ServerID]bool |
| emptyVersionLogs map[raft.ServerID]struct{} |
| } |
| |
| func NewDelegate(b *RaftBackend) *Delegate { |
| return &Delegate{ |
| RaftBackend: b, |
| inflightRemovals: make(map[raft.ServerID]bool), |
| emptyVersionLogs: make(map[raft.ServerID]struct{}), |
| } |
| } |
| |
| // AutopilotConfig is called by the autopilot library to know the desired |
| // autopilot configuration. |
| func (d *Delegate) AutopilotConfig() *autopilot.Config { |
| d.l.RLock() |
| config := &autopilot.Config{ |
| CleanupDeadServers: d.autopilotConfig.CleanupDeadServers, |
| LastContactThreshold: d.autopilotConfig.LastContactThreshold, |
| MaxTrailingLogs: d.autopilotConfig.MaxTrailingLogs, |
| MinQuorum: d.autopilotConfig.MinQuorum, |
| ServerStabilizationTime: d.autopilotConfig.ServerStabilizationTime, |
| Ext: d.autopilotConfigExt(), |
| } |
| d.l.RUnlock() |
| return config |
| } |
| |
| // NotifyState is called by the autopilot library whenever there is a state |
| // change. We update a few metrics when this happens. |
| func (d *Delegate) NotifyState(state *autopilot.State) { |
| if d.raft.State() == raft.Leader { |
| metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(state.FailureTolerance)) |
| if state.Healthy { |
| metrics.SetGauge([]string{"autopilot", "healthy"}, 1) |
| } else { |
| metrics.SetGauge([]string{"autopilot", "healthy"}, 0) |
| } |
| |
| for id, state := range state.Servers { |
| labels := []metrics.Label{ |
| { |
| Name: "node_id", |
| Value: string(id), |
| }, |
| } |
| if state.Health.Healthy { |
| metrics.SetGaugeWithLabels([]string{"autopilot", "node", "healthy"}, 1, labels) |
| } else { |
| metrics.SetGaugeWithLabels([]string{"autopilot", "node", "healthy"}, 0, labels) |
| } |
| } |
| } |
| } |
| |
| // FetchServerStats is called by the autopilot library to retrieve information |
| // about all the nodes in the raft cluster. |
| func (d *Delegate) FetchServerStats(ctx context.Context, servers map[raft.ServerID]*autopilot.Server) map[raft.ServerID]*autopilot.ServerStats { |
| ret := make(map[raft.ServerID]*autopilot.ServerStats) |
| |
| d.l.RLock() |
| followerStates := d.followerStates |
| d.l.RUnlock() |
| |
| followerStates.l.RLock() |
| defer followerStates.l.RUnlock() |
| |
| now := time.Now() |
| for id, followerState := range followerStates.followers { |
| ret[raft.ServerID(id)] = &autopilot.ServerStats{ |
| LastContact: now.Sub(followerState.LastHeartbeat), |
| LastTerm: followerState.LastTerm, |
| LastIndex: followerState.AppliedIndex, |
| } |
| } |
| |
| leaderState, _ := d.fsm.LatestState() |
| ret[raft.ServerID(d.localID)] = &autopilot.ServerStats{ |
| LastTerm: leaderState.Term, |
| LastIndex: leaderState.Index, |
| } |
| |
| return ret |
| } |
| |
| // KnownServers is called by the autopilot library to know the status of each |
| // node in the raft cluster. If the application thinks that certain nodes left, |
| // it is here that we let the autopilot library know of the same. |
| func (d *Delegate) KnownServers() map[raft.ServerID]*autopilot.Server { |
| d.l.RLock() |
| defer d.l.RUnlock() |
| future := d.raft.GetConfiguration() |
| if err := future.Error(); err != nil { |
| d.logger.Error("failed to get raft configuration when computing known servers", "error", err) |
| return nil |
| } |
| |
| apServerStates := d.autopilot.GetState().Servers |
| servers := future.Configuration().Servers |
| serverIDs := make([]string, 0, len(servers)) |
| for _, server := range servers { |
| serverIDs = append(serverIDs, string(server.ID)) |
| } |
| |
| d.followerStates.l.RLock() |
| defer d.followerStates.l.RUnlock() |
| |
| ret := make(map[raft.ServerID]*autopilot.Server) |
| for id, state := range d.followerStates.followers { |
| // If the server is not in raft configuration, even if we received a follower |
| // heartbeat, it shouldn't be a known server for autopilot. |
| if !strutil.StrListContains(serverIDs, id) { |
| continue |
| } |
| |
| // If version isn't found in the state, fake it using the version from the leader so that autopilot |
| // doesn't demote the node to a non-voter, just because of a missed heartbeat. |
| currentServerID := raft.ServerID(id) |
| followerVersion := state.Version |
| leaderVersion := d.effectiveSDKVersion |
| d.dl.Lock() |
| if followerVersion == "" { |
| if _, ok := d.emptyVersionLogs[currentServerID]; !ok { |
| d.logger.Trace("received empty Vault version in heartbeat state. faking it with the leader version for now", "id", id, "leader version", leaderVersion) |
| d.emptyVersionLogs[currentServerID] = struct{}{} |
| } |
| followerVersion = leaderVersion |
| } else { |
| delete(d.emptyVersionLogs, currentServerID) |
| } |
| d.dl.Unlock() |
| |
| server := &autopilot.Server{ |
| ID: currentServerID, |
| Name: id, |
| RaftVersion: raft.ProtocolVersionMax, |
| Meta: d.meta(state), |
| Version: followerVersion, |
| Ext: d.autopilotServerExt(state), |
| } |
| |
| // As KnownServers is a delegate called by autopilot let's check if we already |
| // had this data in the correct format and use it. If we don't (which sounds a |
| // bit sad, unless this ISN'T a voter) then as a fail-safe, let's try what we've |
| // done elsewhere in code to check the desired suffrage and manually set NodeType |
| // based on whether that's a voter or not. If we don't do either of these |
| // things, NodeType isn't set which means technically it's not a voter. |
| // It shouldn't be a voter and end up in this state. |
| if apServerState, found := apServerStates[raft.ServerID(id)]; found && apServerState.Server.NodeType != "" { |
| server.NodeType = apServerState.Server.NodeType |
| } else if state.DesiredSuffrage == "voter" { |
| server.NodeType = autopilot.NodeVoter |
| } |
| |
| switch state.IsDead.Load() { |
| case true: |
| d.logger.Debug("informing autopilot that the node left", "id", id) |
| server.NodeStatus = autopilot.NodeLeft |
| default: |
| server.NodeStatus = autopilot.NodeAlive |
| } |
| |
| ret[raft.ServerID(id)] = server |
| } |
| |
| // Add the leader |
| ret[raft.ServerID(d.localID)] = &autopilot.Server{ |
| ID: raft.ServerID(d.localID), |
| Name: d.localID, |
| RaftVersion: raft.ProtocolVersionMax, |
| NodeStatus: autopilot.NodeAlive, |
| NodeType: autopilot.NodeVoter, // The leader must be a voter |
| Meta: d.meta(&FollowerState{ |
| UpgradeVersion: d.EffectiveVersion(), |
| RedundancyZone: d.RedundancyZone(), |
| }), |
| Version: d.effectiveSDKVersion, |
| Ext: d.autopilotServerExt(nil), |
| IsLeader: true, |
| } |
| |
| return ret |
| } |
| |
| // RemoveFailedServer is called by the autopilot library when it desires a node |
| // to be removed from the raft configuration. This function removes the node |
| // from the raft cluster and stops tracking its information in follower states. |
| // This function needs to return quickly. Hence removal is performed in a |
| // goroutine. |
| func (d *Delegate) RemoveFailedServer(server *autopilot.Server) { |
| go func() { |
| added := false |
| defer func() { |
| if added { |
| d.dl.Lock() |
| delete(d.inflightRemovals, server.ID) |
| d.dl.Unlock() |
| } |
| }() |
| |
| d.dl.Lock() |
| _, ok := d.inflightRemovals[server.ID] |
| if ok { |
| d.logger.Info("removal of dead server is already initiated", "id", server.ID) |
| d.dl.Unlock() |
| return |
| } |
| |
| added = true |
| d.inflightRemovals[server.ID] = true |
| d.dl.Unlock() |
| |
| d.logger.Info("removing dead server from raft configuration", "id", server.ID) |
| if future := d.raft.RemoveServer(server.ID, 0, 0); future.Error() != nil { |
| d.logger.Error("failed to remove server", "server_id", server.ID, "server_address", server.Address, "server_name", server.Name, "error", future.Error()) |
| return |
| } |
| |
| d.followerStates.Delete(string(server.ID)) |
| }() |
| } |
| |
| // SetFollowerStates sets the followerStates field in the backend to track peers |
| // in the raft cluster. |
| func (b *RaftBackend) SetFollowerStates(states *FollowerStates) { |
| b.l.Lock() |
| b.followerStates = states |
| b.l.Unlock() |
| } |
| |
| // SetAutopilotConfig updates the autopilot configuration in the backend. |
| func (b *RaftBackend) SetAutopilotConfig(config *AutopilotConfig) { |
| b.l.Lock() |
| b.autopilotConfig = config |
| b.logger.Info("updated autopilot configuration", "config", b.autopilotConfig) |
| b.l.Unlock() |
| } |
| |
| // AutopilotConfig returns the autopilot configuration in the backend. |
| func (b *RaftBackend) AutopilotConfig() *AutopilotConfig { |
| b.l.RLock() |
| defer b.l.RUnlock() |
| return b.autopilotConfig.Clone() |
| } |
| |
| func (b *RaftBackend) defaultAutopilotConfig() *AutopilotConfig { |
| return &AutopilotConfig{ |
| CleanupDeadServers: false, |
| LastContactThreshold: 10 * time.Second, |
| DeadServerLastContactThreshold: 24 * time.Hour, |
| MaxTrailingLogs: 1000, |
| ServerStabilizationTime: 10 * time.Second, |
| DisableUpgradeMigration: false, |
| UpgradeVersionTag: AutopilotUpgradeVersionTag, |
| RedundancyZoneTag: AutopilotRedundancyZoneTag, |
| } |
| } |
| |
| func (b *RaftBackend) AutopilotDisabled() bool { |
| b.l.RLock() |
| disabled := b.disableAutopilot |
| b.l.RUnlock() |
| return disabled |
| } |
| |
| func (b *RaftBackend) startFollowerHeartbeatTracker() { |
| b.l.RLock() |
| tickerCh := b.followerHeartbeatTicker.C |
| b.l.RUnlock() |
| |
| followerGauge := func(peerID string, suffix string, value float32) { |
| labels := []metrics.Label{ |
| { |
| Name: "peer_id", |
| Value: peerID, |
| }, |
| } |
| metrics.SetGaugeWithLabels([]string{"raft_storage", "follower", suffix}, value, labels) |
| } |
| for range tickerCh { |
| b.l.RLock() |
| if b.raft == nil { |
| // We could be racing with teardown, which will stop the ticker |
| // but that doesn't guarantee that we won't reach this line with a nil |
| // b.raft. |
| b.l.RUnlock() |
| return |
| } |
| b.followerStates.l.RLock() |
| myAppliedIndex := b.raft.AppliedIndex() |
| for peerID, state := range b.followerStates.followers { |
| timeSinceLastHeartbeat := time.Now().Sub(state.LastHeartbeat) / time.Millisecond |
| followerGauge(peerID, "last_heartbeat_ms", float32(timeSinceLastHeartbeat)) |
| followerGauge(peerID, "applied_index_delta", float32(myAppliedIndex-state.AppliedIndex)) |
| |
| if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 { |
| if state.LastHeartbeat.IsZero() || state.IsDead.Load() { |
| continue |
| } |
| now := time.Now() |
| if now.After(state.LastHeartbeat.Add(b.autopilotConfig.DeadServerLastContactThreshold)) { |
| state.IsDead.Store(true) |
| } |
| } |
| } |
| b.followerStates.l.RUnlock() |
| b.l.RUnlock() |
| } |
| } |
| |
| // StopAutopilot stops a running autopilot instance. This should only be called |
| // on the active node. |
| func (b *RaftBackend) StopAutopilot() { |
| b.l.Lock() |
| defer b.l.Unlock() |
| |
| if b.autopilot == nil { |
| return |
| } |
| b.autopilot.Stop() |
| b.autopilot = nil |
| b.followerHeartbeatTicker.Stop() |
| } |
| |
| // AutopilotState represents the health information retrieved from autopilot. |
| type AutopilotState struct { |
| Healthy bool `json:"healthy" mapstructure:"healthy"` |
| FailureTolerance int `json:"failure_tolerance" mapstructure:"failure_tolerance"` |
| Servers map[string]*AutopilotServer `json:"servers" mapstructure:"servers"` |
| Leader string `json:"leader" mapstructure:"leader"` |
| Voters []string `json:"voters" mapstructure:"voters"` |
| NonVoters []string `json:"non_voters,omitempty" mapstructure:"non_voters,omitempty"` |
| RedundancyZones map[string]AutopilotZone `json:"redundancy_zones,omitempty" mapstructure:"redundancy_zones,omitempty"` |
| Upgrade *AutopilotUpgrade `json:"upgrade_info,omitempty" mapstructure:"upgrade_info,omitempty"` |
| OptimisticFailureTolerance int `json:"optimistic_failure_tolerance,omitempty" mapstructure:"optimistic_failure_tolerance,omitempty"` |
| } |
| |
| // AutopilotServer represents the health information of individual server node |
| // retrieved from autopilot. |
| type AutopilotServer struct { |
| ID string `json:"id" mapstructure:"id"` |
| Name string `json:"name" mapstructure:"name"` |
| Address string `json:"address" mapstructure:"address"` |
| NodeStatus string `json:"node_status" mapstructure:"node_status"` |
| LastContact *ReadableDuration `json:"last_contact" mapstructure:"last_contact"` |
| LastTerm uint64 `json:"last_term" mapstructure:"last_term"` |
| LastIndex uint64 `json:"last_index" mapstructure:"last_index"` |
| Healthy bool `json:"healthy" mapstructure:"healthy"` |
| StableSince time.Time `json:"stable_since" mapstructure:"stable_since"` |
| Status string `json:"status" mapstructure:"status"` |
| Version string `json:"version" mapstructure:"version"` |
| RedundancyZone string `json:"redundancy_zone,omitempty" mapstructure:"redundancy_zone,omitempty"` |
| UpgradeVersion string `json:"upgrade_version,omitempty" mapstructure:"upgrade_version,omitempty"` |
| ReadReplica bool `json:"read_replica,omitempty" mapstructure:"read_replica,omitempty"` |
| NodeType string `json:"node_type,omitempty" mapstructure:"node_type,omitempty"` |
| } |
| |
| type AutopilotZone struct { |
| Servers []string `json:"servers,omitempty" mapstructure:"servers,omitempty"` |
| Voters []string `json:"voters,omitempty" mapstructure:"voters,omitempty"` |
| FailureTolerance int `json:"failure_tolerance,omitempty" mapstructure:"failure_tolerance,omitempty"` |
| } |
| |
| type AutopilotUpgrade struct { |
| Status string `json:"status" mapstructure:"status"` |
| TargetVersion string `json:"target_version,omitempty" mapstructure:"target_version,omitempty"` |
| TargetVersionVoters []string `json:"target_version_voters,omitempty" mapstructure:"target_version_voters,omitempty"` |
| TargetVersionNonVoters []string `json:"target_version_non_voters,omitempty" mapstructure:"target_version_non_voters,omitempty"` |
| TargetVersionReadReplicas []string `json:"target_version_read_replicas,omitempty" mapstructure:"target_version_read_replicas,omitempty"` |
| OtherVersionVoters []string `json:"other_version_voters,omitempty" mapstructure:"other_version_voters,omitempty"` |
| OtherVersionNonVoters []string `json:"other_version_non_voters,omitempty" mapstructure:"other_version_non_voters,omitempty"` |
| OtherVersionReadReplicas []string `json:"other_version_read_replicas,omitempty" mapstructure:"other_version_read_replicas,omitempty"` |
| RedundancyZones map[string]AutopilotZoneUpgradeVersions `json:"redundancy_zones,omitempty" mapstructure:"redundancy_zones,omitempty"` |
| } |
| |
| type AutopilotZoneUpgradeVersions struct { |
| TargetVersionVoters []string `json:"target_version_voters,omitempty" mapstructure:"target_version_voters,omitempty"` |
| TargetVersionNonVoters []string `json:"target_version_non_voters,omitempty" mapstructure:"target_version_non_voters,omitempty"` |
| OtherVersionVoters []string `json:"other_version_voters,omitempty" mapstructure:"other_version_voters,omitempty"` |
| OtherVersionNonVoters []string `json:"other_version_non_voters,omitempty" mapstructure:"other_version_non_voters,omitempty"` |
| } |
| |
| // ReadableDuration is a duration type that is serialized to JSON in human readable format. |
| type ReadableDuration time.Duration |
| |
| func NewReadableDuration(dur time.Duration) *ReadableDuration { |
| d := ReadableDuration(dur) |
| return &d |
| } |
| |
| func (d *ReadableDuration) String() string { |
| return d.Duration().String() |
| } |
| |
| func (d *ReadableDuration) Duration() time.Duration { |
| if d == nil { |
| return time.Duration(0) |
| } |
| return time.Duration(*d) |
| } |
| |
| func (d *ReadableDuration) MarshalJSON() ([]byte, error) { |
| return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil |
| } |
| |
| func (d *ReadableDuration) UnmarshalJSON(raw []byte) (err error) { |
| if d == nil { |
| return fmt.Errorf("cannot unmarshal to nil pointer") |
| } |
| |
| var dur time.Duration |
| str := string(raw) |
| if len(str) >= 2 && str[0] == '"' && str[len(str)-1] == '"' { |
| // quoted string |
| dur, err = parseutil.ParseDurationSecond(str[1 : len(str)-1]) |
| if err != nil { |
| return err |
| } |
| } else { |
| // no quotes, not a string |
| v, err := strconv.ParseFloat(str, 64) |
| if err != nil { |
| return err |
| } |
| dur = time.Duration(v) |
| } |
| |
| *d = ReadableDuration(dur) |
| return nil |
| } |
| |
| func stringIDs(ids []raft.ServerID) []string { |
| out := make([]string, len(ids)) |
| for i, id := range ids { |
| out[i] = string(id) |
| } |
| return out |
| } |
| |
| func autopilotToAPIState(state *autopilot.State) (*AutopilotState, error) { |
| out := &AutopilotState{ |
| Healthy: state.Healthy, |
| FailureTolerance: state.FailureTolerance, |
| Leader: string(state.Leader), |
| Voters: stringIDs(state.Voters), |
| Servers: make(map[string]*AutopilotServer), |
| } |
| |
| for id, srv := range state.Servers { |
| aps, err := autopilotToAPIServer(srv) |
| if err != nil { |
| return nil, err |
| } |
| out.Servers[string(id)] = aps |
| } |
| |
| err := autopilotToAPIStateEnterprise(state, out) |
| if err != nil { |
| return nil, err |
| } |
| |
| return out, nil |
| } |
| |
| func autopilotToAPIServer(srv *autopilot.ServerState) (*AutopilotServer, error) { |
| apiSrv := &AutopilotServer{ |
| ID: string(srv.Server.ID), |
| Name: srv.Server.Name, |
| Address: string(srv.Server.Address), |
| NodeStatus: string(srv.Server.NodeStatus), |
| LastContact: NewReadableDuration(srv.Stats.LastContact), |
| LastTerm: srv.Stats.LastTerm, |
| LastIndex: srv.Stats.LastIndex, |
| Healthy: srv.Health.Healthy, |
| StableSince: srv.Health.StableSince, |
| Status: string(srv.State), |
| Version: srv.Server.Version, |
| NodeType: string(srv.Server.NodeType), |
| } |
| |
| err := autopilotToAPIServerEnterprise(&srv.Server, apiSrv) |
| if err != nil { |
| return nil, err |
| } |
| |
| return apiSrv, nil |
| } |
| |
| // GetAutopilotServerState retrieves raft cluster state from autopilot to |
| // return over the API. |
| func (b *RaftBackend) GetAutopilotServerState(ctx context.Context) (*AutopilotState, error) { |
| b.l.RLock() |
| defer b.l.RUnlock() |
| |
| if b.raft == nil { |
| return nil, errors.New("raft storage is not initialized") |
| } |
| |
| if b.autopilot == nil { |
| return nil, nil |
| } |
| |
| apState := b.autopilot.GetState() |
| if apState == nil { |
| return nil, nil |
| } |
| |
| return autopilotToAPIState(apState) |
| } |
| |
| func (b *RaftBackend) DisableAutopilot() { |
| b.l.Lock() |
| b.disableAutopilot = true |
| b.l.Unlock() |
| } |
| |
| // SetupAutopilot gathers information required to configure autopilot and starts |
| // it. If autopilot is disabled, this function does nothing. |
| func (b *RaftBackend) SetupAutopilot(ctx context.Context, storageConfig *AutopilotConfig, followerStates *FollowerStates, disable bool) { |
| b.l.Lock() |
| if disable || os.Getenv("VAULT_RAFT_AUTOPILOT_DISABLE") != "" { |
| b.disableAutopilot = true |
| } |
| |
| if b.disableAutopilot { |
| b.logger.Info("disabling autopilot") |
| b.l.Unlock() |
| return |
| } |
| |
| // Start with a default config |
| b.autopilotConfig = b.defaultAutopilotConfig() |
| |
| // Merge the setting provided over the API |
| b.autopilotConfig.Merge(storageConfig) |
| |
| // Create the autopilot instance |
| options := []autopilot.Option{ |
| autopilot.WithLogger(b.logger), |
| autopilot.WithPromoter(b.autopilotPromoter()), |
| } |
| if b.autopilotReconcileInterval != 0 { |
| options = append(options, autopilot.WithReconcileInterval(b.autopilotReconcileInterval)) |
| } |
| if b.autopilotUpdateInterval != 0 { |
| options = append(options, autopilot.WithUpdateInterval(b.autopilotUpdateInterval)) |
| } |
| b.autopilot = autopilot.New(b.raft, NewDelegate(b), options...) |
| b.followerStates = followerStates |
| b.followerHeartbeatTicker = time.NewTicker(1 * time.Second) |
| |
| b.l.Unlock() |
| |
| b.logger.Info("starting autopilot", "config", b.autopilotConfig, "reconcile_interval", b.autopilotReconcileInterval) |
| b.autopilot.Start(ctx) |
| |
| go b.startFollowerHeartbeatTracker() |
| } |