blob: 77ff31212ef96fdbab23dcff7d8301a6cb4152ba [file] [log] [blame]
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package pki
import (
"crypto/x509"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/logical"
)
const (
minUnifiedTransferDelay = 30 * time.Minute
)
type unifiedTransferStatus struct {
isRunning atomic.Bool
lastRun time.Time
forceRerun atomic.Bool
}
func (uts *unifiedTransferStatus) forceRun() {
uts.forceRerun.Store(true)
}
func newUnifiedTransferStatus() *unifiedTransferStatus {
return &unifiedTransferStatus{}
}
// runUnifiedTransfer meant to run as a background, this will process all and
// send all missing local revocation entries to the unified space if the feature
// is enabled.
func runUnifiedTransfer(sc *storageContext) {
b := sc.Backend
status := b.unifiedTransferStatus
isPerfStandby := b.System().ReplicationState().HasState(consts.ReplicationDRSecondary | consts.ReplicationPerformanceStandby)
if isPerfStandby || b.System().LocalMount() {
// We only do this on active enterprise nodes, when we aren't a local mount
return
}
config, err := b.crlBuilder.getConfigWithUpdate(sc)
if err != nil {
b.Logger().Error("failed to retrieve crl config from storage for unified transfer background process",
"error", err)
return
}
if !config.UnifiedCRL {
// Feature is disabled, no need to run
return
}
clusterId, err := b.System().ClusterID(sc.Context)
if err != nil {
b.Logger().Error("failed to fetch cluster id for unified transfer background process",
"error", err)
return
}
if !status.isRunning.CompareAndSwap(false, true) {
b.Logger().Debug("an existing unified transfer process is already running")
return
}
defer status.isRunning.Store(false)
// Because access to lastRun is not locked, we need to delay this check
// until after we grab the isRunning CAS lock.
if !status.lastRun.IsZero() {
// We have run before, we only run again if we have
// been requested to forceRerun, and we haven't run since our
// minimum delay.
if !(status.forceRerun.Load() && time.Since(status.lastRun) < minUnifiedTransferDelay) {
return
}
}
// Reset our flag before we begin, we do this before we start as
// we can't guarantee that we can properly parse/fix the error from an
// error that comes in from the revoke API after that. This will
// force another run, which worst case, we will fix it on the next
// periodic function call that passes our min delay.
status.forceRerun.Store(false)
err = doUnifiedTransferMissingLocalSerials(sc, clusterId)
if err != nil {
b.Logger().Error("an error occurred running unified transfer", "error", err.Error())
status.forceRerun.Store(true)
} else {
if config.EnableDelta {
err = doUnifiedTransferMissingDeltaWALSerials(sc, clusterId)
if err != nil {
b.Logger().Error("an error occurred running unified transfer", "error", err.Error())
status.forceRerun.Store(true)
}
}
}
status.lastRun = time.Now()
}
func doUnifiedTransferMissingLocalSerials(sc *storageContext, clusterId string) error {
localRevokedSerialNums, err := sc.listRevokedCerts()
if err != nil {
return err
}
if len(localRevokedSerialNums) == 0 {
// No local certs to transfer, no further work to do.
return nil
}
unifiedSerials, err := listClusterSpecificUnifiedRevokedCerts(sc, clusterId)
if err != nil {
return err
}
unifiedCertLookup := sliceToMapKey(unifiedSerials)
errCount := 0
for i, serialNum := range localRevokedSerialNums {
if i%25 == 0 {
config, _ := sc.Backend.crlBuilder.getConfigWithUpdate(sc)
if config != nil && !config.UnifiedCRL {
return errors.New("unified crl has been disabled after we started, stopping")
}
}
if _, ok := unifiedCertLookup[serialNum]; !ok {
err := readRevocationEntryAndTransfer(sc, serialNum)
if err != nil {
errCount++
sc.Backend.Logger().Error("Failed transferring local revocation to unified space",
"serial", serialNum, "error", err)
}
}
}
if errCount > 0 {
sc.Backend.Logger().Warn(fmt.Sprintf("Failed transfering %d local serials to unified storage", errCount))
}
return nil
}
func doUnifiedTransferMissingDeltaWALSerials(sc *storageContext, clusterId string) error {
// We need to do a similar thing for Delta WAL entry certificates.
// When the delta WAL failed to write for one or more entries,
// we'll need to replicate these up to the primary cluster. When it
// has performed a new delta WAL build, it will empty storage and
// update to a last written WAL entry that exceeds what we've seen
// locally.
thisUnifiedWALEntryPath := unifiedDeltaWALPath + deltaWALLastRevokedSerialName
lastUnifiedWALEntry, err := getLastWALSerial(sc, thisUnifiedWALEntryPath)
if err != nil {
return fmt.Errorf("failed to fetch last cross-cluster unified revoked delta WAL serial number: %w", err)
}
lastLocalWALEntry, err := getLastWALSerial(sc, localDeltaWALLastRevokedSerial)
if err != nil {
return fmt.Errorf("failed to fetch last locally revoked delta WAL serial number: %w", err)
}
// We now need to transfer all the entries and then write the last WAL
// entry at the end. Start by listing all certificates; any missing
// certificates will be copied over and then the WAL entry will be
// updated once.
//
// We do not delete entries either locally or remotely, as either
// cluster could've rebuilt delta CRLs with out-of-sync information,
// removing some entries (and, we cannot differentiate between these
// two cases). On next full CRL rebuild (on either cluster), the state
// should get synchronized, and future delta CRLs after this function
// returns without issue will see the remaining entries.
//
// Lastly, we need to ensure we don't accidentally write any unified
// delta WAL entries that aren't present in the main cross-cluster
// revoked storage location. This would mean the above function failed
// to copy them for some reason, despite them presumably appearing
// locally.
_unifiedWALEntries, err := sc.Storage.List(sc.Context, unifiedDeltaWALPath)
if err != nil {
return fmt.Errorf("failed to list cross-cluster unified delta WAL storage: %w", err)
}
unifiedWALEntries := sliceToMapKey(_unifiedWALEntries)
_unifiedRevokedSerials, err := listClusterSpecificUnifiedRevokedCerts(sc, clusterId)
if err != nil {
return fmt.Errorf("failed to list cross-cluster revoked certificates: %w", err)
}
unifiedRevokedSerials := sliceToMapKey(_unifiedRevokedSerials)
localWALEntries, err := sc.Storage.List(sc.Context, localDeltaWALPath)
if err != nil {
return fmt.Errorf("failed to list local delta WAL storage: %w", err)
}
if lastUnifiedWALEntry == lastLocalWALEntry && len(_unifiedWALEntries) == len(localWALEntries) {
// Writing the last revoked WAL entry is the last thing that we do.
// Because these entries match (across clusters) and we have the same
// number of entries, assume we don't have anything to sync and exit
// early.
//
// We need both checks as, in the event of PBPWF failing and then
// returning while more revocations are happening, we could have
// been schedule to run, but then skip running (if only the first
// condition was checked) because a later revocation succeeded
// in writing a unified WAL entry, before we started replicating
// the rest back up.
//
// The downside of this approach is that, if the main cluster
// does a full rebuild in the mean time, we could re-sync more
// entries back up to the primary cluster that are already
// included in the complete CRL. Users can manually rebuild the
// full CRL (clearing these duplicate delta CRL entries) if this
// affects them.
return nil
}
errCount := 0
for index, serial := range localWALEntries {
if index%25 == 0 {
config, _ := sc.Backend.crlBuilder.getConfigWithUpdate(sc)
if config != nil && (!config.UnifiedCRL || !config.EnableDelta) {
return errors.New("unified or delta CRLs have been disabled after we started, stopping")
}
}
if serial == deltaWALLastBuildSerialName || serial == deltaWALLastRevokedSerialName {
// Skip our special serial numbers.
continue
}
_, isAlreadyPresent := unifiedWALEntries[serial]
if isAlreadyPresent {
// Serial exists on both local and unified cluster. We're
// presuming we don't need to read and re-write these entries
// and that only missing entries need to be updated.
continue
}
_, isRevokedCopied := unifiedRevokedSerials[serial]
if !isRevokedCopied {
// We need to wait here to copy over.
errCount += 1
sc.Backend.Logger().Debug("Delta WAL exists locally, but corresponding cross-cluster full revocation entry is missing; skipping", "serial", serial)
continue
}
// All good: read the local entry and write to the remote variant.
localPath := localDeltaWALPath + serial
unifiedPath := unifiedDeltaWALPath + serial
entry, err := sc.Storage.Get(sc.Context, localPath)
if err != nil || entry == nil {
errCount += 1
sc.Backend.Logger().Error("Failed reading local delta WAL entry to copy to cross-cluster", "serial", serial, "err", err)
continue
}
entry.Key = unifiedPath
err = sc.Storage.Put(sc.Context, entry)
if err != nil {
errCount += 1
sc.Backend.Logger().Error("Failed sync local delta WAL entry to cross-cluster unified delta WAL location", "serial", serial, "err", err)
continue
}
}
if errCount > 0 {
// See note above about why we don't fail here.
sc.Backend.Logger().Warn(fmt.Sprintf("Failed transfering %d local delta WAL serials to unified storage", errCount))
return nil
}
// Everything worked. Here, we can write over the delta WAL last revoked
// value. By using the earlier value, even if new revocations have
// occurred, we ensure any further missing entries can be handled in the
// next round.
lastRevSerial := lastWALInfo{Serial: lastLocalWALEntry}
lastWALEntry, err := logical.StorageEntryJSON(thisUnifiedWALEntryPath, lastRevSerial)
if err != nil {
return fmt.Errorf("unable to create cross-cluster unified last delta CRL WAL entry: %w", err)
}
if err = sc.Storage.Put(sc.Context, lastWALEntry); err != nil {
return fmt.Errorf("error saving cross-cluster unified last delta CRL WAL entry: %w", err)
}
return nil
}
func readRevocationEntryAndTransfer(sc *storageContext, serial string) error {
hyphenSerial := normalizeSerial(serial)
revInfo, err := sc.fetchRevocationInfo(hyphenSerial)
if err != nil {
return fmt.Errorf("failed loading revocation entry for serial: %s: %w", serial, err)
}
if revInfo == nil {
sc.Backend.Logger().Debug("no certificate revocation entry for serial", "serial", serial)
return nil
}
cert, err := x509.ParseCertificate(revInfo.CertificateBytes)
if err != nil {
sc.Backend.Logger().Debug("failed parsing certificate stored in revocation entry for serial",
"serial", serial, "error", err)
return nil
}
if revInfo.CertificateIssuer == "" {
// No certificate issuer assigned to this serial yet, just drop it for now,
// as a crl rebuild/tidy needs to happen
return nil
}
revocationTime := revInfo.RevocationTimeUTC
if revInfo.RevocationTimeUTC.IsZero() {
// Legacy revocation entries only had this field and not revocationTimeUTC set...
revocationTime = time.Unix(revInfo.RevocationTime, 0)
}
if time.Now().After(cert.NotAfter) {
// ignore transferring this entry as it has already expired.
return nil
}
entry := &unifiedRevocationEntry{
SerialNumber: hyphenSerial,
CertExpiration: cert.NotAfter,
RevocationTimeUTC: revocationTime,
CertificateIssuer: revInfo.CertificateIssuer,
}
return writeUnifiedRevocationEntry(sc, entry)
}