pubsubkit

package
v0.0.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package pubsubkit provides helper to interact with GCP PubSub

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSubscriptionNotFound ...
	ErrSubscriptionNotFound = errors.New("pubsub subscription doesn't exists")
	// ErrTopicNotFound ...
	ErrTopicNotFound = errors.New("pubsub topic doesn't exists")
)
View Source
var (
	// ErrInvalidSubscription returned when try to receive message from subscription `nil`.
	ErrInvalidSubscription = errors.New("pubsubkit: subscription cannot be nil")

	// ErrInvalidSubscriptionHandler returned when try to receive message from subscription using nil handler.
	ErrInvalidSubscriptionHandler = errors.New("pubsubkit: handler cannot be nil")
)

Functions

func NewPubSubClient

func NewPubSubClient(projectID string, opts ...option.ClientOption) (*pubsub.Client, error)

NewPubSubClient returns new PubSub client in 5s timeout.

func NewPubSubSubscription

func NewPubSubSubscription(
	client *pubsub.Client,
	subID string,
	cfg pubsub.SubscriptionConfig,
	opts ...Option,
) (*pubsub.Subscription, error)

NewPubSubSubscription returns new PubSub topic subscriber in 5s timeout.

func NewPubSubTopic

func NewPubSubTopic(
	client *pubsub.Client,
	topicID string,
	cfg *pubsub.TopicConfig,
	opts ...Option,
) (*pubsub.Topic, error)

NewPubSubTopic returns new PubSub topic publisher in 5s timeout.

func ReceiveSubscription

func ReceiveSubscription(
	ctx context.Context,
	sub *pubsub.Subscription,
	handler WorkerHandlerFunc,
	opts ...Option,
) (err error)

ReceiveSubscription blocks to receive messages from pubsub subscription Call with goroutine if you'd like to do something else in the meantime.

go func() {
	if err := pubsubkit.ReceiveSubscription(...); err != nil {
	  // handle error
	}
}()

It will `Nack()` message when handler returns error & DLT found `Ack()` when handler is success, or error with DLT not found it also logs the process using `toolkit/log` package.

Types

type Message

type Message interface {
	// ID identifies this message.
	// This ID is assigned by the server and is populated for Messages obtained from a subscription.
	// This field is read-only.
	ID() string

	// Data is the actual data in the message.
	Data() []byte

	// Attributes represents the key-value pairs the current message
	// is labelled with.
	Attributes() map[string]string

	// The time at which the message was published.
	// This is populated by the server for Messages obtained from a subscription.
	// This field is read-only.
	PublishTime() time.Time

	// DeliveryAttempt is the number of times a message has been delivered.
	// This is part of the dead lettering feature that forwards messages that
	// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
	// If dead lettering is enabled, this will be set on all attempts, starting
	// with value 1. Otherwise, the value will be nil.
	// This field is read-only.
	DeliveryAttempt() int

	// DLTSupported defines wether message's topic has DLT nor not
	DLTSupported() bool

	// OrderingKey identifies related messages for which publish order should
	// be respected. If empty string is used, message will be sent unordered.
	OrderingKey() string
}

Message wraps *pubsub.Message without the Nack() & Ack() handler it is designed as parameter to `WorkerHandlerFunc`.

type Option

type Option func(*Options)

Option sets options for connect pubsub.

func WithAutoCreate

func WithAutoCreate() Option

WithAutoCreate will create pubsub resource when it's not exists yet.

func WithoutCheckExistance

func WithoutCheckExistance() Option

WithoutCheckExistance bypass pubsub resource existence validations.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options ...

type WorkerHandlerFunc

type WorkerHandlerFunc func(ctx context.Context, msg Message) error

WorkerHandlerFunc handles single message received from *pubsub.Subscription. Your handler should be idempotent since gcp PubSub might send message more than once or message arrived out of order.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL