Documentation ¶
Index ¶
- func Ack(ctx *armadacontext.Context, consumers []pulsar.Consumer, ...)
- func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, ...) error
- func IsPulsarError(err error) bool
- func NewMessageId(id int) pulsar.MessageID
- func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error)
- func PublishSequences(ctx *armadacontext.Context, producer pulsar.Producer, ...) error
- func PulsarError(err error) *pulsar.Error
- func Receive(ctx *armadacontext.Context, consumer pulsar.Consumer, ...) chan pulsar.Message
- type ConsumerMessage
- type ConsumerMessageId
- type MockMessageId
- type MockPulsarMessage
- type PulsarMessageId
- func (id *PulsarMessageId) BatchIdx() int32
- func (id *PulsarMessageId) EntryID() int64
- func (id *PulsarMessageId) Equal(other pulsar.MessageID) (bool, error)
- func (id *PulsarMessageId) Greater(other pulsar.MessageID) (bool, error)
- func (id *PulsarMessageId) GreaterEqual(other pulsar.MessageID) (bool, error)
- func (id *PulsarMessageId) LedgerID() int64
- func (id *PulsarMessageId) PartitionIdx() int32
- func (id *PulsarMessageId) Serialize() []byte
- func (id *PulsarMessageId) String() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Ack ¶
func Ack(ctx *armadacontext.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, backoffTime time.Duration, wg *sync.WaitGroup)
Ack will ack all pulsar messages coming in on the msgs channel. The incoming messages contain a consumer id which corresponds to the index of the consumer that should be used to perform the ack. In theory, the acks could be done in parallel, however its unlikely that they will be a performance bottleneck
func CompactAndPublishSequences ¶
func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxMessageSizeInBytes uint, scheduler schedulers.Scheduler) error
CompactAndPublishSequences reduces the number of sequences to the smallest possible, while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar.
func IsPulsarError ¶
IsPulsarError returns true if there is a pulsar.Error in the chain.
func NewMessageId ¶
func NewPulsarClient ¶
func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error)
func PublishSequences ¶
func PublishSequences(ctx *armadacontext.Context, producer pulsar.Producer, sequences []*armadaevents.EventSequence, scheduler schedulers.Scheduler) error
PublishSequences publishes several event sequences to Pulsar. For efficiency, all sequences are queued for publishing and then flushed. Returns once all sequences have been received by Pulsar.
To reduce the number of separate sequences sent and ensure limit message size, call eventutil.CompactEventSequences(sequences) and eventutil.LimitSequencesByteSize(sequences, int(srv.MaxAllowedMessageSize)) before passing to this function.
func PulsarError ¶
PulsarError returns the first pulsar.Error in the chain, or nil if no such error is found.
Types ¶
type ConsumerMessage ¶
ConsumerMessage wraps a pulsar message and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.
type ConsumerMessageId ¶
ConsumerMessageId wraps a pulsar message id and an identifier for the consumer which originally received the corresponding message. This exists because we need to track which messages came from which consumers so that we can ACK them on the correct consumer.
func NewConsumerMessageId ¶
func NewConsumerMessageId(id int) *ConsumerMessageId
type MockMessageId ¶
type MockPulsarMessage ¶
func EmptyPulsarMessage ¶
func EmptyPulsarMessage(id int, publishTime time.Time) MockPulsarMessage
func NewPulsarMessage ¶
func NewPulsarMessage(id int, publishTime time.Time, payload []byte) MockPulsarMessage
func (MockPulsarMessage) ID ¶
func (m MockPulsarMessage) ID() pulsar.MessageID
func (MockPulsarMessage) Payload ¶
func (m MockPulsarMessage) Payload() []byte
func (MockPulsarMessage) Properties ¶
func (m MockPulsarMessage) Properties() map[string]string
func (MockPulsarMessage) PublishTime ¶
func (m MockPulsarMessage) PublishTime() time.Time
type PulsarMessageId ¶
type PulsarMessageId struct {
// contains filtered or unexported fields
}
PulsarMessageId implements the pulsar.MessageID interface (which uniquely identifies a Pulsar message). We need this since the pulsar client library does not export a MessageID implementation. For PulsarMessageId, we provide, e.g., comparison functions.
func FromMessageId ¶
func FromMessageId(id pulsar.MessageID) *PulsarMessageId
FromMessageId converts a pulsar.MessageID interface type to a *PulsarMessageId, which can be used, e.g., for comparison.
func New ¶
func New(ledgerID, entryID int64, partitionIdx, batchIdx int32) *PulsarMessageId
func (*PulsarMessageId) BatchIdx ¶
func (id *PulsarMessageId) BatchIdx() int32
func (*PulsarMessageId) EntryID ¶
func (id *PulsarMessageId) EntryID() int64
func (*PulsarMessageId) Equal ¶
func (id *PulsarMessageId) Equal(other pulsar.MessageID) (bool, error)
func (*PulsarMessageId) Greater ¶
func (id *PulsarMessageId) Greater(other pulsar.MessageID) (bool, error)
Greater returns true if id occurred after other, or an error if the message ids are not comparable (i.e., if they are from different partitions).
func (*PulsarMessageId) GreaterEqual ¶
func (id *PulsarMessageId) GreaterEqual(other pulsar.MessageID) (bool, error)
func (*PulsarMessageId) LedgerID ¶
func (id *PulsarMessageId) LedgerID() int64
func (*PulsarMessageId) PartitionIdx ¶
func (id *PulsarMessageId) PartitionIdx() int32
func (*PulsarMessageId) Serialize ¶
func (id *PulsarMessageId) Serialize() []byte
func (*PulsarMessageId) String ¶
func (id *PulsarMessageId) String() string