blob: c6fcd56639a2ad123cfe59d1aaa666b090289398 [file] [log] [blame] [edit]
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package metricsutil
import (
"context"
"math/rand"
"sort"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/timeutil"
)
// GaugeLabelValues is one gauge in a set sharing a single key, that
// are measured in a batch.
type GaugeLabelValues struct {
Labels []Label
Value float32
}
// GaugeCollector is a callback function that returns an unfiltered
// set of label-value pairs. It may be cancelled if it takes too long.
type GaugeCollector = func(context.Context) ([]GaugeLabelValues, error)
// collectionBound is a hard limit on how long a collection process
// may take, as a fraction of the current interval.
const collectionBound = 0.02
// collectionTarget is a soft limit; if exceeded, the collection interval
// will be doubled.
const collectionTarget = 0.01
// A GaugeCollectionProcess is responsible for one particular gauge metric.
// It handles a delay on initial startup; limiting the cardinality; and
// exponential backoff on the requested interval.
type GaugeCollectionProcess struct {
stop chan struct{}
stopped chan struct{}
// gauge name
key []string
// labels to use when reporting
labels []Label
// callback function
collector GaugeCollector
// destination for metrics
sink Metrics
logger log.Logger
// time between collections
originalInterval time.Duration
currentInterval time.Duration
ticker *time.Ticker
// used to help limit cardinality
maxGaugeCardinality int
// time source
clock timeutil.Clock
}
// NewGaugeCollectionProcess creates a new collection process for the callback
// function given as an argument, and starts it running.
// A label should be provided for metrics *about* this collection process.
//
// The Run() method must be called to start the process.
func NewGaugeCollectionProcess(
key []string,
id []Label,
collector GaugeCollector,
m metrics.MetricSink,
gaugeInterval time.Duration,
maxGaugeCardinality int,
logger log.Logger,
) (*GaugeCollectionProcess, error) {
return newGaugeCollectionProcessWithClock(
key,
id,
collector,
SinkWrapper{MetricSink: m},
gaugeInterval,
maxGaugeCardinality,
logger,
timeutil.DefaultClock{},
)
}
// NewGaugeCollectionProcess creates a new collection process for the callback
// function given as an argument, and starts it running.
// A label should be provided for metrics *about* this collection process.
//
// The Run() method must be called to start the process.
func (m *ClusterMetricSink) NewGaugeCollectionProcess(
key []string,
id []Label,
collector GaugeCollector,
logger log.Logger,
) (*GaugeCollectionProcess, error) {
return newGaugeCollectionProcessWithClock(
key,
id,
collector,
m,
m.GaugeInterval,
m.MaxGaugeCardinality,
logger,
timeutil.DefaultClock{},
)
}
// test version allows an alternative clock implementation
func newGaugeCollectionProcessWithClock(
key []string,
id []Label,
collector GaugeCollector,
sink Metrics,
gaugeInterval time.Duration,
maxGaugeCardinality int,
logger log.Logger,
clock timeutil.Clock,
) (*GaugeCollectionProcess, error) {
process := &GaugeCollectionProcess{
stop: make(chan struct{}, 1),
stopped: make(chan struct{}, 1),
key: key,
labels: id,
collector: collector,
sink: sink,
originalInterval: gaugeInterval,
currentInterval: gaugeInterval,
maxGaugeCardinality: maxGaugeCardinality,
logger: logger,
clock: clock,
}
return process, nil
}
// delayStart randomly delays by up to one extra interval
// so that collection processes do not all run at the time.
// If we knew all the processes in advance, we could just schedule them
// evenly, but a new one could be added per secret engine.
func (p *GaugeCollectionProcess) delayStart() bool {
randomDelay := time.Duration(rand.Int63n(int64(p.currentInterval)))
// A Timer might be better, but then we'd have to simulate
// one of those too?
delayTick := p.clock.NewTicker(randomDelay)
defer delayTick.Stop()
select {
case <-p.stop:
return true
case <-delayTick.C:
break
}
return false
}
// resetTicker stops the old ticker and starts a new one at the current
// interval setting.
func (p *GaugeCollectionProcess) resetTicker() {
if p.ticker != nil {
p.ticker.Stop()
}
p.ticker = p.clock.NewTicker(p.currentInterval)
}
// collectAndFilterGauges executes the callback function,
// limits the cardinality, and streams the results to the metrics sink.
func (p *GaugeCollectionProcess) collectAndFilterGauges() {
// Run for only an allotted amount of time.
timeout := time.Duration(collectionBound * float64(p.currentInterval))
ctx, cancel := context.WithTimeout(context.Background(),
timeout)
defer cancel()
p.sink.AddDurationWithLabels([]string{"metrics", "collection", "interval"},
p.currentInterval,
p.labels)
start := p.clock.Now()
values, err := p.collector(ctx)
end := p.clock.Now()
duration := end.Sub(start)
// Report how long it took to perform the operation.
p.sink.AddDurationWithLabels([]string{"metrics", "collection"},
duration,
p.labels)
// If over threshold, back off by doubling the measurement interval.
// Currently a restart is the only way to bring it back down.
threshold := time.Duration(collectionTarget * float64(p.currentInterval))
if duration > threshold {
p.logger.Warn("gauge collection time exceeded target", "target", threshold, "actual", duration, "id", p.labels)
p.currentInterval *= 2
p.resetTicker()
}
if err != nil {
p.logger.Error("error collecting gauge", "id", p.labels, "error", err)
p.sink.IncrCounterWithLabels([]string{"metrics", "collection", "error"},
1,
p.labels)
return
}
// Filter to top N.
// This does not guarantee total cardinality is <= N, but it does slow things down
// a little if the cardinality *is* too high and the gauge needs to be disabled.
if len(values) > p.maxGaugeCardinality {
sort.Slice(values, func(a, b int) bool {
return values[a].Value > values[b].Value
})
values = values[:p.maxGaugeCardinality]
}
p.streamGaugesToSink(values)
}
// batchSize is the number of metrics to be sent per tick duration.
const batchSize = 25
func (p *GaugeCollectionProcess) streamGaugesToSink(values []GaugeLabelValues) {
// Dumping 500 metrics in one big chunk is somewhat unfriendly to UDP-based
// transport, and to the rest of the metrics trying to get through.
// Let's smooth things out over the course of a second.
// 1 second / 500 = 2 ms each, so we can send 25 (batchSize) per 50 milliseconds.
// That should be one or two packets.
sendTick := p.clock.NewTicker(50 * time.Millisecond)
defer sendTick.Stop()
for i, lv := range values {
if i > 0 && i%batchSize == 0 {
select {
case <-p.stop:
// because the channel is closed,
// the main loop will successfully
// read from p.stop too, and exit.
return
case <-sendTick.C:
break
}
}
p.sink.SetGaugeWithLabels(p.key, lv.Value, lv.Labels)
}
}
// Run should be called as a goroutine.
func (p *GaugeCollectionProcess) Run() {
defer close(p.stopped)
// Wait a random amount of time
stopReceived := p.delayStart()
if stopReceived {
return
}
// Create a ticker to start each cycle
p.resetTicker()
// Loop until we get a signal to stop
for {
select {
case <-p.ticker.C:
p.collectAndFilterGauges()
case <-p.stop:
// Can't use defer because this might
// not be the original ticker.
p.ticker.Stop()
return
}
}
}
// Stop the collection process
func (p *GaugeCollectionProcess) Stop() {
close(p.stop)
}