Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeString(t require.TestingT, pls *commonpb.Payloads) string
- func EventBatchesToVersionHistory(versionHistory *historyspb.VersionHistory, eventBatches []*historypb.History) (*historyspb.VersionHistory, error)
- func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages
- func NewContext() context.Context
- func NewTestDataConverter() converter.DataConverter
- func RandomizeStr(id string) string
- func RandomizedNexusEndpoint(name string) string
- func UseCassandraPersistence() bool
- func UseSQLVisibility() bool
- func WithDropTask(o *PollAndProcessWorkflowTaskOptions)
- func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)
- func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)
- func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)
- func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)
- func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)
- func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)
- type ActivityTaskHandler
- type ArchiverBase
- type CapturedReplicationMessage
- type FrontendConfig
- type FunctionalTestBase
- func (s *FunctionalTestBase) AdminClient() adminservice.AdminServiceClient
- func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)
- func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int
- func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string
- func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)
- func (s *FunctionalTestBase) ExternalNamespace() namespace.Name
- func (s *FunctionalTestBase) FrontendClient() workflowservice.WorkflowServiceClient
- func (s *FunctionalTestBase) FrontendGRPCAddress() string
- func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent
- func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent
- func (s *FunctionalTestBase) GetNamespaceID(namespace string) string
- func (s *FunctionalTestBase) GetTestCluster() *TestCluster
- func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig
- func (s *FunctionalTestBase) HttpAPIAddress() string
- func (s *FunctionalTestBase) InjectHook(key testhooks.Key, value any) (cleanup func())
- func (s *FunctionalTestBase) MarkNamespaceAsDeleted(nsName namespace.Name) error
- func (s *FunctionalTestBase) Namespace() namespace.Name
- func (s *FunctionalTestBase) NamespaceID() namespace.ID
- func (s *FunctionalTestBase) OperatorClient() operatorservice.OperatorServiceClient
- func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
- func (s *FunctionalTestBase) RegisterNamespace(nsName namespace.Name, retentionDays int32, ...) (namespace.ID, error)
- func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())
- func (s *FunctionalTestBase) SdkClient() sdkclient.Client
- func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string, ...) error
- func (s *FunctionalTestBase) SetupSubTest()
- func (s *FunctionalTestBase) SetupSuite()
- func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)
- func (s *FunctionalTestBase) SetupTest()
- func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller
- func (s *FunctionalTestBase) TaskQueue() string
- func (s *FunctionalTestBase) TearDownCluster()
- func (s *FunctionalTestBase) TearDownSubTest()
- func (s *FunctionalTestBase) TearDownSuite()
- func (s *FunctionalTestBase) TearDownTest()
- func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})
- func (s *FunctionalTestBase) Worker() sdkworker.Worker
- type HistoryConfig
- type MatchingConfig
- type MessageHandler
- type PersistenceTestBaseFactory
- type PollAndProcessWorkflowTaskOptionFunc
- type PollAndProcessWorkflowTaskOptions
- type PollAndProcessWorkflowTaskResponse
- type QueryHandler
- type RecordedTask
- type ReplicationStreamRecorder
- func (r *ReplicationStreamRecorder) Clear()
- func (r *ReplicationStreamRecorder) GetMessages() []CapturedReplicationMessage
- func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)
- func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor
- func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor
- func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor
- func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor
- func (r *ReplicationStreamRecorder) WriteToLog() error
- type TaskFilter
- type TaskMatcher
- type TaskPollerdeprecated
- func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, ...) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)
- func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
- func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
- func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)
- func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)
- type TaskQueueRecorder
- func (r *TaskQueueRecorder) AddHistoryTasks(ctx context.Context, request *persistence.AddHistoryTasksRequest) error
- func (r *TaskQueueRecorder) AppendHistoryNodes(ctx context.Context, request *persistence.AppendHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error)
- func (r *TaskQueueRecorder) AppendRawHistoryNodes(ctx context.Context, request *persistence.AppendRawHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error)
- func (r *TaskQueueRecorder) Close()
- func (r *TaskQueueRecorder) CompleteHistoryTask(ctx context.Context, request *persistence.CompleteHistoryTaskRequest) error
- func (r *TaskQueueRecorder) ConflictResolveWorkflowExecution(ctx context.Context, ...) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int
- func (r *TaskQueueRecorder) CountTasksForNamespace(category tasks.Category, namespaceID string, matcher TaskMatcher) int
- func (r *TaskQueueRecorder) CountTasksForWorkflow(category tasks.Category, namespaceID string, workflowID string, runID string, ...) int
- func (r *TaskQueueRecorder) CreateWorkflowExecution(ctx context.Context, request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution(ctx context.Context, ...) error
- func (r *TaskQueueRecorder) DeleteHistoryBranch(ctx context.Context, request *persistence.DeleteHistoryBranchRequest) error
- func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ(ctx context.Context, request *persistence.DeleteReplicationTaskFromDLQRequest) error
- func (r *TaskQueueRecorder) DeleteWorkflowExecution(ctx context.Context, request *persistence.DeleteWorkflowExecutionRequest) error
- func (r *TaskQueueRecorder) ForkHistoryBranch(ctx context.Context, request *persistence.ForkHistoryBranchRequest) (*persistence.ForkHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) GetAllHistoryTreeBranches(ctx context.Context, request *persistence.GetAllHistoryTreeBranchesRequest) (*persistence.GetAllHistoryTreeBranchesResponse, error)
- func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask
- func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task
- func (r *TaskQueueRecorder) GetCurrentExecution(ctx context.Context, request *persistence.GetCurrentExecutionRequest) (*persistence.GetCurrentExecutionResponse, error)
- func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil
- func (r *TaskQueueRecorder) GetHistoryTasks(ctx context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error)
- func (r *TaskQueueRecorder) GetName() string
- func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask
- func (r *TaskQueueRecorder) GetReplicationTasksFromDLQ(ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest) (*persistence.GetHistoryTasksResponse, error)
- func (r *TaskQueueRecorder) GetWorkflowExecution(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) IsReplicationDLQEmpty(ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest) (bool, error)
- func (r *TaskQueueRecorder) ListConcreteExecutions(ctx context.Context, request *persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error)
- func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask
- func (r *TaskQueueRecorder) MatchTasksForNamespace(category tasks.Category, namespaceID string, matcher TaskMatcher) []RecordedTask
- func (r *TaskQueueRecorder) MatchTasksForWorkflow(category tasks.Category, namespaceID string, workflowID string, runID string, ...) []RecordedTask
- func (r *TaskQueueRecorder) PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error
- func (r *TaskQueueRecorder) RangeCompleteHistoryTasks(ctx context.Context, request *persistence.RangeCompleteHistoryTasksRequest) error
- func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ(ctx context.Context, ...) error
- func (r *TaskQueueRecorder) ReadHistoryBranch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) ReadHistoryBranchByBatch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchByBatchResponse, error)
- func (r *TaskQueueRecorder) ReadHistoryBranchReverse(ctx context.Context, request *persistence.ReadHistoryBranchReverseRequest) (*persistence.ReadHistoryBranchReverseResponse, error)
- func (r *TaskQueueRecorder) ReadRawHistoryBranch(ctx context.Context, request *persistence.ReadHistoryBranchRequest) (*persistence.ReadRawHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) SetWorkflowExecution(ctx context.Context, request *persistence.SetWorkflowExecutionRequest) (*persistence.SetWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) TrimHistoryBranch(ctx context.Context, request *persistence.TrimHistoryBranchRequest) (*persistence.TrimHistoryBranchResponse, error)
- func (r *TaskQueueRecorder) UpdateWorkflowExecution(ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error)
- func (r *TaskQueueRecorder) WriteToLog(filePath string) error
- type TemporalImpl
- func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient
- func (c *TemporalImpl) Authorize(ctx context.Context, caller *authorization.Claims, ...) (authorization.Result, error)
- func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler
- func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)
- func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager
- func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient
- func (c *TemporalImpl) FrontendClient() workflowservice.WorkflowServiceClient
- func (c *TemporalImpl) FrontendGRPCAddress() string
- func (c *TemporalImpl) FrontendHTTPAddress() string
- func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry
- func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)
- func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager
- func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor
- func (c *TemporalImpl) GetMetricsHandler() metrics.Handler
- func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider
- func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry
- func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder
- func (c *TemporalImpl) HistoryClient() historyservice.HistoryServiceClient
- func (c *TemporalImpl) MatchingClient() matchingservice.MatchingServiceClient
- func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry
- func (c *TemporalImpl) OperatorClient() operatorservice.OperatorServiceClient
- func (c *TemporalImpl) RemoteFrontendGRPCAddress() string
- func (c *TemporalImpl) SetOnAuthorize(...)
- func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))
- func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)
- func (c *TemporalImpl) Start() error
- func (c *TemporalImpl) Stop() error
- func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider
- type TemporalParams
- type TestCluster
- func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient
- func (tc *TestCluster) ArchiverBase() *ArchiverBase
- func (tc *TestCluster) ClusterName() string
- func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager
- func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient
- func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder
- func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder
- func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient
- func (tc *TestCluster) Host() *TemporalImpl
- func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient
- func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient
- func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())
- func (tc *TestCluster) TearDownCluster() error
- func (tc *TestCluster) TestBase() *persistencetests.TestBase
- type TestClusterConfig
- type TestClusterFactory
- type TestClusterOption
- func WithArchivalEnabled() TestClusterOption
- func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption
- func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption
- func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption
- func WithMTLS() TestClusterOption
- func WithNumHistoryShards(n int32) TestClusterOption
- type TestClusterParams
- type TestDataConverter
- func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error
- func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error
- func (tdc *TestDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error)
- func (tdc *TestDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error)
- func (tdc *TestDataConverter) ToString(payload *commonpb.Payload) string
- func (tdc *TestDataConverter) ToStrings(payloads *commonpb.Payloads) []string
- type WorkerConfig
- type WorkflowTaskHandler
Constants ¶
const ( DefaultPageSize = 5 PprofTestPort = 7000 TlsCertCommonName = "my-common-name" ClientSuiteLimit = 10 // TODO (alex): replace all sleeps with WaitForESToSettle with s.Eventually() WaitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent )
const ( DirectionSend = "send" DirectionRecv = "recv" DirectionServerSend = "server_send" DirectionServerRecv = "server_recv" )
Message direction constants
const NamespaceCacheRefreshInterval = time.Second
Variables ¶
var ( ErrEncodingIsNotSet = errors.New("payload encoding metadata is not set") ErrEncodingIsNotSupported = errors.New("payload encoding is not supported") )
var (
ErrNoTasks = errors.New("no tasks")
)
Functions ¶
func EventBatchesToVersionHistory ¶
func EventBatchesToVersionHistory( versionHistory *historyspb.VersionHistory, eventBatches []*historypb.History, ) (*historyspb.VersionHistory, error)
func ExtractReplicationMessages ¶ added in v1.30.0
func ExtractReplicationMessages(msg proto.Message) *replicationspb.WorkflowReplicationMessages
ExtractReplicationMessages extracts WorkflowReplicationMessages from a proto message. This is a helper for tests that need to inspect replication message contents.
func NewContext ¶
NewContext create new context with default timeout 90 seconds.
func NewTestDataConverter ¶
func NewTestDataConverter() converter.DataConverter
TODO (alex): use it by default SdkCleint everywhere?
func RandomizeStr ¶
func RandomizedNexusEndpoint ¶
func UseCassandraPersistence ¶ added in v1.29.0
func UseCassandraPersistence() bool
func UseSQLVisibility ¶ added in v1.27.0
func UseSQLVisibility() bool
func WithDropTask ¶
func WithDropTask(o *PollAndProcessWorkflowTaskOptions)
func WithDumpHistory ¶
func WithDumpHistory(o *PollAndProcessWorkflowTaskOptions)
func WithForceNewWorkflowTask ¶
func WithForceNewWorkflowTask(o *PollAndProcessWorkflowTaskOptions)
func WithNoDumpCommands ¶
func WithNoDumpCommands(o *PollAndProcessWorkflowTaskOptions)
func WithPollSticky ¶
func WithPollSticky(o *PollAndProcessWorkflowTaskOptions)
func WithRespondSticky ¶
func WithRespondSticky(o *PollAndProcessWorkflowTaskOptions)
func WithoutRetries ¶
func WithoutRetries(o *PollAndProcessWorkflowTaskOptions)
Types ¶
type ActivityTaskHandler ¶
type ActivityTaskHandler func(task *workflowservice.PollActivityTaskQueueResponse) (*commonpb.Payloads, bool, error)
type ArchiverBase ¶
type ArchiverBase struct {
// contains filtered or unexported fields
}
ArchiverBase is a testcore struct for archiver provider being used in functional tests
func (*ArchiverBase) HistoryURI ¶
func (a *ArchiverBase) HistoryURI() string
func (*ArchiverBase) Metadata ¶
func (a *ArchiverBase) Metadata() archiver.ArchivalMetadata
func (*ArchiverBase) Provider ¶
func (a *ArchiverBase) Provider() provider.ArchiverProvider
func (*ArchiverBase) VisibilityURI ¶
func (a *ArchiverBase) VisibilityURI() string
type CapturedReplicationMessage ¶ added in v1.30.0
type CapturedReplicationMessage struct {
Timestamp string `json:"timestamp"`
Method string `json:"method"`
Direction string `json:"direction"`
ClusterName string `json:"clusterName"`
TargetAddress string `json:"targetAddress"`
MessageType string `json:"messageType"`
IsStreamCall bool `json:"isStreamCall"`
Request proto.Message `json:"-"` // Don't marshal directly
Response proto.Message `json:"-"` // Don't marshal directly
Message json.RawMessage `json:"message,omitempty"`
}
CapturedReplicationMessage represents a captured replication message
type FrontendConfig ¶
type FrontendConfig struct {
NumFrontendHosts int
}
FrontendConfig is the config for the frontend service
type FunctionalTestBase ¶
type FunctionalTestBase struct {
suite.Suite
// `suite.Suite` embeds `*assert.Assertions` which, by default, makes all asserts (like `s.NoError(err)`)
// only log the error, continue test execution, and only then fail the test.
// This is not desired behavior in most cases. The idiomatic way to change this behavior
// is to replace `*assert.Assertions` with `*require.Assertions` by embedding it in every test suite
// (or base struct of every test suite).
*require.Assertions
protorequire.ProtoAssertions
historyrequire.HistoryRequire
updateutils.UpdateUtils
Logger log.Logger
// contains filtered or unexported fields
}
func (*FunctionalTestBase) AdminClient ¶
func (s *FunctionalTestBase) AdminClient() adminservice.AdminServiceClient
func (*FunctionalTestBase) DecodePayloadsByteSliceInt32 ¶
func (s *FunctionalTestBase) DecodePayloadsByteSliceInt32(ps *commonpb.Payloads) (r int32)
func (*FunctionalTestBase) DecodePayloadsInt ¶
func (s *FunctionalTestBase) DecodePayloadsInt(ps *commonpb.Payloads) int
func (*FunctionalTestBase) DecodePayloadsString ¶
func (s *FunctionalTestBase) DecodePayloadsString(ps *commonpb.Payloads) string
func (*FunctionalTestBase) DurationNear ¶
func (s *FunctionalTestBase) DurationNear(value, target, tolerance time.Duration)
func (*FunctionalTestBase) ExternalNamespace ¶ added in v1.29.0
func (s *FunctionalTestBase) ExternalNamespace() namespace.Name
func (*FunctionalTestBase) FrontendClient ¶
func (s *FunctionalTestBase) FrontendClient() workflowservice.WorkflowServiceClient
func (*FunctionalTestBase) FrontendGRPCAddress ¶
func (s *FunctionalTestBase) FrontendGRPCAddress() string
func (*FunctionalTestBase) GetHistory ¶
func (s *FunctionalTestBase) GetHistory(namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent
func (*FunctionalTestBase) GetHistoryFunc ¶
func (s *FunctionalTestBase) GetHistoryFunc(namespace string, execution *commonpb.WorkflowExecution) func() []*historypb.HistoryEvent
func (*FunctionalTestBase) GetNamespaceID ¶
func (s *FunctionalTestBase) GetNamespaceID(namespace string) string
func (*FunctionalTestBase) GetTestCluster ¶
func (s *FunctionalTestBase) GetTestCluster() *TestCluster
func (*FunctionalTestBase) GetTestClusterConfig ¶
func (s *FunctionalTestBase) GetTestClusterConfig() *TestClusterConfig
func (*FunctionalTestBase) HttpAPIAddress ¶
func (s *FunctionalTestBase) HttpAPIAddress() string
func (*FunctionalTestBase) InjectHook ¶ added in v1.27.0
func (s *FunctionalTestBase) InjectHook(key testhooks.Key, value any) (cleanup func())
func (*FunctionalTestBase) MarkNamespaceAsDeleted ¶ added in v1.27.0
func (s *FunctionalTestBase) MarkNamespaceAsDeleted( nsName namespace.Name, ) error
func (*FunctionalTestBase) Namespace ¶
func (s *FunctionalTestBase) Namespace() namespace.Name
func (*FunctionalTestBase) NamespaceID ¶ added in v1.27.0
func (s *FunctionalTestBase) NamespaceID() namespace.ID
func (*FunctionalTestBase) OperatorClient ¶
func (s *FunctionalTestBase) OperatorClient() operatorservice.OperatorServiceClient
func (*FunctionalTestBase) OverrideDynamicConfig ¶
func (s *FunctionalTestBase) OverrideDynamicConfig(setting dynamicconfig.GenericSetting, value any) (cleanup func())
Overrides one dynamic config setting for the duration of this test (or sub-test). The change will automatically be reverted at the end of the test (using t.Cleanup). The cleanup function is also returned if you want to revert the change before the end of the test.
func (*FunctionalTestBase) RegisterNamespace ¶ added in v1.27.0
func (s *FunctionalTestBase) RegisterNamespace( nsName namespace.Name, retentionDays int32, archivalState enumspb.ArchivalState, historyArchivalURI string, visibilityArchivalURI string, ) (namespace.ID, error)
Register namespace using persistence API because:
- The Retention period is set to 0 for archival tests, and this can't be done through FE,
- Update search attributes would require an extra API call,
- One more extra API call would be necessary to get namespace.ID.
func (*FunctionalTestBase) RunTestWithMatchingBehavior ¶
func (s *FunctionalTestBase) RunTestWithMatchingBehavior(subtest func())
func (*FunctionalTestBase) SdkClient ¶ added in v1.28.0
func (s *FunctionalTestBase) SdkClient() sdkclient.Client
func (*FunctionalTestBase) SendSignal ¶ added in v1.27.0
func (s *FunctionalTestBase) SendSignal(nsName string, execution *commonpb.WorkflowExecution, signalName string, input *commonpb.Payloads, identity string) error
TODO (alex): change to nsName namespace.Name
func (*FunctionalTestBase) SetupSubTest ¶ added in v1.27.0
func (s *FunctionalTestBase) SetupSubTest()
func (*FunctionalTestBase) SetupSuite ¶
func (s *FunctionalTestBase) SetupSuite()
func (*FunctionalTestBase) SetupSuiteWithCluster ¶ added in v1.27.0
func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption)
func (*FunctionalTestBase) SetupTest ¶
func (s *FunctionalTestBase) SetupTest()
All test suites that inherit FunctionalTestBase and overwrite SetupTest must call this testcore FunctionalTestBase.SetupTest function to distribute the tests into partitions. Otherwise, the test suite will be executed multiple times in each partition.
func (*FunctionalTestBase) TaskPoller ¶ added in v1.28.0
func (s *FunctionalTestBase) TaskPoller() *taskpoller.TaskPoller
func (*FunctionalTestBase) TaskQueue ¶ added in v1.28.0
func (s *FunctionalTestBase) TaskQueue() string
func (*FunctionalTestBase) TearDownCluster ¶ added in v1.27.0
func (s *FunctionalTestBase) TearDownCluster()
func (*FunctionalTestBase) TearDownSubTest ¶ added in v1.28.0
func (s *FunctionalTestBase) TearDownSubTest()
**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownSubTest()`.
func (*FunctionalTestBase) TearDownSuite ¶
func (s *FunctionalTestBase) TearDownSuite()
func (*FunctionalTestBase) TearDownTest ¶ added in v1.28.0
func (s *FunctionalTestBase) TearDownTest()
**IMPORTANT**: When overridding this, make sure to invoke `s.FunctionalTestBase.TearDownTest()`.
func (*FunctionalTestBase) WaitForChannel ¶ added in v1.27.0
func (s *FunctionalTestBase) WaitForChannel(ctx context.Context, ch chan struct{})
func (*FunctionalTestBase) Worker ¶ added in v1.28.0
func (s *FunctionalTestBase) Worker() sdkworker.Worker
type HistoryConfig ¶
HistoryConfig contains configs for history service
type MatchingConfig ¶
type MatchingConfig struct {
NumMatchingHosts int
}
MatchingConfig is the config for the matching service
type MessageHandler ¶
type MessageHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error)
type PersistenceTestBaseFactory ¶
type PersistenceTestBaseFactory interface {
NewTestBase(options *persistencetests.TestBaseOptions) *persistencetests.TestBase
}
type PollAndProcessWorkflowTaskOptionFunc ¶
type PollAndProcessWorkflowTaskOptionFunc func(*PollAndProcessWorkflowTaskOptions)
func WithExpectedAttemptCount ¶
func WithExpectedAttemptCount(c int) PollAndProcessWorkflowTaskOptionFunc
func WithRetries ¶
func WithRetries(c int) PollAndProcessWorkflowTaskOptionFunc
type PollAndProcessWorkflowTaskResponse ¶
type PollAndProcessWorkflowTaskResponse struct {
IsQueryTask bool
NewTask *workflowservice.RespondWorkflowTaskCompletedResponse
}
type QueryHandler ¶
type QueryHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*commonpb.Payloads, error)
type RecordedTask ¶ added in v1.30.0
type RecordedTask struct {
Timestamp time.Time `json:"timestamp"`
TaskType string `json:"taskType"` // The specific task type (e.g., "TASK_TYPE_ACTIVITY_RETRY_TIMER")
ShardID int32 `json:"shardId"`
RangeID int64 `json:"rangeId,omitempty"`
NamespaceID string `json:"namespaceId"`
WorkflowID string `json:"workflowId"`
RunID string `json:"runId"`
Task tasks.Task `json:"task"` // The actual task object
}
RecordedTask wraps a task with metadata about when and where it was written
type ReplicationStreamRecorder ¶ added in v1.30.0
type ReplicationStreamRecorder struct {
// contains filtered or unexported fields
}
ReplicationStreamRecorder captures replication stream messages for testing
func NewReplicationStreamRecorder ¶ added in v1.30.0
func NewReplicationStreamRecorder() *ReplicationStreamRecorder
func (*ReplicationStreamRecorder) Clear ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) Clear()
func (*ReplicationStreamRecorder) GetMessages ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) GetMessages() []CapturedReplicationMessage
func (*ReplicationStreamRecorder) SetOutputFile ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) SetOutputFile(filePath string)
SetOutputFile sets the file path for writing captured messages on-demand
func (*ReplicationStreamRecorder) StreamInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor
StreamInterceptor returns a gRPC stream client interceptor that captures stream messages
func (*ReplicationStreamRecorder) StreamServerInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor
StreamServerInterceptor returns a gRPC stream server interceptor that captures stream messages
func (*ReplicationStreamRecorder) UnaryInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor
UnaryInterceptor returns a gRPC unary client interceptor that captures messages
func (*ReplicationStreamRecorder) UnaryServerInterceptor ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a gRPC unary server interceptor that captures messages
func (*ReplicationStreamRecorder) WriteToLog ¶ added in v1.30.0
func (r *ReplicationStreamRecorder) WriteToLog() error
WriteToLog writes all captured messages to the configured output file
type TaskFilter ¶ added in v1.30.0
type TaskFilter struct {
NamespaceID string // Required: namespace ID to filter by
WorkflowID string // Optional: workflow ID to filter by (empty string means no filter)
RunID string // Optional: run ID to filter by (empty string means no filter)
}
TaskFilter specifies criteria for filtering recorded tasks
type TaskMatcher ¶ added in v1.30.0
type TaskMatcher func(RecordedTask) bool
TaskMatcher is a function that tests whether a RecordedTask matches some criteria
type TaskPoller
deprecated
type TaskPoller struct {
Client workflowservice.WorkflowServiceClient
Namespace string
TaskQueue *taskqueuepb.TaskQueue
StickyTaskQueue *taskqueuepb.TaskQueue
StickyScheduleToStartTimeout time.Duration
Identity string
WorkflowTaskHandler WorkflowTaskHandler
ActivityTaskHandler ActivityTaskHandler
QueryHandler QueryHandler
MessageHandler MessageHandler
Logger log.Logger
T *testing.T
}
Deprecated: TaskPoller is deprecated. Use taskpoller.TaskPoller instead. TaskPoller is used in functional tests to poll workflow or activity task queues.
func (*TaskPoller) HandlePartialWorkflowTask ¶
func (p *TaskPoller) HandlePartialWorkflowTask(response *workflowservice.PollWorkflowTaskQueueResponse, forceCreateNewWorkflowTask bool) (*workflowservice.RespondWorkflowTaskCompletedResponse, error)
HandlePartialWorkflowTask for workflow task
func (*TaskPoller) PollAndProcessActivityTask ¶
func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error
PollAndProcessActivityTask for activity tasks
func (*TaskPoller) PollAndProcessActivityTaskWithID ¶
func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error
PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID
func (*TaskPoller) PollAndProcessWorkflowTask ¶
func (p *TaskPoller) PollAndProcessWorkflowTask(funcs ...PollAndProcessWorkflowTaskOptionFunc) (res PollAndProcessWorkflowTaskResponse, err error)
func (*TaskPoller) PollAndProcessWorkflowTaskWithOptions ¶
func (p *TaskPoller) PollAndProcessWorkflowTaskWithOptions(opts *PollAndProcessWorkflowTaskOptions) (res PollAndProcessWorkflowTaskResponse, err error)
type TaskQueueRecorder ¶ added in v1.30.0
type TaskQueueRecorder struct {
// contains filtered or unexported fields
}
TaskQueueRecorder wraps an ExecutionManager to record ALL task writes to the history task queues (transfer, timer, replication, visibility, archival, etc.). This is useful for integration tests where you want to assert on what tasks were generated and in what order. Tasks are stored flattened by category - all tasks of the same type are in a single list, with each task wrapped with metadata about when/where it was written.
func NewTaskQueueRecorder ¶ added in v1.30.0
func NewTaskQueueRecorder(delegate persistence.ExecutionManager, logger log.Logger) *TaskQueueRecorder
NewTaskQueueRecorder creates a recorder that wraps the given ExecutionManager
func (*TaskQueueRecorder) AddHistoryTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) AddHistoryTasks( ctx context.Context, request *persistence.AddHistoryTasksRequest, ) error
AddHistoryTasks records the task write and then delegates to the underlying manager
func (*TaskQueueRecorder) AppendHistoryNodes ¶ added in v1.30.0
func (r *TaskQueueRecorder) AppendHistoryNodes( ctx context.Context, request *persistence.AppendHistoryNodesRequest, ) (*persistence.AppendHistoryNodesResponse, error)
func (*TaskQueueRecorder) AppendRawHistoryNodes ¶ added in v1.30.0
func (r *TaskQueueRecorder) AppendRawHistoryNodes( ctx context.Context, request *persistence.AppendRawHistoryNodesRequest, ) (*persistence.AppendHistoryNodesResponse, error)
func (*TaskQueueRecorder) Close ¶ added in v1.30.0
func (r *TaskQueueRecorder) Close()
func (*TaskQueueRecorder) CompleteHistoryTask ¶ added in v1.30.0
func (r *TaskQueueRecorder) CompleteHistoryTask( ctx context.Context, request *persistence.CompleteHistoryTaskRequest, ) error
func (*TaskQueueRecorder) ConflictResolveWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) ConflictResolveWorkflowExecution( ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest, ) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) CountMatchingTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) CountMatchingTasks(category tasks.Category, matcher TaskMatcher) int
CountMatchingTasks returns the count of tasks in a category that match the given matcher
func (*TaskQueueRecorder) CountTasksForNamespace ¶ added in v1.30.0
func (r *TaskQueueRecorder) CountTasksForNamespace( category tasks.Category, namespaceID string, matcher TaskMatcher, ) int
CountTasksForNamespace returns the count of tasks in a category for a specific namespace
func (*TaskQueueRecorder) CountTasksForWorkflow ¶ added in v1.30.0
func (r *TaskQueueRecorder) CountTasksForWorkflow( category tasks.Category, namespaceID string, workflowID string, runID string, matcher TaskMatcher, ) int
CountTasksForWorkflow returns the count of tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID
func (*TaskQueueRecorder) CreateWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) CreateWorkflowExecution( ctx context.Context, request *persistence.CreateWorkflowExecutionRequest, ) (*persistence.CreateWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) DeleteCurrentWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteCurrentWorkflowExecution( ctx context.Context, request *persistence.DeleteCurrentWorkflowExecutionRequest, ) error
func (*TaskQueueRecorder) DeleteHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteHistoryBranch( ctx context.Context, request *persistence.DeleteHistoryBranchRequest, ) error
func (*TaskQueueRecorder) DeleteReplicationTaskFromDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteReplicationTaskFromDLQ( ctx context.Context, request *persistence.DeleteReplicationTaskFromDLQRequest, ) error
func (*TaskQueueRecorder) DeleteWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) DeleteWorkflowExecution( ctx context.Context, request *persistence.DeleteWorkflowExecutionRequest, ) error
func (*TaskQueueRecorder) ForkHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ForkHistoryBranch( ctx context.Context, request *persistence.ForkHistoryBranchRequest, ) (*persistence.ForkHistoryBranchResponse, error)
func (*TaskQueueRecorder) GetAllHistoryTreeBranches ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetAllHistoryTreeBranches( ctx context.Context, request *persistence.GetAllHistoryTreeBranchesRequest, ) (*persistence.GetAllHistoryTreeBranchesResponse, error)
func (*TaskQueueRecorder) GetAllRecordedTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetAllRecordedTasks() map[tasks.Category][]RecordedTask
GetAllRecordedTasks returns all recorded tasks WITH metadata, grouped by category
func (*TaskQueueRecorder) GetAllTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetAllTasks() map[tasks.Category][]tasks.Task
GetAllTasks returns all tasks grouped by category (unwrapped, without metadata)
func (*TaskQueueRecorder) GetCurrentExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetCurrentExecution( ctx context.Context, request *persistence.GetCurrentExecutionRequest, ) (*persistence.GetCurrentExecutionResponse, error)
func (*TaskQueueRecorder) GetHistoryBranchUtil ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetHistoryBranchUtil() persistence.HistoryBranchUtil
func (*TaskQueueRecorder) GetHistoryTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetHistoryTasks( ctx context.Context, request *persistence.GetHistoryTasksRequest, ) (*persistence.GetHistoryTasksResponse, error)
func (*TaskQueueRecorder) GetName ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetName() string
func (*TaskQueueRecorder) GetRecordedTasksByCategoryFiltered ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetRecordedTasksByCategoryFiltered(category tasks.Category, filter TaskFilter) []RecordedTask
GetRecordedTasksByCategoryFiltered returns recorded tasks WITH metadata for a specific category, filtered by namespace (required) and optionally by workflow ID and run ID. This is the preferred API for tests to ensure tasks are properly scoped.
func (*TaskQueueRecorder) GetReplicationTasksFromDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetReplicationTasksFromDLQ( ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest, ) (*persistence.GetHistoryTasksResponse, error)
func (*TaskQueueRecorder) GetWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) GetWorkflowExecution( ctx context.Context, request *persistence.GetWorkflowExecutionRequest, ) (*persistence.GetWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) IsReplicationDLQEmpty ¶ added in v1.30.0
func (r *TaskQueueRecorder) IsReplicationDLQEmpty( ctx context.Context, request *persistence.GetReplicationTasksFromDLQRequest, ) (bool, error)
func (*TaskQueueRecorder) ListConcreteExecutions ¶ added in v1.30.0
func (r *TaskQueueRecorder) ListConcreteExecutions( ctx context.Context, request *persistence.ListConcreteExecutionsRequest, ) (*persistence.ListConcreteExecutionsResponse, error)
func (*TaskQueueRecorder) MatchTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) MatchTasks(category tasks.Category, matcher TaskMatcher) []RecordedTask
MatchTasks returns all tasks in a category that match the given matcher function
func (*TaskQueueRecorder) MatchTasksForNamespace ¶ added in v1.30.0
func (r *TaskQueueRecorder) MatchTasksForNamespace( category tasks.Category, namespaceID string, matcher TaskMatcher, ) []RecordedTask
MatchTasksForNamespace returns all tasks in a category for a specific namespace
func (*TaskQueueRecorder) MatchTasksForWorkflow ¶ added in v1.30.0
func (r *TaskQueueRecorder) MatchTasksForWorkflow( category tasks.Category, namespaceID string, workflowID string, runID string, matcher TaskMatcher, ) []RecordedTask
MatchTasksForWorkflow returns all tasks in a category for a specific workflow If namespaceID is empty, it matches any namespace If runID is empty, it matches any runID for the given workflowID
func (*TaskQueueRecorder) PutReplicationTaskToDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) PutReplicationTaskToDLQ( ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest, ) error
func (*TaskQueueRecorder) RangeCompleteHistoryTasks ¶ added in v1.30.0
func (r *TaskQueueRecorder) RangeCompleteHistoryTasks( ctx context.Context, request *persistence.RangeCompleteHistoryTasksRequest, ) error
func (*TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ ¶ added in v1.30.0
func (r *TaskQueueRecorder) RangeDeleteReplicationTaskFromDLQ( ctx context.Context, request *persistence.RangeDeleteReplicationTaskFromDLQRequest, ) error
func (*TaskQueueRecorder) ReadHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadHistoryBranch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadHistoryBranchResponse, error)
func (*TaskQueueRecorder) ReadHistoryBranchByBatch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadHistoryBranchByBatch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadHistoryBranchByBatchResponse, error)
func (*TaskQueueRecorder) ReadHistoryBranchReverse ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadHistoryBranchReverse( ctx context.Context, request *persistence.ReadHistoryBranchReverseRequest, ) (*persistence.ReadHistoryBranchReverseResponse, error)
func (*TaskQueueRecorder) ReadRawHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) ReadRawHistoryBranch( ctx context.Context, request *persistence.ReadHistoryBranchRequest, ) (*persistence.ReadRawHistoryBranchResponse, error)
func (*TaskQueueRecorder) SetWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) SetWorkflowExecution( ctx context.Context, request *persistence.SetWorkflowExecutionRequest, ) (*persistence.SetWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) TrimHistoryBranch ¶ added in v1.30.0
func (r *TaskQueueRecorder) TrimHistoryBranch( ctx context.Context, request *persistence.TrimHistoryBranchRequest, ) (*persistence.TrimHistoryBranchResponse, error)
func (*TaskQueueRecorder) UpdateWorkflowExecution ¶ added in v1.30.0
func (r *TaskQueueRecorder) UpdateWorkflowExecution( ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest, ) (*persistence.UpdateWorkflowExecutionResponse, error)
func (*TaskQueueRecorder) WriteToLog ¶ added in v1.30.0
func (r *TaskQueueRecorder) WriteToLog(filePath string) error
WriteToLog writes all captured tasks to a file in JSON format
type TemporalImpl ¶
type TemporalImpl struct {
// contains filtered or unexported fields
}
func (*TemporalImpl) AdminClient ¶
func (c *TemporalImpl) AdminClient() adminservice.AdminServiceClient
func (*TemporalImpl) Authorize ¶
func (c *TemporalImpl) Authorize( ctx context.Context, caller *authorization.Claims, target *authorization.CallTarget, ) (authorization.Result, error)
func (*TemporalImpl) CaptureMetricsHandler ¶
func (c *TemporalImpl) CaptureMetricsHandler() *metricstest.CaptureHandler
func (*TemporalImpl) ChasmEngine ¶ added in v1.29.0
func (c *TemporalImpl) ChasmEngine() (chasm.Engine, error)
func (*TemporalImpl) ChasmVisibilityManager ¶ added in v1.30.0
func (c *TemporalImpl) ChasmVisibilityManager() chasm.VisibilityManager
func (*TemporalImpl) DcClient ¶
func (c *TemporalImpl) DcClient() *dynamicconfig.MemoryClient
func (*TemporalImpl) FrontendClient ¶
func (c *TemporalImpl) FrontendClient() workflowservice.WorkflowServiceClient
func (*TemporalImpl) FrontendGRPCAddress ¶
func (c *TemporalImpl) FrontendGRPCAddress() string
func (*TemporalImpl) FrontendHTTPAddress ¶
func (c *TemporalImpl) FrontendHTTPAddress() string
func (*TemporalImpl) GetCHASMRegistry ¶ added in v1.29.0
func (c *TemporalImpl) GetCHASMRegistry() *chasm.Registry
func (*TemporalImpl) GetClaims ¶
func (c *TemporalImpl) GetClaims(authInfo *authorization.AuthInfo) (*authorization.Claims, error)
func (*TemporalImpl) GetExecutionManager ¶
func (c *TemporalImpl) GetExecutionManager() persistence.ExecutionManager
func (*TemporalImpl) GetGrpcClientInterceptor ¶ added in v1.29.0
func (c *TemporalImpl) GetGrpcClientInterceptor() *grpcinject.Interceptor
func (*TemporalImpl) GetMetricsHandler ¶
func (c *TemporalImpl) GetMetricsHandler() metrics.Handler
func (*TemporalImpl) GetTLSConfigProvider ¶
func (c *TemporalImpl) GetTLSConfigProvider() encryption.TLSConfigProvider
func (*TemporalImpl) GetTaskCategoryRegistry ¶
func (c *TemporalImpl) GetTaskCategoryRegistry() tasks.TaskCategoryRegistry
func (*TemporalImpl) GetTaskQueueRecorder ¶ added in v1.30.0
func (c *TemporalImpl) GetTaskQueueRecorder() *TaskQueueRecorder
func (*TemporalImpl) HistoryClient ¶
func (c *TemporalImpl) HistoryClient() historyservice.HistoryServiceClient
func (*TemporalImpl) MatchingClient ¶
func (c *TemporalImpl) MatchingClient() matchingservice.MatchingServiceClient
func (*TemporalImpl) NamespaceRegistries ¶ added in v1.27.0
func (c *TemporalImpl) NamespaceRegistries() []namespace.Registry
func (*TemporalImpl) OperatorClient ¶
func (c *TemporalImpl) OperatorClient() operatorservice.OperatorServiceClient
func (*TemporalImpl) RemoteFrontendGRPCAddress ¶
func (c *TemporalImpl) RemoteFrontendGRPCAddress() string
Use this to get an address for a remote cluster to connect to.
func (*TemporalImpl) SetOnAuthorize ¶
func (c *TemporalImpl) SetOnAuthorize( fn func(context.Context, *authorization.Claims, *authorization.CallTarget) (authorization.Result, error), )
func (*TemporalImpl) SetOnGetClaims ¶
func (c *TemporalImpl) SetOnGetClaims(fn func(*authorization.AuthInfo) (*authorization.Claims, error))
func (*TemporalImpl) SetTaskQueueRecorder ¶ added in v1.30.0
func (c *TemporalImpl) SetTaskQueueRecorder(recorder *TaskQueueRecorder)
func (*TemporalImpl) Start ¶
func (c *TemporalImpl) Start() error
func (*TemporalImpl) Stop ¶
func (c *TemporalImpl) Stop() error
func (*TemporalImpl) TlsConfigProvider ¶
func (c *TemporalImpl) TlsConfigProvider() *encryption.FixedTLSConfigProvider
type TemporalParams ¶
type TemporalParams struct {
ClusterMetadataConfig *cluster.Config
PersistenceConfig config.Persistence
MetadataMgr persistence.MetadataManager
ClusterMetadataManager persistence.ClusterMetadataManager
ShardMgr persistence.ShardManager
ExecutionManager persistence.ExecutionManager
TaskMgr persistence.TaskManager
NamespaceReplicationQueue persistence.NamespaceReplicationQueue
AbstractDataStoreFactory persistenceClient.AbstractDataStoreFactory
VisibilityStoreFactory visibility.VisibilityStoreFactory
Logger log.Logger
ArchiverMetadata carchiver.ArchivalMetadata
ArchiverProvider provider.ArchiverProvider
EnableReadHistoryFromArchival bool
FrontendConfig FrontendConfig
HistoryConfig HistoryConfig
MatchingConfig MatchingConfig
WorkerConfig WorkerConfig
ESConfig *esclient.Config
ESClient esclient.Client
MockAdminClient map[string]adminservice.AdminServiceClient
NamespaceReplicationTaskExecutor nsreplication.TaskExecutor
DynamicConfigOverrides map[dynamicconfig.Key]interface{}
TLSConfigProvider *encryption.FixedTLSConfigProvider
CaptureMetricsHandler *metricstest.CaptureHandler
// ServiceFxOptions is populated by WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
TaskCategoryRegistry tasks.TaskCategoryRegistry
HostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts
SpanExporters map[telemetry.SpanExporterType]sdktrace.SpanExporter
}
TemporalParams contains everything needed to bootstrap Temporal
type TestCluster ¶
type TestCluster struct {
// contains filtered or unexported fields
}
TestCluster is a testcore struct for functional tests
func (*TestCluster) AdminClient ¶
func (tc *TestCluster) AdminClient() adminservice.AdminServiceClient
func (*TestCluster) ArchiverBase ¶ added in v1.27.0
func (tc *TestCluster) ArchiverBase() *ArchiverBase
func (*TestCluster) ClusterName ¶ added in v1.27.0
func (tc *TestCluster) ClusterName() string
func (*TestCluster) ExecutionManager ¶
func (tc *TestCluster) ExecutionManager() persistence.ExecutionManager
ExecutionManager returns an execution manager factory from the test cluster
func (*TestCluster) FrontendClient ¶
func (tc *TestCluster) FrontendClient() workflowservice.WorkflowServiceClient
func (*TestCluster) GetReplicationStreamRecorder ¶ added in v1.30.0
func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder
func (*TestCluster) GetTaskQueueRecorder ¶ added in v1.30.0
func (tc *TestCluster) GetTaskQueueRecorder() *TaskQueueRecorder
func (*TestCluster) HistoryClient ¶
func (tc *TestCluster) HistoryClient() historyservice.HistoryServiceClient
HistoryClient returns a history client from the test cluster
func (*TestCluster) Host ¶
func (tc *TestCluster) Host() *TemporalImpl
TODO (alex): expose only needed objects from TemporalImpl.
func (*TestCluster) MatchingClient ¶
func (tc *TestCluster) MatchingClient() matchingservice.MatchingServiceClient
MatchingClient returns a matching client from the test cluster
func (*TestCluster) OperatorClient ¶
func (tc *TestCluster) OperatorClient() operatorservice.OperatorServiceClient
func (*TestCluster) OverrideDynamicConfig ¶
func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func())
func (*TestCluster) TearDownCluster ¶
func (tc *TestCluster) TearDownCluster() error
TearDownCluster tears down the test cluster
func (*TestCluster) TestBase ¶
func (tc *TestCluster) TestBase() *persistencetests.TestBase
TODO (alex): remove this method. Replace usages with concrete methods.
type TestClusterConfig ¶
type TestClusterConfig struct {
EnableArchival bool
IsMasterCluster bool
ClusterMetadata cluster.Config
Persistence persistencetests.TestBaseOptions
FrontendConfig FrontendConfig
HistoryConfig HistoryConfig
MatchingConfig MatchingConfig
WorkerConfig WorkerConfig
ESConfig *esclient.Config
MockAdminClient map[string]adminservice.AdminServiceClient
FaultInjection *config.FaultInjection
DynamicConfigOverrides map[dynamicconfig.Key]any
EnableMTLS bool
EnableMetricsCapture bool
SpanExporters map[telemetry.SpanExporterType]sdktrace.SpanExporter
// ServiceFxOptions can be populated using WithFxOptionsForService.
ServiceFxOptions map[primitives.ServiceName][]fx.Option
}
TestClusterConfig are config for a test cluster
type TestClusterFactory ¶
type TestClusterFactory interface {
NewCluster(t *testing.T, clusterConfig *TestClusterConfig, logger log.Logger) (*TestCluster, error)
}
func NewTestClusterFactory ¶
func NewTestClusterFactory() TestClusterFactory
func NewTestClusterFactoryWithCustomTestBaseFactory ¶
func NewTestClusterFactoryWithCustomTestBaseFactory(tbFactory PersistenceTestBaseFactory) TestClusterFactory
type TestClusterOption ¶ added in v1.27.0
type TestClusterOption func(params *TestClusterParams)
func WithArchivalEnabled ¶ added in v1.27.0
func WithArchivalEnabled() TestClusterOption
func WithDynamicConfigOverrides ¶ added in v1.27.0
func WithDynamicConfigOverrides(overrides map[dynamicconfig.Key]any) TestClusterOption
func WithFaultInjectionConfig ¶ added in v1.28.0
func WithFaultInjectionConfig(cfg *config.FaultInjection) TestClusterOption
func WithFxOptionsForService ¶
func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) TestClusterOption
WithFxOptionsForService returns an Option which, when passed as an argument to setupSuite, will append the given list of fx options to the end of the arguments to the fx.New call for the given service. For example, if you want to obtain the shard controller for the history service, you can do this:
var shardController shard.Controller s.setupSuite(t, tests.WithFxOptionsForService(primitives.HistoryService, fx.Populate(&shardController))) // now you can use shardController during your test
This is similar to the pattern of plumbing dependencies through the TestClusterConfig, but it's much more convenient, scalable and flexible. The reason we need to do this on a per-service basis is that there are separate fx apps for each one.
func WithMTLS ¶ added in v1.28.0
func WithMTLS() TestClusterOption
func WithNumHistoryShards ¶ added in v1.28.0
func WithNumHistoryShards(n int32) TestClusterOption
type TestClusterParams ¶
type TestClusterParams struct {
ServiceOptions map[primitives.ServiceName][]fx.Option
DynamicConfigOverrides map[dynamicconfig.Key]any
ArchivalEnabled bool
EnableMTLS bool
FaultInjectionConfig *config.FaultInjection
NumHistoryShards int32
}
TestClusterParams contains the variables which are used to configure test cluster via the TestClusterOption type.
func ApplyTestClusterOptions ¶ added in v1.27.0
func ApplyTestClusterOptions(options []TestClusterOption) TestClusterParams
type TestDataConverter ¶
type TestDataConverter struct {
NumOfCallToPayloads int // for testing to know testDataConverter is called as expected
NumOfCallFromPayloads int
}
TestDataConverter implements encoded.DataConverter using gob
func (*TestDataConverter) FromPayload ¶
func (tdc *TestDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error
func (*TestDataConverter) FromPayloads ¶
func (tdc *TestDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error
func (*TestDataConverter) ToPayload ¶
func (tdc *TestDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error)
func (*TestDataConverter) ToPayloads ¶
func (tdc *TestDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error)
type WorkerConfig ¶
WorkerConfig is the config for the worker service
type WorkflowTaskHandler ¶
type WorkflowTaskHandler func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error)