pulsarutils

package
v0.3.45 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Ack

func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, wg *sync.WaitGroup)

Ack will ack all pulsar messages coming in on the msgs channel. The incoming messages contain a consumer id which corresponds to the index of the consumer that should be used to perform the ack. In theory, the acks could be done in parallel, however its unlikely that they will be a performance bottleneck

func CompactAndPublishSequences

func CompactAndPublishSequences(ctx context.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxMessageSizeInBytes int) error

CompactAndPublishSequences reduces the number of sequences to the smallest possible, while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar.

func IsPulsarError

func IsPulsarError(err error) bool

IsPulsarError returns true if there is a pulsar.Error in the chain.

func NewMessageId

func NewMessageId(id int) pulsar.MessageID

func NewPulsarClient

func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error)

func ParsePulsarCompressionLevel

func ParsePulsarCompressionLevel(compressionLevelStr string) (pulsar.CompressionLevel, error)

func ParsePulsarCompressionType

func ParsePulsarCompressionType(compressionTypeStr string) (pulsar.CompressionType, error)

func PublishSequences

func PublishSequences(ctx context.Context, producer pulsar.Producer, sequences []*armadaevents.EventSequence) error

PublishSequence publishes several event sequences to Pulsar. For efficiency, all sequences are queued for publishing and then flushed. Returns once all sequences have been received by Pulsar.

To reduce the number of separate sequences sent and ensure limit message size, call eventutil.CompactEventSequences(sequences) and eventutil.LimitSequencesByteSize(sequences, int(srv.MaxAllowedMessageSize)) before passing to this function.

func PulsarError

func PulsarError(err error) *pulsar.Error

PulsarError returns the first pulsar.Error in the chain, or nil if no such error is found.

func Receive

func Receive(
	ctx context.Context,
	consumer pulsar.Consumer,
	receiveTimeout time.Duration,
	backoffTime time.Duration,
	m *commonmetrics.Metrics,
) chan pulsar.Message

Types

type ConsumerMessage

type ConsumerMessage struct {
	Message    pulsar.Message
	ConsumerId int
}

ConsumerMessage wraps a pulsar message and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.

type ConsumerMessageId

type ConsumerMessageId struct {
	MessageId  pulsar.MessageID
	Index      int64
	ConsumerId int
}

ConsumerMessageId wraps a pulsar message id and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.

func NewConsumerMessageId

func NewConsumerMessageId(id int) *ConsumerMessageId

type MockMessageId

type MockMessageId struct {
	pulsar.MessageID
	// contains filtered or unexported fields
}

type MockPulsarMessage

type MockPulsarMessage struct {
	pulsar.Message
	// contains filtered or unexported fields
}

func EmptyPulsarMessage

func EmptyPulsarMessage(id int, publishTime time.Time) MockPulsarMessage

func NewPulsarMessage

func NewPulsarMessage(id int, publishTime time.Time, payload []byte) MockPulsarMessage

func (MockPulsarMessage) ID

func (MockPulsarMessage) Payload

func (m MockPulsarMessage) Payload() []byte

func (MockPulsarMessage) Properties

func (MockPulsarMessage) Properties() map[string]string

func (MockPulsarMessage) PublishTime

func (m MockPulsarMessage) PublishTime() time.Time

type PulsarMessageId

type PulsarMessageId struct {
	// contains filtered or unexported fields
}

PulsarMessageId implements the pulsar.MessageID interface (which uniquely identifies a Pulsar message). We need this since the pulsar client library does not export a MessageID implementation. For PulsarMessageId, we provide, e.g., comparison functions.

func FromMessageId

func FromMessageId(id pulsar.MessageID) *PulsarMessageId

FromMessageId converts a pulsar.MessageID interface type to a *PulsarMessageId, which can be used, e.g., for comparison.

func New

func New(ledgerID, entryID int64, partitionIdx, batchIdx int32) *PulsarMessageId

func (*PulsarMessageId) BatchIdx

func (id *PulsarMessageId) BatchIdx() int32

func (*PulsarMessageId) EntryID

func (id *PulsarMessageId) EntryID() int64

func (*PulsarMessageId) Equal

func (id *PulsarMessageId) Equal(other pulsar.MessageID) (bool, error)

func (*PulsarMessageId) Greater

func (id *PulsarMessageId) Greater(other pulsar.MessageID) (bool, error)

Greater returns true if id occurred after other, or an error if the message ids are not comparable (i.e., if they are from different partitions).

func (*PulsarMessageId) GreaterEqual

func (id *PulsarMessageId) GreaterEqual(other pulsar.MessageID) (bool, error)

func (*PulsarMessageId) LedgerID

func (id *PulsarMessageId) LedgerID() int64

func (*PulsarMessageId) PartitionIdx

func (id *PulsarMessageId) PartitionIdx() int32

func (*PulsarMessageId) Serialize

func (id *PulsarMessageId) Serialize() []byte

func (*PulsarMessageId) String

func (id *PulsarMessageId) String() string

type PulsarToChannel

type PulsarToChannel struct {
	Consumer pulsar.Consumer
	C        chan pulsar.Message
}

PulsarToChannel is a service for receiving messages from Pulsar and forwarding those on C.

func NewPulsarToChannel

func NewPulsarToChannel(consumer pulsar.Consumer) *PulsarToChannel

func (*PulsarToChannel) Run

func (srv *PulsarToChannel) Run(ctx context.Context) error

Run starts the service.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL