| // Copyright (c) HashiCorp, Inc. |
| // SPDX-License-Identifier: MPL-2.0 |
| |
| package raft |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "math" |
| "os" |
| "path/filepath" |
| "runtime" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| log "github.com/hashicorp/go-hclog" |
| "github.com/hashicorp/vault/sdk/plugin/pb" |
| "github.com/rboyer/safeio" |
| bolt "go.etcd.io/bbolt" |
| "go.uber.org/atomic" |
| |
| "github.com/hashicorp/raft" |
| ) |
| |
| const ( |
| // boltSnapshotID is the stable ID for any boltDB snapshot. Keeping the ID |
| // stable means there is only ever one bolt snapshot in the system |
| boltSnapshotID = "bolt-snapshot" |
| tmpSuffix = ".tmp" |
| snapPath = "snapshots" |
| ) |
| |
| // BoltSnapshotStore implements the SnapshotStore interface and allows snapshots |
| // to be stored in BoltDB files on local disk. Since we always have an up to |
| // date FSM we use a special snapshot ID to indicate that the snapshot can be |
| // pulled from the BoltDB file that is currently backing the FSM. This allows us |
| // to provide just-in-time snapshots without doing incremental data dumps. |
| // |
| // When a snapshot is being installed on the node we will Create and Write data |
| // to it. This will cause the snapshot store to create a new BoltDB file and |
| // write the snapshot data to it. Then, we can simply rename the snapshot to the |
| // FSM's filename. This allows us to atomically install the snapshot and |
| // reduces the amount of disk i/o. Older snapshots are reaped on startup and |
| // before each subsequent snapshot write. This ensures we only have one snapshot |
| // on disk at a time. |
| type BoltSnapshotStore struct { |
| // path is the directory in which to store file based snapshots |
| path string |
| |
| // We hold a copy of the FSM so we can stream snapshots straight out of the |
| // database. |
| fsm *FSM |
| |
| logger log.Logger |
| } |
| |
| // BoltSnapshotSink implements SnapshotSink optionally choosing to write to a |
| // file. |
| type BoltSnapshotSink struct { |
| store *BoltSnapshotStore |
| logger log.Logger |
| meta raft.SnapshotMeta |
| trans raft.Transport |
| |
| // These fields will be used if we are writing a snapshot (vs. reading |
| // one) |
| written atomic.Bool |
| writer io.WriteCloser |
| writeError error |
| dir string |
| parentDir string |
| doneWritingCh chan struct{} |
| |
| l sync.Mutex |
| closed bool |
| } |
| |
| // NewBoltSnapshotStore creates a new BoltSnapshotStore based |
| // on a base directory. |
| func NewBoltSnapshotStore(base string, logger log.Logger, fsm *FSM) (*BoltSnapshotStore, error) { |
| if logger == nil { |
| return nil, fmt.Errorf("no logger provided") |
| } |
| |
| // Ensure our path exists |
| path := filepath.Join(base, snapPath) |
| if err := os.MkdirAll(path, 0o700); err != nil && !os.IsExist(err) { |
| return nil, fmt.Errorf("snapshot path not accessible: %v", err) |
| } |
| |
| // Setup the store |
| store := &BoltSnapshotStore{ |
| logger: logger, |
| fsm: fsm, |
| path: path, |
| } |
| |
| // Cleanup any old or failed snapshots on startup. |
| if err := store.ReapSnapshots(); err != nil { |
| return nil, err |
| } |
| |
| return store, nil |
| } |
| |
| // Create is used to start a new snapshot |
| func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) { |
| // We only support version 1 snapshots at this time. |
| if version != 1 { |
| return nil, fmt.Errorf("unsupported snapshot version %d", version) |
| } |
| |
| // Create the sink |
| sink := &BoltSnapshotSink{ |
| store: f, |
| logger: f.logger, |
| meta: raft.SnapshotMeta{ |
| Version: version, |
| ID: boltSnapshotID, |
| Index: index, |
| Term: term, |
| Configuration: configuration, |
| ConfigurationIndex: configurationIndex, |
| }, |
| trans: trans, |
| } |
| |
| return sink, nil |
| } |
| |
| // List returns available snapshots in the store. It only returns bolt |
| // snapshots. No snapshot will be returned if there are no indexes in the |
| // FSM. |
| func (f *BoltSnapshotStore) List() ([]*raft.SnapshotMeta, error) { |
| meta, err := f.getMetaFromFSM() |
| if err != nil { |
| return nil, err |
| } |
| |
| // If we haven't seen any data yet do not return a snapshot |
| if meta.Index == 0 { |
| return nil, nil |
| } |
| |
| return []*raft.SnapshotMeta{meta}, nil |
| } |
| |
| // getBoltSnapshotMeta returns the fsm's latest state and configuration. |
| func (f *BoltSnapshotStore) getMetaFromFSM() (*raft.SnapshotMeta, error) { |
| latestIndex, latestConfig := f.fsm.LatestState() |
| meta := &raft.SnapshotMeta{ |
| Version: 1, |
| ID: boltSnapshotID, |
| Index: latestIndex.Index, |
| Term: latestIndex.Term, |
| } |
| |
| if latestConfig != nil { |
| meta.ConfigurationIndex, meta.Configuration = protoConfigurationToRaftConfiguration(latestConfig) |
| } |
| |
| return meta, nil |
| } |
| |
| // Open takes a snapshot ID and returns a ReadCloser for that snapshot. |
| func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) { |
| if id == boltSnapshotID { |
| return f.openFromFSM() |
| } |
| |
| return f.openFromFile(id) |
| } |
| |
| func (f *BoltSnapshotStore) openFromFSM() (*raft.SnapshotMeta, io.ReadCloser, error) { |
| meta, err := f.getMetaFromFSM() |
| if err != nil { |
| return nil, nil, err |
| } |
| // If we don't have any data return an error |
| if meta.Index == 0 { |
| return nil, nil, errors.New("no snapshot data") |
| } |
| |
| // Stream data out of the FSM to calculate the size |
| readCloser, writeCloser := io.Pipe() |
| metaReadCloser, metaWriteCloser := io.Pipe() |
| go func() { |
| f.fsm.writeTo(context.Background(), metaWriteCloser, writeCloser) |
| }() |
| |
| // Compute the size |
| n, err := io.Copy(ioutil.Discard, metaReadCloser) |
| if err != nil { |
| f.logger.Error("failed to read state file", "error", err) |
| metaReadCloser.Close() |
| readCloser.Close() |
| return nil, nil, err |
| } |
| |
| meta.Size = n |
| metaReadCloser.Close() |
| |
| return meta, readCloser, nil |
| } |
| |
| func (f *BoltSnapshotStore) getMetaFromDB(id string) (*raft.SnapshotMeta, error) { |
| if len(id) == 0 { |
| return nil, errors.New("can not open empty snapshot ID") |
| } |
| |
| filename := filepath.Join(f.path, id, databaseFilename) |
| boltDB, err := bolt.Open(filename, 0o600, &bolt.Options{Timeout: 1 * time.Second}) |
| if err != nil { |
| return nil, err |
| } |
| defer boltDB.Close() |
| |
| meta := &raft.SnapshotMeta{ |
| Version: 1, |
| ID: id, |
| } |
| |
| err = boltDB.View(func(tx *bolt.Tx) error { |
| b := tx.Bucket(configBucketName) |
| val := b.Get(latestIndexKey) |
| if val != nil { |
| var snapshotIndexes IndexValue |
| err := proto.Unmarshal(val, &snapshotIndexes) |
| if err != nil { |
| return err |
| } |
| |
| meta.Index = snapshotIndexes.Index |
| meta.Term = snapshotIndexes.Term |
| } |
| |
| // Read in our latest config and populate it inmemory |
| val = b.Get(latestConfigKey) |
| if val != nil { |
| var config ConfigurationValue |
| err := proto.Unmarshal(val, &config) |
| if err != nil { |
| return err |
| } |
| |
| meta.ConfigurationIndex, meta.Configuration = protoConfigurationToRaftConfiguration(&config) |
| } |
| return nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| |
| return meta, nil |
| } |
| |
| func (f *BoltSnapshotStore) openFromFile(id string) (*raft.SnapshotMeta, io.ReadCloser, error) { |
| meta, err := f.getMetaFromDB(id) |
| if err != nil { |
| return nil, nil, err |
| } |
| |
| filename := filepath.Join(f.path, id, databaseFilename) |
| installer := &boltSnapshotInstaller{ |
| meta: meta, |
| ReadCloser: ioutil.NopCloser(strings.NewReader(filename)), |
| filename: filename, |
| } |
| |
| return meta, installer, nil |
| } |
| |
| // ReapSnapshots reaps all snapshots. |
| func (f *BoltSnapshotStore) ReapSnapshots() error { |
| snapshots, err := ioutil.ReadDir(f.path) |
| switch { |
| case err == nil: |
| case os.IsNotExist(err): |
| return nil |
| default: |
| f.logger.Error("failed to scan snapshot directory", "error", err) |
| return err |
| } |
| |
| for _, snap := range snapshots { |
| // Ignore any files |
| if !snap.IsDir() { |
| continue |
| } |
| |
| // Warn about temporary snapshots, this indicates a previously failed |
| // snapshot attempt. We still want to clean these up. |
| dirName := snap.Name() |
| if strings.HasSuffix(dirName, tmpSuffix) { |
| f.logger.Warn("found temporary snapshot", "name", dirName) |
| } |
| |
| path := filepath.Join(f.path, dirName) |
| f.logger.Info("reaping snapshot", "path", path) |
| if err := os.RemoveAll(path); err != nil { |
| f.logger.Error("failed to reap snapshot", "path", snap.Name(), "error", err) |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| // ID returns the ID of the snapshot, can be used with Open() |
| // after the snapshot is finalized. |
| func (s *BoltSnapshotSink) ID() string { |
| s.l.Lock() |
| defer s.l.Unlock() |
| |
| return s.meta.ID |
| } |
| |
| func (s *BoltSnapshotSink) writeBoltDBFile() error { |
| // Create a new path |
| name := snapshotName(s.meta.Term, s.meta.Index) |
| path := filepath.Join(s.store.path, name+tmpSuffix) |
| s.logger.Info("creating new snapshot", "path", path) |
| |
| // Make the directory |
| if err := os.MkdirAll(path, 0o700); err != nil { |
| s.logger.Error("failed to make snapshot directory", "error", err) |
| return err |
| } |
| |
| // Create the BoltDB file |
| dbPath := filepath.Join(path, databaseFilename) |
| boltDB, err := bolt.Open(dbPath, 0o600, &bolt.Options{Timeout: 1 * time.Second}) |
| if err != nil { |
| return err |
| } |
| |
| // Write the snapshot metadata |
| if err := writeSnapshotMetaToDB(&s.meta, boltDB); err != nil { |
| return err |
| } |
| |
| // Set the snapshot ID to the generated name. |
| s.meta.ID = name |
| |
| // Create the done channel |
| s.doneWritingCh = make(chan struct{}) |
| |
| // Store the directories so we can commit the changes on success or abort |
| // them on failure. |
| s.dir = path |
| s.parentDir = s.store.path |
| |
| // Create a pipe so we pipe writes into the go routine below. |
| reader, writer := io.Pipe() |
| s.writer = writer |
| |
| // Start a go routine in charge of piping data from the snapshot's Write |
| // call to the delimtedreader and the BoltDB file. |
| go func() { |
| defer close(s.doneWritingCh) |
| defer boltDB.Close() |
| |
| // The delimted reader will parse full proto messages from the snapshot |
| // data. |
| protoReader := NewDelimitedReader(reader, math.MaxInt32) |
| defer protoReader.Close() |
| |
| var done bool |
| var keys int |
| entry := new(pb.StorageEntry) |
| for !done { |
| err := boltDB.Update(func(tx *bolt.Tx) error { |
| b, err := tx.CreateBucketIfNotExists(dataBucketName) |
| if err != nil { |
| return err |
| } |
| |
| // Commit in batches of 50k. Bolt holds all the data in memory and |
| // doesn't split the pages until commit so we do incremental writes. |
| for i := 0; i < 50000; i++ { |
| err := protoReader.ReadMsg(entry) |
| if err != nil { |
| if err == io.EOF { |
| done = true |
| return nil |
| } |
| return err |
| } |
| |
| err = b.Put([]byte(entry.Key), entry.Value) |
| if err != nil { |
| return err |
| } |
| keys += 1 |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| s.logger.Error("snapshot write: failed to write transaction", "error", err) |
| s.writeError = err |
| return |
| } |
| |
| s.logger.Trace("snapshot write: writing keys", "num_written", keys) |
| } |
| }() |
| |
| return nil |
| } |
| |
| // Write is used to append to the bolt file. The first call to write ensures we |
| // have the file created. |
| func (s *BoltSnapshotSink) Write(b []byte) (int, error) { |
| s.l.Lock() |
| defer s.l.Unlock() |
| |
| // If this is the first call to Write we need to setup the boltDB file and |
| // kickoff the pipeline write |
| if previouslyWritten := s.written.Swap(true); !previouslyWritten { |
| // Reap any old snapshots |
| if err := s.store.ReapSnapshots(); err != nil { |
| return 0, err |
| } |
| |
| if err := s.writeBoltDBFile(); err != nil { |
| return 0, err |
| } |
| } |
| |
| return s.writer.Write(b) |
| } |
| |
| // Close is used to indicate a successful end. |
| func (s *BoltSnapshotSink) Close() error { |
| s.l.Lock() |
| defer s.l.Unlock() |
| |
| // Make sure close is idempotent |
| if s.closed { |
| return nil |
| } |
| s.closed = true |
| |
| if s.writer != nil { |
| s.writer.Close() |
| <-s.doneWritingCh |
| |
| if s.writeError != nil { |
| // If we encountered an error while writing then we should remove |
| // the directory and return the error |
| _ = os.RemoveAll(s.dir) |
| return s.writeError |
| } |
| |
| // Move the directory into place |
| newPath := strings.TrimSuffix(s.dir, tmpSuffix) |
| |
| var err error |
| if runtime.GOOS != "windows" { |
| err = safeio.Rename(s.dir, newPath) |
| } else { |
| err = os.Rename(s.dir, newPath) |
| } |
| |
| if err != nil { |
| s.logger.Error("failed to move snapshot into place", "error", err) |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| // Cancel is used to indicate an unsuccessful end. |
| func (s *BoltSnapshotSink) Cancel() error { |
| s.l.Lock() |
| defer s.l.Unlock() |
| |
| // Make sure close is idempotent |
| if s.closed { |
| return nil |
| } |
| s.closed = true |
| |
| if s.writer != nil { |
| s.writer.Close() |
| <-s.doneWritingCh |
| |
| // Attempt to remove all artifacts |
| return os.RemoveAll(s.dir) |
| } |
| |
| return nil |
| } |
| |
| type boltSnapshotInstaller struct { |
| io.ReadCloser |
| meta *raft.SnapshotMeta |
| filename string |
| } |
| |
| func (i *boltSnapshotInstaller) Filename() string { |
| return i.filename |
| } |
| |
| func (i *boltSnapshotInstaller) Metadata() *raft.SnapshotMeta { |
| return i.meta |
| } |
| |
| func (i *boltSnapshotInstaller) Install(filename string) error { |
| if len(i.filename) == 0 { |
| return errors.New("snapshot filename empty") |
| } |
| |
| if len(filename) == 0 { |
| return errors.New("fsm filename empty") |
| } |
| |
| // Rename the snapshot to the FSM location |
| if runtime.GOOS != "windows" { |
| return safeio.Rename(i.filename, filename) |
| } else { |
| return os.Rename(i.filename, filename) |
| } |
| } |
| |
| // snapshotName generates a name for the snapshot. |
| func snapshotName(term, index uint64) string { |
| now := time.Now() |
| msec := now.UnixNano() / int64(time.Millisecond) |
| return fmt.Sprintf("%d-%d-%d", term, index, msec) |
| } |