temporal

package
v1.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2022 License: MIT Imports: 51 Imported by: 15

Documentation

Index

Constants

This section is empty.

Variables

View Source
var FxLogAdapter = fx.WithLogger(func(logger log.Logger) fxevent.Logger {
	return &fxLogAdapter{logger: logger}
})
View Source
var ServerFxImplModule = fx.Options(
	fx.Provide(NewServerFxImpl),
	fx.Provide(func(src *ServerImpl) Server { return src }),
)
View Source
var ServiceTracingModule = fx.Options(
	fx.Supply([]otelsdktrace.BatchSpanProcessorOption{}),
	fx.Provide(
		fx.Annotate(
			func(exps []otelsdktrace.SpanExporter, opts []otelsdktrace.BatchSpanProcessorOption) []otelsdktrace.SpanProcessor {
				sps := make([]otelsdktrace.SpanProcessor, 0, len(exps))
				for _, exp := range exps {
					sps = append(sps, otelsdktrace.NewBatchSpanProcessor(exp, opts...))
				}
				return sps
			},
			fx.ParamTags(`optional:"true"`, ``),
		),
	),
	fx.Provide(
		fx.Annotate(
			func(rsn resource.ServiceName, rsi resource.InstanceID) (*otelresource.Resource, error) {
				serviceName := string(rsn)
				if !strings.HasPrefix(serviceName, "io.temporal.") {
					serviceName = fmt.Sprintf("io.temporal.%s", serviceName)
				}
				attrs := []attribute.KeyValue{
					semconv.ServiceNameKey.String(serviceName),
					semconv.ServiceVersionKey.String(headers.ServerVersion),
				}
				if rsi != "" {
					attrs = append(attrs, semconv.ServiceInstanceIDKey.String(string(rsi)))
				}
				return otelresource.New(context.Background(),
					otelresource.WithProcess(),
					otelresource.WithOS(),
					otelresource.WithHost(),
					otelresource.WithContainer(),
					otelresource.WithAttributes(attrs...),
				)
			},
			fx.ParamTags(``, `optional:"true"`),
		),
	),
	fx.Provide(
		func(r *otelresource.Resource, sps []otelsdktrace.SpanProcessor) []otelsdktrace.TracerProviderOption {
			opts := make([]otelsdktrace.TracerProviderOption, 0, len(sps)+1)
			opts = append(opts, otelsdktrace.WithResource(r))
			for _, sp := range sps {
				opts = append(opts, otelsdktrace.WithSpanProcessor(sp))
			}
			return opts
		},
	),
	fx.Provide(func(lc fx.Lifecycle, opts []otelsdktrace.TracerProviderOption) trace.TracerProvider {
		tp := otelsdktrace.NewTracerProvider(opts...)
		lc.Append(fx.Hook{OnStop: func(ctx context.Context) error {
			tp.Shutdown(ctx)
			return nil
		}})
		return tp
	}),

	fx.Provide(func() propagation.TextMapPropagator { return propagation.TraceContext{} }),
	fx.Provide(telemetry.NewServerTraceInterceptor),
	fx.Provide(telemetry.NewClientTraceInterceptor),
)

ServiceTracingModule holds per-service (i.e. frontend/history/matching/worker) fx state. The following types can be overriden with fx.Replace/fx.Decorate:

  • []go.opentelemetry.io/otel/sdk/trace.BatchSpanProcessorOption default: empty slice
  • []go.opentelemetry.io/otel/sdk/trace.SpanProcessor default: wrap each otelsdktrace.SpanExporter with otelsdktrace.NewBatchSpanProcessor
  • *go.opentelemetry.io/otel/sdk/resource.Resource default: resource.Default() augmented with the supplied serviceName
  • []go.opentelemetry.io/otel/sdk/trace.TracerProviderOption default: the provided resource.Resource and each of the otelsdktrace.SpanExporter
  • go.opentelemetry.io/otel/trace.TracerProvider default: otelsdktrace.NewTracerProvider with each of the otelsdktrace.TracerProviderOption
  • go.opentelemetry.io/otel/ppropagation.TextMapPropagator default: propagation.TraceContext{}
  • telemetry.ServerTraceInterceptor
  • telemetry.ClientTraceInterceptor

Services is the list of all valid temporal services

View Source
var TraceExportModule = fx.Options(
	fx.Invoke(func(log log.Logger) {
		otel.SetErrorHandler(otel.ErrorHandlerFunc(
			func(err error) {
				log.Warn("OTEL error", tag.Error(err), tag.ErrorType(err))
			}),
		)
	}),

	fx.Provide(func(lc fx.Lifecycle, c *config.Config) ([]otelsdktrace.SpanExporter, error) {
		exporters, err := c.ExporterConfig.SpanExporters()
		if err != nil {
			return nil, err
		}
		lc.Append(fx.Hook{
			OnStart: startAll(exporters),
			OnStop:  shutdownAll(exporters),
		})
		return exporters, nil
	}),
)

