queues

package
v0.30.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2025 License: MIT Imports: 5 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume added in v0.28.0

func Consume(f Consumer)

Consume sets the Consumer function to receive batches of messages from Cloudflare Queues NOTE: This function will block the current goroutine and is intented to be used as long as the only worker's purpose is to be the consumer of a Cloudflare Queue. In case the worker has other purposes (e.g. handling HTTP requests), use ConsumeNonBlock instead.

func ConsumeNonBlock added in v0.28.0

func ConsumeNonBlock(f Consumer)

ConsumeNonBlock sets the Consumer function to receive batches of messages from Cloudflare Queues. This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests). The worker will not block receiving messages and will continue to execute other tasks. ConsumeNonBlock should be called before setting other blocking handlers (e.g. workers.Serve).

Types

type BatchSendOption

type BatchSendOption func(*batchSendOptions)

func WithBatchDelaySeconds

func WithBatchDelaySeconds(d time.Duration) BatchSendOption

WithBatchDelaySeconds changes the number of seconds to delay the message.

type Consumer added in v0.28.0

type Consumer func(batch *MessageBatch) error

Consumer is a function that received a batch of messages from Cloudflare Queues. The function should be set using Consume or ConsumeNonBlock. A returned error will cause the batch to be retried (unless the batch or individual messages are acked). NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message acknowledgment until the task is completed witout blocking the queue consumption.

type Message added in v0.28.0

type Message struct {

	// ID - The unique Cloudflare-generated identifier of the message
	ID string
	// Timestamp - The time when the message was enqueued
	Timestamp time.Time
	// Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody.
	Body js.Value
	// Attempts - The number of times the message delivery has been retried.
	Attempts int
	// contains filtered or unexported fields
}

Message represents a message of the batch received by the consumer.

func (*Message) Ack added in v0.28.0

func (m *Message) Ack()

Ack acknowledges the message as successfully delivered despite the result returned from the consuming function.

func (*Message) BytesBody added in v0.28.0

func (m *Message) BytesBody() ([]byte, error)

func (*Message) Retry added in v0.28.0

func (m *Message) Retry(opts ...RetryOption)

Retry marks the message to be re-delivered. The message will be retried after the optional delay configured with RetryOption.

func (*Message) StringBody added in v0.28.0

func (m *Message) StringBody() (string, error)

type MessageBatch added in v0.28.0

type MessageBatch struct {

	// Queue - The name of the queue from which the messages were received
	Queue string

	// Messages - The messages in the batch
	Messages []*Message
	// contains filtered or unexported fields
}

MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the worker configuration.

func (*MessageBatch) AckAll added in v0.28.0

func (b *MessageBatch) AckAll()

AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function.

func (*MessageBatch) RetryAll added in v0.28.0

func (b *MessageBatch) RetryAll(opts ...RetryOption)

RetryAll marks all messages in the batch to be re-delivered. The messages will be retried after the optional delay configured with RetryOption.

type MessageSendRequest added in v0.28.0

type MessageSendRequest struct {
	// contains filtered or unexported fields
}

MessageSendRequest is a wrapper type used for sending message batches. see: https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagesendrequest

func NewBytesMessageSendRequest added in v0.28.0

func NewBytesMessageSendRequest(content []byte, opts ...SendOption) *MessageSendRequest

NewBytesMessageSendRequest creates a single byte array message to be batched before sending to a queue.

func NewJSONMessageSendRequest added in v0.28.0

func NewJSONMessageSendRequest(content any, opts ...SendOption) *MessageSendRequest

NewJSONMessageSendRequest creates a single JSON message to be batched before sending to a queue.

func NewTextMessageSendRequest added in v0.28.0

func NewTextMessageSendRequest(content string, opts ...SendOption) *MessageSendRequest

NewTextMessageSendRequest creates a single text message to be batched before sending to a queue.

func NewV8MessageSendRequest added in v0.28.0

func NewV8MessageSendRequest(content js.Value, opts ...SendOption) *MessageSendRequest

NewV8MessageSendRequest creates a single raw JS value message to be batched before sending to a queue.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(queueName string) (*Producer, error)

NewProducer creates a new Producer object to send messages to a queue. queueName is the name of the queue environment var to send messages to. In Cloudflare API documentation, this object represents the Queue.

func (*Producer) SendBatch

func (p *Producer) SendBatch(messages []*MessageSendRequest, opts ...BatchSendOption) error

SendBatch sends multiple messages to a queue. This function allows setting options for each message.

func (*Producer) SendBytes

func (p *Producer) SendBytes(body []byte, opts ...SendOption) error

SendBytes sends a single byte array message to a queue.

func (*Producer) SendJSON

func (p *Producer) SendJSON(body any, opts ...SendOption) error

SendJSON sends a single JSON message to a queue.

func (*Producer) SendText

func (p *Producer) SendText(body string, opts ...SendOption) error

SendText sends a single text message to a queue.

func (*Producer) SendV8

func (p *Producer) SendV8(body js.Value, opts ...SendOption) error

SendV8 sends a single raw JS value message to a queue.

type RetryOption added in v0.28.0

type RetryOption func(*retryOptions)

func WithRetryDelay added in v0.28.0

func WithRetryDelay(d time.Duration) RetryOption

WithRetryDelay sets the delay in seconds before the messages delivery is retried. Note that the delay should not be less than a second and is not more precise than a second.

type SendOption

type SendOption func(*sendOptions)

func WithDelaySeconds

func WithDelaySeconds(d time.Duration) SendOption

WithDelaySeconds changes the number of seconds to delay the message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL