| // Copyright (c) HashiCorp, Inc. |
| // SPDX-License-Identifier: MPL-2.0 |
| package dataflow |
| |
| import ( |
| "context" |
| "fmt" |
| "log" |
| "strings" |
| "time" |
| |
| "github.com/hashicorp/terraform-provider-google-beta/google-beta/tpgresource" |
| transport_tpg "github.com/hashicorp/terraform-provider-google-beta/google-beta/transport" |
| |
| "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" |
| "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" |
| "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" |
| |
| "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" |
| dataflow "google.golang.org/api/dataflow/v1b3" |
| "google.golang.org/api/googleapi" |
| ) |
| |
| const resourceDataflowJobGoogleLabelPrefix = "goog-dataflow-provided" |
| const resourceDataflowJobGoogleProvidedLabelPrefix = "labels." + resourceDataflowJobGoogleLabelPrefix |
| |
| var DataflowTerminatingStatesMap = map[string]struct{}{ |
| "JOB_STATE_CANCELLING": {}, |
| "JOB_STATE_DRAINING": {}, |
| } |
| |
| var DataflowTerminalStatesMap = map[string]struct{}{ |
| "JOB_STATE_DONE": {}, |
| "JOB_STATE_FAILED": {}, |
| "JOB_STATE_CANCELLED": {}, |
| "JOB_STATE_UPDATED": {}, |
| "JOB_STATE_DRAINED": {}, |
| } |
| |
| func resourceDataflowJobLabelDiffSuppress(k, old, new string, d *schema.ResourceData) bool { |
| // Example Diff: "labels.goog-dataflow-provided-template-version": "word_count" => "" |
| if strings.HasPrefix(k, resourceDataflowJobGoogleProvidedLabelPrefix) && new == "" { |
| // Suppress diff if field is a Google Dataflow-provided label key and has no explicitly set value in Config. |
| return true |
| } |
| |
| // Let diff be determined by labels (above) |
| if strings.HasPrefix(k, "labels.%") { |
| return true |
| } |
| |
| // For other keys, don't suppress diff. |
| return false |
| } |
| |
| func ResourceDataflowJob() *schema.Resource { |
| return &schema.Resource{ |
| Create: resourceDataflowJobCreate, |
| Read: resourceDataflowJobRead, |
| Update: resourceDataflowJobUpdateByReplacement, |
| Delete: resourceDataflowJobDelete, |
| Timeouts: &schema.ResourceTimeout{ |
| Update: schema.DefaultTimeout(10 * time.Minute), |
| }, |
| CustomizeDiff: customdiff.All( |
| tpgresource.SetLabelsDiff, |
| resourceDataflowJobTypeCustomizeDiff, |
| ), |
| Importer: &schema.ResourceImporter{ |
| State: schema.ImportStatePassthrough, |
| }, |
| SchemaVersion: 1, |
| StateUpgraders: []schema.StateUpgrader{ |
| { |
| Type: resourceDataflowJobResourceV0().CoreConfigSchema().ImpliedType(), |
| Upgrade: ResourceDataflowJobStateUpgradeV0, |
| Version: 0, |
| }, |
| }, |
| Schema: map[string]*schema.Schema{ |
| "name": { |
| Type: schema.TypeString, |
| Required: true, |
| // ForceNew applies to both stream and batch jobs |
| ForceNew: true, |
| Description: `A unique name for the resource, required by Dataflow.`, |
| }, |
| |
| "template_gcs_path": { |
| Type: schema.TypeString, |
| Required: true, |
| Description: `The Google Cloud Storage path to the Dataflow job template.`, |
| }, |
| |
| "temp_gcs_location": { |
| Type: schema.TypeString, |
| Required: true, |
| Description: `A writeable location on Google Cloud Storage for the Dataflow job to dump its temporary data.`, |
| }, |
| |
| "zone": { |
| Type: schema.TypeString, |
| Optional: true, |
| // ForceNew applies to both stream and batch jobs |
| ForceNew: true, |
| Description: `The zone in which the created job should run. If it is not provided, the provider zone is used.`, |
| }, |
| |
| "region": { |
| Type: schema.TypeString, |
| Optional: true, |
| // ForceNew applies to both stream and batch jobs |
| ForceNew: true, |
| Description: `The region in which the created job should run.`, |
| }, |
| |
| "max_workers": { |
| Type: schema.TypeInt, |
| Optional: true, |
| // ForceNew applies to both stream and batch jobs |
| ForceNew: true, |
| Description: `The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.`, |
| }, |
| |
| "parameters": { |
| Type: schema.TypeMap, |
| Optional: true, |
| Description: `Key/Value pairs to be passed to the Dataflow job (as used in the template).`, |
| }, |
| |
| "labels": { |
| Type: schema.TypeMap, |
| Optional: true, |
| Description: `User labels to be specified for the job. Keys and values should follow the restrictions specified in the labeling restrictions page. NOTE: This field is non-authoritative, and will only manage the labels present in your configuration. |
| Please refer to the field 'effective_labels' for all of the labels present on the resource.`, |
| }, |
| |
| "terraform_labels": { |
| Type: schema.TypeMap, |
| Computed: true, |
| Description: `The combination of labels configured directly on the resource and default labels configured on the provider.`, |
| Elem: &schema.Schema{Type: schema.TypeString}, |
| }, |
| |
| "effective_labels": { |
| Type: schema.TypeMap, |
| Computed: true, |
| Description: `All of labels (key/value pairs) present on the resource in GCP, including the labels configured through Terraform, other clients and services.`, |
| Elem: &schema.Schema{Type: schema.TypeString}, |
| }, |
| |
| "transform_name_mapping": { |
| Type: schema.TypeMap, |
| Optional: true, |
| Description: `Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job.`, |
| }, |
| |
| "on_delete": { |
| Type: schema.TypeString, |
| ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false), |
| Optional: true, |
| Default: "drain", |
| Description: `One of "drain" or "cancel". Specifies behavior of deletion during terraform destroy.`, |
| }, |
| |
| "project": { |
| Type: schema.TypeString, |
| Optional: true, |
| Computed: true, |
| // ForceNew applies to both stream and batch jobs |
| ForceNew: true, |
| Description: `The project in which the resource belongs.`, |
| }, |
| |
| "state": { |
| Type: schema.TypeString, |
| Computed: true, |
| Description: `The current state of the resource, selected from the JobState enum.`, |
| }, |
| "type": { |
| Type: schema.TypeString, |
| Computed: true, |
| Description: `The type of this job, selected from the JobType enum.`, |
| }, |
| "service_account_email": { |
| Type: schema.TypeString, |
| Optional: true, |
| Description: `The Service Account email used to create the job.`, |
| }, |
| |
| "network": { |
| Type: schema.TypeString, |
| Optional: true, |
| DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName, |
| Description: `The network to which VMs will be assigned. If it is not provided, "default" will be used.`, |
| }, |
| |
| "subnetwork": { |
| Type: schema.TypeString, |
| Optional: true, |
| DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName, |
| Description: `The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK".`, |
| }, |
| |
| "machine_type": { |
| Type: schema.TypeString, |
| Optional: true, |
| Description: `The machine type to use for the job.`, |
| }, |
| |
| "kms_key_name": { |
| Type: schema.TypeString, |
| Optional: true, |
| Description: `The name for the Cloud KMS key for the job. Key format is: projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY`, |
| }, |
| |
| "ip_configuration": { |
| Type: schema.TypeString, |
| Optional: true, |
| Description: `The configuration for VM IPs. Options are "WORKER_IP_PUBLIC" or "WORKER_IP_PRIVATE".`, |
| ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false), |
| }, |
| |
| "additional_experiments": { |
| Type: schema.TypeSet, |
| Optional: true, |
| Computed: true, |
| Description: `List of experiments that should be used by the job. An example value is ["enable_stackdriver_agent_metrics"].`, |
| Elem: &schema.Schema{ |
| Type: schema.TypeString, |
| }, |
| }, |
| |
| "job_id": { |
| Type: schema.TypeString, |
| Computed: true, |
| Description: `The unique ID of this job.`, |
| }, |
| |
| "enable_streaming_engine": { |
| Type: schema.TypeBool, |
| Optional: true, |
| Description: `Indicates if the job should use the streaming engine feature.`, |
| }, |
| |
| "skip_wait_on_job_termination": { |
| Type: schema.TypeBool, |
| Optional: true, |
| Default: false, |
| Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not ensure that the job names are different, e.g. by embedding a release ID or by using a random_id.`, |
| }, |
| }, |
| UseJSONNumber: true, |
| } |
| } |
| |
| func resourceDataflowJobTypeCustomizeDiff(_ context.Context, d *schema.ResourceDiff, meta interface{}) error { |
| // All non-virtual fields are ForceNew for batch jobs |
| if d.Get("type") == "JOB_TYPE_BATCH" { |
| resourceSchema := ResourceDataflowJob().Schema |
| for field := range resourceSchema { |
| if field == "on_delete" { |
| continue |
| } |
| |
| if field != "terraform_labels" && d.HasChange(field) { |
| if err := d.ForceNew(field); err != nil { |
| return err |
| } |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| // return true if a job is in a terminal state, OR if a job is in a |
| // terminating state and skipWait is true |
| func shouldStopDataflowJobDeleteQuery(state string, skipWait bool) bool { |
| _, stopQuery := DataflowTerminalStatesMap[state] |
| if !stopQuery && skipWait { |
| _, stopQuery = DataflowTerminatingStatesMap[state] |
| } |
| return stopQuery |
| } |
| |
| func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error { |
| config := meta.(*transport_tpg.Config) |
| userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) |
| if err != nil { |
| return err |
| } |
| |
| project, err := tpgresource.GetProject(d, config) |
| if err != nil { |
| return err |
| } |
| |
| region, err := tpgresource.GetRegion(d, config) |
| if err != nil { |
| return err |
| } |
| |
| params := tpgresource.ExpandStringMap(d, "parameters") |
| |
| env, err := resourceDataflowJobSetupEnv(d, config) |
| if err != nil { |
| return err |
| } |
| |
| request := dataflow.CreateJobFromTemplateRequest{ |
| JobName: d.Get("name").(string), |
| GcsPath: d.Get("template_gcs_path").(string), |
| Parameters: params, |
| Environment: &env, |
| } |
| |
| job, err := resourceDataflowJobCreateJob(config, project, region, userAgent, &request) |
| if err != nil { |
| return err |
| } |
| d.SetId(job.Id) |
| |
| return resourceDataflowJobRead(d, meta) |
| } |
| |
| func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error { |
| config := meta.(*transport_tpg.Config) |
| userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) |
| if err != nil { |
| return err |
| } |
| |
| project, err := tpgresource.GetProject(d, config) |
| if err != nil { |
| return err |
| } |
| |
| region, err := tpgresource.GetRegion(d, config) |
| if err != nil { |
| return err |
| } |
| |
| id := d.Id() |
| |
| job, err := resourceDataflowJobGetJob(config, project, region, userAgent, id) |
| if err != nil { |
| return transport_tpg.HandleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", id)) |
| } |
| |
| if err := d.Set("job_id", job.Id); err != nil { |
| return fmt.Errorf("Error setting job_id: %s", err) |
| } |
| if err := d.Set("state", job.CurrentState); err != nil { |
| return fmt.Errorf("Error setting state: %s", err) |
| } |
| if err := d.Set("name", job.Name); err != nil { |
| return fmt.Errorf("Error setting name: %s", err) |
| } |
| if err := d.Set("type", job.Type); err != nil { |
| return fmt.Errorf("Error setting type: %s", err) |
| } |
| if err := d.Set("project", project); err != nil { |
| return fmt.Errorf("Error setting project: %s", err) |
| } |
| if err := tpgresource.SetLabels(job.Labels, d, "labels"); err != nil { |
| return fmt.Errorf("Error setting labels: %s", err) |
| } |
| if err := tpgresource.SetLabels(job.Labels, d, "terraform_labels"); err != nil { |
| return fmt.Errorf("Error setting terraform_labels: %s", err) |
| } |
| if err := d.Set("effective_labels", job.Labels); err != nil { |
| return fmt.Errorf("Error setting effective_labels: %s", err) |
| } |
| if job.Environment == nil { |
| return fmt.Errorf("Error accessing Environment proto: proto is nil") |
| } |
| if err := d.Set("kms_key_name", job.Environment.ServiceKmsKeyName); err != nil { |
| return fmt.Errorf("Error setting kms_key_name: %s", err) |
| } |
| |
| // This map isn't provided on all responses. It's not clear why but we only want to set these fields |
| // if the API returns. Otherwise this execution will crash for the user. |
| // https://github.com/hashicorp/terraform-provider-google/issues/7449 |
| sdkPipelineOptions, err := tpgresource.ConvertToMap(job.Environment.SdkPipelineOptions) |
| if err == nil { |
| optionsMap := sdkPipelineOptions["options"].(map[string]interface{}) |
| if err := d.Set("template_gcs_path", optionsMap["templateLocation"]); err != nil { |
| return fmt.Errorf("Error setting template_gcs_path: %s", err) |
| } |
| if err := d.Set("temp_gcs_location", optionsMap["tempLocation"]); err != nil { |
| return fmt.Errorf("Error setting temp_gcs_location: %s", err) |
| } |
| if err := d.Set("machine_type", optionsMap["machineType"]); err != nil { |
| return fmt.Errorf("Error setting machine_type: %s", err) |
| } |
| if err := d.Set("network", optionsMap["network"]); err != nil { |
| return fmt.Errorf("Error setting network: %s", err) |
| } |
| if err := d.Set("service_account_email", optionsMap["serviceAccountEmail"]); err != nil { |
| return fmt.Errorf("Error setting service_account_email: %s", err) |
| } |
| } |
| |
| if ok := shouldStopDataflowJobDeleteQuery(job.CurrentState, d.Get("skip_wait_on_job_termination").(bool)); ok { |
| log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState) |
| d.SetId("") |
| return nil |
| } |
| d.SetId(job.Id) |
| |
| return nil |
| } |
| |
| // Stream update method. Batch job changes should have been set to ForceNew via custom diff |
| func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error { |
| // Don't send an update request if only virtual fields have changes |
| if resourceDataflowJobIsVirtualUpdate(d, ResourceDataflowJob().Schema) { |
| return nil |
| } |
| |
| if jobHasUpdate(d, ResourceDataflowJob().Schema) { |
| config := meta.(*transport_tpg.Config) |
| userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) |
| if err != nil { |
| return err |
| } |
| |
| project, err := tpgresource.GetProject(d, config) |
| if err != nil { |
| return err |
| } |
| |
| region, err := tpgresource.GetRegion(d, config) |
| if err != nil { |
| return err |
| } |
| |
| params := tpgresource.ExpandStringMap(d, "parameters") |
| tnamemapping := tpgresource.ExpandStringMap(d, "transform_name_mapping") |
| |
| env, err := resourceDataflowJobSetupEnv(d, config) |
| if err != nil { |
| return err |
| } |
| |
| request := dataflow.LaunchTemplateParameters{ |
| JobName: d.Get("name").(string), |
| Parameters: params, |
| TransformNameMapping: tnamemapping, |
| Environment: &env, |
| Update: true, |
| } |
| |
| var response *dataflow.LaunchTemplateResponse |
| err = transport_tpg.Retry(transport_tpg.RetryOptions{ |
| RetryFunc: func() (updateErr error) { |
| response, updateErr = resourceDataflowJobLaunchTemplate(config, project, region, userAgent, d.Get("template_gcs_path").(string), &request) |
| return updateErr |
| }, |
| Timeout: time.Minute * time.Duration(5), |
| ErrorRetryPredicates: []transport_tpg.RetryErrorPredicateFunc{transport_tpg.IsDataflowJobUpdateRetryableError}, |
| }) |
| if err != nil { |
| return err |
| } |
| |
| if err := waitForDataflowJobToBeUpdated(d, config, response.Job.Id, userAgent, d.Timeout(schema.TimeoutUpdate)); err != nil { |
| return fmt.Errorf("Error updating job with job ID %q: %v", d.Id(), err) |
| } |
| |
| d.SetId(response.Job.Id) |
| } |
| return resourceDataflowJobRead(d, meta) |
| } |
| |
| func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error { |
| config := meta.(*transport_tpg.Config) |
| userAgent, err := tpgresource.GenerateUserAgentString(d, config.UserAgent) |
| if err != nil { |
| return err |
| } |
| |
| project, err := tpgresource.GetProject(d, config) |
| if err != nil { |
| return err |
| } |
| |
| region, err := tpgresource.GetRegion(d, config) |
| if err != nil { |
| return err |
| } |
| |
| id := d.Id() |
| |
| requestedState, err := resourceDataflowJobMapRequestedState(d.Get("on_delete").(string)) |
| if err != nil { |
| return err |
| } |
| |
| // Retry updating the state while the job is not ready to be canceled/drained. |
| err = resource.Retry(time.Minute*time.Duration(15), func() *resource.RetryError { |
| // To terminate a dataflow job, we update the job with a requested |
| // terminal state. |
| job := &dataflow.Job{ |
| RequestedState: requestedState, |
| } |
| |
| _, updateErr := resourceDataflowJobUpdateJob(config, project, region, userAgent, id, job) |
| if updateErr != nil { |
| gerr, isGoogleErr := updateErr.(*googleapi.Error) |
| if !isGoogleErr { |
| // If we have an error and it's not a google-specific error, we should go ahead and return. |
| return resource.NonRetryableError(updateErr) |
| } |
| |
| if strings.Contains(gerr.Message, "not yet ready for canceling") { |
| // Retry cancelling job if it's not ready. |
| // Sleep to avoid hitting update quota with repeated attempts. |
| time.Sleep(5 * time.Second) |
| return resource.RetryableError(updateErr) |
| } |
| |
| if strings.Contains(gerr.Message, "Job has terminated") { |
| // Job has already been terminated, skip. |
| return nil |
| } |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| // Wait for state to reach terminal state (canceled/drained/done plus cancelling/draining if skipWait) |
| skipWait := d.Get("skip_wait_on_job_termination").(bool) |
| ok := shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait) |
| for !ok { |
| log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string)) |
| time.Sleep(5 * time.Second) |
| |
| err = resourceDataflowJobRead(d, meta) |
| if err != nil { |
| return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err) |
| } |
| ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait) |
| } |
| |
| // Only remove the job from state if it's actually successfully hit a final state. |
| if ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait); ok { |
| log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string)) |
| d.SetId("") |
| return nil |
| } |
| return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string)) |
| } |
| |
| func resourceDataflowJobMapRequestedState(policy string) (string, error) { |
| switch policy { |
| case "cancel": |
| return "JOB_STATE_CANCELLED", nil |
| case "drain": |
| return "JOB_STATE_DRAINING", nil |
| default: |
| return "", fmt.Errorf("Invalid `on_delete` policy: %s", policy) |
| } |
| } |
| |
| func resourceDataflowJobCreateJob(config *transport_tpg.Config, project, region, userAgent string, request *dataflow.CreateJobFromTemplateRequest) (*dataflow.Job, error) { |
| if region == "" { |
| return config.NewDataflowClient(userAgent).Projects.Templates.Create(project, request).Do() |
| } |
| return config.NewDataflowClient(userAgent).Projects.Locations.Templates.Create(project, region, request).Do() |
| } |
| |
| func resourceDataflowJobGetJob(config *transport_tpg.Config, project, region, userAgent string, id string) (*dataflow.Job, error) { |
| if region == "" { |
| return config.NewDataflowClient(userAgent).Projects.Jobs.Get(project, id).View("JOB_VIEW_ALL").Do() |
| } |
| return config.NewDataflowClient(userAgent).Projects.Locations.Jobs.Get(project, region, id).View("JOB_VIEW_ALL").Do() |
| } |
| |
| func resourceDataflowJobUpdateJob(config *transport_tpg.Config, project, region, userAgent string, id string, job *dataflow.Job) (*dataflow.Job, error) { |
| if region == "" { |
| return config.NewDataflowClient(userAgent).Projects.Jobs.Update(project, id, job).Do() |
| } |
| return config.NewDataflowClient(userAgent).Projects.Locations.Jobs.Update(project, region, id, job).Do() |
| } |
| |
| func resourceDataflowJobLaunchTemplate(config *transport_tpg.Config, project, region, userAgent string, gcsPath string, request *dataflow.LaunchTemplateParameters) (*dataflow.LaunchTemplateResponse, error) { |
| if region == "" { |
| return config.NewDataflowClient(userAgent).Projects.Templates.Launch(project, request).GcsPath(gcsPath).Do() |
| } |
| return config.NewDataflowClient(userAgent).Projects.Locations.Templates.Launch(project, region, request).GcsPath(gcsPath).Do() |
| } |
| |
| func resourceDataflowJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.RuntimeEnvironment, error) { |
| zone, _ := tpgresource.GetZone(d, config) |
| |
| labels := tpgresource.ExpandStringMap(d, "effective_labels") |
| |
| additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set)) |
| |
| env := dataflow.RuntimeEnvironment{ |
| MaxWorkers: int64(d.Get("max_workers").(int)), |
| Network: d.Get("network").(string), |
| ServiceAccountEmail: d.Get("service_account_email").(string), |
| Subnetwork: d.Get("subnetwork").(string), |
| TempLocation: d.Get("temp_gcs_location").(string), |
| MachineType: d.Get("machine_type").(string), |
| KmsKeyName: d.Get("kms_key_name").(string), |
| IpConfiguration: d.Get("ip_configuration").(string), |
| EnableStreamingEngine: d.Get("enable_streaming_engine").(bool), |
| AdditionalUserLabels: labels, |
| Zone: zone, |
| AdditionalExperiments: additionalExperiments, |
| } |
| return env, nil |
| } |
| |
| func resourceDataflowJobIterateMapForceNew(mapKey string, d *schema.ResourceDiff) error { |
| obj := d.Get(mapKey).(map[string]interface{}) |
| for k := range obj { |
| entrySchemaKey := mapKey + "." + k |
| if d.HasChange(entrySchemaKey) { |
| // ForceNew must be called on the parent map to trigger |
| if err := d.ForceNew(mapKey); err != nil { |
| return err |
| } |
| break |
| } |
| } |
| return nil |
| } |
| |
| func resourceDataflowJobIterateMapHasChange(mapKey string, d *schema.ResourceData) bool { |
| obj := d.Get(mapKey).(map[string]interface{}) |
| for k := range obj { |
| entrySchemaKey := mapKey + "." + k |
| if d.HasChange(entrySchemaKey) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData, resourceSchema map[string]*schema.Schema) bool { |
| // on_delete is the only virtual field |
| if d.HasChange("on_delete") { |
| for field := range resourceSchema { |
| if field == "on_delete" { |
| continue |
| } |
| |
| if d.HasChange(field) { |
| return false |
| } |
| } |
| // on_delete is changing, but nothing else |
| return true |
| } |
| |
| return false |
| } |
| |
| // If only fields on_delete, terraform_labels are changing, no update request is needed |
| func jobHasUpdate(d *schema.ResourceData, resourceSchema map[string]*schema.Schema) bool { |
| if d.HasChange("on_delete") || d.HasChange("labels") || d.HasChange("terraform_labels") { |
| for field := range resourceSchema { |
| if field == "on_delete" || field == "labels" || field == "terraform_labels" { |
| continue |
| } |
| |
| if d.HasChange(field) { |
| return true |
| } |
| } |
| // on_delete, or terraform_labels are changing, but nothing else |
| return false |
| } |
| |
| return true |
| } |
| |
| func waitForDataflowJobToBeUpdated(d *schema.ResourceData, config *transport_tpg.Config, replacementJobID, userAgent string, timeout time.Duration) error { |
| return resource.Retry(timeout, func() *resource.RetryError { |
| project, err := tpgresource.GetProject(d, config) |
| if err != nil { |
| return resource.NonRetryableError(err) |
| } |
| |
| region, err := tpgresource.GetRegion(d, config) |
| if err != nil { |
| return resource.NonRetryableError(err) |
| } |
| |
| replacementJob, err := resourceDataflowJobGetJob(config, project, region, userAgent, replacementJobID) |
| if err != nil { |
| if transport_tpg.IsRetryableError(err, nil, nil) { |
| return resource.RetryableError(err) |
| } |
| return resource.NonRetryableError(err) |
| } |
| |
| state := replacementJob.CurrentState |
| switch state { |
| case "", "JOB_STATE_PENDING": |
| return resource.RetryableError(fmt.Errorf("the replacement job with ID %q has pending state %q.", replacementJobID, state)) |
| case "JOB_STATE_FAILED": |
| return resource.NonRetryableError(fmt.Errorf("the replacement job with ID %q failed with state %q.", replacementJobID, state)) |
| default: |
| log.Printf("[DEBUG] the replacement job with ID %q has state %q.", replacementJobID, state) |
| return nil |
| } |
| }) |
| } |