Documentation
¶
Index ¶
- Variables
- func RetryCount(val int) *int
- type BatchJSONOptions
- type BatchOptions
- type Client
- func (c *Client) Batch(options []BatchOptions) (results [][]PublishOrEnqueueResponse, err error)
- func (c *Client) BatchJSON(options []BatchJSONOptions) (results [][]PublishOrEnqueueResponse, err error)
- func (c *Client) Dlq() *Dlq
- func (c *Client) Enqueue(options EnqueueOptions) (result PublishOrEnqueueResponse, err error)
- func (c *Client) EnqueueJSON(options EnqueueJSONOptions) (result PublishOrEnqueueResponse, err error)
- func (c *Client) Events() *Events
- func (c *Client) Keys() *Keys
- func (c *Client) Messages() *Messages
- func (c *Client) Publish(options PublishOptions) (result PublishOrEnqueueResponse, err error)
- func (c *Client) PublishJSON(options PublishJSONOptions) (result PublishOrEnqueueResponse, err error)
- func (c *Client) Queues() *Queues
- func (c *Client) Schedules() *Schedules
- func (c *Client) UrlGroups() *UrlGroups
- type Dlq
- type DlqFilter
- type DlqMessage
- type Endpoint
- type EnqueueJSONOptions
- type EnqueueOptions
- type EnqueueUrlGroupJSONOptions
- type EnqueueUrlGroupOptions
- type Event
- type EventFilter
- type EventState
- type Events
- type Keys
- type ListDlqOptions
- type ListEventsOptions
- type Message
- type Messages
- type Options
- type PublishJSONOptions
- type PublishOptions
- type PublishOrEnqueueResponse
- type PublishUrlGroupJSONOptions
- type PublishUrlGroupOptions
- type Queue
- type QueueWithLag
- type Queues
- func (c *Queues) Delete(queue string) (err error)
- func (c *Queues) Get(name string) (schedule QueueWithLag, err error)
- func (c *Queues) List() (schedules []QueueWithLag, err error)
- func (c *Queues) Pause(queue string) (err error)
- func (c *Queues) Resume(queue string) (err error)
- func (c *Queues) Upsert(queue Queue) (err error)
- type Receiver
- type Schedule
- type ScheduleJSONOptions
- type ScheduleOptions
- type Schedules
- func (s *Schedules) Create(schedule ScheduleOptions) (string, error)
- func (s *Schedules) CreateJSON(schedule ScheduleJSONOptions) (scheduleId string, err error)
- func (s *Schedules) Delete(scheduleId string) (err error)
- func (s *Schedules) Get(scheduleId string) (schedule Schedule, err error)
- func (s *Schedules) List() (schedules []Schedule, err error)
- func (s *Schedules) Pause(scheduleId string) (err error)
- func (s *Schedules) Resume(scheduleId string) (err error)
- type SigningKeys
- type UrlGroup
- type UrlGroups
- func (u *UrlGroups) Delete(urlGroup string) (err error)
- func (u *UrlGroups) Enqueue(options EnqueueUrlGroupOptions) (result []PublishOrEnqueueResponse, err error)
- func (u *UrlGroups) EnqueueJSON(message EnqueueUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error)
- func (u *UrlGroups) Get(urlGroup string) (result UrlGroup, err error)
- func (u *UrlGroups) List() (result []UrlGroup, err error)
- func (u *UrlGroups) Publish(po PublishUrlGroupOptions) (result []PublishOrEnqueueResponse, err error)
- func (u *UrlGroups) PublishJSON(message PublishUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error)
- func (u *UrlGroups) RemoveEndpoints(urlGroup string, endpoints []Endpoint) (err error)
- func (u *UrlGroups) UpsertEndpoints(urlGroup string, endpoints []Endpoint) (err error)
- type VerifyOptions
Constants ¶
This section is empty.
Variables ¶
var (
ErrInvalidSignature = fmt.Errorf("failed to validate signature")
)
Functions ¶
func RetryCount ¶
Types ¶
type BatchJSONOptions ¶
type BatchJSONOptions struct { Queue string Url string UrlGroup string Api string Body map[string]any Method string Headers map[string]string Retries *int Callback string FailureCallback string Forward string Delay string NotBefore string DeduplicationId string ContentBasedDeduplication bool Timeout string }
type BatchOptions ¶
type BatchOptions struct { Queue string Url string UrlGroup string Api string Body string Method string ContentType string Headers map[string]string Retries *int Callback string FailureCallback string Forward string Delay string NotBefore string DeduplicationId string ContentBasedDeduplication bool Timeout string }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
NewClient initializes a client instance with the given token and the default HTTP client.
func NewClientWith ¶
NewClientWith initializes a client with the given token and HTTP client.
func NewClientWithEnv ¶
func NewClientWithEnv() *Client
NewClientWithEnv initializes a client with the token from the QSTASH_TOKEN environment variable and the default HTTP client.
func (*Client) Batch ¶
func (c *Client) Batch(options []BatchOptions) (results [][]PublishOrEnqueueResponse, err error)
Batch publishes or enqueues multiple messages in a single request.
func (*Client) BatchJSON ¶
func (c *Client) BatchJSON(options []BatchJSONOptions) (results [][]PublishOrEnqueueResponse, err error)
BatchJSON publishes or enqueues multiple messages in a single request, automatically serializing the message bodies as JSON strings, and setting content type to `application/json`.
func (*Client) Enqueue ¶
func (c *Client) Enqueue(options EnqueueOptions) (result PublishOrEnqueueResponse, err error)
Enqueue enqueues a message, after creating the queue if it does not exist.
func (*Client) EnqueueJSON ¶
func (c *Client) EnqueueJSON(options EnqueueJSONOptions) (result PublishOrEnqueueResponse, err error)
EnqueueJSON enqueues a message, after creating the queue if it does not exist. It automatically serializes the body as JSON string, and setting content type to `application/json`.
func (*Client) Publish ¶
func (c *Client) Publish(options PublishOptions) (result PublishOrEnqueueResponse, err error)
Publish publishes a message to QStash.
func (*Client) PublishJSON ¶
func (c *Client) PublishJSON(options PublishJSONOptions) (result PublishOrEnqueueResponse, err error)
PublishJSON publishes a message to QStash, automatically serializing the body as JSON string, and setting content type to `application/json`.
type Dlq ¶
type Dlq struct {
// contains filtered or unexported fields
}
Dlq (Dead Letter Queue) is a specialized queue used to store messages that cannot be processed successfully by the API. When the API fails to process a request due to reasons like bugs in the code, temporary issues with third-party services, or network problems, QStash will retry processing the message a few times. If the retries are unsuccessful, the message is then moved to the Dlq. This allows for these problematic messages to be handled manually, ensuring they don't get lost or cause further issues in the system.
func (*Dlq) DeleteMany ¶
DeleteMany deletes multiple messages from the Dlq and returns the number of deleted messages.
func (*Dlq) Get ¶
func (d *Dlq) Get(dlqId string) (dlqMessage DlqMessage, err error)
Get retrieves a message from the DLQ by its unique ID.
func (*Dlq) List ¶
func (d *Dlq) List(options ListDlqOptions) (messages []DlqMessage, cursor string, err error)
List retrieves all messages currently in the Dlq.
type DlqFilter ¶
type DlqFilter struct { // MessageId filters Dlq entries by the ID of the message. MessageId string // Url filters Dlq entries by the URL of the message. Url string // UrlGroup filters Dlq entries by URL group of the message. UrlGroup string // ScheduleId filters Dlq entries by schedule ID. ScheduleId string // Queue filters Dlq entries by queue name. Queue string // Api filters Dlq entries by the API name of the message. Api string // FromDate filters Dlq entries by starting time in milliseconds. FromDate time.Time // ToDate filters Dlq entries by ending time in milliseconds. ToDate time.Time // ResponseStatus filters Dlq entries by HTTP response status code of the message. ResponseStatus int // CallerIP filters Dlq entries by IP address of the publisher of the message. CallerIP string }
type DlqMessage ¶
type DlqMessage struct { Message // DlqId is the unique id within the Dlq. DlqId string `json:"dlqId"` // ResponseStatus is the HTTP status code of the last failed delivery attempt. ResponseStatus int `json:"responseStatus,omitempty"` // ResponseHeaders is the response headers of the last failed delivery attempt. ResponseHeaders http.Header `json:"responseHeader,omitempty"` // ResponseBody is the response body of the last failed delivery attempt if it is composed of UTF-8 characters only, empty otherwise. ResponseBody string `json:"responseBody,omitempty"` // ResponseBodyBase64 is the base64 encoded response body of the last failed delivery attempt if the response body contains non-UTF-8 characters, empty otherwise. ResponseBodyBase64 string `json:"responseBodyBase64,omitempty"` }
type EnqueueJSONOptions ¶
type EnqueueJSONOptions struct { Queue string PublishJSONOptions }
type EnqueueOptions ¶
type EnqueueOptions struct { Queue string PublishOptions }
type EnqueueUrlGroupJSONOptions ¶
type EnqueueUrlGroupJSONOptions struct { Queue string PublishUrlGroupJSONOptions }
type EnqueueUrlGroupOptions ¶
type EnqueueUrlGroupOptions struct { Queue string PublishUrlGroupOptions }
type Event ¶
type Event struct { // Time is the timestamp of this event in Unix time (milliseconds). Time int64 `json:"time"` // MessageId is the ID of associated message. MessageId string `json:"messageId"` // State is the current state of the message. State EventState `json:"state"` // Error is set only if the status of the message is an error. Error string `json:"error,omitempty"` // NextDeliveryTime is the next scheduled time of the message in milliseconds NextDeliveryTime int64 `json:"nextDeliveryTime,omitempty"` // Url is the destination url Url string `json:"url"` // UrlGroup is the name of the url group if this message was sent through an url group, empty otherwise. UrlGroup string `json:"topicName,omitempty"` // Endpoint is the name of the endpoint if this message was sent through an url group, empty otherwise. EndpointName string `json:"endpointName,omitempty"` // Api is the name of the api if this message was sent to an api. Api string `json:"api,omitempty"` // QueueName is the name of the queue if this message is enqueued on a queue, empty otherwise. QueueName string `json:"queueName,omitempty"` // ScheduleId is the ID of responsible schedule if the message is triggered by a schedule. ScheduleId string `json:"scheduleId,omitempty"` }
type EventFilter ¶
type EventFilter struct { // MessageId filters events by the ID of the message. MessageId string // State filters events by the state of the message. State EventState // Url filters events by the URL of the message. Url string // UrlGroup filters events by URL group of the message. UrlGroup string // Api filters events by the API name of the message. Api string // Queue filters events by queue name. Queue string // ScheduleId filters events by schedule ID. ScheduleId string // FromDate filters events by starting time in milliseconds. FromDate time.Time // ToDate filters events by ending time in milliseconds. ToDate time.Time }
type EventState ¶
type EventState string
var ( Created EventState = "CREATED" Active EventState = "ACTIVE" Retry EventState = "RETRY" Error EventState = "ERROR" Delivered EventState = "DELIVERED" Failed EventState = "FAILED" CancelRequested EventState = "CANCEL_REQUESTED" Canceled EventState = "CANCELED" )
type Keys ¶
type Keys struct {
// contains filtered or unexported fields
}
func (*Keys) Get ¶
func (k *Keys) Get() (keys SigningKeys, err error)
Get retrieves the current and next signing keys.
func (*Keys) Rotate ¶
func (k *Keys) Rotate() (keys SigningKeys, err error)
Rotate rotates the current signing key and gets the new signing key. The next signing key becomes the current signing key, and a new signing key is assigned to the next signing key.
type ListDlqOptions ¶
type ListEventsOptions ¶
type ListEventsOptions struct { // Cursor is the starting point for listing events. Cursor string // Count is the maximum number of events to return. Count int // Filter is the filter to apply. Filter EventFilter }
func (*ListEventsOptions) Params ¶
func (l *ListEventsOptions) Params() url.Values
type Message ¶
type Message struct { // MessageId is the unique identifier of the message. MessageId string `json:"messageId"` // Endpoint is the endpoint name of the message if the endpoint is given a name within the url group. Endpoint string `json:"endpointName,omitempty"` // Url is the address to which the message should be delivered. Url string `json:"url,omitempty"` // UrlGroup is the url group name if this message was sent to an url group, empty otherwise. UrlGroup string `json:"urlGroup,omitempty"` // Method is the HTTP method to use for the message. Method string `json:"method"` // Header is the HTTP headers sent the endpoint. Header http.Header `json:"header"` // Body is the body of the message if it is composed of UTF-8 characters only, empty otherwise. Body string `json:"body,omitempty"` // BodyBase64 is the base64 encoded body if the body contains non-UTF-8 characters, empty otherwise. BodyBase64 string `json:"bodyBase64,omitempty"` // MaxRetries is the number of retries that should be attempted in case of delivery failure. MaxRetries int32 `json:"maxRetries"` // NotBefore is the unix timestamp in milliseconds before which the message should not be delivered. NotBefore int64 `json:"notBefore"` // CreatedAt is the unix timestamp in milliseconds when the message was created. CreatedAt int64 `json:"createdAt"` // Callback is the url which is called each time the message is attempted to be delivered. Callback string `json:"callback,omitempty"` // FailureCallback is the url which is called after the message is failed. FailureCallback string `json:"failureCallback,omitempty"` // ScheduleId is the id of scheduled job of the message if the message is triggered by a schedule. ScheduleId string `json:"scheduleId,omitempty"` // CallerIP is IP address of the publisher of this message. CallerIP string `json:"callerIP,omitempty"` // Queue is the queue name if this message was enqueued to a queue. Queue string `json:"queueName,omitempty"` // Api is the api name if this message was sent to an api. Api string `json:"api,omitempty"` }
type Messages ¶
type Messages struct {
// contains filtered or unexported fields
}
func (*Messages) Cancel ¶
Cancel cancels delivery of an existing message.
Cancelling a message will remove it from QStash and stop it from being delivered in the future. If a message is in flight to your API, it might be too late to cancel.
func (*Messages) CancelMany ¶
CancelMany cancels delivery of given messages.
type Options ¶
type Options struct { // Url is the base address of QStash, it's set to https://qstash.upstash.io by default. Url string // Token is the authorization token from the Upstash console. Token string // Client is the HTTP client used for sending requests. Client *http.Client }
type PublishJSONOptions ¶
type PublishOptions ¶
type PublishOrEnqueueResponse ¶
type PublishOrEnqueueResponse struct { // MessageId is the unique identifier of new message. MessageId string `json:"messageId"` // Deduplicated indicates whether the message is a duplicate that was not sent to the destination. Deduplicated bool `json:"deduplicated,omitempty"` // Url is the target address of the message if it was sent to a URL group, empty otherwise. Url string `json:"url,omitempty"` }
type PublishUrlGroupOptions ¶
type Queue ¶
type Queue struct { // Name is the name of the queue Name string `json:"queueName" validate:"required"` // Parallelism is the number of parallel consumers consuming from the queue. Parallelism int `json:"parallelism"` // IsPaused is whether the queue is paused or not. IsPaused bool `json:"paused"` }
type QueueWithLag ¶
type QueueWithLag struct { // Name is the name of the queue. Name string `json:"name"` // Parallelism is the number of parallel consumers consuming from the queue. Parallelism int `json:"parallelism"` // CreatedAt is the creation time of the queue, in unix milliseconds. CreatedAt int64 `json:"createdAt"` // UpdatedAt is the last update time of the queue, in unix milliseconds UpdatedAt int64 `json:"updatedAt"` // Lag is the number of unprocessed messages that exist in the queue. Lag int64 `json:"lag"` // IsPaused is whether the queue is paused or not. IsPaused bool `json:"paused"` }
type Queues ¶
type Queues struct {
// contains filtered or unexported fields
}
Queues in QStash are mechanisms that ensure ordered delivery (FIFO) and allow controlled parallelism in processing messages. Messages are queued and delivered one by one in a first-in-first-out order, ensuring each message is processed before the next one is activated. If a message fails due to an endpoint returning a non-2xx code, retries are attempted before moving to the next message. To avoid overwhelming an endpoint and improve throughput, parallelism can be configured, allowing multiple messages to be processed concurrently.
func (*Queues) Get ¶
func (c *Queues) Get(name string) (schedule QueueWithLag, err error)
Get retrieves a queue by its name.
func (*Queues) List ¶
func (c *Queues) List() (schedules []QueueWithLag, err error)
List retrieves all queues.
type Receiver ¶
Receiver offers a simple way to verify the signature of a request.
func NewReceiver ¶
func NewReceiverWithEnv ¶
func NewReceiverWithEnv() *Receiver
func (*Receiver) Verify ¶
func (r *Receiver) Verify(opts VerifyOptions) (err error)
Verify verifies the signature of a request. It tries to verify the signature with the current signing key. If that fails, maybe because you have rotated the keys recently, it will try to verify the signature with the next signing key.
type Schedule ¶
type Schedule struct { // Id is the unique id of the schedule. Id string `json:"scheduleId"` // CreatedAt is the creation time of the schedule, in unix milliseconds. CreatedAt int64 `json:"createdAt"` // Cron is the cron expression used to schedule the messages. Cron string `json:"cron"` // Destination is the destination url or url group. Destination string `json:"destination"` // Method is the HTTP method to use for the message. Method string `json:"method"` // Header is the headers of the message. Header map[string][]string `json:"header,omitempty"` // Body is the body of the scheduled message if it is composed of UTF-8 characters only, empty otherwise. Body string `json:"body,omitempty"` // BodyBase64 is he base64 encoded body if the scheduled message body contains non-UTF-8 characters, empty otherwise. BodyBase64 string `json:"bodyBase64,omitempty"` // Retries is the number of retries that should be attempted in case of delivery failure. Retries int32 `json:"retries"` // Delay is the delay in seconds before the message is delivered. Delay int32 `json:"delay,omitempty"` // Callback is the url which is called each time the message is attempted to be delivered. Callback string `json:"callback,omitempty"` // FailureCallback is the url which is called after the message is failed FailureCallback string `json:"failureCallback,omitempty"` // LastScheduleTime is the timestamp in unix milli of last scheduled message LastScheduleTime int64 `json:"lastScheduleTime,omitempty"` // LastScheduleTime is the timestamp in unix milli of the next scheduled message NextScheduleTime int64 `json:"nextScheduleTime,omitempty"` // LastScheduleStates is the message id state pair for last schedule. LastScheduleStates map[string]string `json:"lastScheduleStates,omitempty"` // CallerIP is IP address of the creator of this schedule. CallerIP string `json:"callerIP,omitempty"` // IsPaused indicates whether the schedule is paused or not. IsPaused bool `json:"isPaused,omitempty"` }
type ScheduleJSONOptions ¶
type ScheduleOptions ¶
type Schedules ¶
type Schedules struct {
// contains filtered or unexported fields
}
Schedules in QStash allow you to publish messages at specified intervals instead of just once. You can create schedules using cron expressions. These expressions define the timing of message delivery, evaluated in the UTC timezone.
func (*Schedules) Create ¶
func (s *Schedules) Create(schedule ScheduleOptions) (string, error)
Create creates a schedule to send messages periodically and returns the ID of created schedule.
func (*Schedules) CreateJSON ¶
func (s *Schedules) CreateJSON(schedule ScheduleJSONOptions) (scheduleId string, err error)
CreateJSON creates a schedule to send messages periodically, automatically serializing the body as JSON string, and setting content type to `application/json`.
type SigningKeys ¶
type UrlGroup ¶
type UrlGroup struct { // Name is the name of the url group. Name string `json:"name"` // CreatedAt is the creation time of the url group, in unix milliseconds. CreatedAt int64 `json:"createdAt"` // UpdatedAt is the last update time of the url group, in unix milliseconds. UpdatedAt int64 `json:"updatedAt"` // Endpoints is the list of endpoints belong to url group. Endpoints []Endpoint `json:"endpoints"` }
type UrlGroups ¶
type UrlGroups struct {
// contains filtered or unexported fields
}
UrlGroups in QStash are namespaces where you can publish messages that are then sent to multiple endpoints. After creating an url group, you can define multiple endpoints, each represented by a publicly available URL. When a message is published to an url group, it is distributed to all subscribed endpoints.
func (*UrlGroups) Enqueue ¶
func (u *UrlGroups) Enqueue(options EnqueueUrlGroupOptions) (result []PublishOrEnqueueResponse, err error)
Enqueue enqueues a message, after creating the queue if it does not exist.
func (*UrlGroups) EnqueueJSON ¶
func (u *UrlGroups) EnqueueJSON(message EnqueueUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error)
EnqueueJSON enqueues a message, after creating the queue if it does not exist. It automatically serializes the body as JSON string, and setting content type to `application/json`.
func (*UrlGroups) Publish ¶
func (u *UrlGroups) Publish(po PublishUrlGroupOptions) (result []PublishOrEnqueueResponse, err error)
Publish publishes a message to QStash.
func (*UrlGroups) PublishJSON ¶
func (u *UrlGroups) PublishJSON(message PublishUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error)
PublishJSON publishes a message to QStash, automatically serializing the body as JSON string, and setting content type to `application/json`.
func (*UrlGroups) RemoveEndpoints ¶
RemoveEndpoints removes one or more endpoints from an url group. If all endpoints have been removed, the url group will be deleted.
func (*UrlGroups) UpsertEndpoints ¶
UpsertEndpoints adds or updates one or more endpoints to an url group. If the url group or the endpoint does not exist, it will be created. If the endpoint exists, it will be updated.
type VerifyOptions ¶
type VerifyOptions struct { // Signature is the signature from the `Upstash-Signature` header. Signature string // Url is the address of the endpoint where the request was sent to. When set to `None`, url is not check. Url string // Body is the raw request body. Body string // Tolerance is the duration to tolerate when checking `nbf` and `exp` claims, to deal with small clock differences among different servers. Tolerance time.Duration }