pubsub

package
v1.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2022 License: MPL-2.0 Imports: 2 Imported by: 4

Documentation

Overview

Package pubsub provides Encore applications with the ability to create Pub/Sub Topics and multiple Subscriptions on those topics in a cloud-agnostic manner.

For more information see https://encore.dev/docs/develop/pubsub

Index

Constants

View Source
const (
	// it will attempt to immediately forward a message to the dead letter queue if the subscription Handler
	// returns any error or panics.
	//
	// Note: With some cloud providers, having no retries may not be supported, in which case the minimum number of
	// retries permitted by the provider will be used.
	NoRetries = -2 // InfiniteRetries is used to control deadletter queuing logic, when set as the MaxRetires within the RetryPolicy
	// it will attempt to always retry a message without ever sending it to the dead letter queue.
	//
	// Note: With some cloud providers, infinite retries may not be supported, in which case the maximum number of
	// retries permitted by the provider will be used.
	InfiniteRetries = -1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DeliveryGuarantee

type DeliveryGuarantee int

DeliveryGuarantee is used to configure the delivery contract for a topic

const (
	// a consumer at least once. This is supported by all cloud providers.
	AtLeastOnce DeliveryGuarantee = iota + 1
)

type RetryPolicy

type RetryPolicy struct {
	// The minimum time to wait between retries. Defaults to 10 seconds.
	MinBackoff time.Duration

	// The maximum time to wait between retries. Defaults to 10 minutes.
	MaxBackoff time.Duration

	// MaxRetries is used to control deadletter queuing logic, when:
	//   n == 0: A default value of 100 retries will be used
	//   n > 0:  Encore will forward a message to a dead letter queue after n retries
	//   n == pubsub.InfiniteRetries: Messages will not be forwarded to the dead letter queue by the Encore framework
	MaxRetries int
}

RetryPolicy defines how a subscription should handle retries after errors either delivering the message or processing the message.

The values given to this structure are parsed at compile time, such that the correct Cloud resources can be provisioned to support the queue.

As such the values given here may be clamped to the supported values by the target cloud. (i.e. min/max values brought within the supported range by the target cloud).

type Subscription

type Subscription[T any] struct {
	// contains filtered or unexported fields
}

Subscription represents a subscription to a Topic.

func NewSubscription

func NewSubscription[T any](topic *Topic[T], name string, subscriptionCfg SubscriptionConfig[T]) *Subscription[T]

NewSubscription is used to declare a Subscription to a topic. The passed in handler will be called for each message published to the topic.

A call to NewSubscription can only be made when declaring a package level variable. Any calls to this function made outside a package level variable declaration will result in a compiler error.

The subscription name must be unique for that topic. Subscription names must be defined in kebab-case (lowercase alphanumerics and hyphen seperated). The subscription name must start with a letter and end with either a letter or number. It cannot be longer than 63 characters.

Once created and deployed never change the subscription name, or the topic name otherwise messages will be lost which could be in flight.

Example:

import "encore.dev/pubsub"

type MyEvent struct {
  Foo string
}

var MyTopic = pubsub.NewTopic[*MyEvent]("my-topic", pubsub.TopicConfig{
  DeliveryGuarantee: pubsub.AtLeastOnce,
})

var Subscription = pubsub.NewSubscription(MyTopic, "my-subscription", pubsub.SubscriptionConfig[*MyEvent]{
  Handler:     HandleEvent,
  RetryPolicy: &pubsub.RetryPolicy { MaxRetries: 10 },
})

func HandleEvent(ctx context.Context, event *MyEvent) error {
  rlog.Info("received foo")
  return nil
}

type SubscriptionConfig

type SubscriptionConfig[T any] struct {
	// The function which will be called to process a message
	// sent on the topic.
	//
	// It is important for this function to block and not return
	// until all processing relating to the message has been completed.
	//
	// When this function returns a `nil`, the message will be
	// acknowledged (acked) from the topic, and should not be redelivered.
	//
	// When this function returns an `error`, the message will be
	// negatively acknowledged (nacked), which will cause a redelivery
	// attempt to be made (unless the retry policy's MaxRetries has been reached).
	//
	// This field is required.
	Handler func(ctx context.Context, msg T) error

	// AckDeadline is the time a consumer has to process a message
	// before it's returned to the subscription
	//
	// Default is 30 seconds, however the ack deadline must be at least
	// 1 second.
	AckDeadline time.Duration

	// MessageRetention is how long an undelivered message is kept
	// on the topic before it's purged
	// Default is 7 days.
	MessageRetention time.Duration

	// RetryPolicy defines how a message should be retried when
	// the subscriber returns an error
	RetryPolicy *RetryPolicy
}

SubscriptionConfig is used when creating a subscription

The values given here may be clamped to the supported values by the target cloud. (i.e. ack deadline may be brought within the supported range by the target cloud pubsub implementation).

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

Topic presents a flow of events of type T from any number of publishers to any number of subscribers.

Each subscription will receive a copy of each message published to the topic.

See NewTopic for more information on how to declare a Topic.

func NewTopic

func NewTopic[T any](name string, cfg TopicConfig) *Topic[T]

NewTopic is used to declare a Topic. Encore will use static analysis to identify Topics and automatically provision them for you.

A call to NewTopic can only be made when declaring a package level variable. Any calls to this function made outside a package level variable declaration will result in a compiler error.

The topic name must be unique within an Encore application. Topic names must be defined in kebab-case (lowercase alphanumerics and hyphen seperated). The topic name must start with a letter and end with either a letter or number. It cannot be longer than 63 characters. Once created and deployed never change the topic name. When refactoring the topic name must stay the same. This allows for messages already on the topic to continue to be received after the refactored code is deployed.

Example:

 import "encore.dev/pubsub"

 type MyEvent struct {
   Foo string
 }

 var MyTopic = pubsub.NewTopic[*MyEvent]("my-topic", pubsub.TopicConfig{
   DeliveryGuarantee: pubsub.AtLeastOnce,
 })

//encore:api public
func DoFoo(ctx context.Context) error {
  msgID, err := MyTopic.Publish(ctx, &MyEvent{Foo: "bar"})
  if err != nil { return err }
  rlog.Info("foo published", "message_id", msgID)
  return nil
}

func (*Topic[T]) Publish

func (*Topic[T]) Publish(ctx context.Context, msg T) (id string, err error)

Publish will publish a message to the topic and returns a unique message ID for the message.

This function will not return until the message has been successfully accepted by the topic.

If an error is returned, it is probable that the message failed to be published, however it is possible that the message could still be received by subscriptions to the topic.

type TopicConfig

type TopicConfig struct {
	// DeliveryGuarantee is used to configure the delivery guarantee of a Topic
	//
	// This field is required.
	DeliveryGuarantee DeliveryGuarantee
}

TopicConfig is used when creating a Topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL