blob: d4b8588afaf423ddf86107ea56a07d079ff694df [file] [log] [blame]
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package rpcapi
import (
"context"
"fmt"
"io"
"log"
"sync"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/terraform-svchost/disco"
"github.com/hashicorp/terraform/internal/pluginshared"
"github.com/hashicorp/terraform/internal/rpcapi/dynrpcserver"
"github.com/hashicorp/terraform/internal/rpcapi/terraform1/dependencies"
"github.com/hashicorp/terraform/internal/rpcapi/terraform1/packages"
"github.com/hashicorp/terraform/internal/rpcapi/terraform1/stacks"
"github.com/hashicorp/terraform/internal/stacksplugin/stacksproto1"
"google.golang.org/grpc"
)
// GRPCStacksClient is the client interface for interacting with terraform-stacksplugin
type GRPCStacksClient struct {
Client stacksproto1.CommandServiceClient
Broker *plugin.GRPCBroker
Services *disco.Disco
Context context.Context
ShutdownCh <-chan struct{}
}
// Proof that GRPCStacksClient fulfills the go-plugin interface
var _ pluginshared.CustomPluginClient = GRPCStacksClient{}
type brokerIDs struct {
packagesBrokerID uint32
dependenciesBrokerID uint32
stacksBrokerID uint32
}
// registerBrokers starts the GRPC servers for the dependencies, packages, and stacks
// services and returns the broker IDs for each.
func (c GRPCStacksClient) registerBrokers(stdout, stderr io.Writer) brokerIDs {
handles := newHandleTable()
dependenciesServer := dynrpcserver.NewDependenciesStub()
packagesServer := dynrpcserver.NewPackagesStub()
stacksServer := dynrpcserver.NewStacksStub()
var serverWG sync.WaitGroup
// wait for all 3 servers to start
serverWG.Add(3)
dependenciesServerFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
dependencies.RegisterDependenciesServer(s, dependenciesServer)
dependenciesServer.ActivateRPCServer(newDependenciesServer(handles, c.Services))
serverWG.Done()
return s
}
dependenciesBrokerID := c.Broker.NextId()
go c.Broker.AcceptAndServe(dependenciesBrokerID, dependenciesServerFunc)
packagesServerFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
packages.RegisterPackagesServer(s, packagesServer)
packagesServer.ActivateRPCServer(newPackagesServer(c.Services))
serverWG.Done()
return s
}
packagesBrokerID := c.Broker.NextId()
go c.Broker.AcceptAndServe(packagesBrokerID, packagesServerFunc)
stacksServerFunc := func(opts []grpc.ServerOption) *grpc.Server {
s := grpc.NewServer(opts...)
stacks.RegisterStacksServer(s, stacksServer)
stacksServer.ActivateRPCServer(newStacksServer(
newStopper(), handles, c.Services, &serviceOpts{experimentsAllowed: true}))
serverWG.Done()
return s
}
stacksBrokerID := c.Broker.NextId()
go c.Broker.AcceptAndServe(stacksBrokerID, stacksServerFunc)
// block till all 3 servers have signaled readiness
serverWG.Wait()
return brokerIDs{
dependenciesBrokerID: dependenciesBrokerID,
packagesBrokerID: packagesBrokerID,
stacksBrokerID: stacksBrokerID,
}
}
// Execute sends the client Execute request and waits for the plugin to return
// an exit code response before returning
func (c GRPCStacksClient) executeWithBrokers(brokerIDs brokerIDs, args []string, stdout, stderr io.Writer) int {
ctx, cancel := context.WithCancel(c.Context)
// Monitor for interrupt and cancel the context if received
go func() {
sig := <-c.ShutdownCh
fmt.Print("\n\nOperation Interrupted, any remote operations started will continue\n\n")
log.Printf("[INFO] Received signal: %s, cancelling operation", sig)
cancel()
}()
client, err := c.Client.Execute(ctx, &stacksproto1.CommandRequest{
DependenciesServer: brokerIDs.dependenciesBrokerID,
PackagesServer: brokerIDs.packagesBrokerID,
StacksServer: brokerIDs.stacksBrokerID,
Args: args,
})
if err != nil {
fmt.Fprint(stderr, err.Error())
return 1
}
for {
// stacksplugin streams output as multiple CommandResponse value. Each
// value will either contain stdout bytes, stderr bytes, or an exit code.
response, err := client.Recv()
if err == io.EOF {
log.Print("[DEBUG] received EOF from stacksplugin")
break
} else if err != nil {
fmt.Fprintf(stderr, "Failed to receive command response from stacksplugin: %s", err)
return 1
}
if bytes := response.GetStdout(); len(bytes) > 0 {
written, err := fmt.Fprint(stdout, string(bytes))
if err != nil {
log.Printf("[ERROR] Failed to write stacksplugin output to stdout: %s", err)
return 1
}
if written != len(bytes) {
log.Printf("[ERROR] Wrote %d bytes to stdout but expected to write %d", written, len(bytes))
}
} else if bytes := response.GetStderr(); len(bytes) > 0 {
written, err := fmt.Fprint(stderr, string(bytes))
if err != nil {
log.Printf("[ERROR] Failed to write stacksplugin output to stderr: %s", err)
return 1
}
if written != len(bytes) {
log.Printf("[ERROR] Wrote %d bytes to stdout but expected to write %d", written, len(bytes))
}
} else {
exitCode := response.GetExitCode()
log.Printf("[TRACE] received exit code: %d", exitCode)
if exitCode < 0 || exitCode > 255 {
log.Printf("[ERROR] stacksplugin returned an invalid error code %d", exitCode)
return 255
}
return int(exitCode)
}
}
// This should indicate a bug in the plugin
fmt.Fprint(stderr, "stacksplugin exited without responding with an error code")
return 1
}
// Execute sends the client Execute request and waits for the plugin to return
// an exit code response before returning
func (c GRPCStacksClient) Execute(args []string, stdout, stderr io.Writer) int {
brokerIDs := c.registerBrokers(stdout, stderr)
return c.executeWithBrokers(brokerIDs, args, stdout, stderr)
}