TraceExportModule holds process-global telemetry fx state defining the set of OTEL trace/span exporters used by tracing instrumentation. The following types can be overriden/augmented with fx.Replace/fx.Decorate:

- []go.opentelemetry.io/otel/sdk/trace.SpanExporter

Functions

func ApplyClusterMetadataConfigProvider added in v1.14.0

func ApplyClusterMetadataConfigProvider(
	logger log.Logger,
	config *config.Config,
	persistenceServiceResolver resolver.ServiceResolver,
	persistenceFactoryProvider persistenceClient.FactoryProviderFn,
	customDataStoreFactory persistenceClient.AbstractDataStoreFactory,
) (*cluster.Config, config.Persistence, error)

ApplyClusterMetadataConfigProvider performs a config check against the configured persistence store for cluster metadata. If there is a mismatch, the persisted values take precedence and will be written over in the config objects. This is to keep this check hidden from downstream calls. TODO: move this to cluster.fx

func InterruptCh

func InterruptCh() <-chan interface{}

func PersistenceFactoryProvider added in v1.16.0

func PersistenceFactoryProvider() persistenceClient.FactoryProviderFn

func ServerLifetimeHooks added in v1.14.0

func ServerLifetimeHooks(
	lc fx.Lifecycle,
	svr Server,
)

func ServerOptionsProvider added in v1.14.0

func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error)

func StopService added in v1.16.0

func StopService(logger log.Logger, app *fx.App, svcName string, stopChan chan struct{})

Types

type Server

type Server interface {
	Start() error
	Stop()
}

func NewServer

func NewServer(opts ...ServerOption) (Server, error)

NewServer returns a new instance of server that serves one or many services.

type ServerFx added in v1.14.0

type ServerFx struct {
	// contains filtered or unexported fields
}

func NewServerFx added in v1.14.0

func NewServerFx(opts ...ServerOption) (*ServerFx, error)

func (ServerFx) Start added in v1.14.0

func (s ServerFx) Start() error

func (ServerFx) Stop added in v1.14.0

func (s ServerFx) Stop()

type ServerImpl added in v1.14.0

type ServerImpl struct {
	// contains filtered or unexported fields
}

ServerImpl is temporal server.

func NewServerFxImpl added in v1.14.0

func NewServerFxImpl(
	opts *serverOptions,
	logger log.Logger,
	namespaceLogger resource.NamespaceLogger,
	stoppedCh chan interface{},
	dcCollection *dynamicconfig.Collection,
	servicesGroup ServicesGroupIn,
	persistenceConfig config.Persistence,
	clusterMetadata *cluster.Config,
	persistenceFactoryProvider persistenceClient.FactoryProviderFn,
) *ServerImpl

NewServer returns a new instance of server that serves one or many services.

func (*ServerImpl) Start added in v1.14.0

func (s *ServerImpl) Start() error

Start temporal server. This function should be called only once, Server doesn't support multiple restarts.

func (*ServerImpl) Stop added in v1.14.0

func (s *ServerImpl) Stop()

Stop stops the server.

type ServerOption

type ServerOption interface {
	// contains filtered or unexported methods
}

func ForServices

func ForServices(names []string) ServerOption

ForServices indicates which supplied services (e.g. frontend, history, matching, worker) within the server to start

func InterruptOn

func InterruptOn(interruptCh <-chan interface{}) ServerOption

InterruptOn interrupts server on the signal from server. If channel is nil Start() will block forever.

func WithAudienceGetter added in v1.10.3

func WithAudienceGetter(audienceGetter func(cfg *config.Config) authorization.JWTAudienceMapper) ServerOption

WithAudienceGetter configures JWT audience getter for authorization

func WithAuthorizer

func WithAuthorizer(authorizer authorization.Authorizer) ServerOption

WithAuthorizer sets a low level authorizer to allow/deny all API calls

func WithChainedFrontendGrpcInterceptors added in v1.14.0

func WithChainedFrontendGrpcInterceptors(
	interceptors ...grpc.UnaryServerInterceptor,
) ServerOption

WithChainedFrontendGrpcInterceptors sets a chain of ordered custom grpc interceptors that will be invoked for all Frontend gRPC API calls. The list of custom interceptors will be appended to the end of the internal ServerInterceptors. The custom interceptors will be invoked in the order as they appear in the supplied list, after the internal ServerInterceptors.

func WithClaimMapper added in v1.4.0

func WithClaimMapper(claimMapper func(cfg *config.Config) authorization.ClaimMapper) ServerOption

WithClaimMapper configures a role mapper for authorization

func WithClientFactoryProvider added in v1.11.0

func WithClientFactoryProvider(clientFactoryProvider client.FactoryProvider) ServerOption

WithClientFactoryProvider sets a custom ClientFactoryProvider NOTE: this option is experimental and may be changed or removed in future release.

