| // Copyright (c) HashiCorp, Inc. |
| // SPDX-License-Identifier: MPL-2.0 |
| |
| package kubernetes |
| |
| import ( |
| "fmt" |
| "strconv" |
| "sync" |
| "time" |
| |
| "github.com/hashicorp/go-hclog" |
| sr "github.com/hashicorp/vault/serviceregistration" |
| "github.com/hashicorp/vault/serviceregistration/kubernetes/client" |
| "github.com/oklog/run" |
| ) |
| |
| // How often to retry sending a state update if it fails. |
| var retryFreq = 5 * time.Second |
| |
| // retryHandler executes retries. |
| // It is thread-safe. |
| type retryHandler struct { |
| // These don't need a mutex because they're never mutated. |
| logger hclog.Logger |
| namespace, podName string |
| |
| // To synchronize setInitialState and patchesToRetry. |
| lock sync.Mutex |
| |
| // initialStateSet determines whether an initial state has been set |
| // successfully or whether a state already exists. |
| initialStateSet bool |
| |
| // State stores an initial state to be set |
| initialState sr.State |
| |
| // The map holds the path to the label being updated. It will only either |
| // not hold a particular label, or hold _the last_ state we were aware of. |
| // These should only be updated after initial state has been set. |
| patchesToRetry map[string]*client.Patch |
| |
| // client is the Client to use when making API calls against kubernetes |
| client *client.Client |
| } |
| |
| // Run must be called for retries to be started. |
| func (r *retryHandler) Run(shutdownCh <-chan struct{}, wait *sync.WaitGroup) { |
| r.setInitialState(shutdownCh) |
| |
| // Run this in a go func so this call doesn't block. |
| wait.Add(1) |
| go func() { |
| // Make sure Vault will give us time to finish up here. |
| defer wait.Done() |
| |
| var g run.Group |
| |
| // This run group watches for the shutdownCh |
| shutdownActorStop := make(chan struct{}) |
| g.Add(func() error { |
| select { |
| case <-shutdownCh: |
| case <-shutdownActorStop: |
| } |
| return nil |
| }, func(error) { |
| close(shutdownActorStop) |
| }) |
| |
| checkUpdateStateStop := make(chan struct{}) |
| g.Add(func() error { |
| r.periodicUpdateState(checkUpdateStateStop) |
| return nil |
| }, func(error) { |
| close(checkUpdateStateStop) |
| r.client.Shutdown() |
| }) |
| |
| if err := g.Run(); err != nil { |
| r.logger.Error("error encountered during periodic state update", "error", err) |
| } |
| }() |
| } |
| |
| func (r *retryHandler) setInitialState(shutdownCh <-chan struct{}) { |
| r.lock.Lock() |
| defer r.lock.Unlock() |
| |
| doneCh := make(chan struct{}) |
| |
| go func() { |
| if err := r.setInitialStateInternal(); err != nil { |
| if r.logger.IsWarn() { |
| r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error())) |
| } |
| } |
| close(doneCh) |
| }() |
| |
| // Wait until the state is set or shutdown happens |
| select { |
| case <-doneCh: |
| case <-shutdownCh: |
| } |
| } |
| |
| // Notify adds a patch to be retried until it's either completed without |
| // error, or no longer needed. |
| func (r *retryHandler) Notify(patch *client.Patch) { |
| r.lock.Lock() |
| defer r.lock.Unlock() |
| |
| // Initial state must be set first, or subsequent notifications we've |
| // received could get smashed by a late-arriving initial state. |
| // We will store this to retry it when appropriate. |
| if !r.initialStateSet { |
| if r.logger.IsWarn() { |
| r.logger.Warn(fmt.Sprintf("cannot notify of present state for %s because initial state is unset", patch.Path)) |
| } |
| r.patchesToRetry[patch.Path] = patch |
| return |
| } |
| |
| // Initial state has been sent, so it's OK to attempt a patch immediately. |
| if err := r.client.PatchPod(r.namespace, r.podName, patch); err != nil { |
| if r.logger.IsWarn() { |
| r.logger.Warn(fmt.Sprintf("unable to update state for %s due to %s, will retry", patch.Path, err.Error())) |
| } |
| r.patchesToRetry[patch.Path] = patch |
| } |
| } |
| |
| // setInitialStateInternal sets the initial state remotely. This should be |
| // called with the lock held. |
| func (r *retryHandler) setInitialStateInternal() error { |
| // If this is set, we return immediately |
| if r.initialStateSet { |
| return nil |
| } |
| |
| // Verify that the pod exists and our configuration looks good. |
| pod, err := r.client.GetPod(r.namespace, r.podName) |
| if err != nil { |
| return err |
| } |
| |
| // Now to initially label our pod. |
| if pod.Metadata == nil { |
| // This should never happen IRL, just being defensive. |
| return fmt.Errorf("no pod metadata on %+v", pod) |
| } |
| if pod.Metadata.Labels == nil { |
| // Notify the labels field, and the labels as part of that one call. |
| // The reason we must take a different approach to adding them is discussed here: |
| // https://stackoverflow.com/questions/57480205/error-while-applying-json-patch-to-kubernetes-custom-resource |
| if err := r.client.PatchPod(r.namespace, r.podName, &client.Patch{ |
| Operation: client.Add, |
| Path: "/metadata/labels", |
| Value: map[string]string{ |
| labelVaultVersion: r.initialState.VaultVersion, |
| labelActive: strconv.FormatBool(r.initialState.IsActive), |
| labelSealed: strconv.FormatBool(r.initialState.IsSealed), |
| labelPerfStandby: strconv.FormatBool(r.initialState.IsPerformanceStandby), |
| labelInitialized: strconv.FormatBool(r.initialState.IsInitialized), |
| }, |
| }); err != nil { |
| return err |
| } |
| } else { |
| // Create the labels through a patch to each individual field. |
| patches := []*client.Patch{ |
| { |
| Operation: client.Replace, |
| Path: pathToLabels + labelVaultVersion, |
| Value: r.initialState.VaultVersion, |
| }, |
| { |
| Operation: client.Replace, |
| Path: pathToLabels + labelActive, |
| Value: strconv.FormatBool(r.initialState.IsActive), |
| }, |
| { |
| Operation: client.Replace, |
| Path: pathToLabels + labelSealed, |
| Value: strconv.FormatBool(r.initialState.IsSealed), |
| }, |
| { |
| Operation: client.Replace, |
| Path: pathToLabels + labelPerfStandby, |
| Value: strconv.FormatBool(r.initialState.IsPerformanceStandby), |
| }, |
| { |
| Operation: client.Replace, |
| Path: pathToLabels + labelInitialized, |
| Value: strconv.FormatBool(r.initialState.IsInitialized), |
| }, |
| } |
| if err := r.client.PatchPod(r.namespace, r.podName, patches...); err != nil { |
| return err |
| } |
| } |
| r.initialStateSet = true |
| return nil |
| } |
| |
| func (r *retryHandler) periodicUpdateState(stopCh chan struct{}) { |
| retry := time.NewTicker(retryFreq) |
| defer retry.Stop() |
| |
| for { |
| // Call updateState immediately so we don't wait for the first tick |
| // if setting the initial state |
| r.updateState() |
| |
| select { |
| case <-stopCh: |
| return |
| case <-retry.C: |
| } |
| } |
| } |
| |
| func (r *retryHandler) updateState() { |
| r.lock.Lock() |
| defer r.lock.Unlock() |
| |
| // Initial state must be set first, or subsequent notifications we've |
| // received could get smashed by a late-arriving initial state. |
| // If the state is already set, this is a no-op. |
| if err := r.setInitialStateInternal(); err != nil { |
| if r.logger.IsWarn() { |
| r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error())) |
| } |
| // On failure, we leave the initial state func populated for |
| // the next retry. |
| return |
| } |
| |
| if len(r.patchesToRetry) == 0 { |
| // Nothing further to do here. |
| return |
| } |
| |
| patches := make([]*client.Patch, len(r.patchesToRetry)) |
| i := 0 |
| for _, patch := range r.patchesToRetry { |
| patches[i] = patch |
| i++ |
| } |
| |
| if err := r.client.PatchPod(r.namespace, r.podName, patches...); err != nil { |
| if r.logger.IsWarn() { |
| r.logger.Warn(fmt.Sprintf("unable to update state for due to %s, will retry", err.Error())) |
| } |
| return |
| } |
| r.patchesToRetry = make(map[string]*client.Patch) |
| } |