Documentation
¶
Index ¶
- func Consume(f Consumer)
- func ConsumeNonBlock(f Consumer)
- type BatchSendOption
- type Consumer
- type Message
- type MessageBatch
- type MessageSendRequest
- func NewBytesMessageSendRequest(content []byte, opts ...SendOption) *MessageSendRequest
- func NewJSONMessageSendRequest(content any, opts ...SendOption) *MessageSendRequest
- func NewTextMessageSendRequest(content string, opts ...SendOption) *MessageSendRequest
- func NewV8MessageSendRequest(content js.Value, opts ...SendOption) *MessageSendRequest
- type Producer
- func (p *Producer) SendBatch(messages []*MessageSendRequest, opts ...BatchSendOption) error
- func (p *Producer) SendBytes(body []byte, opts ...SendOption) error
- func (p *Producer) SendJSON(body any, opts ...SendOption) error
- func (p *Producer) SendText(body string, opts ...SendOption) error
- func (p *Producer) SendV8(body js.Value, opts ...SendOption) error
- type RetryOption
- type SendOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶ added in v0.28.0
func Consume(f Consumer)
Consume sets the Consumer function to receive batches of messages from Cloudflare Queues NOTE: This function will block the current goroutine and is intented to be used as long as the only worker's purpose is to be the consumer of a Cloudflare Queue. In case the worker has other purposes (e.g. handling HTTP requests), use ConsumeNonBlock instead.
func ConsumeNonBlock ¶ added in v0.28.0
func ConsumeNonBlock(f Consumer)
ConsumeNonBlock sets the Consumer function to receive batches of messages from Cloudflare Queues. This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests). The worker will not block receiving messages and will continue to execute other tasks. ConsumeNonBlock should be called before setting other blocking handlers (e.g. workers.Serve).
Types ¶
type BatchSendOption ¶
type BatchSendOption func(*batchSendOptions)
func WithBatchDelaySeconds ¶
func WithBatchDelaySeconds(d time.Duration) BatchSendOption
WithBatchDelaySeconds changes the number of seconds to delay the message.
type Consumer ¶ added in v0.28.0
type Consumer func(batch *MessageBatch) error
Consumer is a function that received a batch of messages from Cloudflare Queues. The function should be set using Consume or ConsumeNonBlock. A returned error will cause the batch to be retried (unless the batch or individual messages are acked). NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message acknowledgment until the task is completed witout blocking the queue consumption.
type Message ¶ added in v0.28.0
type Message struct { // ID - The unique Cloudflare-generated identifier of the message ID string // Timestamp - The time when the message was enqueued Timestamp time.Time // Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody. Body js.Value // Attempts - The number of times the message delivery has been retried. Attempts int // contains filtered or unexported fields }
Message represents a message of the batch received by the consumer.
func (*Message) Ack ¶ added in v0.28.0
func (m *Message) Ack()
Ack acknowledges the message as successfully delivered despite the result returned from the consuming function.
func (*Message) Retry ¶ added in v0.28.0
func (m *Message) Retry(opts ...RetryOption)
Retry marks the message to be re-delivered. The message will be retried after the optional delay configured with RetryOption.
func (*Message) StringBody ¶ added in v0.28.0
type MessageBatch ¶ added in v0.28.0
type MessageBatch struct { // Queue - The name of the queue from which the messages were received Queue string // Messages - The messages in the batch Messages []*Message // contains filtered or unexported fields }
MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the worker configuration.
- https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer
- https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (*MessageBatch) AckAll ¶ added in v0.28.0
func (b *MessageBatch) AckAll()
AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function.
func (*MessageBatch) RetryAll ¶ added in v0.28.0
func (b *MessageBatch) RetryAll(opts ...RetryOption)
RetryAll marks all messages in the batch to be re-delivered. The messages will be retried after the optional delay configured with RetryOption.
type MessageSendRequest ¶ added in v0.28.0
type MessageSendRequest struct {
// contains filtered or unexported fields
}
MessageSendRequest is a wrapper type used for sending message batches. see: https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagesendrequest
func NewBytesMessageSendRequest ¶ added in v0.28.0
func NewBytesMessageSendRequest(content []byte, opts ...SendOption) *MessageSendRequest
NewBytesMessageSendRequest creates a single byte array message to be batched before sending to a queue.
func NewJSONMessageSendRequest ¶ added in v0.28.0
func NewJSONMessageSendRequest(content any, opts ...SendOption) *MessageSendRequest
NewJSONMessageSendRequest creates a single JSON message to be batched before sending to a queue.
func NewTextMessageSendRequest ¶ added in v0.28.0
func NewTextMessageSendRequest(content string, opts ...SendOption) *MessageSendRequest
NewTextMessageSendRequest creates a single text message to be batched before sending to a queue.
func NewV8MessageSendRequest ¶ added in v0.28.0
func NewV8MessageSendRequest(content js.Value, opts ...SendOption) *MessageSendRequest
NewV8MessageSendRequest creates a single raw JS value message to be batched before sending to a queue.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
NewProducer creates a new Producer object to send messages to a queue. queueName is the name of the queue environment var to send messages to. In Cloudflare API documentation, this object represents the Queue.
func (*Producer) SendBatch ¶
func (p *Producer) SendBatch(messages []*MessageSendRequest, opts ...BatchSendOption) error
SendBatch sends multiple messages to a queue. This function allows setting options for each message.
func (*Producer) SendBytes ¶
func (p *Producer) SendBytes(body []byte, opts ...SendOption) error
SendBytes sends a single byte array message to a queue.
func (*Producer) SendJSON ¶
func (p *Producer) SendJSON(body any, opts ...SendOption) error
SendJSON sends a single JSON message to a queue.
type RetryOption ¶ added in v0.28.0
type RetryOption func(*retryOptions)
func WithRetryDelay ¶ added in v0.28.0
func WithRetryDelay(d time.Duration) RetryOption
WithRetryDelay sets the delay in seconds before the messages delivery is retried. Note that the delay should not be less than a second and is not more precise than a second.
type SendOption ¶
type SendOption func(*sendOptions)
func WithDelaySeconds ¶
func WithDelaySeconds(d time.Duration) SendOption
WithDelaySeconds changes the number of seconds to delay the message.