pubsub

package
v1.34.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: MPL-2.0 Imports: 3 Imported by: 4

Documentation

Overview

Package pubsub provides Encore applications with the ability to create Pub/Sub Topics and multiple Subscriptions on those topics in a cloud-agnostic manner.

For more information see https://encore.dev/docs/develop/pubsub

Index

Constants

View Source
const (
	// NoRetries is used to control deadletter queuing logic, when set as the MaxRetires within the RetryPolicy
	// it will attempt to immediately forward a message to the dead letter queue if the subscription Handler
	// returns any error or panics.
	//
	// Note: With some cloud providers, having no retries may not be supported, in which case the minimum number of
	// retries permitted by the provider will be used.
	NoRetries = -2

	// InfiniteRetries is used to control deadletter queuing logic, when set as the MaxRetires within the RetryPolicy
	// it will attempt to always retry a message without ever sending it to the dead letter queue.
	//
	// Note: With some cloud providers, infinite retries may not be supported, in which case the maximum number of
	// retries permitted by the provider will be used.
	InfiniteRetries = -1
)

Variables

This section is empty.

Functions

func MethodHandler added in v1.20.0

func MethodHandler[T, SvcStruct any](handler func(s SvcStruct, ctx context.Context, msg T) error) (_ func(ctx context.Context, msg T) error)

MethodHandler is used to define a subscription Handler that references a service struct method.

Example Usage:

//encore:service
type Service struct {}

func (s *Service) Method(ctx context.Context, msg *Event) error { /* ... */ }

var _ = pubsub.NewSubscription(Topic, "subscription-name", pubsub.SubscriptionConfig[*Event]{
	Handler: pubsub.MethodHandler((*MyService).MyMethod),
	// ...
})

func TopicRef added in v1.16.1

func TopicRef[P TopicPerms[T], T any](topic *Topic[T]) (_ P)

TopicRef returns an interface reference to a topic, that can be freely passed around within a service without being subject to Encore's typical static analysis restrictions that normally apply to *Topic objects.

This works because using TopicRef effectively declares which operations you want to be able to perform since the type argument P must be a permission-declaring interface (TopicPerms[T]).

The returned reference is scoped down to those permissions.

For example:

var MyTopic = pubsub.NewTopic[Msg](...)
var ref = pubsub.TopicRef[pubsub.Publisher[Msg]](MyTopic)
// ref.Publish(...) can now be used to publish messages to MyTopic.

Types

type DeliveryGuarantee

type DeliveryGuarantee int

DeliveryGuarantee is used to configure the delivery contract for a topic

const (
	// AtLeastOnce guarantees that a message for a subscription is delivered to
	// a consumer at least once.
	//
	// On AWS and GCP there is no limit to the throughput for a topic.
	AtLeastOnce DeliveryGuarantee = iota + 1

	// ExactlyOnce guarantees that a message for a subscription is delivered to
	// a consumer exactly once, to the best of the system's ability.
	//
	// However, there are edge cases when a message might be redelivered.
	// For example, if a networking issue causes the acknowledgement of success
	// processing the message to be lost before the cloud provider receives it.
	//
	// It is also important to note that the ExactlyOnce delivery guarantee only
	// applies to the delivery of the message to the consumer, and not to the
	// original publishing of the message, such that if a message is published twice,
	// such as due to an retry within the application logic, it will be delivered twice.
	// (i.e. ExactlyOnce delivery does not imply message deduplication on publish)
	//
	// As such it's recommended that the subscription handler function is idempotent
	// and is able to handle duplicate messages.
	//
	// Subscriptions attached to ExactlyOnce topics have higher message delivery latency compared to AtLeastOnce.
	//
	// By using ExactlyOnce semantics on a topic, the throughput will be limited depending on the cloud provider:
	//
	// - AWS: 300 messages per second for the topic (see [AWS SQS Quotas]).
	// - GCP: At least 3,000 messages per second across all topics in the region
	// 		  (can be higher on the region see [GCP PubSub Quotas]).
	//
	// [AWS SQS Quotas]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
	// [GCP PubSub Quotas]: https://cloud.google.com/pubsub/quotas#quotas
	ExactlyOnce
)