func WithConfig

func WithConfig(cfg *config.Config) ServerOption

WithConfig sets a custom configuration

func WithConfigLoader

func WithConfigLoader(configDir string, env string, zone string) ServerOption

WithConfigLoader sets a custom configuration load

func WithCustomDataStoreFactory added in v1.11.0

func WithCustomDataStoreFactory(customFactory persistenceclient.AbstractDataStoreFactory) ServerOption

WithCustomDataStoreFactory sets a custom AbstractDataStoreFactory NOTE: this option is experimental and may be changed or removed in future release.

func WithCustomMetricsHandler added in v1.17.0

func WithCustomMetricsHandler(provider metrics.MetricsHandler) ServerOption

WithCustomerMetricsProvider sets a custom implementation of the metrics.MetricsHandler interface metrics.MetricsHandler is the base interface for publishing metric events

func WithDynamicConfigClient added in v1.5.7

func WithDynamicConfigClient(c dynamicconfig.Client) ServerOption

WithDynamicConfigClient sets custom client for reading dynamic configuration.

func WithElasticsearchHttpClient added in v1.5.7

func WithElasticsearchHttpClient(c *http.Client) ServerOption

WithElasticsearchHttpClient sets a custom HTTP client which is used to make requests to Elasticsearch

func WithLogger added in v1.5.7

func WithLogger(logger log.Logger) ServerOption

WithLogger sets a custom logger

func WithNamespaceLogger added in v1.11.0

func WithNamespaceLogger(namespaceLogger log.Logger) ServerOption

WithNamespaceLogger sets an optional logger for all frontend operations

func WithPersistenceServiceResolver added in v1.5.7

func WithPersistenceServiceResolver(r resolver.ServiceResolver) ServerOption

WithPersistenceServiceResolver sets a custom persistence service resolver which will convert service name or address value from config to another address

func WithSearchAttributesMapper added in v1.12.1

func WithSearchAttributesMapper(m searchattribute.Mapper) ServerOption

WithSearchAttributesMapper sets a custom search attributes mapper which converts search attributes aliases to field names and vice versa.

func WithTLSConfigFactory

func WithTLSConfigFactory(tlsConfigProvider encryption.TLSConfigProvider) ServerOption

WithTLSConfigFactory overrides default provider of TLS configuration

type ServiceProviderParamsCommon added in v1.14.0

type ServiceProviderParamsCommon struct {
	fx.In

	Cfg                        *config.Config
	ServiceNames               resource.ServiceNames
	Logger                     log.Logger
	NamespaceLogger            resource.NamespaceLogger
	DynamicConfigClient        dynamicconfig.Client
	MetricsHandler             metrics.MetricsHandler
	EsConfig                   *esclient.Config
	EsClient                   esclient.Client
	TlsConfigProvider          encryption.TLSConfigProvider
	PersistenceConfig          config.Persistence
	ClusterMetadata            *cluster.Config
	ClientFactoryProvider      client.FactoryProvider
	AudienceGetter             authorization.JWTAudienceMapper
	PersistenceServiceResolver resolver.ServiceResolver
	PersistenceFactoryProvider persistenceClient.FactoryProviderFn
	SearchAttributesMapper     searchattribute.Mapper
	CustomInterceptors         []grpc.UnaryServerInterceptor
	Authorizer                 authorization.Authorizer
	ClaimMapper                authorization.ClaimMapper
	DataStoreFactory           persistenceClient.AbstractDataStoreFactory
	SpanExporters              []otelsdktrace.SpanExporter
	InstanceID                 resource.InstanceID `optional:"true"`
}

type ServiceStopFn added in v1.14.0

type ServiceStopFn func()

type ServicesGroupIn added in v1.14.0

type ServicesGroupIn struct {
	fx.In
	Services []*ServicesMetadata `group:"services"`
}

type ServicesGroupOut added in v1.14.0

type ServicesGroupOut struct {
	fx.Out

	Services *ServicesMetadata `group:"services"`
}

func FrontendServiceProvider added in v1.14.0

func FrontendServiceProvider(
	params ServiceProviderParamsCommon,
) (ServicesGroupOut, error)

func HistoryServiceProvider added in v1.14.0

func HistoryServiceProvider(
	params ServiceProviderParamsCommon,
) (ServicesGroupOut, error)

func MatchingServiceProvider added in v1.14.0

func MatchingServiceProvider(
	params ServiceProviderParamsCommon,
) (ServicesGroupOut, error)

func WorkerServiceProvider added in v1.14.0

func WorkerServiceProvider(
	params ServiceProviderParamsCommon,
) (ServicesGroupOut, error)

type ServicesMetadata added in v1.14.0

type ServicesMetadata struct {
	App           *fx.App // Added for info. ServiceStopFn is enough.
	ServiceName   string
	ServiceStopFn ServiceStopFn
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL