blob: 3665813d04792dbe8a756190cfa71e144ad50d29 [file] [log] [blame]
// Copyright © 2019, Oracle and/or its affiliates.
package oci
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/sdk/physical"
"github.com/oracle/oci-go-sdk/common"
"github.com/oracle/oci-go-sdk/common/auth"
"github.com/oracle/oci-go-sdk/objectstorage"
"golang.org/x/net/context"
)
// Verify Backend satisfies the correct interfaces
var _ physical.Backend = (*Backend)(nil)
const (
// Limits maximum outstanding requests
MaxNumberOfPermits = 256
)
var (
metricDelete = []string{"oci", "delete"}
metricGet = []string{"oci", "get"}
metricList = []string{"oci", "list"}
metricPut = []string{"oci", "put"}
metricDeleteFull = []string{"oci", "deleteFull"}
metricGetFull = []string{"oci", "getFull"}
metricListFull = []string{"oci", "listFull"}
metricPutFull = []string{"oci", "putFull"}
metricDeleteHa = []string{"oci", "deleteHa"}
metricGetHa = []string{"oci", "getHa"}
metricPutHa = []string{"oci", "putHa"}
metricDeleteAcquirePool = []string{"oci", "deleteAcquirePool"}
metricGetAcquirePool = []string{"oci", "getAcquirePool"}
metricListAcquirePool = []string{"oci", "listAcquirePool"}
metricPutAcquirePool = []string{"oci", "putAcquirePool"}
metricDeleteFailed = []string{"oci", "deleteFailed"}
metricGetFailed = []string{"oci", "getFailed"}
metricListFailed = []string{"oci", "listFailed"}
metricPutFailed = []string{"oci", "putFailed"}
metricHaWatchLockRetriable = []string{"oci", "haWatchLockRetriable"}
metricPermitsUsed = []string{"oci", "permitsUsed"}
metric5xx = []string{"oci", "5xx"}
)
type Backend struct {
client *objectstorage.ObjectStorageClient
bucketName string
logger log.Logger
permitPool *physical.PermitPool
namespaceName string
haEnabled bool
lockBucketName string
}
func NewBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
bucketName := conf["bucket_name"]
if bucketName == "" {
return nil, errors.New("missing bucket name")
}
namespaceName := conf["namespace_name"]
if bucketName == "" {
return nil, errors.New("missing namespace name")
}
lockBucketName := ""
haEnabled := false
var err error
haEnabledStr := conf["ha_enabled"]
if haEnabledStr != "" {
haEnabled, err = strconv.ParseBool(haEnabledStr)
if err != nil {
return nil, fmt.Errorf("failed to parse HA enabled: %w", err)
}
if haEnabled {
lockBucketName = conf["lock_bucket_name"]
if lockBucketName == "" {
return nil, errors.New("missing lock bucket name")
}
}
}
authTypeAPIKeyBool := false
authTypeAPIKeyStr := conf["auth_type_api_key"]
if authTypeAPIKeyStr != "" {
authTypeAPIKeyBool, err = strconv.ParseBool(authTypeAPIKeyStr)
if err != nil {
return nil, fmt.Errorf("failed parsing auth_type_api_key parameter: %w", err)
}
}
var cp common.ConfigurationProvider
if authTypeAPIKeyBool {
cp = common.DefaultConfigProvider()
} else {
cp, err = auth.InstancePrincipalConfigurationProvider()
if err != nil {
return nil, fmt.Errorf("failed creating InstancePrincipalConfigurationProvider: %w", err)
}
}
objectStorageClient, err := objectstorage.NewObjectStorageClientWithConfigurationProvider(cp)
if err != nil {
return nil, fmt.Errorf("failed creating NewObjectStorageClientWithConfigurationProvider: %w", err)
}
region := conf["region"]
if region != "" {
objectStorageClient.SetRegion(region)
}
logger.Debug("configuration",
"bucket_name", bucketName,
"region", region,
"namespace_name", namespaceName,
"ha_enabled", haEnabled,
"lock_bucket_name", lockBucketName,
"auth_type_api_key", authTypeAPIKeyBool,
)
return &Backend{
client: &objectStorageClient,
bucketName: bucketName,
logger: logger,
permitPool: physical.NewPermitPool(MaxNumberOfPermits),
namespaceName: namespaceName,
haEnabled: haEnabled,
lockBucketName: lockBucketName,
}, nil
}
func (o *Backend) Put(ctx context.Context, entry *physical.Entry) error {
o.logger.Debug("PUT started")
defer metrics.MeasureSince(metricPutFull, time.Now())
startAcquirePool := time.Now()
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
o.permitPool.Acquire()
defer o.permitPool.Release()
metrics.MeasureSince(metricPutAcquirePool, startAcquirePool)
defer metrics.MeasureSince(metricPut, time.Now())
size := int64(len(entry.Value))
opcClientRequestId, err := uuid.GenerateUUID()
if err != nil {
metrics.IncrCounter(metricPutFailed, 1)
o.logger.Error("failed to generate UUID")
return fmt.Errorf("failed to generate UUID: %w", err)
}
o.logger.Debug("PUT", "opc-client-request-id", opcClientRequestId)
request := objectstorage.PutObjectRequest{
NamespaceName: &o.namespaceName,
BucketName: &o.bucketName,
ObjectName: &entry.Key,
ContentLength: &size,
PutObjectBody: ioutil.NopCloser(bytes.NewReader(entry.Value)),
OpcMeta: nil,
OpcClientRequestId: &opcClientRequestId,
}
resp, err := o.client.PutObject(ctx, request)
if resp.RawResponse != nil && resp.RawResponse.Body != nil {
defer resp.RawResponse.Body.Close()
}
if err != nil {
metrics.IncrCounter(metricPutFailed, 1)
return fmt.Errorf("failed to put data: %w", err)
}
o.logRequest("PUT", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err)
o.logger.Debug("PUT completed")
return nil
}
func (o *Backend) Get(ctx context.Context, key string) (*physical.Entry, error) {
o.logger.Debug("GET started")
defer metrics.MeasureSince(metricGetFull, time.Now())
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
startAcquirePool := time.Now()
o.permitPool.Acquire()
defer o.permitPool.Release()
metrics.MeasureSince(metricGetAcquirePool, startAcquirePool)
defer metrics.MeasureSince(metricGet, time.Now())
opcClientRequestId, err := uuid.GenerateUUID()
if err != nil {
o.logger.Error("failed to generate UUID")
return nil, fmt.Errorf("failed to generate UUID: %w", err)
}
o.logger.Debug("GET", "opc-client-request-id", opcClientRequestId)
request := objectstorage.GetObjectRequest{
NamespaceName: &o.namespaceName,
BucketName: &o.bucketName,
ObjectName: &key,
OpcClientRequestId: &opcClientRequestId,
}
resp, err := o.client.GetObject(ctx, request)
if resp.RawResponse != nil && resp.RawResponse.Body != nil {
defer resp.RawResponse.Body.Close()
}
o.logRequest("GET", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err)
if err != nil {
if resp.RawResponse != nil && resp.RawResponse.StatusCode == http.StatusNotFound {
return nil, nil
}
metrics.IncrCounter(metricGetFailed, 1)
return nil, fmt.Errorf("failed to read Value: %w", err)
}
body, err := ioutil.ReadAll(resp.Content)
if err != nil {
metrics.IncrCounter(metricGetFailed, 1)
return nil, fmt.Errorf("failed to decode Value into bytes: %w", err)
}
o.logger.Debug("GET completed")
return &physical.Entry{
Key: key,
Value: body,
}, nil
}
func (o *Backend) Delete(ctx context.Context, key string) error {
o.logger.Debug("DELETE started")
defer metrics.MeasureSince(metricDeleteFull, time.Now())
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
startAcquirePool := time.Now()
o.permitPool.Acquire()
defer o.permitPool.Release()
metrics.MeasureSince(metricDeleteAcquirePool, startAcquirePool)
defer metrics.MeasureSince(metricDelete, time.Now())
opcClientRequestId, err := uuid.GenerateUUID()
if err != nil {
o.logger.Error("Delete: error generating UUID")
return fmt.Errorf("failed to generate UUID: %w", err)
}
o.logger.Debug("Delete", "opc-client-request-id", opcClientRequestId)
request := objectstorage.DeleteObjectRequest{
NamespaceName: &o.namespaceName,
BucketName: &o.bucketName,
ObjectName: &key,
OpcClientRequestId: &opcClientRequestId,
}
resp, err := o.client.DeleteObject(ctx, request)
if resp.RawResponse != nil && resp.RawResponse.Body != nil {
defer resp.RawResponse.Body.Close()
}
o.logRequest("DELETE", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err)
if err != nil {
if resp.RawResponse != nil && resp.RawResponse.StatusCode == http.StatusNotFound {
return nil
}
metrics.IncrCounter(metricDeleteFailed, 1)
return fmt.Errorf("failed to delete Key: %w", err)
}
o.logger.Debug("DELETE completed")
return nil
}
func (o *Backend) List(ctx context.Context, prefix string) ([]string, error) {
o.logger.Debug("LIST started")
defer metrics.MeasureSince(metricListFull, time.Now())
metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits()))
startAcquirePool := time.Now()
o.permitPool.Acquire()
defer o.permitPool.Release()
metrics.MeasureSince(metricListAcquirePool, startAcquirePool)
defer metrics.MeasureSince(metricList, time.Now())
var keys []string
delimiter := "/"
var start *string
for {
opcClientRequestId, err := uuid.GenerateUUID()
if err != nil {
o.logger.Error("List: error generating UUID")
return nil, fmt.Errorf("failed to generate UUID %w", err)
}
o.logger.Debug("LIST", "opc-client-request-id", opcClientRequestId)
request := objectstorage.ListObjectsRequest{
NamespaceName: &o.namespaceName,
BucketName: &o.bucketName,
Prefix: &prefix,
Delimiter: &delimiter,
Start: start,
OpcClientRequestId: &opcClientRequestId,
}
resp, err := o.client.ListObjects(ctx, request)
o.logRequest("LIST", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err)
if err != nil {
metrics.IncrCounter(metricListFailed, 1)
return nil, fmt.Errorf("failed to list using prefix: %w", err)
}
for _, commonPrefix := range resp.Prefixes {
commonPrefix := strings.TrimPrefix(commonPrefix, prefix)
keys = append(keys, commonPrefix)
}
for _, object := range resp.Objects {
key := strings.TrimPrefix(*object.Name, prefix)
keys = append(keys, key)
}
// Duplicate keys are not expected
keys = strutil.RemoveDuplicates(keys, false)
if resp.NextStartWith == nil {
resp.RawResponse.Body.Close()
break
}
start = resp.NextStartWith
resp.RawResponse.Body.Close()
}
sort.Strings(keys)
o.logger.Debug("LIST completed")
return keys, nil
}
func (o *Backend) logRequest(operation string, response *http.Response, clientOpcRequestIdPtr *string, opcRequestIdPtr *string, err error) {
statusCode := 0
clientOpcRequestId := " "
opcRequestId := " "
if response != nil {
statusCode = response.StatusCode
if statusCode/100 == 5 {
metrics.IncrCounter(metric5xx, 1)
}
}
if clientOpcRequestIdPtr != nil {
clientOpcRequestId = *clientOpcRequestIdPtr
}
if opcRequestIdPtr != nil {
opcRequestId = *opcRequestIdPtr
}
statusCodeStr := "No response"
if statusCode != 0 {
statusCodeStr = strconv.Itoa(statusCode)
}
logLine := fmt.Sprintf("%s client:opc-request-id %s opc-request-id: %s status-code: %s",
operation, clientOpcRequestId, opcRequestId, statusCodeStr)
if err != nil && statusCode/100 == 5 {
o.logger.Error(logLine, "error", err)
}
}