| // Copyright © 2019, Oracle and/or its affiliates. |
| package oci |
| |
| import ( |
| "bytes" |
| "errors" |
| "fmt" |
| "io/ioutil" |
| "net/http" |
| "sort" |
| "strconv" |
| "strings" |
| "time" |
| |
| "github.com/armon/go-metrics" |
| log "github.com/hashicorp/go-hclog" |
| "github.com/hashicorp/go-secure-stdlib/strutil" |
| "github.com/hashicorp/go-uuid" |
| "github.com/hashicorp/vault/sdk/physical" |
| "github.com/oracle/oci-go-sdk/common" |
| "github.com/oracle/oci-go-sdk/common/auth" |
| "github.com/oracle/oci-go-sdk/objectstorage" |
| "golang.org/x/net/context" |
| ) |
| |
| // Verify Backend satisfies the correct interfaces |
| var _ physical.Backend = (*Backend)(nil) |
| |
| const ( |
| // Limits maximum outstanding requests |
| MaxNumberOfPermits = 256 |
| ) |
| |
| var ( |
| metricDelete = []string{"oci", "delete"} |
| metricGet = []string{"oci", "get"} |
| metricList = []string{"oci", "list"} |
| metricPut = []string{"oci", "put"} |
| metricDeleteFull = []string{"oci", "deleteFull"} |
| metricGetFull = []string{"oci", "getFull"} |
| metricListFull = []string{"oci", "listFull"} |
| metricPutFull = []string{"oci", "putFull"} |
| |
| metricDeleteHa = []string{"oci", "deleteHa"} |
| metricGetHa = []string{"oci", "getHa"} |
| metricPutHa = []string{"oci", "putHa"} |
| |
| metricDeleteAcquirePool = []string{"oci", "deleteAcquirePool"} |
| metricGetAcquirePool = []string{"oci", "getAcquirePool"} |
| metricListAcquirePool = []string{"oci", "listAcquirePool"} |
| metricPutAcquirePool = []string{"oci", "putAcquirePool"} |
| |
| metricDeleteFailed = []string{"oci", "deleteFailed"} |
| metricGetFailed = []string{"oci", "getFailed"} |
| metricListFailed = []string{"oci", "listFailed"} |
| metricPutFailed = []string{"oci", "putFailed"} |
| metricHaWatchLockRetriable = []string{"oci", "haWatchLockRetriable"} |
| metricPermitsUsed = []string{"oci", "permitsUsed"} |
| |
| metric5xx = []string{"oci", "5xx"} |
| ) |
| |
| type Backend struct { |
| client *objectstorage.ObjectStorageClient |
| bucketName string |
| logger log.Logger |
| permitPool *physical.PermitPool |
| namespaceName string |
| haEnabled bool |
| lockBucketName string |
| } |
| |
| func NewBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { |
| bucketName := conf["bucket_name"] |
| if bucketName == "" { |
| return nil, errors.New("missing bucket name") |
| } |
| |
| namespaceName := conf["namespace_name"] |
| if bucketName == "" { |
| return nil, errors.New("missing namespace name") |
| } |
| |
| lockBucketName := "" |
| haEnabled := false |
| var err error |
| haEnabledStr := conf["ha_enabled"] |
| if haEnabledStr != "" { |
| haEnabled, err = strconv.ParseBool(haEnabledStr) |
| if err != nil { |
| return nil, fmt.Errorf("failed to parse HA enabled: %w", err) |
| } |
| |
| if haEnabled { |
| lockBucketName = conf["lock_bucket_name"] |
| if lockBucketName == "" { |
| return nil, errors.New("missing lock bucket name") |
| } |
| } |
| } |
| |
| authTypeAPIKeyBool := false |
| authTypeAPIKeyStr := conf["auth_type_api_key"] |
| if authTypeAPIKeyStr != "" { |
| authTypeAPIKeyBool, err = strconv.ParseBool(authTypeAPIKeyStr) |
| if err != nil { |
| return nil, fmt.Errorf("failed parsing auth_type_api_key parameter: %w", err) |
| } |
| } |
| |
| var cp common.ConfigurationProvider |
| if authTypeAPIKeyBool { |
| cp = common.DefaultConfigProvider() |
| } else { |
| cp, err = auth.InstancePrincipalConfigurationProvider() |
| if err != nil { |
| return nil, fmt.Errorf("failed creating InstancePrincipalConfigurationProvider: %w", err) |
| } |
| } |
| |
| objectStorageClient, err := objectstorage.NewObjectStorageClientWithConfigurationProvider(cp) |
| if err != nil { |
| return nil, fmt.Errorf("failed creating NewObjectStorageClientWithConfigurationProvider: %w", err) |
| } |
| |
| region := conf["region"] |
| if region != "" { |
| objectStorageClient.SetRegion(region) |
| } |
| |
| logger.Debug("configuration", |
| "bucket_name", bucketName, |
| "region", region, |
| "namespace_name", namespaceName, |
| "ha_enabled", haEnabled, |
| "lock_bucket_name", lockBucketName, |
| "auth_type_api_key", authTypeAPIKeyBool, |
| ) |
| |
| return &Backend{ |
| client: &objectStorageClient, |
| bucketName: bucketName, |
| logger: logger, |
| permitPool: physical.NewPermitPool(MaxNumberOfPermits), |
| namespaceName: namespaceName, |
| haEnabled: haEnabled, |
| lockBucketName: lockBucketName, |
| }, nil |
| } |
| |
| func (o *Backend) Put(ctx context.Context, entry *physical.Entry) error { |
| o.logger.Debug("PUT started") |
| defer metrics.MeasureSince(metricPutFull, time.Now()) |
| startAcquirePool := time.Now() |
| metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits())) |
| o.permitPool.Acquire() |
| defer o.permitPool.Release() |
| metrics.MeasureSince(metricPutAcquirePool, startAcquirePool) |
| |
| defer metrics.MeasureSince(metricPut, time.Now()) |
| size := int64(len(entry.Value)) |
| opcClientRequestId, err := uuid.GenerateUUID() |
| if err != nil { |
| metrics.IncrCounter(metricPutFailed, 1) |
| o.logger.Error("failed to generate UUID") |
| return fmt.Errorf("failed to generate UUID: %w", err) |
| } |
| |
| o.logger.Debug("PUT", "opc-client-request-id", opcClientRequestId) |
| request := objectstorage.PutObjectRequest{ |
| NamespaceName: &o.namespaceName, |
| BucketName: &o.bucketName, |
| ObjectName: &entry.Key, |
| ContentLength: &size, |
| PutObjectBody: ioutil.NopCloser(bytes.NewReader(entry.Value)), |
| OpcMeta: nil, |
| OpcClientRequestId: &opcClientRequestId, |
| } |
| |
| resp, err := o.client.PutObject(ctx, request) |
| if resp.RawResponse != nil && resp.RawResponse.Body != nil { |
| defer resp.RawResponse.Body.Close() |
| } |
| |
| if err != nil { |
| metrics.IncrCounter(metricPutFailed, 1) |
| return fmt.Errorf("failed to put data: %w", err) |
| } |
| |
| o.logRequest("PUT", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err) |
| o.logger.Debug("PUT completed") |
| |
| return nil |
| } |
| |
| func (o *Backend) Get(ctx context.Context, key string) (*physical.Entry, error) { |
| o.logger.Debug("GET started") |
| defer metrics.MeasureSince(metricGetFull, time.Now()) |
| metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits())) |
| startAcquirePool := time.Now() |
| o.permitPool.Acquire() |
| defer o.permitPool.Release() |
| metrics.MeasureSince(metricGetAcquirePool, startAcquirePool) |
| |
| defer metrics.MeasureSince(metricGet, time.Now()) |
| opcClientRequestId, err := uuid.GenerateUUID() |
| if err != nil { |
| o.logger.Error("failed to generate UUID") |
| return nil, fmt.Errorf("failed to generate UUID: %w", err) |
| } |
| o.logger.Debug("GET", "opc-client-request-id", opcClientRequestId) |
| request := objectstorage.GetObjectRequest{ |
| NamespaceName: &o.namespaceName, |
| BucketName: &o.bucketName, |
| ObjectName: &key, |
| OpcClientRequestId: &opcClientRequestId, |
| } |
| |
| resp, err := o.client.GetObject(ctx, request) |
| if resp.RawResponse != nil && resp.RawResponse.Body != nil { |
| defer resp.RawResponse.Body.Close() |
| } |
| o.logRequest("GET", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err) |
| |
| if err != nil { |
| if resp.RawResponse != nil && resp.RawResponse.StatusCode == http.StatusNotFound { |
| return nil, nil |
| } |
| metrics.IncrCounter(metricGetFailed, 1) |
| return nil, fmt.Errorf("failed to read Value: %w", err) |
| } |
| |
| body, err := ioutil.ReadAll(resp.Content) |
| if err != nil { |
| metrics.IncrCounter(metricGetFailed, 1) |
| return nil, fmt.Errorf("failed to decode Value into bytes: %w", err) |
| } |
| |
| o.logger.Debug("GET completed") |
| |
| return &physical.Entry{ |
| Key: key, |
| Value: body, |
| }, nil |
| } |
| |
| func (o *Backend) Delete(ctx context.Context, key string) error { |
| o.logger.Debug("DELETE started") |
| defer metrics.MeasureSince(metricDeleteFull, time.Now()) |
| metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits())) |
| startAcquirePool := time.Now() |
| o.permitPool.Acquire() |
| defer o.permitPool.Release() |
| metrics.MeasureSince(metricDeleteAcquirePool, startAcquirePool) |
| |
| defer metrics.MeasureSince(metricDelete, time.Now()) |
| opcClientRequestId, err := uuid.GenerateUUID() |
| if err != nil { |
| o.logger.Error("Delete: error generating UUID") |
| return fmt.Errorf("failed to generate UUID: %w", err) |
| } |
| o.logger.Debug("Delete", "opc-client-request-id", opcClientRequestId) |
| request := objectstorage.DeleteObjectRequest{ |
| NamespaceName: &o.namespaceName, |
| BucketName: &o.bucketName, |
| ObjectName: &key, |
| OpcClientRequestId: &opcClientRequestId, |
| } |
| |
| resp, err := o.client.DeleteObject(ctx, request) |
| if resp.RawResponse != nil && resp.RawResponse.Body != nil { |
| defer resp.RawResponse.Body.Close() |
| } |
| |
| o.logRequest("DELETE", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err) |
| |
| if err != nil { |
| if resp.RawResponse != nil && resp.RawResponse.StatusCode == http.StatusNotFound { |
| return nil |
| } |
| metrics.IncrCounter(metricDeleteFailed, 1) |
| return fmt.Errorf("failed to delete Key: %w", err) |
| } |
| o.logger.Debug("DELETE completed") |
| |
| return nil |
| } |
| |
| func (o *Backend) List(ctx context.Context, prefix string) ([]string, error) { |
| o.logger.Debug("LIST started") |
| defer metrics.MeasureSince(metricListFull, time.Now()) |
| metrics.SetGauge(metricPermitsUsed, float32(o.permitPool.CurrentPermits())) |
| startAcquirePool := time.Now() |
| o.permitPool.Acquire() |
| defer o.permitPool.Release() |
| |
| metrics.MeasureSince(metricListAcquirePool, startAcquirePool) |
| defer metrics.MeasureSince(metricList, time.Now()) |
| var keys []string |
| delimiter := "/" |
| var start *string |
| |
| for { |
| opcClientRequestId, err := uuid.GenerateUUID() |
| if err != nil { |
| o.logger.Error("List: error generating UUID") |
| return nil, fmt.Errorf("failed to generate UUID %w", err) |
| } |
| o.logger.Debug("LIST", "opc-client-request-id", opcClientRequestId) |
| request := objectstorage.ListObjectsRequest{ |
| NamespaceName: &o.namespaceName, |
| BucketName: &o.bucketName, |
| Prefix: &prefix, |
| Delimiter: &delimiter, |
| Start: start, |
| OpcClientRequestId: &opcClientRequestId, |
| } |
| |
| resp, err := o.client.ListObjects(ctx, request) |
| o.logRequest("LIST", resp.RawResponse, resp.OpcClientRequestId, resp.OpcRequestId, err) |
| |
| if err != nil { |
| metrics.IncrCounter(metricListFailed, 1) |
| return nil, fmt.Errorf("failed to list using prefix: %w", err) |
| } |
| |
| for _, commonPrefix := range resp.Prefixes { |
| commonPrefix := strings.TrimPrefix(commonPrefix, prefix) |
| keys = append(keys, commonPrefix) |
| } |
| |
| for _, object := range resp.Objects { |
| key := strings.TrimPrefix(*object.Name, prefix) |
| keys = append(keys, key) |
| } |
| |
| // Duplicate keys are not expected |
| keys = strutil.RemoveDuplicates(keys, false) |
| |
| if resp.NextStartWith == nil { |
| resp.RawResponse.Body.Close() |
| break |
| } |
| |
| start = resp.NextStartWith |
| resp.RawResponse.Body.Close() |
| } |
| |
| sort.Strings(keys) |
| o.logger.Debug("LIST completed") |
| return keys, nil |
| } |
| |
| func (o *Backend) logRequest(operation string, response *http.Response, clientOpcRequestIdPtr *string, opcRequestIdPtr *string, err error) { |
| statusCode := 0 |
| clientOpcRequestId := " " |
| opcRequestId := " " |
| |
| if response != nil { |
| statusCode = response.StatusCode |
| if statusCode/100 == 5 { |
| metrics.IncrCounter(metric5xx, 1) |
| } |
| } |
| |
| if clientOpcRequestIdPtr != nil { |
| clientOpcRequestId = *clientOpcRequestIdPtr |
| } |
| |
| if opcRequestIdPtr != nil { |
| opcRequestId = *opcRequestIdPtr |
| } |
| |
| statusCodeStr := "No response" |
| if statusCode != 0 { |
| statusCodeStr = strconv.Itoa(statusCode) |
| } |
| |
| logLine := fmt.Sprintf("%s client:opc-request-id %s opc-request-id: %s status-code: %s", |
| operation, clientOpcRequestId, opcRequestId, statusCodeStr) |
| if err != nil && statusCode/100 == 5 { |
| o.logger.Error(logLine, "error", err) |
| } |
| } |