| // Copyright (c) HashiCorp, Inc. |
| // SPDX-License-Identifier: MPL-2.0 |
| |
| package raft |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/hex" |
| "errors" |
| "fmt" |
| "io" |
| "os" |
| "path/filepath" |
| "strconv" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "github.com/armon/go-metrics" |
| "github.com/golang/protobuf/proto" |
| log "github.com/hashicorp/go-hclog" |
| "github.com/hashicorp/go-multierror" |
| "github.com/hashicorp/go-raftchunking" |
| "github.com/hashicorp/go-secure-stdlib/strutil" |
| "github.com/hashicorp/raft" |
| "github.com/hashicorp/vault/sdk/helper/jsonutil" |
| "github.com/hashicorp/vault/sdk/physical" |
| "github.com/hashicorp/vault/sdk/plugin/pb" |
| bolt "go.etcd.io/bbolt" |
| ) |
| |
| const ( |
| deleteOp uint32 = 1 << iota |
| putOp |
| restoreCallbackOp |
| getOp |
| |
| chunkingPrefix = "raftchunking/" |
| databaseFilename = "vault.db" |
| ) |
| |
| var ( |
| // dataBucketName is the value we use for the bucket |
| dataBucketName = []byte("data") |
| configBucketName = []byte("config") |
| latestIndexKey = []byte("latest_indexes") |
| latestConfigKey = []byte("latest_config") |
| localNodeConfigKey = []byte("local_node_config") |
| ) |
| |
| // Verify FSM satisfies the correct interfaces |
| var ( |
| _ physical.Backend = (*FSM)(nil) |
| _ physical.Transactional = (*FSM)(nil) |
| _ raft.FSM = (*FSM)(nil) |
| _ raft.BatchingFSM = (*FSM)(nil) |
| ) |
| |
| type restoreCallback func(context.Context) error |
| |
| type FSMEntry struct { |
| Key string |
| Value []byte |
| } |
| |
| func (f *FSMEntry) String() string { |
| return fmt.Sprintf("Key: %s. Value: %s", f.Key, hex.EncodeToString(f.Value)) |
| } |
| |
| // FSMApplyResponse is returned from an FSM apply. It indicates if the apply was |
| // successful or not. EntryMap contains the keys/values from the Get operations. |
| type FSMApplyResponse struct { |
| Success bool |
| EntrySlice []*FSMEntry |
| } |
| |
| // FSM is Vault's primary state storage. It writes updates to a bolt db file |
| // that lives on local disk. FSM implements raft.FSM and physical.Backend |
| // interfaces. |
| type FSM struct { |
| // latestIndex and latestTerm must stay at the top of this struct to be |
| // properly 64-bit aligned. |
| |
| // latestIndex and latestTerm are the term and index of the last log we |
| // received |
| latestIndex *uint64 |
| latestTerm *uint64 |
| // latestConfig is the latest server configuration we've seen |
| latestConfig atomic.Value |
| |
| l sync.RWMutex |
| path string |
| logger log.Logger |
| noopRestore bool |
| |
| // applyCallback is used to control the pace of applies in tests |
| applyCallback func() |
| |
| db *bolt.DB |
| |
| // retoreCb is called after we've restored a snapshot |
| restoreCb restoreCallback |
| |
| chunker *raftchunking.ChunkingBatchingFSM |
| |
| localID string |
| desiredSuffrage string |
| unknownOpTypes sync.Map |
| } |
| |
| // NewFSM constructs a FSM using the given directory |
| func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) { |
| // Initialize the latest term, index, and config values |
| latestTerm := new(uint64) |
| latestIndex := new(uint64) |
| latestConfig := atomic.Value{} |
| atomic.StoreUint64(latestTerm, 0) |
| atomic.StoreUint64(latestIndex, 0) |
| latestConfig.Store((*ConfigurationValue)(nil)) |
| |
| f := &FSM{ |
| path: path, |
| logger: logger, |
| |
| latestTerm: latestTerm, |
| latestIndex: latestIndex, |
| latestConfig: latestConfig, |
| // Assume that the default intent is to join as as voter. This will be updated |
| // when this node joins a cluster with a different suffrage, or during cluster |
| // setup if this is already part of a cluster with a desired suffrage. |
| desiredSuffrage: "voter", |
| localID: localID, |
| } |
| |
| f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{ |
| f: f, |
| ctx: context.Background(), |
| }) |
| |
| dbPath := filepath.Join(path, databaseFilename) |
| f.l.Lock() |
| defer f.l.Unlock() |
| if err := f.openDBFile(dbPath); err != nil { |
| return nil, fmt.Errorf("failed to open bolt file: %w", err) |
| } |
| |
| return f, nil |
| } |
| |
| func (f *FSM) getDB() *bolt.DB { |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| return f.db |
| } |
| |
| // SetFSMDelay adds a delay to the FSM apply. This is used in tests to simulate |
| // a slow apply. |
| func (r *RaftBackend) SetFSMDelay(delay time.Duration) { |
| r.SetFSMApplyCallback(func() { time.Sleep(delay) }) |
| } |
| |
| func (r *RaftBackend) SetFSMApplyCallback(f func()) { |
| r.fsm.l.Lock() |
| r.fsm.applyCallback = f |
| r.fsm.l.Unlock() |
| } |
| |
| func (f *FSM) openDBFile(dbPath string) error { |
| if len(dbPath) == 0 { |
| return errors.New("can not open empty filename") |
| } |
| |
| st, err := os.Stat(dbPath) |
| switch { |
| case err != nil && os.IsNotExist(err): |
| case err != nil: |
| return fmt.Errorf("error checking raft FSM db file %q: %v", dbPath, err) |
| default: |
| perms := st.Mode() & os.ModePerm |
| if perms&0o077 != 0 { |
| f.logger.Warn("raft FSM db file has wider permissions than needed", |
| "needed", os.FileMode(0o600), "existing", perms) |
| } |
| } |
| |
| opts := boltOptions(dbPath) |
| start := time.Now() |
| boltDB, err := bolt.Open(dbPath, 0o600, opts) |
| if err != nil { |
| return err |
| } |
| elapsed := time.Now().Sub(start) |
| f.logger.Debug("time to open database", "elapsed", elapsed, "path", dbPath) |
| metrics.MeasureSince([]string{"raft_storage", "fsm", "open_db_file"}, start) |
| |
| err = boltDB.Update(func(tx *bolt.Tx) error { |
| // make sure we have the necessary buckets created |
| _, err := tx.CreateBucketIfNotExists(dataBucketName) |
| if err != nil { |
| return fmt.Errorf("failed to create bucket: %v", err) |
| } |
| b, err := tx.CreateBucketIfNotExists(configBucketName) |
| if err != nil { |
| return fmt.Errorf("failed to create bucket: %v", err) |
| } |
| |
| // Read in our latest index and term and populate it inmemory |
| val := b.Get(latestIndexKey) |
| if val != nil { |
| var latest IndexValue |
| err := proto.Unmarshal(val, &latest) |
| if err != nil { |
| return err |
| } |
| |
| atomic.StoreUint64(f.latestTerm, latest.Term) |
| atomic.StoreUint64(f.latestIndex, latest.Index) |
| } |
| |
| // Read in our latest config and populate it inmemory |
| val = b.Get(latestConfigKey) |
| if val != nil { |
| var latest ConfigurationValue |
| err := proto.Unmarshal(val, &latest) |
| if err != nil { |
| return err |
| } |
| |
| f.latestConfig.Store(&latest) |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| f.db = boltDB |
| return nil |
| } |
| |
| func (f *FSM) Stats() bolt.Stats { |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| return f.db.Stats() |
| } |
| |
| func (f *FSM) Close() error { |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| return f.db.Close() |
| } |
| |
| func writeSnapshotMetaToDB(metadata *raft.SnapshotMeta, db *bolt.DB) error { |
| latestIndex := &IndexValue{ |
| Term: metadata.Term, |
| Index: metadata.Index, |
| } |
| indexBytes, err := proto.Marshal(latestIndex) |
| if err != nil { |
| return err |
| } |
| |
| protoConfig := raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration) |
| configBytes, err := proto.Marshal(protoConfig) |
| if err != nil { |
| return err |
| } |
| |
| err = db.Update(func(tx *bolt.Tx) error { |
| b, err := tx.CreateBucketIfNotExists(configBucketName) |
| if err != nil { |
| return err |
| } |
| |
| err = b.Put(latestConfigKey, configBytes) |
| if err != nil { |
| return err |
| } |
| |
| err = b.Put(latestIndexKey, indexBytes) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (f *FSM) localNodeConfig() (*LocalNodeConfigValue, error) { |
| var configBytes []byte |
| if err := f.db.View(func(tx *bolt.Tx) error { |
| value := tx.Bucket(configBucketName).Get(localNodeConfigKey) |
| if value != nil { |
| configBytes = make([]byte, len(value)) |
| copy(configBytes, value) |
| } |
| return nil |
| }); err != nil { |
| return nil, err |
| } |
| if configBytes == nil { |
| return nil, nil |
| } |
| |
| var lnConfig LocalNodeConfigValue |
| if configBytes != nil { |
| err := proto.Unmarshal(configBytes, &lnConfig) |
| if err != nil { |
| return nil, err |
| } |
| f.desiredSuffrage = lnConfig.DesiredSuffrage |
| return &lnConfig, nil |
| } |
| |
| return nil, nil |
| } |
| |
| func (f *FSM) DesiredSuffrage() string { |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| return f.desiredSuffrage |
| } |
| |
| func (f *FSM) upgradeLocalNodeConfig() error { |
| f.l.Lock() |
| defer f.l.Unlock() |
| |
| // Read the local node config |
| lnConfig, err := f.localNodeConfig() |
| if err != nil { |
| return err |
| } |
| |
| // Entry is already present. Get the suffrage value. |
| if lnConfig != nil { |
| f.desiredSuffrage = lnConfig.DesiredSuffrage |
| return nil |
| } |
| |
| // |
| // This is the upgrade case where there is no entry |
| // |
| |
| lnConfig = &LocalNodeConfigValue{} |
| |
| // Refer to the persisted latest raft config |
| config := f.latestConfig.Load().(*ConfigurationValue) |
| |
| // If there is no config, then this is a fresh node coming up. This could end up |
| // being a voter or non-voter. But by default assume that this is a voter. It |
| // will be changed if this node joins the cluster as a non-voter. |
| if config == nil { |
| f.desiredSuffrage = "voter" |
| lnConfig.DesiredSuffrage = f.desiredSuffrage |
| return f.persistDesiredSuffrage(lnConfig) |
| } |
| |
| // Get the last known suffrage of the node and assume that it is the desired |
| // suffrage. There is no better alternative here. |
| for _, srv := range config.Servers { |
| if srv.Id == f.localID { |
| switch srv.Suffrage { |
| case int32(raft.Nonvoter): |
| lnConfig.DesiredSuffrage = "non-voter" |
| default: |
| lnConfig.DesiredSuffrage = "voter" |
| } |
| // Bring the intent to the fsm instance. |
| f.desiredSuffrage = lnConfig.DesiredSuffrage |
| break |
| } |
| } |
| |
| return f.persistDesiredSuffrage(lnConfig) |
| } |
| |
| // recordSuffrage is called when a node successfully joins the cluster. This |
| // intent should land in the stored configuration. If the config isn't available |
| // yet, we still go ahead and store the intent in the fsm. During the next |
| // update to the configuration, this intent will be persisted. |
| func (f *FSM) recordSuffrage(desiredSuffrage string) error { |
| f.l.Lock() |
| defer f.l.Unlock() |
| |
| if err := f.persistDesiredSuffrage(&LocalNodeConfigValue{ |
| DesiredSuffrage: desiredSuffrage, |
| }); err != nil { |
| return err |
| } |
| |
| f.desiredSuffrage = desiredSuffrage |
| return nil |
| } |
| |
| func (f *FSM) persistDesiredSuffrage(lnconfig *LocalNodeConfigValue) error { |
| dsBytes, err := proto.Marshal(lnconfig) |
| if err != nil { |
| return err |
| } |
| |
| return f.db.Update(func(tx *bolt.Tx) error { |
| return tx.Bucket(configBucketName).Put(localNodeConfigKey, dsBytes) |
| }) |
| } |
| |
| func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error { |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| err := writeSnapshotMetaToDB(metadata, f.db) |
| if err != nil { |
| return err |
| } |
| |
| atomic.StoreUint64(f.latestIndex, metadata.Index) |
| atomic.StoreUint64(f.latestTerm, metadata.Term) |
| f.latestConfig.Store(raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration)) |
| |
| return nil |
| } |
| |
| // LatestState returns the latest index and configuration values we have seen on |
| // this FSM. |
| func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) { |
| return &IndexValue{ |
| Term: atomic.LoadUint64(f.latestTerm), |
| Index: atomic.LoadUint64(f.latestIndex), |
| }, f.latestConfig.Load().(*ConfigurationValue) |
| } |
| |
| // Delete deletes the given key from the bolt file. |
| func (f *FSM) Delete(ctx context.Context, path string) error { |
| defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now()) |
| |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| return f.db.Update(func(tx *bolt.Tx) error { |
| return tx.Bucket(dataBucketName).Delete([]byte(path)) |
| }) |
| } |
| |
| // Delete deletes the given key from the bolt file. |
| func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { |
| defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete_prefix"}, time.Now()) |
| |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| err := f.db.Update(func(tx *bolt.Tx) error { |
| // Assume bucket exists and has keys |
| c := tx.Bucket(dataBucketName).Cursor() |
| |
| prefixBytes := []byte(prefix) |
| for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() { |
| if err := c.Delete(); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| }) |
| |
| return err |
| } |
| |
| // Get retrieves the value at the given path from the bolt file. |
| func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { |
| // TODO: Remove this outdated metric name in an older release |
| defer metrics.MeasureSince([]string{"raft", "get"}, time.Now()) |
| defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now()) |
| |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| var valCopy []byte |
| var found bool |
| |
| err := f.db.View(func(tx *bolt.Tx) error { |
| value := tx.Bucket(dataBucketName).Get([]byte(path)) |
| if value != nil { |
| found = true |
| valCopy = make([]byte, len(value)) |
| copy(valCopy, value) |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| if !found { |
| return nil, nil |
| } |
| |
| return &physical.Entry{ |
| Key: path, |
| Value: valCopy, |
| }, nil |
| } |
| |
| // Put writes the given entry to the bolt file. |
| func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { |
| defer metrics.MeasureSince([]string{"raft_storage", "fsm", "put"}, time.Now()) |
| |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| // Start a write transaction. |
| return f.db.Update(func(tx *bolt.Tx) error { |
| return tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value) |
| }) |
| } |
| |
| // List retrieves the set of keys with the given prefix from the bolt file. |
| func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { |
| // TODO: Remove this outdated metric name in a future release |
| defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) |
| defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now()) |
| |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| var keys []string |
| |
| err := f.db.View(func(tx *bolt.Tx) error { |
| // Assume bucket exists and has keys |
| c := tx.Bucket(dataBucketName).Cursor() |
| |
| prefixBytes := []byte(prefix) |
| for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() { |
| key := string(k) |
| key = strings.TrimPrefix(key, prefix) |
| if i := strings.Index(key, "/"); i == -1 { |
| // Add objects only from the current 'folder' |
| keys = append(keys, key) |
| } else { |
| // Add truncated 'folder' paths |
| if len(keys) == 0 || keys[len(keys)-1] != key[:i+1] { |
| keys = append(keys, string(key[:i+1])) |
| } |
| } |
| } |
| |
| return nil |
| }) |
| |
| return keys, err |
| } |
| |
| // Transaction writes all the operations in the provided transaction to the bolt |
| // file. |
| func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| // Start a write transaction. |
| err := f.db.Update(func(tx *bolt.Tx) error { |
| b := tx.Bucket(dataBucketName) |
| for _, txn := range txns { |
| var err error |
| switch txn.Operation { |
| case physical.PutOperation: |
| err = b.Put([]byte(txn.Entry.Key), txn.Entry.Value) |
| case physical.DeleteOperation: |
| err = b.Delete([]byte(txn.Entry.Key)) |
| default: |
| return fmt.Errorf("%q is not a supported transaction operation", txn.Operation) |
| } |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| }) |
| |
| return err |
| } |
| |
| // ApplyBatch will apply a set of logs to the FSM. This is called from the raft |
| // library. |
| func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} { |
| numLogs := len(logs) |
| |
| if numLogs == 0 { |
| return []interface{}{} |
| } |
| |
| // We will construct one slice per log, each slice containing another slice of results from our get ops |
| entrySlices := make([][]*FSMEntry, 0, numLogs) |
| |
| // Do the unmarshalling first so we don't hold locks |
| var latestConfiguration *ConfigurationValue |
| commands := make([]interface{}, 0, numLogs) |
| for _, l := range logs { |
| switch l.Type { |
| case raft.LogCommand: |
| command := &LogData{} |
| err := proto.Unmarshal(l.Data, command) |
| if err != nil { |
| f.logger.Error("error proto unmarshaling log data", "error", err) |
| panic("error proto unmarshaling log data") |
| } |
| commands = append(commands, command) |
| case raft.LogConfiguration: |
| configuration := raft.DecodeConfiguration(l.Data) |
| config := raftConfigurationToProtoConfiguration(l.Index, configuration) |
| |
| commands = append(commands, config) |
| |
| // Update the latest configuration the fsm has received; we will |
| // store this after it has been committed to storage. |
| latestConfiguration = config |
| |
| default: |
| panic(fmt.Sprintf("got unexpected log type: %d", l.Type)) |
| } |
| } |
| |
| // Only advance latest pointer if this log has a higher index value than |
| // what we have seen in the past. |
| var logIndex []byte |
| var err error |
| latestIndex, _ := f.LatestState() |
| lastLog := logs[numLogs-1] |
| if latestIndex.Index < lastLog.Index { |
| logIndex, err = proto.Marshal(&IndexValue{ |
| Term: lastLog.Term, |
| Index: lastLog.Index, |
| }) |
| if err != nil { |
| f.logger.Error("unable to marshal latest index", "error", err) |
| panic("unable to marshal latest index") |
| } |
| } |
| |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| if f.applyCallback != nil { |
| f.applyCallback() |
| } |
| |
| err = f.db.Update(func(tx *bolt.Tx) error { |
| b := tx.Bucket(dataBucketName) |
| for _, commandRaw := range commands { |
| entrySlice := make([]*FSMEntry, 0) |
| switch command := commandRaw.(type) { |
| case *LogData: |
| for _, op := range command.Operations { |
| var err error |
| switch op.OpType { |
| case putOp: |
| err = b.Put([]byte(op.Key), op.Value) |
| case deleteOp: |
| err = b.Delete([]byte(op.Key)) |
| case getOp: |
| fsmEntry := &FSMEntry{ |
| Key: op.Key, |
| } |
| val := b.Get([]byte(op.Key)) |
| if len(val) > 0 { |
| newVal := make([]byte, len(val)) |
| copy(newVal, val) |
| fsmEntry.Value = newVal |
| } |
| entrySlice = append(entrySlice, fsmEntry) |
| case restoreCallbackOp: |
| if f.restoreCb != nil { |
| // Kick off the restore callback function in a go routine |
| go f.restoreCb(context.Background()) |
| } |
| default: |
| if _, ok := f.unknownOpTypes.Load(op.OpType); !ok { |
| f.logger.Error("unsupported transaction operation", "op", op.OpType) |
| f.unknownOpTypes.Store(op.OpType, struct{}{}) |
| } |
| } |
| if err != nil { |
| return err |
| } |
| } |
| |
| case *ConfigurationValue: |
| b := tx.Bucket(configBucketName) |
| configBytes, err := proto.Marshal(command) |
| if err != nil { |
| return err |
| } |
| if err := b.Put(latestConfigKey, configBytes); err != nil { |
| return err |
| } |
| } |
| |
| entrySlices = append(entrySlices, entrySlice) |
| } |
| |
| if len(logIndex) > 0 { |
| b := tx.Bucket(configBucketName) |
| err = b.Put(latestIndexKey, logIndex) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| f.logger.Error("failed to store data", "error", err) |
| panic("failed to store data") |
| } |
| |
| // If we advanced the latest value, update the in-memory representation too. |
| if len(logIndex) > 0 { |
| atomic.StoreUint64(f.latestTerm, lastLog.Term) |
| atomic.StoreUint64(f.latestIndex, lastLog.Index) |
| } |
| |
| // If one or more configuration changes were processed, store the latest one. |
| if latestConfiguration != nil { |
| f.latestConfig.Store(latestConfiguration) |
| } |
| |
| // Build the responses. The logs array is used here to ensure we reply to |
| // all command values; even if they are not of the types we expect. This |
| // should futureproof this function from more log types being provided. |
| resp := make([]interface{}, numLogs) |
| for i := range logs { |
| resp[i] = &FSMApplyResponse{ |
| Success: true, |
| EntrySlice: entrySlices[i], |
| } |
| } |
| |
| return resp |
| } |
| |
| // Apply will apply a log value to the FSM. This is called from the raft |
| // library. |
| func (f *FSM) Apply(log *raft.Log) interface{} { |
| return f.ApplyBatch([]*raft.Log{log})[0] |
| } |
| |
| type writeErrorCloser interface { |
| io.WriteCloser |
| CloseWithError(error) error |
| } |
| |
| // writeTo will copy the FSM's content to a remote sink. The data is written |
| // twice, once for use in determining various metadata attributes of the dataset |
| // (size, checksum, etc) and a second for the sink of the data. We also use a |
| // proto delimited writer so we can stream proto messages to the sink. |
| func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) { |
| defer metrics.MeasureSince([]string{"raft_storage", "fsm", "write_snapshot"}, time.Now()) |
| |
| protoWriter := NewDelimitedWriter(sink) |
| metadataProtoWriter := NewDelimitedWriter(metaSink) |
| |
| f.l.RLock() |
| defer f.l.RUnlock() |
| |
| err := f.db.View(func(tx *bolt.Tx) error { |
| b := tx.Bucket(dataBucketName) |
| |
| c := b.Cursor() |
| |
| // Do the first scan of the data for metadata purposes. |
| for k, v := c.First(); k != nil; k, v = c.Next() { |
| err := metadataProtoWriter.WriteMsg(&pb.StorageEntry{ |
| Key: string(k), |
| Value: v, |
| }) |
| if err != nil { |
| metaSink.CloseWithError(err) |
| return err |
| } |
| } |
| metaSink.Close() |
| |
| // Do the second scan for copy purposes. |
| for k, v := c.First(); k != nil; k, v = c.Next() { |
| err := protoWriter.WriteMsg(&pb.StorageEntry{ |
| Key: string(k), |
| Value: v, |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| }) |
| sink.CloseWithError(err) |
| } |
| |
| // Snapshot implements the FSM interface. It returns a noop snapshot object. |
| func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { |
| return &noopSnapshotter{ |
| fsm: f, |
| }, nil |
| } |
| |
| // SetNoopRestore is used to disable restore operations on raft startup. Because |
| // we are using persistent storage in our FSM we do not need to issue a restore |
| // on startup. |
| func (f *FSM) SetNoopRestore(enabled bool) { |
| f.l.Lock() |
| f.noopRestore = enabled |
| f.l.Unlock() |
| } |
| |
| // Restore installs a new snapshot from the provided reader. It does an atomic |
| // rename of the snapshot file into the database filepath. While a restore is |
| // happening the FSM is locked and no writes or reads can be performed. |
| func (f *FSM) Restore(r io.ReadCloser) error { |
| defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now()) |
| |
| if f.noopRestore { |
| return nil |
| } |
| |
| snapshotInstaller, ok := r.(*boltSnapshotInstaller) |
| if !ok { |
| wrapper, ok := r.(raft.ReadCloserWrapper) |
| if !ok { |
| return fmt.Errorf("expected ReadCloserWrapper object, got: %T", r) |
| } |
| snapshotInstallerRaw := wrapper.WrappedReadCloser() |
| snapshotInstaller, ok = snapshotInstallerRaw.(*boltSnapshotInstaller) |
| if !ok { |
| return fmt.Errorf("expected snapshot installer object, got: %T", snapshotInstallerRaw) |
| } |
| } |
| |
| f.l.Lock() |
| defer f.l.Unlock() |
| |
| // Cache the local node config before closing the db file |
| lnConfig, err := f.localNodeConfig() |
| if err != nil { |
| return err |
| } |
| |
| // Close the db file |
| if err := f.db.Close(); err != nil { |
| f.logger.Error("failed to close database file", "error", err) |
| return err |
| } |
| |
| dbPath := filepath.Join(f.path, databaseFilename) |
| |
| f.logger.Info("installing snapshot to FSM") |
| |
| // Install the new boltdb file |
| var retErr *multierror.Error |
| if err := snapshotInstaller.Install(dbPath); err != nil { |
| f.logger.Error("failed to install snapshot", "error", err) |
| retErr = multierror.Append(retErr, fmt.Errorf("failed to install snapshot database: %w", err)) |
| } else { |
| f.logger.Info("snapshot installed") |
| } |
| |
| // Open the db file. We want to do this regardless of if the above install |
| // worked. If the install failed we should try to open the old DB file. |
| if err := f.openDBFile(dbPath); err != nil { |
| f.logger.Error("failed to open new database file", "error", err) |
| retErr = multierror.Append(retErr, fmt.Errorf("failed to open new bolt file: %w", err)) |
| } |
| |
| // Handle local node config restore. lnConfig should not be nil here, but |
| // adding the nil check anyways for safety. |
| if lnConfig != nil { |
| // Persist the local node config on the restored fsm. |
| if err := f.persistDesiredSuffrage(lnConfig); err != nil { |
| f.logger.Error("failed to persist local node config from before the restore", "error", err) |
| retErr = multierror.Append(retErr, fmt.Errorf("failed to persist local node config from before the restore: %w", err)) |
| } |
| } |
| |
| return retErr.ErrorOrNil() |
| } |
| |
| // noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything |
| // since our SnapshotStore reads data out of the FSM on Open(). |
| type noopSnapshotter struct { |
| fsm *FSM |
| } |
| |
| // Persist implements the fsm.Snapshot interface. It doesn't need to persist any |
| // state data, but it does persist the raft metadata. This is necessary so we |
| // can be sure to capture indexes for operation types that are not sent to the |
| // FSM. |
| func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error { |
| boltSnapshotSink := sink.(*BoltSnapshotSink) |
| |
| // We are processing a snapshot, fastforward the index, term, and |
| // configuration to the latest seen by the raft system. |
| if err := s.fsm.witnessSnapshot(&boltSnapshotSink.meta); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| // Release doesn't do anything. |
| func (s *noopSnapshotter) Release() {} |
| |
| // raftConfigurationToProtoConfiguration converts a raft configuration object to |
| // a proto value. |
| func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue { |
| servers := make([]*Server, len(configuration.Servers)) |
| for i, s := range configuration.Servers { |
| servers[i] = &Server{ |
| Suffrage: int32(s.Suffrage), |
| Id: string(s.ID), |
| Address: string(s.Address), |
| } |
| } |
| return &ConfigurationValue{ |
| Index: index, |
| Servers: servers, |
| } |
| } |
| |
| // protoConfigurationToRaftConfiguration converts a proto configuration object |
| // to a raft object. |
| func protoConfigurationToRaftConfiguration(configuration *ConfigurationValue) (uint64, raft.Configuration) { |
| servers := make([]raft.Server, len(configuration.Servers)) |
| for i, s := range configuration.Servers { |
| servers[i] = raft.Server{ |
| Suffrage: raft.ServerSuffrage(s.Suffrage), |
| ID: raft.ServerID(s.Id), |
| Address: raft.ServerAddress(s.Address), |
| } |
| } |
| return configuration.Index, raft.Configuration{ |
| Servers: servers, |
| } |
| } |
| |
| type FSMChunkStorage struct { |
| f *FSM |
| ctx context.Context |
| } |
| |
| // chunkPaths returns a disk prefix and key given chunkinfo |
| func (f *FSMChunkStorage) chunkPaths(chunk *raftchunking.ChunkInfo) (string, string) { |
| prefix := fmt.Sprintf("%s%d/", chunkingPrefix, chunk.OpNum) |
| key := fmt.Sprintf("%s%d", prefix, chunk.SequenceNum) |
| return prefix, key |
| } |
| |
| func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error) { |
| b, err := jsonutil.EncodeJSON(chunk) |
| if err != nil { |
| return false, fmt.Errorf("error encoding chunk info: %w", err) |
| } |
| |
| prefix, key := f.chunkPaths(chunk) |
| |
| entry := &physical.Entry{ |
| Key: key, |
| Value: b, |
| } |
| |
| f.f.l.RLock() |
| defer f.f.l.RUnlock() |
| |
| // Start a write transaction. |
| done := new(bool) |
| if err := f.f.db.Update(func(tx *bolt.Tx) error { |
| if err := tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value); err != nil { |
| return fmt.Errorf("error storing chunk info: %w", err) |
| } |
| |
| // Assume bucket exists and has keys |
| c := tx.Bucket(dataBucketName).Cursor() |
| |
| var keys []string |
| prefixBytes := []byte(prefix) |
| for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() { |
| key := string(k) |
| key = strings.TrimPrefix(key, prefix) |
| if i := strings.Index(key, "/"); i == -1 { |
| // Add objects only from the current 'folder' |
| keys = append(keys, key) |
| } else { |
| // Add truncated 'folder' paths |
| keys = strutil.AppendIfMissing(keys, string(key[:i+1])) |
| } |
| } |
| |
| *done = uint32(len(keys)) == chunk.NumChunks |
| |
| return nil |
| }); err != nil { |
| return false, err |
| } |
| |
| return *done, nil |
| } |
| |
| func (f *FSMChunkStorage) FinalizeOp(opNum uint64) ([]*raftchunking.ChunkInfo, error) { |
| ret, err := f.chunksForOpNum(opNum) |
| if err != nil { |
| return nil, fmt.Errorf("error getting chunks for op keys: %w", err) |
| } |
| |
| prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum}) |
| if err := f.f.DeletePrefix(f.ctx, prefix); err != nil { |
| return nil, fmt.Errorf("error deleting prefix after op finalization: %w", err) |
| } |
| |
| return ret, nil |
| } |
| |
| func (f *FSMChunkStorage) chunksForOpNum(opNum uint64) ([]*raftchunking.ChunkInfo, error) { |
| prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum}) |
| |
| opChunkKeys, err := f.f.List(f.ctx, prefix) |
| if err != nil { |
| return nil, fmt.Errorf("error fetching op chunk keys: %w", err) |
| } |
| |
| if len(opChunkKeys) == 0 { |
| return nil, nil |
| } |
| |
| var ret []*raftchunking.ChunkInfo |
| |
| for _, v := range opChunkKeys { |
| seqNum, err := strconv.ParseInt(v, 10, 64) |
| if err != nil { |
| return nil, fmt.Errorf("error converting seqnum to integer: %w", err) |
| } |
| |
| entry, err := f.f.Get(f.ctx, prefix+v) |
| if err != nil { |
| return nil, fmt.Errorf("error fetching chunkinfo: %w", err) |
| } |
| |
| var ci raftchunking.ChunkInfo |
| if err := jsonutil.DecodeJSON(entry.Value, &ci); err != nil { |
| return nil, fmt.Errorf("error decoding chunkinfo json: %w", err) |
| } |
| |
| if ret == nil { |
| ret = make([]*raftchunking.ChunkInfo, ci.NumChunks) |
| } |
| |
| ret[seqNum] = &ci |
| } |
| |
| return ret, nil |
| } |
| |
| func (f *FSMChunkStorage) GetChunks() (raftchunking.ChunkMap, error) { |
| opNums, err := f.f.List(f.ctx, chunkingPrefix) |
| if err != nil { |
| return nil, fmt.Errorf("error doing recursive list for chunk saving: %w", err) |
| } |
| |
| if len(opNums) == 0 { |
| return nil, nil |
| } |
| |
| ret := make(raftchunking.ChunkMap, len(opNums)) |
| for _, opNumStr := range opNums { |
| opNum, err := strconv.ParseInt(opNumStr, 10, 64) |
| if err != nil { |
| return nil, fmt.Errorf("error parsing op num during chunk saving: %w", err) |
| } |
| |
| opChunks, err := f.chunksForOpNum(uint64(opNum)) |
| if err != nil { |
| return nil, fmt.Errorf("error getting chunks for op keys during chunk saving: %w", err) |
| } |
| |
| ret[uint64(opNum)] = opChunks |
| } |
| |
| return ret, nil |
| } |
| |
| func (f *FSMChunkStorage) RestoreChunks(chunks raftchunking.ChunkMap) error { |
| if err := f.f.DeletePrefix(f.ctx, chunkingPrefix); err != nil { |
| return fmt.Errorf("error deleting prefix for chunk restoration: %w", err) |
| } |
| if len(chunks) == 0 { |
| return nil |
| } |
| |
| for opNum, opChunks := range chunks { |
| for _, chunk := range opChunks { |
| if chunk == nil { |
| continue |
| } |
| if chunk.OpNum != opNum { |
| return errors.New("unexpected op number in chunk") |
| } |
| if _, err := f.StoreChunk(chunk); err != nil { |
| return fmt.Errorf("error storing chunk during restoration: %w", err) |
| } |
| } |
| } |
| |
| return nil |
| } |