blob: 3afec745805b94144eb1a57a44957c92e136d0b4 [file] [log] [blame]
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package rpcapi
import (
"bytes"
"context"
"fmt"
"io"
"time"
"github.com/hashicorp/go-slug/sourceaddrs"
"github.com/hashicorp/go-slug/sourcebundle"
"github.com/hashicorp/terraform-svchost/disco"
"go.opentelemetry.io/otel/attribute"
otelCodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/terraform/internal/addrs"
"github.com/hashicorp/terraform/internal/depsfile"
"github.com/hashicorp/terraform/internal/plans"
"github.com/hashicorp/terraform/internal/providercache"
"github.com/hashicorp/terraform/internal/providers"
"github.com/hashicorp/terraform/internal/rpcapi/terraform1"
"github.com/hashicorp/terraform/internal/rpcapi/terraform1/stacks"
"github.com/hashicorp/terraform/internal/stacks/stackaddrs"
"github.com/hashicorp/terraform/internal/stacks/stackconfig"
"github.com/hashicorp/terraform/internal/stacks/stackmigrate"
"github.com/hashicorp/terraform/internal/stacks/stackplan"
"github.com/hashicorp/terraform/internal/stacks/stackruntime"
"github.com/hashicorp/terraform/internal/stacks/stackruntime/hooks"
"github.com/hashicorp/terraform/internal/stacks/stackstate"
"github.com/hashicorp/terraform/internal/states"
"github.com/hashicorp/terraform/internal/states/statefile"
"github.com/hashicorp/terraform/internal/tfdiags"
)
type stacksServer struct {
stacks.UnimplementedStacksServer
stopper *stopper
services *disco.Disco
handles *handleTable
experimentsAllowed bool
// providerCacheOverride is a map of provider names to provider factories
// that should be used instead of the default provider cache. This is used
// within tests to side load providers without needing a real provider
// cache.
providerCacheOverride map[addrs.Provider]providers.Factory
// providerDependencyLockOverride is an in-memory override of the provider
// lockfile used for testing when the real provider is side-loaded.
providerDependencyLockOverride *depsfile.Locks
// planTimestampOverride is an in-memory override of the plan timestamp used
// for testing. This just ensures our tests aren't flaky as we can use a
// constant timestamp for the plan.
planTimestampOverride *time.Time
}
var (
_ stacks.StacksServer = (*stacksServer)(nil)
WorkspaceNameEnvVar = "TF_WORKSPACE"
)
func newStacksServer(stopper *stopper, handles *handleTable, services *disco.Disco, opts *serviceOpts) *stacksServer {
return &stacksServer{
stopper: stopper,
services: services,
handles: handles,
experimentsAllowed: opts.experimentsAllowed,
}
}
func (s *stacksServer) OpenStackConfiguration(ctx context.Context, req *stacks.OpenStackConfiguration_Request) (*stacks.OpenStackConfiguration_Response, error) {
sourcesHnd := handle[*sourcebundle.Bundle](req.SourceBundleHandle)
sources := s.handles.SourceBundle(sourcesHnd)
if sources == nil {
return nil, status.Error(codes.InvalidArgument, "the given source bundle handle is invalid")
}
sourceAddr, err := resolveFinalSourceAddr(req.SourceAddress, sources)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid source address: %s", err)
}
config, diags := stackconfig.LoadConfigDir(sourceAddr, sources)
if diags.HasErrors() {
// For errors in the configuration itself we treat that as a successful
// result from OpenStackConfiguration but with diagnostics in the
// response and no source handle.
return &stacks.OpenStackConfiguration_Response{
Diagnostics: diagnosticsToProto(diags),
}, nil
}
configHnd, err := s.handles.NewStackConfig(config, sourcesHnd)
if err != nil {
// The only reasonable way we can fail here is if the caller made
// a concurrent call to Dependencies.CloseSourceBundle after we
// checked the handle validity above. That'd be a very strange thing
// to do, but in the event it happens we'll just discard the config
// we loaded (since its source files on disk might be gone imminently)
// and return an error.
return nil, status.Errorf(codes.Unknown, "allocating config handle: %s", err)
}
// If we get here then we're guaranteed that the source bundle handle
// cannot be closed until the config handle is closed -- enforced by
// [handleTable]'s dependency tracking -- and so we can return the config
// handle. (The caller is required to ensure that the source bundle files
// on disk are not modified for as long as the source bundle handle remains
// open, and its lifetime will necessarily exceed the config handle.)
return &stacks.OpenStackConfiguration_Response{
StackConfigHandle: configHnd.ForProtobuf(),
Diagnostics: diagnosticsToProto(diags),
}, nil
}
func (s *stacksServer) CloseStackConfiguration(ctx context.Context, req *stacks.CloseStackConfiguration_Request) (*stacks.CloseStackConfiguration_Response, error) {
hnd := handle[*stackconfig.Config](req.StackConfigHandle)
err := s.handles.CloseStackConfig(hnd)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return &stacks.CloseStackConfiguration_Response{}, nil
}
func (s *stacksServer) ValidateStackConfiguration(ctx context.Context, req *stacks.ValidateStackConfiguration_Request) (*stacks.ValidateStackConfiguration_Response, error) {
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
cfg := s.handles.StackConfig(cfgHnd)
if cfg == nil {
return nil, status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
}
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
var deps *depsfile.Locks
if !depsHnd.IsNil() {
deps = s.handles.DependencyLocks(depsHnd)
if deps == nil {
return nil, status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
}
} else {
deps = depsfile.NewLocks()
}
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
var providerCache *providercache.Dir
if !providerCacheHnd.IsNil() {
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
if providerCache == nil {
return nil, status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
}
}
// (providerFactoriesForLocks explicitly supports a nil providerCache)
providerFactories, err := providerFactoriesForLocks(deps, providerCache)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
}
diags := stackruntime.Validate(ctx, &stackruntime.ValidateRequest{
Config: cfg,
ExperimentsAllowed: s.experimentsAllowed,
ProviderFactories: providerFactories,
DependencyLocks: *deps,
})
return &stacks.ValidateStackConfiguration_Response{
Diagnostics: diagnosticsToProto(diags),
}, nil
}
func (s *stacksServer) FindStackConfigurationComponents(ctx context.Context, req *stacks.FindStackConfigurationComponents_Request) (*stacks.FindStackConfigurationComponents_Response, error) {
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
cfg := s.handles.StackConfig(cfgHnd)
if cfg == nil {
return nil, status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
}
return &stacks.FindStackConfigurationComponents_Response{
Config: stackConfigMetaforProto(cfg.Root, stackaddrs.RootStack),
}, nil
}
func stackConfigMetaforProto(cfgNode *stackconfig.ConfigNode, stackAddr stackaddrs.Stack) *stacks.FindStackConfigurationComponents_StackConfig {
ret := &stacks.FindStackConfigurationComponents_StackConfig{
Components: make(map[string]*stacks.FindStackConfigurationComponents_Component),
EmbeddedStacks: make(map[string]*stacks.FindStackConfigurationComponents_EmbeddedStack),
InputVariables: make(map[string]*stacks.FindStackConfigurationComponents_InputVariable),
OutputValues: make(map[string]*stacks.FindStackConfigurationComponents_OutputValue),
Removed: make(map[string]*stacks.FindStackConfigurationComponents_Removed),
}
for name, cc := range cfgNode.Stack.Components {
cProto := &stacks.FindStackConfigurationComponents_Component{
SourceAddr: cc.FinalSourceAddr.String(),
ComponentAddr: stackaddrs.Config(stackAddr, stackaddrs.Component{Name: cc.Name}).String(),
}
switch {
case cc.ForEach != nil:
cProto.Instances = stacks.FindStackConfigurationComponents_FOR_EACH
default:
cProto.Instances = stacks.FindStackConfigurationComponents_SINGLE
}
ret.Components[name] = cProto
}
for name, sn := range cfgNode.Children {
sc := cfgNode.Stack.EmbeddedStacks[name]
sProto := &stacks.FindStackConfigurationComponents_EmbeddedStack{
SourceAddr: sn.Stack.SourceAddr.String(),
Config: stackConfigMetaforProto(sn, stackAddr.Child(name)),
}
switch {
case sc.ForEach != nil:
sProto.Instances = stacks.FindStackConfigurationComponents_FOR_EACH
default:
sProto.Instances = stacks.FindStackConfigurationComponents_SINGLE
}
ret.EmbeddedStacks[name] = sProto
}
for name, vc := range cfgNode.Stack.InputVariables {
vProto := &stacks.FindStackConfigurationComponents_InputVariable{
Optional: !vc.DefaultValue.IsNull(),
Sensitive: vc.Sensitive,
Ephemeral: vc.Ephemeral,
}
ret.InputVariables[name] = vProto
}
for name, oc := range cfgNode.Stack.OutputValues {
oProto := &stacks.FindStackConfigurationComponents_OutputValue{
Sensitive: oc.Sensitive,
Ephemeral: oc.Ephemeral,
}
ret.OutputValues[name] = oProto
}
// Currently Components are the only thing that can be removed
for name, rc := range cfgNode.Stack.RemovedComponents.All() {
var blocks []*stacks.FindStackConfigurationComponents_Removed_Block
for _, rc := range rc {
relativeAddress := rc.From.TargetConfigComponent()
cProto := &stacks.FindStackConfigurationComponents_Removed_Block{
SourceAddr: rc.FinalSourceAddr.String(),
ComponentAddr: stackaddrs.Config(append(stackAddr, relativeAddress.Stack...), relativeAddress.Item).String(),
Destroy: rc.Destroy,
}
switch {
case rc.ForEach != nil:
cProto.Instances = stacks.FindStackConfigurationComponents_FOR_EACH
default:
cProto.Instances = stacks.FindStackConfigurationComponents_SINGLE
}
blocks = append(blocks, cProto)
}
relativeAddress := rc[0].From.TargetConfigComponent()
ret.Removed[name.String()] = &stacks.FindStackConfigurationComponents_Removed{
// in order to ensure as much backwards and forwards compatibility
// as possible, we're going to set the deprecated single fields
// with the first run block
SourceAddr: rc[0].FinalSourceAddr.String(),
Instances: func() stacks.FindStackConfigurationComponents_Instances {
switch {
case rc[0].ForEach != nil:
return stacks.FindStackConfigurationComponents_FOR_EACH
default:
return stacks.FindStackConfigurationComponents_SINGLE
}
}(),
ComponentAddr: stackaddrs.Config(append(stackAddr, relativeAddress.Stack...), relativeAddress.Item).String(),
Destroy: rc[0].Destroy,
// We return all the values here:
Blocks: blocks,
}
}
return ret
}
func (s *stacksServer) OpenState(stream stacks.Stacks_OpenStateServer) error {
loader := stackstate.NewLoader()
for {
item, err := stream.Recv()
if err == io.EOF {
break // All done!
} else if err != nil {
return err
}
err = loader.AddRaw(item.Raw.Key, item.Raw.Value)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid raw state element: %s", err)
}
}
hnd := s.handles.NewStackState(loader.State())
return stream.SendAndClose(&stacks.OpenStackState_Response{
StateHandle: hnd.ForProtobuf(),
})
}
func (s *stacksServer) CloseState(ctx context.Context, req *stacks.CloseStackState_Request) (*stacks.CloseStackState_Response, error) {
hnd := handle[*stackstate.State](req.StateHandle)
err := s.handles.CloseStackState(hnd)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return &stacks.CloseStackState_Response{}, nil
}
func (s *stacksServer) PlanStackChanges(req *stacks.PlanStackChanges_Request, evts stacks.Stacks_PlanStackChangesServer) error {
ctx := evts.Context()
syncEvts := newSyncStreamingRPCSender(evts)
evts = nil // Prevent accidental unsynchronized usage of this server
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
cfg := s.handles.StackConfig(cfgHnd)
if cfg == nil {
return status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
}
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
var deps *depsfile.Locks
if s.providerDependencyLockOverride != nil {
deps = s.providerDependencyLockOverride
} else if !depsHnd.IsNil() {
deps = s.handles.DependencyLocks(depsHnd)
if deps == nil {
return status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
}
} else {
deps = depsfile.NewLocks()
}
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
var providerCache *providercache.Dir
if !providerCacheHnd.IsNil() {
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
if providerCache == nil {
return status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
}
}
// NOTE: providerCache can be nil if no handle was provided, in which
// case the call can only use built-in providers. All code below
// must avoid panicking when providerCache is nil, but is allowed to
// return an InvalidArgument error in that case.
if req.PreviousStateHandle != 0 && len(req.PreviousState) != 0 {
return status.Error(codes.InvalidArgument, "must not set both previous_state_handle and previous_state")
}
inputValues, err := externalInputValuesFromProto(req.InputValues)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid input values: %s", err)
}
var providerFactories map[addrs.Provider]providers.Factory
if s.providerCacheOverride != nil {
// This is only used in tests to side load providers without needing a
// real provider cache.
providerFactories = s.providerCacheOverride
} else {
// (providerFactoriesForLocks explicitly supports a nil providerCache)
providerFactories, err = providerFactoriesForLocks(deps, providerCache)
if err != nil {
return status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
}
}
// We'll hook some internal events in the planning process both to generate
// tracing information if we're in an OpenTelemetry-aware context and
// to propagate a subset of the events to our client.
hooks := stackPlanHooks(syncEvts, cfg.Root.Stack.SourceAddr)
ctx = stackruntime.ContextWithHooks(ctx, hooks)
var planMode plans.Mode
switch req.PlanMode {
case stacks.PlanMode_NORMAL:
planMode = plans.NormalMode
case stacks.PlanMode_REFRESH_ONLY:
planMode = plans.RefreshOnlyMode
case stacks.PlanMode_DESTROY:
planMode = plans.DestroyMode
default:
return status.Errorf(codes.InvalidArgument, "unsupported planning mode %d", req.PlanMode)
}
var prevState *stackstate.State
if req.PreviousStateHandle != 0 {
stateHnd := handle[*stackstate.State](req.PreviousStateHandle)
prevState = s.handles.StackState(stateHnd)
if prevState == nil {
return status.Error(codes.InvalidArgument, "the given previous state handle is invalid")
}
} else {
// Deprecated: The previous state is provided inline as a map.
// FIXME: Remove this old field once our existing clients are updated.
prevState, err = stackstate.LoadFromProto(req.PreviousState)
if err != nil {
return status.Errorf(codes.InvalidArgument, "can't load previous state: %s", err)
}
}
changesCh := make(chan stackplan.PlannedChange, 8)
diagsCh := make(chan tfdiags.Diagnostic, 2)
rtReq := stackruntime.PlanRequest{
PlanMode: planMode,
Config: cfg,
PrevState: prevState,
ProviderFactories: providerFactories,
InputValues: inputValues,
ExperimentsAllowed: s.experimentsAllowed,
DependencyLocks: *deps,
// planTimestampOverride will be null if not set, so it's fine for
// us to just set this all the time. In practice, this will only have
// a value in tests.
ForcePlanTimestamp: s.planTimestampOverride,
}
rtResp := stackruntime.PlanResponse{
PlannedChanges: changesCh,
Diagnostics: diagsCh,
}
// As a long-running operation, the plan RPC must be able to be stopped. We
// do this by requesting a stop channel from the stopper, and using it to
// cancel the planning process.
stopCh := s.stopper.add()
defer s.stopper.remove(stopCh)
// We create a new cancellable context for the stack plan operation to
// allow us to respond to stop requests.
planCtx, cancelPlan := context.WithCancel(ctx)
defer cancelPlan()
// The actual plan operation runs in the background, and emits events
// to us via the channels in rtResp before finally closing changesCh
// to signal that the process is complete.
go stackruntime.Plan(planCtx, &rtReq, &rtResp)
emitDiag := func(diag tfdiags.Diagnostic) {
diags := tfdiags.Diagnostics{diag}
protoDiags := diagnosticsToProto(diags)
for _, protoDiag := range protoDiags {
syncEvts.Send(&stacks.PlanStackChanges_Event{
Event: &stacks.PlanStackChanges_Event_Diagnostic{
Diagnostic: protoDiag,
},
})
}
}
// There is no strong ordering between the planned changes and the
// diagnostics, so we need to be prepared for them to arrive in any
// order. However, stackruntime.Plan does guarantee that it will
// close changesCh only after it's finished writing to and closing
// everything else, and so we can assume that once changesCh is
// closed we only need to worry about whatever's left in the
// diagsCh buffer.
Events:
for {
select {
case change, ok := <-changesCh:
if !ok {
if diagsCh != nil {
// Async work is done! We do still need to consume the rest
// of diagsCh before we stop, though, because there might
// be some extras in the channel's buffer that we didn't
// get to yet.
for diag := range diagsCh {
emitDiag(diag)
}
}
break Events
}
protoChange, err := change.PlannedChangeProto()
if err != nil {
// Should not get here: it always indicates a bug in
// PlannedChangeProto or in the code which constructed
// the change over in package stackeval.
emitDiag(tfdiags.Sourceless(
tfdiags.Error,
"Incorrectly-constructed change",
fmt.Sprintf(
"Failed to serialize a %T value for recording in the saved plan: %s.\n\nThis is a bug in Terraform; please report it!",
protoChange, err,
),
))
continue
}
syncEvts.Send(&stacks.PlanStackChanges_Event{
Event: &stacks.PlanStackChanges_Event_PlannedChange{
PlannedChange: protoChange,
},
})
case diag, ok := <-diagsCh:
if !ok {
// The diagnostics channel has closed, so we'll just stop
// trying to read from it and wait for changesCh to close,
// which will be our final signal that everything is done.
diagsCh = nil
continue
}
emitDiag(diag)
case <-stopCh:
// If our stop channel is signalled, we need to cancel the plan.
// This may result in remaining changes or diagnostics being
// emitted, so we continue to monitor those channels if they're
// still active.
cancelPlan()
}
}
return nil
}
func (s *stacksServer) OpenPlan(stream stacks.Stacks_OpenPlanServer) error {
loader := stackplan.NewLoader()
for {
item, err := stream.Recv()
if err == io.EOF {
break // All done!
} else if err != nil {
return err
}
err = loader.AddRaw(item.Raw)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid raw plan element: %s", err)
}
}
plan, err := loader.Plan()
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid raw plan: %s", err)
}
hnd := s.handles.NewStackPlan(plan)
return stream.SendAndClose(&stacks.OpenStackPlan_Response{
PlanHandle: hnd.ForProtobuf(),
})
}
func (s *stacksServer) ClosePlan(ctx context.Context, req *stacks.CloseStackPlan_Request) (*stacks.CloseStackPlan_Response, error) {
hnd := handle[*stackplan.Plan](req.PlanHandle)
err := s.handles.CloseStackPlan(hnd)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return &stacks.CloseStackPlan_Response{}, nil
}
func (s *stacksServer) ApplyStackChanges(req *stacks.ApplyStackChanges_Request, evts stacks.Stacks_ApplyStackChangesServer) error {
ctx := evts.Context()
syncEvts := newSyncStreamingRPCSender(evts)
evts = nil // Prevent accidental unsynchronized usage of this server
if req.PlanHandle != 0 && len(req.PlannedChanges) != 0 {
return status.Error(codes.InvalidArgument, "must not set both plan_handle and planned_changes")
}
var plan *stackplan.Plan
if req.PlanHandle != 0 {
planHnd := handle[*stackplan.Plan](req.PlanHandle)
plan = s.handles.StackPlan(planHnd)
if plan == nil {
return status.Error(codes.InvalidArgument, "the given plan handle is invalid")
}
// The plan handle is immediately invalidated by trying to apply it;
// plans are not reusable because they are valid only against the
// exact prior state they were generated for.
if err := s.handles.CloseStackPlan(planHnd); err != nil {
// It would be very strange to get here!
return status.Error(codes.Internal, "failed to close the plan handle")
}
} else {
// Deprecated: whole plan specified inline
// FIXME: Remove this old field once our existing clients are updated.
var err error
plan, err = stackplan.LoadFromProto(req.PlannedChanges)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid planned_changes: %s", err)
}
}
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
cfg := s.handles.StackConfig(cfgHnd)
if cfg == nil {
return status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
}
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
var deps *depsfile.Locks
if !depsHnd.IsNil() {
deps = s.handles.DependencyLocks(depsHnd)
if deps == nil {
return status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
}
} else {
deps = depsfile.NewLocks()
}
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
var providerCache *providercache.Dir
if !providerCacheHnd.IsNil() {
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
if providerCache == nil {
return status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
}
}
// NOTE: providerCache can be nil if no handle was provided, in which
// case the call can only use built-in providers. All code below
// must avoid panicking when providerCache is nil, but is allowed to
// return an InvalidArgument error in that case.
// (providerFactoriesForLocks explicitly supports a nil providerCache)
providerFactories, err := providerFactoriesForLocks(deps, providerCache)
if err != nil {
return status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
}
inputValues, err := externalInputValuesFromProto(req.InputValues)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid input values: %s", err)
}
// We'll hook some internal events in the planning process both to generate
// tracing information if we're in an OpenTelemetry-aware context and
// to propagate a subset of the events to our client.
hooks := stackApplyHooks(syncEvts, cfg.Root.Stack.SourceAddr)
ctx = stackruntime.ContextWithHooks(ctx, hooks)
changesCh := make(chan stackstate.AppliedChange, 8)
diagsCh := make(chan tfdiags.Diagnostic, 2)
rtReq := stackruntime.ApplyRequest{
Config: cfg,
InputValues: inputValues,
ProviderFactories: providerFactories,
Plan: plan,
ExperimentsAllowed: s.experimentsAllowed,
DependencyLocks: *deps,
}
rtResp := stackruntime.ApplyResponse{
AppliedChanges: changesCh,
Diagnostics: diagsCh,
}
// As a long-running operation, the apply RPC must be able to be stopped.
// We do this by requesting a stop channel from the stopper, and using it
// to cancel the planning process.
stopCh := s.stopper.add()
defer s.stopper.remove(stopCh)
// We create a new cancellable context for the stack plan operation to
// allow us to respond to stop requests.
applyCtx, cancelApply := context.WithCancel(ctx)
defer cancelApply()
// The actual apply operation runs in the background, and emits events
// to us via the channels in rtResp before finally closing changesCh
// to signal that the process is complete.
go stackruntime.Apply(applyCtx, &rtReq, &rtResp)
emitDiag := func(diag tfdiags.Diagnostic) {
diags := tfdiags.Diagnostics{diag}
protoDiags := diagnosticsToProto(diags)
for _, protoDiag := range protoDiags {
syncEvts.Send(&stacks.ApplyStackChanges_Event{
Event: &stacks.ApplyStackChanges_Event_Diagnostic{
Diagnostic: protoDiag,
},
})
}
}
// There is no strong ordering between the planned changes and the
// diagnostics, so we need to be prepared for them to arrive in any
// order. However, stackruntime.Apply does guarantee that it will
// close changesCh only after it's finished writing to and closing
// everything else, and so we can assume that once changesCh is
// closed we only need to worry about whatever's left in the
// diagsCh buffer.
Events:
for {
select {
case change, ok := <-changesCh:
if !ok {
if diagsCh != nil {
// Async work is done! We do still need to consume the rest
// of diagsCh before we stop, though, because there might
// be some extras in the channel's buffer that we didn't
// get to yet.
for diag := range diagsCh {
emitDiag(diag)
}
}
break Events
}
protoChange, err := change.AppliedChangeProto()
if err != nil {
// Should not get here: it always indicates a bug in
// AppliedChangeProto or in the code which constructed
// the change over in package stackeval.
// If we get here then it's likely that something will be
// left stale in the final stack state, so we should really
// avoid ever getting here.
emitDiag(tfdiags.Sourceless(
tfdiags.Error,
"Incorrectly-constructed apply result",
fmt.Sprintf(
"Failed to serialize a %T value for recording in the updated state: %s.\n\nThis is a bug in Terraform; please report it!",
protoChange, err,
),
))
continue
}
syncEvts.Send(&stacks.ApplyStackChanges_Event{
Event: &stacks.ApplyStackChanges_Event_AppliedChange{
AppliedChange: protoChange,
},
})
case diag, ok := <-diagsCh:
if !ok {
// The diagnostics channel has closed, so we'll just stop
// trying to read from it and wait for changesCh to close,
// which will be our final signal that everything is done.
diagsCh = nil
continue
}
emitDiag(diag)
case <-stopCh:
// If our stop channel is signalled, we need to cancel the apply.
// This may result in remaining changes or diagnostics being
// emitted, so we continue to monitor those channels if they're
// still active.
cancelApply()
}
}
return nil
}
func (s *stacksServer) OpenStackInspector(ctx context.Context, req *stacks.OpenStackInspector_Request) (*stacks.OpenStackInspector_Response, error) {
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
cfg := s.handles.StackConfig(cfgHnd)
if cfg == nil {
return nil, status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
}
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
var deps *depsfile.Locks
if !depsHnd.IsNil() {
deps = s.handles.DependencyLocks(depsHnd)
if deps == nil {
return nil, status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
}
} else {
deps = depsfile.NewLocks()
}
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
var providerCache *providercache.Dir
if !providerCacheHnd.IsNil() {
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
if providerCache == nil {
return nil, status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
}
}
// NOTE: providerCache can be nil if no handle was provided, in which
// case the call can only use built-in providers. All code below
// must avoid panicking when providerCache is nil, but is allowed to
// return an InvalidArgument error in that case.
// (providerFactoriesForLocks explicitly supports a nil providerCache)
providerFactories, err := providerFactoriesForLocks(deps, providerCache)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
}
inputValues, err := externalInputValuesFromProto(req.InputValues)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid input values: %s", err)
}
state, err := stackstate.LoadFromProto(req.State)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "can't load state snapshot: %s", err)
}
hnd := s.handles.NewStackInspector(&stacksInspector{
Config: cfg,
State: state,
ProviderFactories: providerFactories,
InputValues: inputValues,
ExperimentsAllowed: s.experimentsAllowed,
})
return &stacks.OpenStackInspector_Response{
StackInspectorHandle: hnd.ForProtobuf(),
// There are currently no situations that return diagnostics, but
// we reserve the right to add some later.
}, nil
}
func (s *stacksServer) ListResourceIdentities(ctx context.Context, req *stacks.ListResourceIdentities_Request) (*stacks.ListResourceIdentities_Response, error) {
hnd := handle[*stackstate.State](req.StateHandle)
stackState := s.handles.StackState(hnd)
if stackState == nil {
return nil, status.Error(codes.InvalidArgument, "the given stack state handle is invalid")
}
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
var deps *depsfile.Locks
if !depsHnd.IsNil() {
deps = s.handles.DependencyLocks(depsHnd)
if deps == nil {
return nil, status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
}
} else {
deps = depsfile.NewLocks()
}
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
var providerCache *providercache.Dir
if !providerCacheHnd.IsNil() {
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
if providerCache == nil {
return nil, status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
}
}
// NOTE: providerCache can be nil if no handle was provided, in which
// case the call can only use built-in providers. All code below
// must avoid panicking when providerCache is nil, but is allowed to
// return an InvalidArgument error in that case.
// (providerFactoriesForLocks explicitly supports a nil providerCache)
providerFactories, err := providerFactoriesForLocks(deps, providerCache)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
}
identitySchemas := make(map[addrs.Provider]map[string]providers.IdentitySchema)
for name, factory := range providerFactories {
provider, err := factory()
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "provider %s failed to initialize: %s", name, err)
}
schema := provider.GetResourceIdentitySchemas()
if len(schema.Diagnostics) > 0 {
return nil, status.Errorf(codes.InvalidArgument, "provider %s failed to retrieve schema: %s", name, schema.Diagnostics.Err())
} else {
identitySchemas[name] = schema.IdentityTypes
}
}
resourceIdentities, err := listResourceIdentities(stackState, identitySchemas)
if err != nil {
return nil, err
}
return &stacks.ListResourceIdentities_Response{
Resource: resourceIdentities,
}, nil
}
func (s *stacksServer) InspectExpressionResult(ctx context.Context, req *stacks.InspectExpressionResult_Request) (*stacks.InspectExpressionResult_Response, error) {
hnd := handle[*stacksInspector](req.StackInspectorHandle)
insp := s.handles.StackInspector(hnd)
if insp == nil {
return nil, status.Error(codes.InvalidArgument, "the given stack inspector handle is invalid")
}
return insp.InspectExpressionResult(ctx, req)
}
func (s *stacksServer) OpenTerraformState(ctx context.Context, request *stacks.OpenTerraformState_Request) (*stacks.OpenTerraformState_Response, error) {
switch data := request.State.(type) {
case *stacks.OpenTerraformState_Request_ConfigPath:
// Load the state from the backend.
// This function should return an empty state even if the diags
// has errors. This makes it easier for the caller, as they should
// close the state handle regardless of the diags.
loader := stackmigrate.Loader{Discovery: s.services}
state, diags := loader.LoadState(data.ConfigPath)
hnd := s.handles.NewTerraformState(state)
return &stacks.OpenTerraformState_Response{
StateHandle: hnd.ForProtobuf(),
Diagnostics: diagnosticsToProto(diags),
}, nil
case *stacks.OpenTerraformState_Request_Raw:
// load the state from the raw data
file, err := statefile.Read(bytes.NewReader(data.Raw))
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid raw state data: %s", err)
}
hnd := s.handles.NewTerraformState(file.State)
return &stacks.OpenTerraformState_Response{
StateHandle: hnd.ForProtobuf(),
}, nil
default:
return nil, status.Error(codes.InvalidArgument, "invalid state source")
}
}
func (s *stacksServer) CloseTerraformState(ctx context.Context, request *stacks.CloseTerraformState_Request) (*stacks.CloseTerraformState_Response, error) {
hnd := handle[*states.State](request.StateHandle)
err := s.handles.CloseTerraformState(hnd)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return new(stacks.CloseTerraformState_Response), nil
}
func (s *stacksServer) MigrateTerraformState(request *stacks.MigrateTerraformState_Request, server stacks.Stacks_MigrateTerraformStateServer) error {
previousStateHandle := handle[*states.State](request.StateHandle)
previousState := s.handles.TerraformState(previousStateHandle)
if previousState == nil {
return status.Error(codes.InvalidArgument, "the given state handle is invalid")
}
configHandle := handle[*stackconfig.Config](request.ConfigHandle)
config := s.handles.StackConfig(configHandle)
if config == nil {
return status.Error(codes.InvalidArgument, "the given config handle is invalid")
}
dependencyLocksHandle := handle[*depsfile.Locks](request.DependencyLocksHandle)
dependencyLocks := s.handles.DependencyLocks(dependencyLocksHandle)
if dependencyLocks == nil {
return status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
}
var providerFactories map[addrs.Provider]providers.Factory
if s.providerCacheOverride != nil {
// This is only used in tests to side load providers without needing a
// real provider cache.
providerFactories = s.providerCacheOverride
} else {
providerCacheHandle := handle[*providercache.Dir](request.ProviderCacheHandle)
providerCache := s.handles.ProviderPluginCache(providerCacheHandle)
if providerCache == nil {
return status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
}
var err error
providerFactories, err = providerFactoriesForLocks(dependencyLocks, providerCache)
if err != nil {
return status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
}
}
migrate := &stackmigrate.Migration{
Providers: providerFactories,
PreviousState: previousState,
Config: config,
}
emit := func(change stackstate.AppliedChange) {
proto, err := change.AppliedChangeProto()
if err != nil {
server.Send(&stacks.MigrateTerraformState_Event{
Result: &stacks.MigrateTerraformState_Event_Diagnostic{
Diagnostic: &terraform1.Diagnostic{
Severity: terraform1.Diagnostic_ERROR,
Summary: "Failed to serialize change",
Detail: fmt.Sprintf("Failed to serialize state change for recording in the migration plan: %s", err),
},
},
})
return
}
server.Send(&stacks.MigrateTerraformState_Event{
Result: &stacks.MigrateTerraformState_Event_AppliedChange{
AppliedChange: proto,
},
})
}
emitDiag := func(diagnostic tfdiags.Diagnostic) {
server.Send(&stacks.MigrateTerraformState_Event{
Result: &stacks.MigrateTerraformState_Event_Diagnostic{
Diagnostic: diagnosticToProto(diagnostic),
},
})
}
mapping := request.GetMapping()
if mapping == nil {
return status.Error(codes.InvalidArgument, "missing migration mapping")
}
switch mapping := mapping.(type) {
case *stacks.MigrateTerraformState_Request_Simple:
migrate.Migrate(
mapping.Simple.ResourceAddressMap,
mapping.Simple.ModuleAddressMap,
emit, emitDiag)
default:
return status.Error(codes.InvalidArgument, "unsupported migration mapping")
}
return nil
}
func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
return stackChangeHooks(
func(scp *stacks.StackChangeProgress) error {
return evts.Send(&stacks.PlanStackChanges_Event{
Event: &stacks.PlanStackChanges_Event_Progress{
Progress: scp,
},
})
},
mainStackSource,
)
}
func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
return stackChangeHooks(
func(scp *stacks.StackChangeProgress) error {
return evts.Send(&stacks.ApplyStackChanges_Event{
Event: &stacks.ApplyStackChanges_Event_Progress{
Progress: scp,
},
})
},
mainStackSource,
)
}
// stackChangeHooks is the shared hook-handling logic for both [stackPlanHooks]
// and [stackApplyHooks]. Each phase emits a different subset of the events
// handled here.
func stackChangeHooks(send func(*stacks.StackChangeProgress) error, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
return &stackruntime.Hooks{
// For any BeginFunc-shaped hook that returns an OpenTelemetry tracing
// span, we'll wrap it in a context so that the runtime's downstream
// operations will appear as children of it.
ContextAttach: func(parent context.Context, tracking any) context.Context {
span, ok := tracking.(trace.Span)
if !ok {
return parent
}
return trace.ContextWithSpan(parent, span)
},
// For the overall plan operation we don't emit any events to the client,
// since it already knows it has asked us to plan, but we do establish
// a root tracing span for all of the downstream planning operations to
// attach themselves to.
BeginPlan: func(ctx context.Context, s struct{}) any {
_, span := tracer.Start(ctx, "planning", trace.WithAttributes(
attribute.String("main_stack_source", mainStackSource.String()),
))
return span
},
EndPlan: func(ctx context.Context, span any, s struct{}) any {
span.(trace.Span).End()
return nil
},
// For the overall apply operation we don't emit any events to the client,
// since it already knows it has asked us to apply, but we do establish
// a root tracing span for all of the downstream planning operations to
// attach themselves to.
BeginApply: func(ctx context.Context, s struct{}) any {
_, span := tracer.Start(ctx, "applying", trace.WithAttributes(
attribute.String("main_stack_source", mainStackSource.String()),
))
return span
},
EndApply: func(ctx context.Context, span any, s struct{}) any {
span.(trace.Span).End()
return nil
},
// After expanding a component, we emit an event to the client to
// list all of the resulting instances. In the common case of an
// unexpanded component, this will be a single address.
ComponentExpanded: func(ctx context.Context, ce *hooks.ComponentInstances) {
ias := make([]string, 0, len(ce.InstanceAddrs))
for _, ia := range ce.InstanceAddrs {
ias = append(ias, ia.String())
}
send(&stacks.StackChangeProgress{
Event: &stacks.StackChangeProgress_ComponentInstances_{
ComponentInstances: &stacks.StackChangeProgress_ComponentInstances{
ComponentAddr: ce.ComponentAddr.String(),
InstanceAddrs: ias,
},
},
})
},
// For each component instance, we emit a series of events to the
// client, reporting the status of the plan operation. We also create a
// nested tracing span for the component instance.
PendingComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending))
},
BeginComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning))
_, span := tracer.Start(ctx, "planning", trace.WithAttributes(
attribute.String("component_instance", ci.String()),
))
return span
},
EndComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned))
span.(trace.Span).SetStatus(otelCodes.Ok, "planning succeeded")
span.(trace.Span).End()
return nil
},
ErrorComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored))
span.(trace.Span).SetStatus(otelCodes.Error, "planning failed")
span.(trace.Span).End()
return nil
},
DeferComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceDeferred))
span.(trace.Span).SetStatus(otelCodes.Error, "planning succeeded, but deferred")
span.(trace.Span).End()
return nil
},
PendingComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending))
},
BeginComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplying))
_, span := tracer.Start(ctx, "applying", trace.WithAttributes(
attribute.String("component_instance", ci.String()),
))
return span
},
EndComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplied))
span.(trace.Span).SetStatus(otelCodes.Ok, "applying succeeded")
span.(trace.Span).End()
return nil
},
ErrorComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored))
span.(trace.Span).SetStatus(otelCodes.Error, "applying failed")
span.(trace.Span).End()
return nil
},
// When Terraform core reports a resource instance plan status, we
// forward it to the events client.
ReportResourceInstanceStatus: func(ctx context.Context, span any, rihd *hooks.ResourceInstanceStatusHookData) any {
// addrs.Provider.String() will panic on the zero value. In this
// case, holding a zero provider would mean a bug in our event
// logging code rather than in core logic, so avoid exploding, but
// send a blank string to expose the error later.
providerAddr := ""
if !rihd.ProviderAddr.IsZero() {
providerAddr = rihd.ProviderAddr.String()
}
send(&stacks.StackChangeProgress{
Event: &stacks.StackChangeProgress_ResourceInstanceStatus_{
ResourceInstanceStatus: &stacks.StackChangeProgress_ResourceInstanceStatus{
Addr: stacks.NewResourceInstanceObjectInStackAddr(rihd.Addr),
Status: rihd.Status.ForProtobuf(),
ProviderAddr: providerAddr,
},
},
})
return span
},
// Upon completion of a component instance plan, we emit a planned
// change sumary event to the client for each resource instance.
ReportResourceInstancePlanned: func(ctx context.Context, span any, ric *hooks.ResourceInstanceChange) any {
span.(trace.Span).AddEvent("planned resource instance", trace.WithAttributes(
attribute.String("component_instance", ric.Addr.Component.String()),
attribute.String("resource_instance", ric.Addr.Item.String()),
))
ripc, err := resourceInstancePlanned(ric)
if err != nil {
return span
}
send(&stacks.StackChangeProgress{
Event: &stacks.StackChangeProgress_ResourceInstancePlannedChange_{
ResourceInstancePlannedChange: ripc,
},
})
return span
},
ReportResourceInstanceDeferred: func(ctx context.Context, span any, change *hooks.DeferredResourceInstanceChange) any {
span.(trace.Span).AddEvent("deferred resource instance", trace.WithAttributes(
attribute.String("component_instance", change.Change.Addr.Component.String()),
attribute.String("resource_instance", change.Change.Addr.Item.String()),
))
ripc, err := resourceInstancePlanned(change.Change)
if err != nil {
return span
}
deferred := stackplan.EncodeDeferred(change.Reason)
send(&stacks.StackChangeProgress{
Event: &stacks.StackChangeProgress_DeferredResourceInstancePlannedChange_{
DeferredResourceInstancePlannedChange: &stacks.StackChangeProgress_DeferredResourceInstancePlannedChange{
Change: ripc,
Deferred: deferred,
},
},
})
return span
},
// We also report a roll-up of planned resource action counts after each
// component instance plan or apply completes.
ReportComponentInstancePlanned: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any {
send(&stacks.StackChangeProgress{
Event: &stacks.StackChangeProgress_ComponentInstanceChanges_{
ComponentInstanceChanges: &stacks.StackChangeProgress_ComponentInstanceChanges{
Addr: &stacks.ComponentInstanceInStackAddr{
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(),
ComponentInstanceAddr: cic.Addr.String(),
},
Total: int32(cic.Total()),
Add: int32(cic.Add),
Change: int32(cic.Change),
Import: int32(cic.Import),
Remove: int32(cic.Remove),
Defer: int32(cic.Defer),
Move: int32(cic.Move),
Forget: int32(cic.Forget),
},
},
})
return span
},
// The apply rollup should typically report the same information as
// the plan one did earlier, but could vary in some situations if
// e.g. a planned update turned out to be a no-op once some unknown
// values were known, or if the apply phase is handled by a different
// version of the agent than the plan phase which has support for
// a different set of possible change types.
ReportComponentInstanceApplied: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any {
send(&stacks.StackChangeProgress{
Event: &stacks.StackChangeProgress_ComponentInstanceChanges_{
ComponentInstanceChanges: &stacks.StackChangeProgress_ComponentInstanceChanges{
Addr: &stacks.ComponentInstanceInStackAddr{
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(),
ComponentInstanceAddr: cic.Addr.String(),
},
Total: int32(cic.Total()),
Add: int32(cic.Add),
Change: int32(cic.Change),
Import: int32(cic.Import),
Remove: int32(cic.Remove),
Defer: int32(cic.Defer),
Move: int32(cic.Move),
Forget: int32(cic.Forget),
},
},
})
return span
},
}
}
func resourceInstancePlanned(ric *hooks.ResourceInstanceChange) (*stacks.StackChangeProgress_ResourceInstancePlannedChange, error) {
actions, err := stacks.ChangeTypesForPlanAction(ric.Change.Action)
if err != nil {
return nil, err
}
var moved *stacks.StackChangeProgress_ResourceInstancePlannedChange_Moved
if !ric.Change.PrevRunAddr.Equal(ric.Change.Addr) {
moved = &stacks.StackChangeProgress_ResourceInstancePlannedChange_Moved{
PrevAddr: &stacks.ResourceInstanceInStackAddr{
ComponentInstanceAddr: ric.Addr.Component.String(),
ResourceInstanceAddr: ric.Change.PrevRunAddr.String(),
},
}
}
var imported *stacks.StackChangeProgress_ResourceInstancePlannedChange_Imported
if ric.Change.Importing != nil {
imported = &stacks.StackChangeProgress_ResourceInstancePlannedChange_Imported{
ImportId: ric.Change.Importing.ID,
Unknown: ric.Change.Importing.Unknown,
}
}
return &stacks.StackChangeProgress_ResourceInstancePlannedChange{
Addr: stacks.NewResourceInstanceObjectInStackAddr(ric.Addr),
Actions: actions,
Moved: moved,
Imported: imported,
ProviderAddr: ric.Change.ProviderAddr.Provider.String(),
}, nil
}
func evtComponentInstanceStatus(ci stackaddrs.AbsComponentInstance, status hooks.ComponentInstanceStatus) *stacks.StackChangeProgress {
return &stacks.StackChangeProgress{
Event: &stacks.StackChangeProgress_ComponentInstanceStatus_{
ComponentInstanceStatus: &stacks.StackChangeProgress_ComponentInstanceStatus{
Addr: &stacks.ComponentInstanceInStackAddr{
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(ci).String(),
ComponentInstanceAddr: ci.String(),
},
Status: status.ForProtobuf(),
},
},
}
}
// syncPlanStackChangesServer is a wrapper around a
// stacks.Stacks_PlanStackChangesServer implementation that makes the
// Send method concurrency-safe by holding a mutex throughout the underlying
// call.
type syncPlanStackChangesServer = syncStreamingRPCSender[stacks.Stacks_PlanStackChangesServer, *stacks.PlanStackChanges_Event]
// syncApplyStackChangesServer is a wrapper around a
// stacks.Stacks_ApplyStackChangesServer implementation that makes the
// Send method concurrency-safe by holding a mutex throughout the underlying
// call.
type syncApplyStackChangesServer = syncStreamingRPCSender[stacks.Stacks_ApplyStackChangesServer, *stacks.ApplyStackChanges_Event]