type Publisher added in v1.16.1

type Publisher[T any] interface {
	// Publish publishes a message to the topic.
	Publish(ctx context.Context, msg T) (id string, err error)

	// Meta returns metadata about the topic.
	Meta() TopicMeta
}

Publisher is the interface for publishing messages to a topic. It can be used in conjunction with TopicRef to declare a reference that can publish messages to the topic.

For example:

var MyTopic = pubsub.NewTopic[Msg](...)
var ref = pubsub.TopicRef[pubsub.Publisher[Msg]](MyTopic)

The ref object can then be used to publish messages and can be passed around freely within the service, without being subject to Encore's static analysis restrictions that apply to MyTopic.

type RetryPolicy

type RetryPolicy struct {
	// The minimum time to wait between retries. Defaults to 10 seconds.
	MinBackoff time.Duration

	// The maximum time to wait between retries. Defaults to 10 minutes.
	MaxBackoff time.Duration

	// MaxRetries is used to control deadletter queuing logic, when:
	//   n == 0: A default value of 100 retries will be used
	//   n > 0:  Encore will forward a message to a dead letter queue after n retries
	//   n == pubsub.InfiniteRetries: Messages will not be forwarded to the dead letter queue by the Encore framework
	MaxRetries int
}

RetryPolicy defines how a subscription should handle retries after errors either delivering the message or processing the message.

The values given to this structure are parsed at compile time, such that the correct Cloud resources can be provisioned to support the queue.

As such the values given here may be clamped to the supported values by the target cloud. (i.e. min/max values brought within the supported range by the target cloud).

type Subscription

type Subscription[T any] struct {
	// contains filtered or unexported fields
}

Subscription represents a subscription to a Topic.

func NewSubscription

func NewSubscription[T any](topic *Topic[T], name string, cfg SubscriptionConfig[T]) (_ *Subscription[T])

NewSubscription is used to declare a Subscription to a topic. The passed in handler will be called for each message published to the topic.

A call to NewSubscription can only be made when declaring a package level variable. Any calls to this function made outside a package level variable declaration will result in a compiler error.

The subscription name must be unique for that topic. Subscription names must be defined in kebab-case (lowercase alphanumerics and hyphen separated). The subscription name must start with a letter and end with either a letter or number. It cannot be longer than 63 characters.

Once created and deployed never change the subscription name, or the topic name otherwise messages will be lost which could be in flight.

Example:

	import "encore.dev/pubsub"

	type MyEvent struct {
	  Foo string
	}

	var MyTopic = pubsub.NewTopic[*MyEvent]("my-topic", pubsub.TopicConfig{
	  DeliveryGuarantee: pubsub.AtLeastOnce,
	})

	var Subscription = pubsub.NewSubscription(MyTopic, "my-subscription", pubsub.SubscriptionConfig[*MyEvent]{
	  Handler:     HandleEvent,
	  RetryPolicy: &pubsub.RetryPolicy{MaxRetries: 10},
      MaxConcurrency: 5,
	})

	func HandleEvent(ctx context.Context, event *MyEvent) error {
	  rlog.Info("received foo")
	  return nil
	}

func (*Subscription[T]) Config added in v1.16.1

func (*Subscription[T]) Config() (_ SubscriptionConfig[T])

Config returns the subscription's configuration. It must not be modified by the caller.

func (*Subscription[T]) Meta added in v1.16.1

func (*Subscription[T]) Meta() (_ SubscriptionMeta[T])

Meta returns metadata about the topic.

type SubscriptionConfig

