Documentation
¶
Index ¶
- Constants
- Variables
- func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte) (*hsm.Node, error)
- func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
- func CompletionHandler(ctx context.Context, env hsm.Environment, ref hsm.Ref, requestID string, ...) error
- func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
- func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
- func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
- func EndpointRegistryProvider(matchingClient resource.MatchingClient, ...) commonnexus.EndpointRegistry
- func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
- func RegisterEventDefinitions(reg *hsm.Registry) error
- func RegisterExecutor(registry *hsm.Registry, options TaskExecutorOptions) error
- func RegisterStateMachines(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type BackoffTask
- type BackoffTaskSerializer
- type CancelRequestCompletedEventDefinition
- func (d CancelRequestCompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestCompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestCompletedEventDefinition) Type() enumspb.EventType
- type CancelRequestFailedEventDefinition
- func (d CancelRequestFailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestFailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestFailedEventDefinition) Type() enumspb.EventType
- type CancelRequestedEventDefinition
- func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestedEventDefinition) Type() enumspb.EventType
- type Cancelation
- type CancelationBackoffTask
- type CancelationBackoffTaskSerializer
- type CancelationTask
- type CancelationTaskSerializer
- type CanceledEventDefinition
- func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CanceledEventDefinition) Type() enumspb.EventType
- type ClientProvider
- type CompletedEventDefinition
- func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CompletedEventDefinition) Type() enumspb.EventType
- type Config
- type EventAttemptFailed
- type EventCancelationAttemptFailed
- type EventCancelationFailed
- type EventCancelationRescheduled
- type EventCancelationScheduled
- type EventCancelationSucceeded
- type EventCanceled
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventStarted
- type EventSucceeded
- type EventTimedOut
- type FailedEventDefinition
- func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d FailedEventDefinition) Type() enumspb.EventType
- type InvocationTask
- type InvocationTaskSerializer
- type LimitedReadCloser
- type NexusHeaderTagMapping
- type NexusMetricTagConfig
- type NexusTransportProvider
- type Operation
- func (o Operation) Cancel(node *hsm.Node, t time.Time, requestedEventID int64) (hsm.TransitionOutput, error)
- func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
- func (o Operation) CancelationNode(node *hsm.Node) (*hsm.Node, error)
- func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)
- func (o Operation) SetState(state enumsspb.NexusOperationState)
- func (o Operation) State() enumsspb.NexusOperationState
- type ResponseSizeLimiter
- type ScheduledEventDefinition
- func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d ScheduledEventDefinition) Type() enumspb.EventType
- type StartedEventDefinition
- func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d StartedEventDefinition) Type() enumspb.EventType
- type TaskExecutorOptions
- type TimedOutEventDefinition
- func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
- func (d TimedOutEventDefinition) Type() enumspb.EventType
- type TimeoutTask
- type TimeoutTaskSerializer
Constants ¶
const ( // OperationMachineType is a unique type identifier for the Operation state machine. OperationMachineType = "nexusoperations.Operation" // CancelationMachineType is a unique type identifier for the Cancelation state machine. CancelationMachineType = "nexusoperations.Cancelation" )
const ( TaskTypeTimeout = "nexusoperations.Timeout" TaskTypeInvocation = "nexusoperations.Invocation" TaskTypeBackoff = "nexusoperations.Backoff" TaskTypeCancelation = "nexusoperations.Cancelation" TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff" )
const NexusCallbackSourceHeader = "Nexus-Callback-Source"
Variables ¶
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
"component.nexusoperations.callback.endpoint.template",
"unset",
`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver asynchronous completion.
The template can be used to interpolate the {{.NamepaceName}} and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set in order to use Nexus Operations.`,
)
var CancelationMachineKey = hsm.Key{Type: CancelationMachineType, ID: ""}
CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.
var DisallowedOperationHeaders = dynamicconfig.NewGlobalTypedSettingWithConverter( "component.nexusoperations.disallowedHeaders", func(in any) ([]string, error) { keys, err := dynamicconfig.ConvertStructure[[]string](nil)(in) if err != nil { return nil, err } for i, k := range keys { keys[i] = strings.ToLower(k) } return keys, nil }, []string{ "request-timeout", interceptor.DCRedirectionApiHeaderName, interceptor.DCRedirectionContextHeaderName, headers.CallerNameHeaderName, headers.CallerTypeHeaderName, headers.CallOriginHeaderName, }, `Case insensitive list of disallowed header keys for Nexus Operations. ScheduleNexusOperation commands with a "nexus_header" field that contains any of these disallowed keys will be rejected.`, )
var ErrInvalidOperationToken = errors.New("invalid operation token")
var ErrOperationTimeoutBelowMin = errors.New("remaining operation timeout is less than required minimum")
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.concurrency",
30,
`MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution.
Once the limit is reached, ScheduleNexusOperation commands will be rejected.`,
)
var MaxOperationHeaderSize = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.header.size",
8192,
`The maximum allowed header size for a Nexus Operation.
ScheduleNexusOperation commands with a "nexus_header" field that exceeds this limit will be rejected.
Uses Go's len() function on header keys and values to determine the total size.`,
)
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.name.length",
1000,
`MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name.
ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetting(
"component.nexusoperations.limit.scheduleToCloseTimeout",
0,
`MaxOperationScheduleToCloseTimeout limits the maximum allowed duration of a Nexus Operation. ScheduleOperation
commands that specify no schedule-to-close timeout or a longer timeout than permitted will have their
schedule-to-close timeout capped to this value. 0 implies no limit.`,
)
var MaxOperationTokenLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.token.length",
4096,
`Limits the maximum allowed length for a Nexus Operation token. Tokens returned via start responses or via async
completions that exceed this limit will be rejected. Uses Go's len() function to determine the length.
Leave this limit long enough to fit a workflow ID and namespace name plus padding at minimum since that's what the SDKs
use as the token.`,
)
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.service.name.length",
1000,
`MaxServiceNameLength limits the maximum allowed length for a Nexus Service name.
ScheduleNexusOperation commands with a service name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting( "component.nexusoperations.metrics.tags", NexusMetricTagConfig{}, `Controls which metric tags are included with Nexus operation metrics. This configuration supports: 1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag) 2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag) 3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings) Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration. Adding high-cardinality tags (like unique operation names) can significantly increase metric storage requirements and query complexity. Consider the cardinality impact when enabling these tags.`, )
var MinDispatchTaskTimeout = dynamicconfig.NewNamespaceDurationSetting( "component.nexusoperations.limit.dispatch.task.timeout.min", time.Second, `MinDispatchTaskTimeout is the minimum time remaining for a request to be dispatched to the handler worker. If the remaining request timeout is less than this value, a timeout error will be returned. Working in conjunction with MinRequestTimeout, both configs help ensure that the server has enough time to complete a Nexus request.`, )
var MinRequestTimeout = dynamicconfig.NewNamespaceDurationSetting( "component.nexusoperations.limit.request.timeout.min", time.Millisecond*1500, `MinRequestTimeout is the minimum time remaining for a request to complete for the server to make RPCs. If the remaining request timeout is less than this value, a non-retryable timeout error will be returned.`, )
var Module = fx.Module( "component.nexusoperations", fx.Provide(ConfigProvider), fx.Provide(ClientProviderFactory), fx.Provide(DefaultNexusTransportProvider), fx.Provide(CallbackTokenGeneratorProvider), fx.Provide(EndpointRegistryProvider), fx.Invoke(EndpointRegistryLifetimeHooks), fx.Invoke(RegisterStateMachines), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterEventDefinitions), fx.Invoke(RegisterExecutor), )
var OutboundRequestCounter = metrics.NewCounterDef( "nexus_outbound_requests", metrics.WithDescription("The number of Nexus outbound requests made by the history service."), )
var OutboundRequestLatency = metrics.NewTimerDef( "nexus_outbound_latency", metrics.WithDescription("Latency of outbound Nexus requests made by the history service."), )
var RecordCancelRequestCompletionEvents = dynamicconfig.NewGlobalBoolSetting( "component.nexusoperations.recordCancelRequestCompletionEvents", true, `Boolean flag to control whether to record NexusOperationCancelRequestCompleted and NexusOperationCancelRequestFailed events. Default true.`, )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "component.nexusoperations.request.timeout", time.Second*10, `RequestTimeout is the timeout for making a single nexus start or cancel request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var TransitionAttemptFailed = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED}, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(op.Attempt), nil) nextAttemptScheduleTime := event.Time.Add(nextDelay) op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) op.LastAttemptFailure = event.Failure return op.output() }, )
var TransitionCancelationAttemptFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(c.Attempt), nil) nextAttemptScheduleTime := event.Time.Add(nextDelay) c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) c.LastAttemptFailure = event.Failure return c.output(event.Node) }, )
var TransitionCancelationFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{ enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, }, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) c.LastAttemptFailure = event.Failure return c.output(event.Node) }, )
var TransitionCancelationRescheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(c Cancelation, event EventCancelationRescheduled) (hsm.TransitionOutput, error) { c.NextAttemptScheduleTime = nil return c.output(event.Node) }, )
var TransitionCancelationScheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(op Cancelation, event EventCancelationScheduled) (hsm.TransitionOutput, error) { op.RequestedTime = timestamppb.New(event.Time) return op.output(event.Node) }, )
var TransitionCancelationSucceeded = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SUCCEEDED, func(c Cancelation, event EventCancelationSucceeded) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) return c.output(event.Node) }, )
var TransitionCanceled = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_CANCELED, func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionFailed = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_FAILED, func(op Operation, event EventFailed) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionRescheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) { op.NextAttemptScheduleTime = nil return op.output() }, )
var TransitionScheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventScheduled) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionStarted = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_STARTED, func(op Operation, event EventStarted) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) if event.Attributes.OperationToken != "" { op.OperationToken = event.Attributes.OperationToken } else if event.Attributes.OperationId != "" { op.OperationToken = event.Attributes.OperationId } child, err := op.CancelationNode(event.Node) if err != nil { return hsm.TransitionOutput{}, err } if child != nil { return hsm.TransitionOutput{}, hsm.MachineTransition(child, func(c Cancelation) (hsm.TransitionOutput, error) { return TransitionCancelationScheduled.Apply(c, EventCancelationScheduled{ Time: event.Time, Node: child, }) }) } return op.output() }, )
var TransitionSucceeded = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_SUCCEEDED, func(op Operation, event EventSucceeded) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionTimedOut = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, func(op Operation, event EventTimedOut) (hsm.TransitionOutput, error) { return op.output() }, )
var UseSystemCallbackURL = dynamicconfig.NewGlobalBoolSetting( "component.nexusoperations.useSystemCallbackURL", false, `UseSystemCallbackURL is a global feature toggle that controls how the executor generates callback URLs for worker targets in Nexus Operations.When set to true, the executor will use the fixed system callback URL ("temporal://system") for all worker targets, instead of generating URLs from the callback URL template. This simplifies configuration and improves reliability for worker callbacks. - false (default): The executor uses the callback URL template to generate callback URLs for worker targets. - true: The executor uses the fixed system callback URL ("temporal://system") for worker targets. Note: The default will switch to true in future releases.`, )
Functions ¶
func AddChild ¶
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte) (*hsm.Node, error)
AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.
func CallbackTokenGeneratorProvider ¶
func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
func CompletionHandler ¶
func ConvertLinkWorkflowEventToNexusLink ¶ added in v1.25.1
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
NOTE: Experimental
func ConvertNexusLinkToLinkWorkflowEvent ¶ added in v1.25.1
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
NOTE: Experimental
func EndpointRegistryLifetimeHooks ¶ added in v1.25.0
func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
func EndpointRegistryProvider ¶ added in v1.25.0
func EndpointRegistryProvider( matchingClient resource.MatchingClient, endpointManager persistence.NexusEndpointManager, dc *dynamicconfig.Collection, logger log.Logger, metricsHandler metrics.Handler, ) commonnexus.EndpointRegistry
func MachineCollection ¶
func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
MachineCollection creates a new typed [statemachines.Collection] for operations.
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, options TaskExecutorOptions, ) error
func RegisterStateMachines ¶
func RegisterTaskSerializers ¶
Types ¶
type BackoffTask ¶
type BackoffTask struct {
// contains filtered or unexported fields
}
func (BackoffTask) Deadline ¶
func (t BackoffTask) Deadline() time.Time
func (BackoffTask) Destination ¶ added in v1.26.2
func (t BackoffTask) Destination() string
func (BackoffTask) Type ¶
func (BackoffTask) Type() string
func (BackoffTask) Validate ¶ added in v1.25.2
func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error
type BackoffTaskSerializer ¶
type BackoffTaskSerializer struct{}
func (BackoffTaskSerializer) Deserialize ¶
func (BackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelRequestCompletedEventDefinition ¶ added in v1.28.0
type CancelRequestCompletedEventDefinition struct{}
func (CancelRequestCompletedEventDefinition) Apply ¶ added in v1.28.0
func (d CancelRequestCompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestCompletedEventDefinition) CherryPick ¶ added in v1.28.0
func (d CancelRequestCompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger ¶ added in v1.28.0
func (d CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestCompletedEventDefinition) Type ¶ added in v1.28.0
func (d CancelRequestCompletedEventDefinition) Type() enumspb.EventType
type CancelRequestFailedEventDefinition ¶ added in v1.28.0
type CancelRequestFailedEventDefinition struct{}
func (CancelRequestFailedEventDefinition) Apply ¶ added in v1.28.0
func (d CancelRequestFailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestFailedEventDefinition) CherryPick ¶ added in v1.28.0
func (d CancelRequestFailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger ¶ added in v1.28.0
func (d CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestFailedEventDefinition) Type ¶ added in v1.28.0
func (d CancelRequestFailedEventDefinition) Type() enumspb.EventType
type CancelRequestedEventDefinition ¶
type CancelRequestedEventDefinition struct{}
func (CancelRequestedEventDefinition) Apply ¶ added in v1.25.0
func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestedEventDefinition) CherryPick ¶ added in v1.25.0
func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestedEventDefinition) Type ¶
func (d CancelRequestedEventDefinition) Type() enumspb.EventType
type Cancelation ¶
type Cancelation struct {
*persistencespb.NexusOperationCancellationInfo
}
Cancelation state machine for canceling an operation.
func (Cancelation) RegenerateTasks ¶
func (Cancelation) SetState ¶
func (c Cancelation) SetState(state enumspb.NexusOperationCancellationState)
func (Cancelation) State ¶
func (c Cancelation) State() enumspb.NexusOperationCancellationState
type CancelationBackoffTask ¶
type CancelationBackoffTask struct {
// contains filtered or unexported fields
}
func (CancelationBackoffTask) Deadline ¶
func (t CancelationBackoffTask) Deadline() time.Time
func (CancelationBackoffTask) Destination ¶ added in v1.26.2
func (CancelationBackoffTask) Destination() string
func (CancelationBackoffTask) Type ¶
func (CancelationBackoffTask) Type() string
func (CancelationBackoffTask) Validate ¶ added in v1.26.2
func (CancelationBackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationBackoffTaskSerializer ¶
type CancelationBackoffTaskSerializer struct{}
func (CancelationBackoffTaskSerializer) Deserialize ¶
func (CancelationBackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelationTask ¶
func (CancelationTask) Deadline ¶ added in v1.26.2
func (CancelationTask) Deadline() time.Time
func (CancelationTask) Destination ¶
func (t CancelationTask) Destination() string
func (CancelationTask) Type ¶
func (CancelationTask) Type() string
func (CancelationTask) Validate ¶ added in v1.26.2
func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationTaskSerializer ¶
type CancelationTaskSerializer struct{}
func (CancelationTaskSerializer) Deserialize ¶
func (CancelationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CanceledEventDefinition ¶
type CanceledEventDefinition struct{}
func (CanceledEventDefinition) Apply ¶ added in v1.25.0
func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CanceledEventDefinition) CherryPick ¶ added in v1.25.0
func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CanceledEventDefinition) IsWorkflowTaskTrigger ¶
func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
func (CanceledEventDefinition) Type ¶
func (d CanceledEventDefinition) Type() enumspb.EventType
type ClientProvider ¶
type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexusrpc.HTTPClient, error)
ClientProvider provides a nexus client for a given endpoint.
func ClientProviderFactory ¶
func ClientProviderFactory( namespaceRegistry namespace.Registry, endpointRegistry commonnexus.EndpointRegistry, httpTransportProvider NexusTransportProvider, clusterMetadata cluster.Metadata, rpcFactory common.RPCFactory, ) (ClientProvider, error)
type CompletedEventDefinition ¶
type CompletedEventDefinition struct{}
func (CompletedEventDefinition) Apply ¶ added in v1.25.0
func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CompletedEventDefinition) CherryPick ¶ added in v1.25.0
func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CompletedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
func (CompletedEventDefinition) Type ¶
func (d CompletedEventDefinition) Type() enumspb.EventType
type Config ¶
type Config struct {
Enabled dynamicconfig.BoolPropertyFn
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxServiceNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationTokenLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationHeaderSize dynamicconfig.IntPropertyFnWithNamespaceFilter
DisallowedOperationHeaders dynamicconfig.TypedPropertyFn[[]string]
MaxOperationScheduleToCloseTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
CallbackURLTemplate dynamicconfig.StringPropertyFn
UseSystemCallbackURL dynamicconfig.BoolPropertyFn
RecordCancelRequestCompletionEvents dynamicconfig.BoolPropertyFn
RetryPolicy func() backoff.RetryPolicy
}
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type EventAttemptFailed ¶
type EventAttemptFailed struct {
Time time.Time
Failure *failurepb.Failure
Node *hsm.Node
RetryPolicy backoff.RetryPolicy
}
EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.
type EventCancelationAttemptFailed ¶
type EventCancelationAttemptFailed struct {
Time time.Time
Failure *failurepb.Failure
Node *hsm.Node
RetryPolicy backoff.RetryPolicy
}
EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.
type EventCancelationFailed ¶
EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.
type EventCancelationRescheduled ¶
EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.
type EventCancelationScheduled ¶
EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.
type EventCancelationSucceeded ¶
EventCancelationSucceeded is triggered when a cancelation attempt succeeds.
type EventCanceled ¶
EventCanceled is triggered when an invocation attempt succeeds.
type EventFailed ¶
type EventFailed struct {
Time time.Time
Node *hsm.Node
Attributes *historypb.NexusOperationFailedEventAttributes
}
EventFailed is triggered when an invocation attempt is failed with a non retryable error.
type EventRescheduled ¶
EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.
type EventStarted ¶
type EventStarted struct {
Time time.Time
Node *hsm.Node
Attributes *historypb.NexusOperationStartedEventAttributes
}
EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.
type EventSucceeded ¶
type EventSucceeded struct {
// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
Time time.Time
Node *hsm.Node
}
EventSucceeded is triggered when an invocation attempt succeeds.
type EventTimedOut ¶
EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
type FailedEventDefinition ¶
type FailedEventDefinition struct{}
func (FailedEventDefinition) Apply ¶ added in v1.25.0
func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (FailedEventDefinition) CherryPick ¶ added in v1.25.0
func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (FailedEventDefinition) IsWorkflowTaskTrigger ¶
func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
func (FailedEventDefinition) Type ¶
func (d FailedEventDefinition) Type() enumspb.EventType
type InvocationTask ¶
func (InvocationTask) Deadline ¶ added in v1.26.2
func (InvocationTask) Deadline() time.Time
func (InvocationTask) Destination ¶
func (t InvocationTask) Destination() string
func (InvocationTask) Type ¶
func (InvocationTask) Type() string
func (InvocationTask) Validate ¶ added in v1.25.2
func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type InvocationTaskSerializer ¶
type InvocationTaskSerializer struct{}
func (InvocationTaskSerializer) Deserialize ¶
func (InvocationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type LimitedReadCloser ¶
type LimitedReadCloser struct {
R io.ReadCloser
N int64
}
A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.
func NewLimitedReadCloser ¶
func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser
func (*LimitedReadCloser) Close ¶
func (l *LimitedReadCloser) Close() error
type NexusHeaderTagMapping ¶ added in v1.27.0
type NexusMetricTagConfig ¶ added in v1.27.0
type NexusMetricTagConfig struct {
// Include service name as a metric tag
IncludeServiceTag bool
// Include operation name as a metric tag
IncludeOperationTag bool
// Configuration for mapping request headers to metric tags
HeaderTagMappings []NexusHeaderTagMapping
}
type NexusTransportProvider ¶
type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper
NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.
func DefaultNexusTransportProvider ¶
func DefaultNexusTransportProvider() NexusTransportProvider
type Operation ¶
type Operation struct {
*persistencespb.NexusOperationInfo
}
Operation state machine.
func (Operation) Cancel ¶
func (o Operation) Cancel(node *hsm.Node, t time.Time, requestedEventID int64) (hsm.TransitionOutput, error)
Cancel marks the Operation machine as canceled by spawning a child Cancelation machine. If the Operation already completed, then the Operation cannot be canceled anymore, and the Cancelation machine will stay in UNSPECIFIED state. If the Operation is in STARTED state, then transition the Cancelation machine to the SCHEDULED state. Otherwise, the Cancelation machine will wait the Operation machine transition to the STARTED state.
func (Operation) Cancelation ¶
func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
func (Operation) CancelationNode ¶ added in v1.25.2
func (Operation) RegenerateTasks ¶
func (Operation) SetState ¶
func (o Operation) SetState(state enumsspb.NexusOperationState)
func (Operation) State ¶
func (o Operation) State() enumsspb.NexusOperationState
type ResponseSizeLimiter ¶
type ResponseSizeLimiter struct {
// contains filtered or unexported fields
}
type ScheduledEventDefinition ¶
type ScheduledEventDefinition struct{}
func (ScheduledEventDefinition) Apply ¶ added in v1.25.0
func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (ScheduledEventDefinition) CherryPick ¶ added in v1.25.0
func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (ScheduledEventDefinition) IsWorkflowTaskTrigger ¶
func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
func (ScheduledEventDefinition) Type ¶
func (d ScheduledEventDefinition) Type() enumspb.EventType
type StartedEventDefinition ¶
type StartedEventDefinition struct{}
func (StartedEventDefinition) Apply ¶ added in v1.25.0
func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (StartedEventDefinition) CherryPick ¶ added in v1.25.0
func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (StartedEventDefinition) IsWorkflowTaskTrigger ¶
func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
func (StartedEventDefinition) Type ¶
func (d StartedEventDefinition) Type() enumspb.EventType
type TaskExecutorOptions ¶ added in v1.25.0
type TaskExecutorOptions struct {
fx.In
Config *Config
NamespaceRegistry namespace.Registry
MetricsHandler metrics.Handler
Logger log.Logger
CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
ClientProvider ClientProvider
EndpointRegistry commonnexus.EndpointRegistry
HTTPTraceProvider commonnexus.HTTPClientTraceProvider
}
type TimedOutEventDefinition ¶
type TimedOutEventDefinition struct{}
func (TimedOutEventDefinition) Apply ¶ added in v1.25.0
func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (TimedOutEventDefinition) CherryPick ¶ added in v1.25.0
func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (TimedOutEventDefinition) IsWorkflowTaskTrigger ¶
func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
func (TimedOutEventDefinition) Type ¶
func (d TimedOutEventDefinition) Type() enumspb.EventType
type TimeoutTask ¶
type TimeoutTask struct {
// contains filtered or unexported fields
}
func (TimeoutTask) Deadline ¶
func (t TimeoutTask) Deadline() time.Time
func (TimeoutTask) Destination ¶ added in v1.26.2
func (TimeoutTask) Destination() string
func (TimeoutTask) Type ¶
func (TimeoutTask) Type() string
func (TimeoutTask) Validate ¶
func (t TimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
Validate checks if the timeout task is still valid to execute for the given node state.
type TimeoutTaskSerializer ¶
type TimeoutTaskSerializer struct{}
func (TimeoutTaskSerializer) Deserialize ¶
func (TimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)