| // Copyright (c) HashiCorp, Inc. |
| // SPDX-License-Identifier: MPL-2.0 |
| |
| package metricsutil |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "math/rand" |
| "reflect" |
| "sync/atomic" |
| "testing" |
| "time" |
| |
| "github.com/armon/go-metrics" |
| log "github.com/hashicorp/go-hclog" |
| "github.com/hashicorp/vault/helper/timeutil" |
| ) |
| |
| // SimulatedTime maintains a virtual clock so the test isn't |
| // dependent upon real time. |
| // Unfortunately there is no way to run these tests in parallel |
| // since they rely on the same global timeNow function. |
| type SimulatedTime struct { |
| now time.Time |
| tickerBarrier chan *SimulatedTicker |
| timeutil.DefaultClock |
| } |
| |
| var _ timeutil.Clock = &SimulatedTime{} |
| |
| type SimulatedTicker struct { |
| ticker *time.Ticker |
| duration time.Duration |
| sender chan time.Time |
| } |
| |
| func (s *SimulatedTime) Now() time.Time { |
| return s.now |
| } |
| |
| func (s *SimulatedTime) NewTicker(d time.Duration) *time.Ticker { |
| // Create a real ticker, but set its duration to an amount that will never fire for real. |
| // We'll inject times into the channel directly. |
| replacementChannel := make(chan time.Time) |
| t := time.NewTicker(1000 * time.Hour) |
| t.C = replacementChannel |
| s.tickerBarrier <- &SimulatedTicker{t, d, replacementChannel} |
| return t |
| } |
| |
| func (s *SimulatedTime) waitForTicker(t *testing.T) *SimulatedTicker { |
| t.Helper() |
| // System under test should create a ticker within 100ms, |
| // wait for it to show up or else fail the test. |
| timeout := time.After(100 * time.Millisecond) |
| select { |
| case <-timeout: |
| t.Fatal("Timeout waiting for ticker creation.") |
| return nil |
| case t := <-s.tickerBarrier: |
| return t |
| } |
| } |
| |
| func (s *SimulatedTime) allowTickers(n int) { |
| s.tickerBarrier = make(chan *SimulatedTicker, n) |
| } |
| |
| func startSimulatedTime() *SimulatedTime { |
| s := &SimulatedTime{ |
| now: time.Now(), |
| tickerBarrier: make(chan *SimulatedTicker, 1), |
| } |
| return s |
| } |
| |
| type SimulatedCollector struct { |
| numCalls uint32 |
| callBarrier chan uint32 |
| } |
| |
| func newSimulatedCollector() *SimulatedCollector { |
| return &SimulatedCollector{ |
| numCalls: 0, |
| callBarrier: make(chan uint32, 1), |
| } |
| } |
| |
| func (s *SimulatedCollector) waitForCall(t *testing.T) { |
| timeout := time.After(100 * time.Millisecond) |
| select { |
| case <-timeout: |
| t.Fatal("Timeout waiting for call to collection function.") |
| return |
| case <-s.callBarrier: |
| return |
| } |
| } |
| |
| func (s *SimulatedCollector) EmptyCollectionFunction(ctx context.Context) ([]GaugeLabelValues, error) { |
| atomic.AddUint32(&s.numCalls, 1) |
| s.callBarrier <- s.numCalls |
| return []GaugeLabelValues{}, nil |
| } |
| |
| func TestGauge_Creation(t *testing.T) { |
| c := newSimulatedCollector() |
| sink := BlackholeSink() |
| sink.GaugeInterval = 33 * time.Minute |
| |
| key := []string{"example", "count"} |
| labels := []Label{{"gauge", "test"}} |
| |
| p, err := sink.NewGaugeCollectionProcess( |
| key, |
| labels, |
| c.EmptyCollectionFunction, |
| log.Default(), |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| |
| if _, ok := p.clock.(timeutil.DefaultClock); !ok { |
| t.Error("Default clock not installed.") |
| } |
| |
| if !reflect.DeepEqual(p.key, key) { |
| t.Errorf("Key not initialized, got %v but expected %v", |
| p.key, key) |
| } |
| |
| if !reflect.DeepEqual(p.labels, labels) { |
| t.Errorf("Labels not initialized, got %v but expected %v", |
| p.key, key) |
| } |
| |
| if p.originalInterval != sink.GaugeInterval || p.currentInterval != sink.GaugeInterval { |
| t.Errorf("Intervals not initialized, got %v and %v, expected %v", |
| p.originalInterval, p.currentInterval, sink.GaugeInterval) |
| } |
| } |
| |
| func TestGauge_StartDelay(t *testing.T) { |
| // Work through an entire startup sequence, up to collecting |
| // the first batch of gauges. |
| s := startSimulatedTime() |
| c := newSimulatedCollector() |
| |
| sink := BlackholeSink() |
| sink.GaugeInterval = 2 * time.Hour |
| |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| c.EmptyCollectionFunction, |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| go p.Run() |
| |
| delayTicker := s.waitForTicker(t) |
| if delayTicker.duration > sink.GaugeInterval { |
| t.Errorf("Delayed start %v is more than interval %v.", |
| delayTicker.duration, sink.GaugeInterval) |
| } |
| if c.numCalls > 0 { |
| t.Error("Collection function has been called") |
| } |
| |
| // Signal the end of delay, then another ticker should start |
| delayTicker.sender <- time.Now() |
| |
| intervalTicker := s.waitForTicker(t) |
| if intervalTicker.duration != sink.GaugeInterval { |
| t.Errorf("Ticker duration is %v, expected %v", |
| intervalTicker.duration, sink.GaugeInterval) |
| } |
| if c.numCalls > 0 { |
| t.Error("Collection function has been called") |
| } |
| |
| // Time's up, ensure the collection function is executed. |
| intervalTicker.sender <- time.Now() |
| c.waitForCall(t) |
| if c.numCalls != 1 { |
| t.Errorf("Collection function called %v times, expected %v.", c.numCalls, 1) |
| } |
| |
| p.Stop() |
| } |
| |
| func waitForStopped(t *testing.T, p *GaugeCollectionProcess) { |
| t.Helper() |
| timeout := time.After(100 * time.Millisecond) |
| select { |
| case <-timeout: |
| t.Fatal("Timeout waiting for process to stop.") |
| case <-p.stopped: |
| return |
| } |
| } |
| |
| func TestGauge_StoppedDuringInitialDelay(t *testing.T) { |
| // Stop the process before it gets into its main loop |
| s := startSimulatedTime() |
| c := newSimulatedCollector() |
| |
| sink := BlackholeSink() |
| sink.GaugeInterval = 2 * time.Hour |
| |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| c.EmptyCollectionFunction, |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| go p.Run() |
| |
| // Stop during the initial delay, check that goroutine exits |
| s.waitForTicker(t) |
| p.Stop() |
| waitForStopped(t, p) |
| } |
| |
| func TestGauge_StoppedAfterInitialDelay(t *testing.T) { |
| // Stop the process during its main loop |
| s := startSimulatedTime() |
| c := newSimulatedCollector() |
| |
| sink := BlackholeSink() |
| sink.GaugeInterval = 2 * time.Hour |
| |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| c.EmptyCollectionFunction, |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| go p.Run() |
| |
| // Get through initial delay, wait for interval ticker |
| delayTicker := s.waitForTicker(t) |
| delayTicker.sender <- time.Now() |
| |
| s.waitForTicker(t) |
| p.Stop() |
| waitForStopped(t, p) |
| } |
| |
| func TestGauge_Backoff(t *testing.T) { |
| s := startSimulatedTime() |
| s.allowTickers(100) |
| |
| c := newSimulatedCollector() |
| |
| sink := BlackholeSink() |
| sink.GaugeInterval = 2 * time.Hour |
| |
| threshold := sink.GaugeInterval / 100 |
| f := func(ctx context.Context) ([]GaugeLabelValues, error) { |
| atomic.AddUint32(&c.numCalls, 1) |
| // Move time forward by more than 1% of the gauge interval |
| s.now = s.now.Add(threshold).Add(time.Second) |
| c.callBarrier <- c.numCalls |
| return []GaugeLabelValues{}, nil |
| } |
| |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| f, |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| // Do not run, we'll just going to call an internal function. |
| p.collectAndFilterGauges() |
| |
| if p.currentInterval != 2*p.originalInterval { |
| t.Errorf("Current interval is %v, should be 2x%v.", |
| p.currentInterval, |
| p.originalInterval) |
| } |
| } |
| |
| func TestGauge_RestartTimer(t *testing.T) { |
| s := startSimulatedTime() |
| c := newSimulatedCollector() |
| sink := BlackholeSink() |
| sink.GaugeInterval = 2 * time.Hour |
| |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| c.EmptyCollectionFunction, |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| |
| p.resetTicker() |
| t1 := s.waitForTicker(t) |
| if t1.duration != p.currentInterval { |
| t.Fatalf("Bad ticker interval, got %v expected %v", |
| t1.duration, p.currentInterval) |
| } |
| |
| p.currentInterval = 4 * p.originalInterval |
| p.resetTicker() |
| t2 := s.waitForTicker(t) |
| if t2.duration != p.currentInterval { |
| t.Fatalf("Bad ticker interval, got %v expected %v", |
| t1.duration, p.currentInterval) |
| } |
| } |
| |
| func waitForDone(t *testing.T, |
| tick chan<- time.Time, |
| done <-chan struct{}, |
| ) int { |
| t.Helper() |
| timeout := time.After(500 * time.Millisecond) |
| |
| numTicks := 0 |
| for { |
| select { |
| case <-timeout: |
| t.Fatal("Timeout waiting for metrics to be sent.") |
| case tick <- time.Now(): |
| numTicks += 1 |
| case <-done: |
| return numTicks |
| } |
| } |
| } |
| |
| func makeLabels(numLabels int) []GaugeLabelValues { |
| values := make([]GaugeLabelValues, numLabels) |
| for i := range values { |
| values[i].Labels = []Label{ |
| {"test", "true"}, |
| {"which", fmt.Sprintf("%v", i)}, |
| } |
| values[i].Value = float32(i + 1) |
| } |
| return values |
| } |
| |
| func TestGauge_InterruptedStreaming(t *testing.T) { |
| s := startSimulatedTime() |
| // Long bucket time == low chance of crossing interval |
| inmemSink := metrics.NewInmemSink( |
| 1000000*time.Hour, |
| 2000000*time.Hour) |
| |
| sink := NewClusterMetricSink("test", inmemSink) |
| sink.MaxGaugeCardinality = 500 |
| sink.GaugeInterval = 2 * time.Hour |
| |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| nil, // shouldn't be called |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| |
| // We'll queue up at least two batches; only one will be sent |
| // unless we give a ticker. |
| values := makeLabels(75) |
| done := make(chan struct{}) |
| go func() { |
| p.streamGaugesToSink(values) |
| close(done) |
| }() |
| |
| p.Stop() |
| // a nil channel is never writeable |
| waitForDone(t, nil, done) |
| |
| // If we start close to the end of an interval, metrics will |
| // be split across two buckets. |
| intervals := inmemSink.Data() |
| if len(intervals) > 1 { |
| t.Skip("Detected interval crossing.") |
| } |
| |
| if len(intervals[0].Gauges) == len(values) { |
| t.Errorf("Found %v gauges, expected fewer.", |
| len(intervals[0].Gauges)) |
| } |
| } |
| |
| // helper function to create a closure that's a GaugeCollector. |
| func (c *SimulatedCollector) makeFunctionForValues( |
| values []GaugeLabelValues, |
| s *SimulatedTime, |
| advanceTime time.Duration, |
| ) GaugeCollector { |
| // A function that returns a static list |
| return func(ctx context.Context) ([]GaugeLabelValues, error) { |
| atomic.AddUint32(&c.numCalls, 1) |
| // TODO: this seems like a data race? |
| s.now = s.now.Add(advanceTime) |
| c.callBarrier <- c.numCalls |
| return values, nil |
| } |
| } |
| |
| func TestGauge_MaximumMeasurements(t *testing.T) { |
| s := startSimulatedTime() |
| c := newSimulatedCollector() |
| |
| // Long bucket time == low chance of crossing interval |
| inmemSink := metrics.NewInmemSink( |
| 1000000*time.Hour, |
| 2000000*time.Hour) |
| |
| sink := NewClusterMetricSink("test", inmemSink) |
| sink.MaxGaugeCardinality = 100 |
| sink.GaugeInterval = 2 * time.Hour |
| |
| // Create a report larger than the default limit |
| excessGauges := 20 |
| values := makeLabels(sink.MaxGaugeCardinality + excessGauges) |
| rand.Shuffle(len(values), func(i, j int) { |
| values[i], values[j] = values[j], values[i] |
| }) |
| |
| // Advance time by 0.5% of duration |
| advance := time.Duration(int(0.005 * float32(sink.GaugeInterval))) |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| c.makeFunctionForValues(values, s, advance), |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| |
| // This needs a ticker in order to do its thing, |
| // so run it in the background and we'll send the ticks |
| // from here. |
| done := make(chan struct{}, 1) |
| go func() { |
| p.collectAndFilterGauges() |
| close(done) |
| }() |
| |
| sendTicker := s.waitForTicker(t) |
| numTicksSent := waitForDone(t, sendTicker.sender, done) |
| |
| // 100 items, one delay after each batchSize (25), means that |
| // 3 ticks are consumed, so 3 or 4 must be sent. |
| expectedTicks := sink.MaxGaugeCardinality/batchSize - 1 |
| if numTicksSent < expectedTicks || numTicksSent > expectedTicks+1 { |
| t.Errorf("Number of ticks = %v, expected %v.", numTicksSent, expectedTicks) |
| } |
| |
| // If we start close to the end of an interval, metrics will |
| // be split across two buckets. |
| intervals := inmemSink.Data() |
| if len(intervals) > 1 { |
| t.Skip("Detected interval crossing.") |
| } |
| |
| if len(intervals[0].Gauges) != sink.MaxGaugeCardinality { |
| t.Errorf("Found %v gauges, expected %v.", |
| len(intervals[0].Gauges), |
| sink.MaxGaugeCardinality) |
| } |
| |
| minVal := float32(excessGauges) |
| for _, v := range intervals[0].Gauges { |
| if v.Value < minVal { |
| t.Errorf("Gauge %v with value %v should not have been included.", v.Labels, v.Value) |
| break |
| } |
| } |
| } |
| |
| func TestGauge_MeasurementError(t *testing.T) { |
| s := startSimulatedTime() |
| c := newSimulatedCollector() |
| inmemSink := metrics.NewInmemSink( |
| 1000000*time.Hour, |
| 2000000*time.Hour) |
| sink := NewClusterMetricSink("test", inmemSink) |
| sink.MaxGaugeCardinality = 500 |
| sink.GaugeInterval = 2 * time.Hour |
| |
| // Create a small report so we don't have to deal with batching. |
| numGauges := 10 |
| values := make([]GaugeLabelValues, numGauges) |
| for i := range values { |
| values[i].Labels = []Label{ |
| {"test", "true"}, |
| {"which", fmt.Sprintf("%v", i)}, |
| } |
| values[i].Value = float32(i + 1) |
| } |
| |
| f := func(ctx context.Context) ([]GaugeLabelValues, error) { |
| atomic.AddUint32(&c.numCalls, 1) |
| c.callBarrier <- c.numCalls |
| return values, errors.New("test error") |
| } |
| |
| p, err := newGaugeCollectionProcessWithClock( |
| []string{"example", "count"}, |
| []Label{{"gauge", "test"}}, |
| f, |
| sink, |
| sink.GaugeInterval, |
| sink.MaxGaugeCardinality, |
| log.Default(), |
| s, |
| ) |
| if err != nil { |
| t.Fatalf("Error creating collection process: %v", err) |
| } |
| |
| p.collectAndFilterGauges() |
| |
| // We should see no data in the sink |
| intervals := inmemSink.Data() |
| if len(intervals) > 1 { |
| t.Skip("Detected interval crossing.") |
| } |
| |
| if len(intervals[0].Gauges) != 0 { |
| t.Errorf("Found %v gauges, expected %v.", |
| len(intervals[0].Gauges), 0) |
| } |
| } |