type SubscriptionConfig[T any] struct {
	// Handler is the function which will be called to process a message
	// sent on the topic.
	//
	// To reference a method on an [Encore service struct]
	// you can use the [MethodHandler] function. For example:
	//
	//	Handler: pubsub.MethodHandler((*MyService).MyMethod)
	//
	// It is important for the Handler function to block and not return
	// until all processing relating to the message has been completed.
	//
	// When the handler returns a nil error the message will be
	// acknowledged (acked) from the topic, and should not be redelivered.
	//
	// When this function returns a non-nil error the message will be
	// negatively acknowledged (nacked), which will cause a redelivery
	// attempt to be made (unless the retry policy's MaxRetries has been reached).
	//
	// The ctx passed to the handler will be cancelled when
	// the AckDeadline passes.
	//
	// This field is required.
	//
	// [Encore service struct]: https://encore.dev/docs/primitives/services-and-apis/service-structs
	Handler func(ctx context.Context, msg T) error

	// MaxConcurrency is the maximum number of messages which will be processed
	// simultaneously per instance of the service for this subscription.
	//
	// Note that this is per instance of the service, so if your service has
	// scaled to 10 instances and this is set to 10, then 100 messages could be
	// processed simultaneously.
	//
	// If the value is negative, then there will be no limit on the number
	// of messages processed simultaneously.
	//
	// Note: This is not supported by all cloud providers; specifically on GCP
	// when using Cloud Run instances on an unordered topic the subscription will
	// be configured as a Push Subscription and will have an adaptive concurrency
	// See [GCP Push Delivery Rate].
	//
	// This setting also has no effect on Encore Cloud environments.
	//
	// If not set, it uses a reasonable default based on the cloud provider.
	//
	// [GCP Push Delivery Rate]: https://cloud.google.com/pubsub/docs/push#push_delivery_rate
	MaxConcurrency int

	// AckDeadline is the time a consumer has to process a message
	// before it's returned to the subscription
	//
	// Default is 30 seconds, however the ack deadline must be at least
	// 1 second.
	AckDeadline time.Duration

	// MessageRetention is how long an undelivered message is kept
	// on the topic before it's purged
	// Default is 7 days.
	MessageRetention time.Duration

	// RetryPolicy defines how a message should be retried when
	// the subscriber returns an error
	RetryPolicy *RetryPolicy
}

SubscriptionConfig is used when creating a subscription

The values given here may be clamped to the supported values by the target cloud. (i.e. ack deadline may be brought within the supported range by the target cloud pubsub implementation).

type SubscriptionMeta added in v1.16.1

type SubscriptionMeta[T any] struct {
	// Name is the name of the subscription, as provided in the constructor to NewSubscription.
	Name string

	// Config is the subscriptions's configuration.
	Config SubscriptionConfig[T]

	// Topic provides metadata about the topic it subscribes to.
	Topic TopicMeta
}

SubscriptionMeta contains metadata about a subscription. The fields should not be modified by the caller. Additional fields may be added in the future.

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

Topic presents a flow of events of type T from any number of publishers to any number of subscribers.

Each subscription will receive a copy of each message published to the topic.

See NewTopic for more information on how to declare a Topic.

func NewTopic

func NewTopic[T any](name string, cfg TopicConfig) (_ *Topic[T])

NewTopic is used to declare a Topic. Encore will use static analysis to identify Topics and automatically provision them for you.

A call to NewTopic can only be made when declaring a package level variable. Any calls to this function made outside a package level variable declaration will result in a compiler error.

The topic name must be unique within an Encore application. Topic names must be defined in kebab-case (lowercase alphanumerics and hyphen seperated). The topic name must start with a letter and end with either a letter or number. It cannot be longer than 63 characters. Once created and deployed never change the topic name. When refactoring the topic name must stay the same. This allows for messages already on the topic to continue to be received after the refactored code is deployed.

