blob: 0d1c569f7ec2b1d77e4be0c5549daa6e3a47d58c [file] [log] [blame] [edit]
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package transport
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/hashicorp/errwrap"
)
const DefaultBatchSendIntervalSec = 3
// RequestBatcher keeps track of batched requests globally.
// It should be created at a provider level. In general, one
// should be created per service that requires batching to:
// - prevent blocking batching for one service due to another,
// - minimize the possibility of overlap in batchKey formats (see SendRequestWithTimeout)
type RequestBatcher struct {
sync.Mutex
*BatchingConfig
parentCtx context.Context
batches map[string]*startedBatch
debugId string
}
// These types are meant to be the public interface to batchers. They define
// batch data format and logic to send/combine batches, i.e. they require
// specific implementations per type of request.
type (
// BatchRequest represents a single request to a global batcher.
BatchRequest struct {
// ResourceName represents the underlying resource for which
// a request is made. Its format is determined by what SendF expects, but
// typically should be the name of the parent GCP resource being changed.
ResourceName string
// Body is this request's data to be passed to SendF, and may be combined
// with other bodies using CombineF.
Body interface{}
// CombineF function determines how to combine bodies from two batches.
CombineF BatcherCombineFunc
// SendF function determines how to actually send a batched request to a
// third party service. The arguments given to this function are
// (ResourceName, Body) where Body may have been combined with other request
// Bodies.
SendF BatcherSendFunc
// ID for debugging request. This should be specific to a single request
// (i.e. per Terraform resource)
DebugId string
}
// BatcherCombineFunc is a function type for combine existing batches and additional batch data
BatcherCombineFunc func(body interface{}, toAdd interface{}) (interface{}, error)
// BatcherSendFunc is a function type for sending a batch request
BatcherSendFunc func(resourceName string, body interface{}) (interface{}, error)
)
// batchResponse bundles an API response (data, error) tuple.
type batchResponse struct {
body interface{}
err error
}
func (br *batchResponse) IsError() bool {
return br.err != nil
}
// startedBatch refers to a registered batch to group batch requests coming in.
// The timer manages the time after which a given batch is sent.
type startedBatch struct {
batchKey string
// Combined Batch Request
*BatchRequest
// subscribers is a registry of the requests (batchSubscriber) combined into this batcher.
subscribers []batchSubscriber
timer *time.Timer
}
// batchSubscriber contains information required for a single request for a startedBatch.
type batchSubscriber struct {
// singleRequest is the original request this subscriber represents
singleRequest *BatchRequest
// respCh is the channel created to communicate the result to a waiting goroutine.s
respCh chan batchResponse
}
// BatchingConfig contains user configuration for controlling batch requests.
type BatchingConfig struct {
SendAfter time.Duration
EnableBatching bool
}
// Initializes a new batcher.
func NewRequestBatcher(debugId string, ctx context.Context, config *BatchingConfig) *RequestBatcher {
batcher := &RequestBatcher{
debugId: debugId,
parentCtx: ctx,
BatchingConfig: config,
batches: make(map[string]*startedBatch),
}
// Start goroutine to managing stopping the batcher if the provider-level parent context is closed.
go func(b *RequestBatcher) {
// Block until parent context is closed
<-b.parentCtx.Done()
log.Printf("[DEBUG] parent context canceled, cleaning up batcher batches")
b.stop()
}(batcher)
return batcher
}
func (b *RequestBatcher) stop() {
b.Lock()
defer b.Unlock()
log.Printf("[DEBUG] Stopping batcher %q", b.debugId)
for batchKey, batch := range b.batches {
log.Printf("[DEBUG] Cancelling started batch for batchKey %q", batchKey)
batch.timer.Stop()
for _, l := range batch.subscribers {
close(l.respCh)
}
}
}
// SendRequestWithTimeout is a blocking call for making a single request, run alone or as part of a batch.
// It manages registering the single request with the batcher and waiting on the result.
//
// Params:
// batchKey: A string to group batchable requests. It should be unique to the API request being sent, similar to
// the HTTP request URL with GCP resource ID included in the URL (the caller
// may choose to use a key with method if needed to diff GET/read and
// POST/create)
//
// As an example, for google_project_service, the
// batcher is called to batch services.batchEnable() calls for a project
// $PROJECT. The calling code uses the template
// "serviceusage:projects/$PROJECT/services:batchEnable", which mirrors the HTTP request:
// POST https://serviceusage.googleapis.com/v1/projects/$PROJECT/services:batchEnable
func (b *RequestBatcher) SendRequestWithTimeout(batchKey string, request *BatchRequest, timeout time.Duration) (interface{}, error) {
if request == nil {
return nil, fmt.Errorf("error, cannot request batching for nil BatchRequest")
}
if request.CombineF == nil {
return nil, fmt.Errorf("error, cannot request batching for BatchRequest with nil CombineF")
}
if request.SendF == nil {
return nil, fmt.Errorf("error, cannot request batching for BatchRequest with nil SendF")
}
if !b.EnableBatching {
log.Printf("[DEBUG] Batching is disabled, sending single request for %q", request.DebugId)
return request.SendF(request.ResourceName, request.Body)
}
respCh, err := b.registerBatchRequest(batchKey, request)
if err != nil {
return nil, fmt.Errorf("error adding request to batch: %s", err)
}
ctx, cancel := context.WithTimeout(b.parentCtx, timeout)
defer cancel()
select {
case resp := <-respCh:
if resp.err != nil {
return nil, errwrap.Wrapf(
fmt.Sprintf("Request `%s` returned error: {{err}}", request.DebugId),
resp.err)
}
return resp.body, nil
case <-ctx.Done():
break
}
if b.parentCtx.Err() != nil {
switch b.parentCtx.Err() {
case context.Canceled:
return nil, fmt.Errorf("Parent context of request %s canceled", batchKey)
case context.DeadlineExceeded:
return nil, fmt.Errorf("Parent context of request %s timed out", batchKey)
default:
return nil, fmt.Errorf("Parent context of request %s encountered an error: %v", batchKey, ctx.Err())
}
}
switch ctx.Err() {
case context.Canceled:
return nil, fmt.Errorf("Request %s canceled", batchKey)
case context.DeadlineExceeded:
return nil, fmt.Errorf("Request %s timed out after %v", batchKey, timeout)
default:
return nil, fmt.Errorf("Error making request %s: %v", batchKey, ctx.Err())
}
}
// registerBatchRequest safely sees if an existing batch has been started
// with the given batchKey. If a batch exists, this will combine the new
// request into this existing batch. Else, this method manages starting a new
// batch and adding it to the RequestBatcher's started batches.
func (b *RequestBatcher) registerBatchRequest(batchKey string, newRequest *BatchRequest) (<-chan batchResponse, error) {
b.Lock()
defer b.Unlock()
// If batch already exists, combine this request into existing request.
if batch, ok := b.batches[batchKey]; ok {
return batch.addRequest(newRequest)
}
// Batch doesn't exist for given batch key - create a new batch.
log.Printf("[DEBUG] Creating new batch %q from request %q", newRequest.DebugId, batchKey)
// The calling goroutine will need a channel to wait on for a response.
respCh := make(chan batchResponse, 1)
sub := batchSubscriber{
singleRequest: newRequest,
respCh: respCh,
}
// Create a new batch with copy of the given batch request.
b.batches[batchKey] = &startedBatch{
BatchRequest: &BatchRequest{
ResourceName: newRequest.ResourceName,
Body: newRequest.Body,
CombineF: newRequest.CombineF,
SendF: newRequest.SendF,
DebugId: fmt.Sprintf("Combined batch for started batch %q", batchKey),
},
batchKey: batchKey,
subscribers: []batchSubscriber{sub},
}
// Start a timer to send the request
b.batches[batchKey].timer = time.AfterFunc(b.SendAfter, func() {
batch := b.popBatch(batchKey)
if batch == nil {
log.Printf("[ERROR] batch should have been added to saved batches - just run as single request %q", newRequest.DebugId)
respCh <- newRequest.send()
close(respCh)
} else {
b.sendBatchWithSingleRetry(batchKey, batch)
}
})
return respCh, nil
}
func (b *RequestBatcher) sendBatchWithSingleRetry(batchKey string, batch *startedBatch) {
log.Printf("[DEBUG] Sending batch %q combining %d requests)", batchKey, len(batch.subscribers))
resp := batch.send()
// If the batch failed and combines more than one request, retry each single request.
if resp.IsError() && len(batch.subscribers) > 1 {
log.Printf("[DEBUG] Batch failed with error: %v", resp.err)
log.Printf("[DEBUG] Sending each request in batch separately")
for _, sub := range batch.subscribers {
log.Printf("[DEBUG] Retrying single request %q", sub.singleRequest.DebugId)
singleResp := sub.singleRequest.send()
log.Printf("[DEBUG] Retried single request %q returned response: %v", sub.singleRequest.DebugId, singleResp)
if singleResp.IsError() {
singleResp.err = errwrap.Wrapf(
fmt.Sprintf("Batch request and retried single request %q both failed. Final error: {{err}}", sub.singleRequest.DebugId),
singleResp.err)
}
sub.respCh <- singleResp
close(sub.respCh)
}
} else {
// Send result to all subscribers
for _, sub := range batch.subscribers {
sub.respCh <- resp
close(sub.respCh)
}
}
}
// popBatch safely gets and removes a batch with given batchkey from the
// RequestBatcher's started batches.
func (b *RequestBatcher) popBatch(batchKey string) *startedBatch {
b.Lock()
defer b.Unlock()
batch, ok := b.batches[batchKey]
if !ok {
log.Printf("[DEBUG] Batch with ID %q not found in batcher", batchKey)
return nil
}
delete(b.batches, batchKey)
return batch
}
func (batch *startedBatch) addRequest(newRequest *BatchRequest) (<-chan batchResponse, error) {
log.Printf("[DEBUG] Adding batch request %q to existing batch %q", newRequest.DebugId, batch.batchKey)
if batch.CombineF == nil {
return nil, fmt.Errorf("Provider Error: unable to add request %q to batch %q with no CombineF", newRequest.DebugId, batch.batchKey)
}
newBody, err := batch.CombineF(batch.Body, newRequest.Body)
if err != nil {
return nil, fmt.Errorf("Provider Error: Unable to combine request %q data into existing batch %q: %v", newRequest.DebugId, batch.batchKey, err)
}
batch.Body = newBody
log.Printf("[DEBUG] Added batch request %q to batch. New batch body: %v", newRequest.DebugId, batch.Body)
respCh := make(chan batchResponse, 1)
sub := batchSubscriber{
singleRequest: newRequest,
respCh: respCh,
}
batch.subscribers = append(batch.subscribers, sub)
return respCh, nil
}
func (req *BatchRequest) send() batchResponse {
if req.SendF == nil {
return batchResponse{
err: fmt.Errorf("provider error: Batch request has no SendBatch function"),
}
}
v, err := req.SendF(req.ResourceName, req.Body)
return batchResponse{v, err}
}