blob: 210d6b1c68f12d1d0260387a48289c2e2c606268 [file] [log] [blame]
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package dataproc_test
import (
"fmt"
"io/ioutil"
"log"
"strconv"
"strings"
"testing"
"time"
// "regexp"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/terraform"
"github.com/hashicorp/terraform-provider-google-beta/google-beta/acctest"
tpgdataproc "github.com/hashicorp/terraform-provider-google-beta/google-beta/services/dataproc"
"github.com/hashicorp/terraform-provider-google-beta/google-beta/tpgresource"
"google.golang.org/api/dataproc/v1"
"google.golang.org/api/googleapi"
)
type jobTestField struct {
tf_attr string
gcp_attr interface{}
}
// TODO (mbang): Test `ExactlyOneOf` here
// func TestAccDataprocJob_failForMissingJobConfig(t *testing.T) {
// t.Parallel()
// acctest.VcrTest(t, resource.TestCase{
// PreCheck: func() { acctest.AccTestPreCheck(t) },
// ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
// CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
// Steps: []resource.TestStep{
// {
// Config: testAccDataprocJob_missingJobConf(),
// ExpectError: regexp.MustCompile("You must define and configure exactly one xxx_config block"),
// },
// },
// })
// }
func TestAccDataprocJob_updatable(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
jobId := fmt.Sprintf("dproc-update-job-id-%s", rnd)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_updatable(rnd, subnetworkName, jobId, "false"),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.updatable", &job),
resource.TestCheckResourceAttr("google_dataproc_job.updatable", "force_delete", "false"),
),
},
{
Config: testAccDataprocJob_updatable(rnd, subnetworkName, jobId, "true"),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.updatable", &job),
resource.TestCheckResourceAttr("google_dataproc_job.updatable", "force_delete", "true"),
),
},
},
})
}
func TestAccDataprocJob_PySpark(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
jobId := fmt.Sprintf("dproc-custom-job-id-%s", rnd)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_pySpark(rnd, subnetworkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.pyspark", &job),
// Custom supplied job_id
resource.TestCheckResourceAttr("google_dataproc_job.pyspark", "reference.0.job_id", jobId),
// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.pyspark", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.pyspark", "status.0.state_start_time"),
resource.TestCheckResourceAttr("google_dataproc_job.pyspark", "scheduling.0.max_failures_per_hour", "1"),
resource.TestCheckResourceAttr("google_dataproc_job.pyspark", "scheduling.0.max_failures_total", "20"),
resource.TestCheckResourceAttr("google_dataproc_job.pyspark", "labels.one", "1"),
// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.pyspark", "pyspark_config", &job),
// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.pyspark", &job),
),
},
},
})
}
func TestAccDataprocJob_Spark(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_spark(rnd, subnetworkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.spark", &job),
// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.spark", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.spark", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.spark", "status.0.state_start_time"),
// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.spark", "spark_config", &job),
// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.spark", &job),
),
},
},
})
}
func TestAccDataprocJob_Hadoop(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_hadoop(rnd, subnetworkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.hadoop", &job),
// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.hadoop", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.hadoop", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.hadoop", "status.0.state_start_time"),
// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.hadoop", "hadoop_config", &job),
// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.hadoop", &job),
),
},
},
})
}
func TestAccDataprocJob_Hive(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_hive(rnd, subnetworkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.hive", &job),
// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.hive", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.hive", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.hive", "status.0.state_start_time"),
// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.hive", "hive_config", &job),
// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.hive", &job),
),
},
},
})
}
func TestAccDataprocJob_Pig(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_pig(rnd, subnetworkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.pig", &job),
// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.pig", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.pig", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.pig", "status.0.state_start_time"),
// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.pig", "pig_config", &job),
// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.pig", &job),
),
},
},
})
}
func TestAccDataprocJob_SparkSql(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_sparksql(rnd, subnetworkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.sparksql", &job),
// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.sparksql", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.sparksql", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.sparksql", "status.0.state_start_time"),
// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.sparksql", "sparksql_config", &job),
// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.sparksql", &job),
),
},
},
})
}
func TestAccDataprocJob_Presto(t *testing.T) {
t.Parallel()
var job dataproc.Job
rnd := acctest.RandString(t, 10)
networkName := acctest.BootstrapSharedTestNetwork(t, "dataproc-cluster")
subnetworkName := acctest.BootstrapSubnet(t, "dataproc-cluster", networkName)
acctest.BootstrapFirewallForDataprocSharedNetwork(t, "dataproc-cluster", networkName)
acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_presto(rnd, subnetworkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.presto", &job),
// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state_start_time"),
// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.presto", "presto_config", &job),
// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.presto", &job),
),
},
},
})
}
func testAccCheckDataprocJobDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
config := acctest.GoogleProviderConfig(t)
for _, rs := range s.RootModule().Resources {
if rs.Type != "google_dataproc_job" {
continue
}
if rs.Primary.ID == "" {
return fmt.Errorf("Unable to verify delete of dataproc job ID is empty")
}
attributes := rs.Primary.Attributes
project, err := acctest.GetTestProject(rs.Primary, config)
if err != nil {
return err
}
parts := strings.Split(rs.Primary.ID, "/")
job_id := parts[len(parts)-1]
_, err = config.NewDataprocClient(config.UserAgent).Projects.Regions.Jobs.Get(
project, attributes["region"], job_id).Do()
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 404 {
return nil
} else if ok {
return fmt.Errorf("Error making GCP platform call: http code error : %d, http message error: %s", gerr.Code, gerr.Message)
}
return fmt.Errorf("Error making GCP platform call: %s", err.Error())
}
return fmt.Errorf("Dataproc job still exists")
}
return nil
}
}
func testAccCheckDataprocJobCompletesSuccessfully(t *testing.T, n string, job *dataproc.Job) resource.TestCheckFunc {
return func(s *terraform.State) error {
config := acctest.GoogleProviderConfig(t)
attributes := s.RootModule().Resources[n].Primary.Attributes
region := attributes["region"]
project, err := acctest.GetTestProject(s.RootModule().Resources[n].Primary, config)
if err != nil {
return err
}
jobCompleteTimeoutMins := 5 * time.Minute
waitErr := tpgdataproc.DataprocJobOperationWait(config, region, project, job.Reference.JobId,
"Awaiting Dataproc job completion", config.UserAgent, jobCompleteTimeoutMins)
if waitErr != nil {
return waitErr
}
completeJob, err := config.NewDataprocClient(config.UserAgent).Projects.Regions.Jobs.Get(
project, region, job.Reference.JobId).Do()
if err != nil {
return err
}
if completeJob.Status.State == "ERROR" {
if !strings.HasPrefix(completeJob.DriverOutputResourceUri, "gs://") {
return fmt.Errorf("Job completed in ERROR state but no valid log URI found")
}
u := strings.SplitN(strings.TrimPrefix(completeJob.DriverOutputResourceUri, "gs://"), "/", 2)
if len(u) != 2 {
return fmt.Errorf("Job completed in ERROR state but no valid log URI found")
}
l, err := config.NewStorageClient(config.UserAgent).Objects.List(u[0]).Prefix(u[1]).Do()
if err != nil {
return errwrap.Wrapf("Job completed in ERROR state, found error when trying to list logs: {{err}}", err)
}
for _, item := range l.Items {
resp, err := config.NewStorageClient(config.UserAgent).Objects.Get(item.Bucket, item.Name).Download()
if err != nil {
return errwrap.Wrapf("Job completed in ERROR state, found error when trying to read logs: {{err}}", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return errwrap.Wrapf("Job completed in ERROR state, found error when trying to read logs: {{err}}", err)
}
log.Printf("[ERROR] Job failed, driver logs:\n%s", body)
}
return fmt.Errorf("Job completed in ERROR state, check logs for details")
} else if completeJob.Status.State != "DONE" && completeJob.Status.State != "RUNNING" {
return fmt.Errorf("Job did not complete successfully, instead status: %s", completeJob.Status.State)
}
return nil
}
}
func testAccCheckDataprocJobExists(t *testing.T, n string, job *dataproc.Job) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Terraform resource Not found: %s", n)
}
if rs.Primary.ID == "" {
return fmt.Errorf("No ID is set for Dataproc job")
}
config := acctest.GoogleProviderConfig(t)
parts := strings.Split(s.RootModule().Resources[n].Primary.ID, "/")
jobId := parts[len(parts)-1]
project, err := acctest.GetTestProject(s.RootModule().Resources[n].Primary, config)
if err != nil {
return err
}
found, err := config.NewDataprocClient(config.UserAgent).Projects.Regions.Jobs.Get(
project, rs.Primary.Attributes["region"], jobId).Do()
if err != nil {
return err
}
*job = *found
return nil
}
}
func testAccCheckDataprocJobAttrMatch(n, jobType string, job *dataproc.Job) resource.TestCheckFunc {
return func(s *terraform.State) error {
attributes, err := tpgresource.GetResourceAttributes(n, s)
if err != nil {
return err
}
jobTests := []jobTestField{}
if jobType == "pyspark_config" {
jobTests = append(jobTests, jobTestField{"pyspark_config.0.main_python_file_uri", job.PysparkJob.MainPythonFileUri})
jobTests = append(jobTests, jobTestField{"pyspark_config.0.args", job.PysparkJob.Args})
jobTests = append(jobTests, jobTestField{"pyspark_config.0.python_file_uris", job.PysparkJob.PythonFileUris})
jobTests = append(jobTests, jobTestField{"pyspark_config.0.jar_file_uris", job.PysparkJob.JarFileUris})
jobTests = append(jobTests, jobTestField{"pyspark_config.0.file_uris", job.PysparkJob.FileUris})
jobTests = append(jobTests, jobTestField{"pyspark_config.0.archive_uris", job.PysparkJob.ArchiveUris})
jobTests = append(jobTests, jobTestField{"pyspark_config.0.properties", job.PysparkJob.Properties})
jobTests = append(jobTests, jobTestField{"pyspark_config.0.logging_config.0.driver_log_levels", job.PysparkJob.LoggingConfig.DriverLogLevels})
}
if jobType == "spark_config" {
jobTests = append(jobTests, jobTestField{"spark_config.0.main_class", job.SparkJob.MainClass})
jobTests = append(jobTests, jobTestField{"spark_config.0.main_jar_file_uri", job.SparkJob.MainJarFileUri})
jobTests = append(jobTests, jobTestField{"spark_config.0.args", job.SparkJob.Args})
jobTests = append(jobTests, jobTestField{"spark_config.0.jar_file_uris", job.SparkJob.JarFileUris})
jobTests = append(jobTests, jobTestField{"spark_config.0.file_uris", job.SparkJob.FileUris})
jobTests = append(jobTests, jobTestField{"spark_config.0.archive_uris", job.SparkJob.ArchiveUris})
jobTests = append(jobTests, jobTestField{"spark_config.0.properties", job.SparkJob.Properties})
jobTests = append(jobTests, jobTestField{"spark_config.0.logging_config.0.driver_log_levels", job.SparkJob.LoggingConfig.DriverLogLevels})
}
if jobType == "hadoop_config" {
jobTests = append(jobTests, jobTestField{"hadoop_config.0.main_class", job.HadoopJob.MainClass})
jobTests = append(jobTests, jobTestField{"hadoop_config.0.main_jar_file_uri", job.HadoopJob.MainJarFileUri})
jobTests = append(jobTests, jobTestField{"hadoop_config.0.args", job.HadoopJob.Args})
jobTests = append(jobTests, jobTestField{"hadoop_config.0.jar_file_uris", job.HadoopJob.JarFileUris})
jobTests = append(jobTests, jobTestField{"hadoop_config.0.file_uris", job.HadoopJob.FileUris})
jobTests = append(jobTests, jobTestField{"hadoop_config.0.archive_uris", job.HadoopJob.ArchiveUris})
jobTests = append(jobTests, jobTestField{"hadoop_config.0.properties", job.HadoopJob.Properties})
jobTests = append(jobTests, jobTestField{"hadoop_config.0.logging_config.0.driver_log_levels", job.HadoopJob.LoggingConfig.DriverLogLevels})
}
if jobType == "hive_config" {
queries := []string{}
if job.HiveJob.QueryList != nil {
queries = job.HiveJob.QueryList.Queries
}
jobTests = append(jobTests, jobTestField{"hive_config.0.query_list", queries})
jobTests = append(jobTests, jobTestField{"hive_config.0.query_file_uri", job.HiveJob.QueryFileUri})
jobTests = append(jobTests, jobTestField{"hive_config.0.continue_on_failure", job.HiveJob.ContinueOnFailure})
jobTests = append(jobTests, jobTestField{"hive_config.0.script_variables", job.HiveJob.ScriptVariables})
jobTests = append(jobTests, jobTestField{"hive_config.0.properties", job.HiveJob.Properties})
jobTests = append(jobTests, jobTestField{"hive_config.0.jar_file_uris", job.HiveJob.JarFileUris})
}
if jobType == "pig_config" {
queries := []string{}
if job.PigJob.QueryList != nil {
queries = job.PigJob.QueryList.Queries
}
jobTests = append(jobTests, jobTestField{"pig_config.0.query_list", queries})
jobTests = append(jobTests, jobTestField{"pig_config.0.query_file_uri", job.PigJob.QueryFileUri})
jobTests = append(jobTests, jobTestField{"pig_config.0.continue_on_failure", job.PigJob.ContinueOnFailure})
jobTests = append(jobTests, jobTestField{"pig_config.0.script_variables", job.PigJob.ScriptVariables})
jobTests = append(jobTests, jobTestField{"pig_config.0.properties", job.PigJob.Properties})
jobTests = append(jobTests, jobTestField{"pig_config.0.jar_file_uris", job.PigJob.JarFileUris})
}
if jobType == "sparksql_config" {
queries := []string{}
if job.SparkSqlJob.QueryList != nil {
queries = job.SparkSqlJob.QueryList.Queries
}
jobTests = append(jobTests, jobTestField{"sparksql_config.0.query_list", queries})
jobTests = append(jobTests, jobTestField{"sparksql_config.0.query_file_uri", job.SparkSqlJob.QueryFileUri})
jobTests = append(jobTests, jobTestField{"sparksql_config.0.script_variables", job.SparkSqlJob.ScriptVariables})
jobTests = append(jobTests, jobTestField{"sparksql_config.0.properties", job.SparkSqlJob.Properties})
jobTests = append(jobTests, jobTestField{"sparksql_config.0.jar_file_uris", job.SparkSqlJob.JarFileUris})
}
for _, attrs := range jobTests {
if c := checkMatch(attributes, attrs.tf_attr, attrs.gcp_attr); c != "" {
return fmt.Errorf(c)
}
}
return nil
}
}
func checkMatch(attributes map[string]string, attr string, gcp interface{}) string {
if gcpList, ok := gcp.([]string); ok {
return checkListMatch(attributes, attr, gcpList)
}
if gcpMap, ok := gcp.(map[string]string); ok {
return checkMapMatch(attributes, attr, gcpMap)
}
if gcpBool, ok := gcp.(bool); ok {
return checkBoolMatch(attributes, attr, gcpBool)
}
tf := attributes[attr]
if tf != gcp {
return matchError(attr, tf, gcp)
}
return ""
}
func checkListMatch(attributes map[string]string, attr string, gcpList []string) string {
// A bunch of the TestAccDataprocJob_* tests fail without this. It's likely an inaccuracy that happens when shimming the terraform-json
// representation of state back to the old framework's representation of state. So, in the past we would get x.# = 0 whereas now we get x.# = ''.
// It's likely not intentional, however, shouldn't be a big problem - but if we notice it is the sdk team can address it.
if attributes[attr+".#"] == "" {
attributes[attr+".#"] = "0"
}
num, err := strconv.Atoi(attributes[attr+".#"])
if err != nil {
return fmt.Sprintf("Error in number conversion for attribute %s: %s", attr, err)
}
if num != len(gcpList) {
return fmt.Sprintf("Cluster has mismatched %s size.\nTF Size: %d\nGCP Size: %d", attr, num, len(gcpList))
}
for i, gcp := range gcpList {
if tf := attributes[fmt.Sprintf("%s.%d", attr, i)]; tf != gcp {
return matchError(fmt.Sprintf("%s[%d]", attr, i), tf, gcp)
}
}
return ""
}
func checkMapMatch(attributes map[string]string, attr string, gcpMap map[string]string) string {
// A bunch of the TestAccDataprocJob_* tests fail without this. It's likely an inaccuracy that happens when shimming the terraform-json
// representation of state back to the old framework's representation of state. So, in the past we would get x.# = 0 whereas now we get x.# = ''.
// It's likely not intentional, however, shouldn't be a big problem - but if we notice it is the sdk team can address it.
if attributes[attr+".%"] == "" {
attributes[attr+".%"] = "0"
}
num, err := strconv.Atoi(attributes[attr+".%"])
if err != nil {
return fmt.Sprintf("Error in number conversion for attribute %s: %s", attr, err)
}
if num != len(gcpMap) {
return fmt.Sprintf("Cluster has mismatched %s size.\nTF Size: %d\nGCP Size: %d", attr, num, len(gcpMap))
}
for k, gcp := range gcpMap {
if tf := attributes[fmt.Sprintf("%s.%s", attr, k)]; tf != gcp {
return matchError(fmt.Sprintf("%s[%s]", attr, k), tf, gcp)
}
}
return ""
}
func checkBoolMatch(attributes map[string]string, attr string, gcpBool bool) string {
// Handle the case where an unset value defaults to false
var tf bool
var err error
if attributes[attr] == "" {
tf = false
} else {
tf, err = strconv.ParseBool(attributes[attr])
if err != nil {
return fmt.Sprintf("Error converting attribute %s to boolean: value is %s", attr, attributes[attr])
}
}
if tf != gcpBool {
return matchError(attr, tf, gcpBool)
}
return ""
}
func matchError(attr, tf interface{}, gcp interface{}) string {
return fmt.Sprintf("Cluster has mismatched %s.\nTF State: %+v\nGCP State: %+v", attr, tf, gcp)
}
// TODO (mbang): Test `ExactlyOneOf` here
// func testAccDataprocJob_missingJobConf() string {
// return `
// resource "google_dataproc_job" "missing_config" {
// placement {
// cluster_name = "na"
// }
// force_delete = true
// }`
// }
var singleNodeClusterConfig = `
resource "google_dataproc_cluster" "basic" {
name = "dproc-job-test-%s"
region = "us-central1"
cluster_config {
# Keep the costs down with smallest config we can get away with
software_config {
override_properties = {
"dataproc:dataproc.allow.zero.workers" = "true"
}
}
master_config {
num_instances = 1
machine_type = "e2-standard-2"
disk_config {
boot_disk_size_gb = 35
}
}
gce_cluster_config {
subnetwork = "%s"
}
}
}
`
func testAccDataprocJob_updatable(rnd, subnetworkName, jobId, del string) string {
return fmt.Sprintf(
singleNodeClusterConfig+`
resource "google_dataproc_job" "updatable" {
placement {
cluster_name = google_dataproc_cluster.basic.name
}
reference {
job_id = "%s"
}
region = google_dataproc_cluster.basic.region
force_delete = %s
pyspark_config {
main_python_file_uri = "gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py"
}
}
`, rnd, subnetworkName, jobId, del)
}
func testAccDataprocJob_pySpark(rnd, subnetworkName string) string {
return fmt.Sprintf(
singleNodeClusterConfig+`
resource "google_dataproc_job" "pyspark" {
placement {
cluster_name = google_dataproc_cluster.basic.name
}
reference {
job_id = "dproc-custom-job-id-%s"
}
region = google_dataproc_cluster.basic.region
force_delete = true
pyspark_config {
main_python_file_uri = "gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py"
properties = {
"spark.logConf" = "true"
}
logging_config {
driver_log_levels = {
"root" = "INFO"
}
}
}
scheduling {
max_failures_per_hour = 1
max_failures_total=20
}
labels = {
one = "1"
}
}
`, rnd, subnetworkName, rnd)
}
func testAccDataprocJob_spark(rnd, subnetworkName string) string {
return fmt.Sprintf(
singleNodeClusterConfig+`
resource "google_dataproc_job" "spark" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}
spark_config {
main_class = "org.apache.spark.examples.SparkPi"
jar_file_uris = ["file:///usr/lib/spark/examples/jars/spark-examples.jar"]
args = ["1000"]
properties = {
"spark.logConf" = "true"
}
logging_config {
driver_log_levels = {
}
}
}
}
`, rnd, subnetworkName)
}
func testAccDataprocJob_hadoop(rnd, subnetworkName string) string {
return fmt.Sprintf(
singleNodeClusterConfig+`
resource "google_dataproc_job" "hadoop" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}
hadoop_config {
main_jar_file_uri = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
args = [
"wordcount",
"file:///usr/lib/spark/NOTICE",
"gs://${google_dataproc_cluster.basic.cluster_config[0].bucket}/hadoopjob_output_%s",
]
}
}
`, rnd, subnetworkName, rnd)
}
func testAccDataprocJob_hive(rnd, subnetworkName string) string {
return fmt.Sprintf(
singleNodeClusterConfig+`
resource "google_dataproc_job" "hive" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}
hive_config {
query_list = [
"DROP TABLE IF EXISTS dprocjob_test",
"CREATE EXTERNAL TABLE dprocjob_test(bar int) LOCATION 'gs://${google_dataproc_cluster.basic.cluster_config[0].bucket}/hive_dprocjob_test/'",
"SELECT * FROM dprocjob_test WHERE bar > 2",
]
}
}
`, rnd, subnetworkName)
}
func testAccDataprocJob_pig(rnd, subnetworkName string) string {
return fmt.Sprintf(
singleNodeClusterConfig+`
resource "google_dataproc_job" "pig" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}
pig_config {
query_list = [
"LNS = LOAD 'file:///usr/lib/pig/LICENSE.txt ' AS (line)",
"WORDS = FOREACH LNS GENERATE FLATTEN(TOKENIZE(line)) AS word",
"GROUPS = GROUP WORDS BY word",
"WORD_COUNTS = FOREACH GROUPS GENERATE group, COUNT(WORDS)",
"DUMP WORD_COUNTS",
]
}
}
`, rnd, subnetworkName)
}
func testAccDataprocJob_sparksql(rnd, subnetworkName string) string {
return fmt.Sprintf(
singleNodeClusterConfig+`
resource "google_dataproc_job" "sparksql" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}
sparksql_config {
query_list = [
"DROP TABLE IF EXISTS dprocjob_test",
"CREATE TABLE dprocjob_test(bar int)",
"SELECT * FROM dprocjob_test WHERE bar > 2",
]
}
}
`, rnd, subnetworkName)
}
func testAccDataprocJob_presto(rnd, subnetworkName string) string {
return fmt.Sprintf(`
resource "google_dataproc_cluster" "basic" {
name = "dproc-job-test-%s"
region = "us-central1"
cluster_config {
# Keep the costs down with smallest config we can get away with
software_config {
override_properties = {
"dataproc:dataproc.allow.zero.workers" = "true"
}
optional_components = ["PRESTO"]
}
master_config {
num_instances = 1
machine_type = "e2-standard-2"
disk_config {
boot_disk_size_gb = 35
}
}
gce_cluster_config {
subnetwork = "%s"
}
}
}
resource "google_dataproc_job" "presto" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}
presto_config {
query_list = [
"SELECT * FROM system.metadata.schema_properties"
]
}
}
`, rnd, subnetworkName)
}