| package etcd |
| |
| import ( |
| "context" |
| "crypto/md5" |
| "encoding/json" |
| "fmt" |
| "sync" |
| "time" |
| |
| "github.com/hashicorp/go-multierror" |
| "github.com/hashicorp/terraform/internal/states/remote" |
| "github.com/hashicorp/terraform/internal/states/statemgr" |
| etcdv3 "go.etcd.io/etcd/clientv3" |
| etcdv3sync "go.etcd.io/etcd/clientv3/concurrency" |
| ) |
| |
| const ( |
| lockAcquireTimeout = 2 * time.Second |
| lockInfoSuffix = ".lockinfo" |
| ) |
| |
| // RemoteClient is a remote client that will store data in etcd. |
| type RemoteClient struct { |
| Client *etcdv3.Client |
| DoLock bool |
| Key string |
| |
| etcdMutex *etcdv3sync.Mutex |
| etcdSession *etcdv3sync.Session |
| info *statemgr.LockInfo |
| mu sync.Mutex |
| modRevision int64 |
| } |
| |
| func (c *RemoteClient) Get() (*remote.Payload, error) { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| |
| res, err := c.Client.KV.Get(context.TODO(), c.Key) |
| if err != nil { |
| return nil, err |
| } |
| if res.Count == 0 { |
| return nil, nil |
| } |
| if res.Count >= 2 { |
| return nil, fmt.Errorf("Expected a single result but got %d.", res.Count) |
| } |
| |
| c.modRevision = res.Kvs[0].ModRevision |
| |
| payload := res.Kvs[0].Value |
| md5 := md5.Sum(payload) |
| |
| return &remote.Payload{ |
| Data: payload, |
| MD5: md5[:], |
| }, nil |
| } |
| |
| func (c *RemoteClient) Put(data []byte) error { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| |
| res, err := etcdv3.NewKV(c.Client).Txn(context.TODO()).If( |
| etcdv3.Compare(etcdv3.ModRevision(c.Key), "=", c.modRevision), |
| ).Then( |
| etcdv3.OpPut(c.Key, string(data)), |
| etcdv3.OpGet(c.Key), |
| ).Commit() |
| |
| if err != nil { |
| return err |
| } |
| if !res.Succeeded { |
| return fmt.Errorf("The transaction did not succeed.") |
| } |
| if len(res.Responses) != 2 { |
| return fmt.Errorf("Expected two responses but got %d.", len(res.Responses)) |
| } |
| |
| c.modRevision = res.Responses[1].GetResponseRange().Kvs[0].ModRevision |
| return nil |
| } |
| |
| func (c *RemoteClient) Delete() error { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| |
| _, err := c.Client.KV.Delete(context.TODO(), c.Key) |
| return err |
| } |
| |
| func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| |
| if !c.DoLock { |
| return "", nil |
| } |
| if c.etcdSession != nil { |
| return "", fmt.Errorf("state %q already locked", c.Key) |
| } |
| |
| c.info = info |
| return c.lock() |
| } |
| |
| func (c *RemoteClient) Unlock(id string) error { |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| |
| if !c.DoLock { |
| return nil |
| } |
| |
| return c.unlock(id) |
| } |
| |
| func (c *RemoteClient) deleteLockInfo(info *statemgr.LockInfo) error { |
| res, err := c.Client.KV.Delete(context.TODO(), c.Key+lockInfoSuffix) |
| if err != nil { |
| return err |
| } |
| if res.Deleted == 0 { |
| return fmt.Errorf("No keys deleted for %s when deleting lock info.", c.Key+lockInfoSuffix) |
| } |
| return nil |
| } |
| |
| func (c *RemoteClient) getLockInfo() (*statemgr.LockInfo, error) { |
| res, err := c.Client.KV.Get(context.TODO(), c.Key+lockInfoSuffix) |
| if err != nil { |
| return nil, err |
| } |
| if res.Count == 0 { |
| return nil, nil |
| } |
| |
| li := &statemgr.LockInfo{} |
| err = json.Unmarshal(res.Kvs[0].Value, li) |
| if err != nil { |
| return nil, fmt.Errorf("Error unmarshaling lock info: %s.", err) |
| } |
| |
| return li, nil |
| } |
| |
| func (c *RemoteClient) putLockInfo(info *statemgr.LockInfo) error { |
| c.info.Path = c.etcdMutex.Key() |
| c.info.Created = time.Now().UTC() |
| |
| _, err := c.Client.KV.Put(context.TODO(), c.Key+lockInfoSuffix, string(c.info.Marshal())) |
| return err |
| } |
| |
| func (c *RemoteClient) lock() (string, error) { |
| session, err := etcdv3sync.NewSession(c.Client) |
| if err != nil { |
| return "", nil |
| } |
| |
| ctx, cancel := context.WithTimeout(context.TODO(), lockAcquireTimeout) |
| defer cancel() |
| |
| mutex := etcdv3sync.NewMutex(session, c.Key) |
| if err1 := mutex.Lock(ctx); err1 != nil { |
| lockInfo, err2 := c.getLockInfo() |
| if err2 != nil { |
| return "", &statemgr.LockError{Err: err2} |
| } |
| return "", &statemgr.LockError{Info: lockInfo, Err: err1} |
| } |
| |
| c.etcdMutex = mutex |
| c.etcdSession = session |
| |
| err = c.putLockInfo(c.info) |
| if err != nil { |
| if unlockErr := c.unlock(c.info.ID); unlockErr != nil { |
| err = multierror.Append(err, unlockErr) |
| } |
| return "", err |
| } |
| |
| return c.info.ID, nil |
| } |
| |
| func (c *RemoteClient) unlock(id string) error { |
| if c.etcdMutex == nil { |
| return nil |
| } |
| |
| var errs error |
| |
| if err := c.deleteLockInfo(c.info); err != nil { |
| errs = multierror.Append(errs, err) |
| } |
| if err := c.etcdMutex.Unlock(context.TODO()); err != nil { |
| errs = multierror.Append(errs, err) |
| } |
| if err := c.etcdSession.Close(); err != nil { |
| errs = multierror.Append(errs, err) |
| } |
| |
| c.etcdMutex = nil |
| c.etcdSession = nil |
| |
| return errs |
| } |