Example:

 import "encore.dev/pubsub"

 type MyEvent struct {
   Foo string
 }

 var MyTopic = pubsub.NewTopic[*MyEvent]("my-topic", pubsub.TopicConfig{
   DeliveryGuarantee: pubsub.AtLeastOnce,
 })

//encore:api public
func DoFoo(ctx context.Context) error {
  msgID, err := MyTopic.Publish(ctx, &MyEvent{Foo: "bar"})
  if err != nil { return err }
  rlog.Info("foo published", "message_id", msgID)
  return nil
}

func (*Topic[T]) Meta added in v1.16.1

func (*Topic[T]) Meta() (_ TopicMeta)

Meta returns metadata about the topic.

func (*Topic[T]) Publish

func (*Topic[T]) Publish(ctx context.Context, msg T) (id string, err error)

Publish will publish a message to the topic and returns a unique message ID for the message.

This function will not return until the message has been successfully accepted by the topic.

If an error is returned, it is probable that the message failed to be published, however it is possible that the message could still be received by subscriptions to the topic.

type TopicConfig

type TopicConfig struct {
	// DeliveryGuarantee is used to configure the delivery guarantee of a Topic
	//
	// This field is required.
	DeliveryGuarantee DeliveryGuarantee

	// OrderingAttribute is the message attribute to use as a ordering key for
	// messages and delivery will ensure that messages with the same value will
	// be delivered in the order they where published.
	//
	// If OrderingAttribute is not set, messages can be delivered in any order.
	//
	// It is important to note, that in the case of an error being returned by a
	// subscription handler, the message will be retried before any subsequent
	// messages for that ordering key are delivered. This means depending on the
	// retry configuration, a large backlog of messages for a given ordering key
	// may build up. When using OrderingAttribute, it is recommended to use reason
	// about your failure modes and set the retry configuration appropriately.
	//
	// Once the maximum number of retries has been reached, the message will be
	// forwarded to the dead letter queue, and the next message for that ordering
	// key will be delivered.
	//
	// To create attributes on a message, use the `pubsub-attr` struct tag:
	//
	//	type UserEvent struct {
	//		UserID string `pubsub-attr:"user-id"`
	//		Action string
	//	}
	//
	//  var topic = pubsub.NewTopic[UserEvent]("user-events", pubsub.TopicConfig{
	// 		DeliveryGuarantee: pubsub.AtLeastOnce,
	//		OrderingAttribute: "user-id", // Messages with the same user-id will be delivered in the order they where published
	//	})
	//
	//  topic.Publish(ctx, &UserEvent{UserID: "1", Action: "login"})  // This message will be delivered before the logout
	//  topic.Publish(ctx, &UserEvent{UserID: "2", Action: "login"})  // This could be delivered at any time because it has a different user id
	//  topic.Publish(ctx, &UserEvent{UserID: "1", Action: "logout"}) // This message will be delivered after the first message
	//
	// By using OrderingAttribute, the throughput will be limited depending on the cloud provider:
	//
	// - AWS: 300 messages per second for the topic (see [AWS SQS Quotas]).
	// - GCP: 1MB/s for each ordering key (see [GCP PubSub Quotas]).
	//
	// Note: OrderingAttribute currently has no effect during local development.
	//
	// [AWS SQS Quotas]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html
	// [GCP PubSub Quotas]: https://cloud.google.com/pubsub/quotas#resource_limits
	OrderingAttribute string
}

TopicConfig is used when creating a Topic

type TopicMeta added in v1.16.1

type TopicMeta struct {
	// Name is the name of the topic, as provided in the constructor to NewTopic.
	Name string
	// Config is the topic's configuration.
	Config TopicConfig
}

TopicMeta contains metadata about a topic. The fields should not be modified by the caller. Additional fields may be added in the future.

type TopicPerms added in v1.16.1

type TopicPerms[T any] interface {
	Meta() TopicMeta
}

TopicPerms is the type constraint for all permission-declaring interfaces that can be used with TopicRef.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL