v1

package
v0.73.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TASK_PROCESSING_QUEUE staticQueue = "task_processing_queue_v2"
	OLAP_QUEUE            staticQueue = "olap_queue_v2"
)
View Source
const (
	Scheduler = "scheduler"
)

Variables

View Source
var (
	PUB_FLUSH_INTERVAL  = 10 * time.Millisecond
	PUB_BUFFER_SIZE     = 1000
	PUB_MAX_CONCURRENCY = 1
	PUB_TIMEOUT         = 10 * time.Second
)

nolint: staticcheck

View Source
var (
	SUB_FLUSH_INTERVAL  = 10 * time.Millisecond
	SUB_BUFFER_SIZE     = 1000
	SUB_MAX_CONCURRENCY = 10
)

nolint: staticcheck

Functions

func DecodeAndValidateSingleton

func DecodeAndValidateSingleton(dv datautils.DataDecoderValidator, payloads [][]byte, target interface{}) error

func GetTenantExchangeName

func GetTenantExchangeName(t string) string

func JSONConvert

func JSONConvert[T any](payloads [][]byte) []*T

func NewRandomStaticQueue

func NewRandomStaticQueue() staticQueue

func NoOpHook

func NoOpHook(task *Message) error

func QueueTypeFromDispatcherID

func QueueTypeFromDispatcherID(d string) consumerQueue

func QueueTypeFromPartitionIDAndController

func QueueTypeFromPartitionIDAndController(p, controller string) consumerQueue

func QueueTypeFromTickerID

func QueueTypeFromTickerID(t string) consumerQueue

func TenantEventConsumerQueue

func TenantEventConsumerQueue(t string) fanoutQueue

func WithBufferSize

func WithBufferSize(bufferSize int) mqSubBufferOptFunc

func WithDisableImmediateFlush

func WithDisableImmediateFlush(disableImmediateFlush bool) mqSubBufferOptFunc

"Immediate flush" means that if we haven't flushed yet, we can flush immediately without waiting on the flush interval timer.

func WithFlushInterval

func WithFlushInterval(flushInterval time.Duration) mqSubBufferOptFunc

func WithKind

func WithKind(kind SubBufferKind) mqSubBufferOptFunc

func WithMaxConcurrency

func WithMaxConcurrency(maxConcurrency int) mqSubBufferOptFunc

Types

type AckHook

type AckHook func(task *Message) error

type DstFunc

type DstFunc func(tenantId, msgId string, payloads [][]byte) error

type MQPubBuffer

type MQPubBuffer struct {
	// contains filtered or unexported fields
}

MQPubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.

func NewMQPubBuffer

func NewMQPubBuffer(mq MessageQueue) *MQPubBuffer

func (*MQPubBuffer) Pub

func (m *MQPubBuffer) Pub(ctx context.Context, queue Queue, msg *Message, wait bool) error

func (*MQPubBuffer) Stop

func (m *MQPubBuffer) Stop()

type MQSubBuffer

type MQSubBuffer struct {
	// contains filtered or unexported fields
}

MQSubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.

func NewMQSubBuffer

func NewMQSubBuffer(queue Queue, mq MessageQueue, dst DstFunc, fs ...mqSubBufferOptFunc) *MQSubBuffer

func (*MQSubBuffer) Start

func (m *MQSubBuffer) Start() (func() error, error)

type Message

type Message struct {
	// ID is the ID of the task.
	ID string `json:"id"`

	// Payloads is the list of payloads.
	Payloads [][]byte `json:"messages"`

	// TenantID is the tenant ID.
	TenantID string `json:"tenant_id"`

	// Whether the message should immediately expire if it reaches the queue without an active consumer.
	ImmediatelyExpire bool `json:"immediately_expire"`

	// Whether the message should be persisted to disk
	Persistent bool `json:"persistent"`

	// OtelCarrier is the OpenTelemetry carrier for the task.
	OtelCarrier map[string]string `json:"otel_carrier"`

	// Retries is the number of retries for the task.
	Retries int `json:"retries"`
}

func NewTenantMessage

func NewTenantMessage[T any](tenantId, id string, immediatelyExpire, persistent bool, payloads ...T) (*Message, error)

func (*Message) Serialize

func (t *Message) Serialize() ([]byte, error)

func (*Message) SetOtelCarrier

func (t *Message) SetOtelCarrier(otelCarrier map[string]string)

type MessageQueue

type MessageQueue interface {
	// Clone copies the message queue with a new instance.
	Clone() (func() error, MessageQueue)

	// SetQOS sets the quality of service for the message queue.
	SetQOS(prefetchCount int)

	// SendMessage sends a message to the message queue.
	SendMessage(ctx context.Context, queue Queue, msg *Message) error

	// Subscribe subscribes to the task queue. It returns a cleanup function that should be called when the
	// subscription is no longer needed.
	Subscribe(queue Queue, preAck AckHook, postAck AckHook) (func() error, error)

	// RegisterTenant registers a new pub/sub mechanism for a tenant. This should be called when a
	// new tenant is created. If this is not called, implementors should ensure that there's a check
	// on the first message to a tenant to ensure that the tenant is registered, and store the tenant
	// in an LRU cache which lives in-memory.
	RegisterTenant(ctx context.Context, tenantId string) error

	// IsReady returns true if the task queue is ready to accept tasks.
	IsReady() bool
}

type PubFunc

type PubFunc func(m *Message) error

type Queue

type Queue interface {
	// Name returns the name of the queue.
	Name() string

	// Durable returns true if this queue should survive task queue restarts.
	Durable() bool

	// AutoDeleted returns true if this queue should be deleted when the last consumer unsubscribes.
	AutoDeleted() bool

	// Exclusive returns true if this queue should only be accessed by the current connection.
	Exclusive() bool

	// FanoutExchangeKey returns which exchange the queue should be subscribed to. This is only currently relevant
	// to tenant pub/sub queues.
	//
	// In RabbitMQ terminology, the existence of a subscriber key means that the queue is bound to a fanout
	// exchange, and a new random queue is generated for each connection when connections are retried.
	FanoutExchangeKey() string

	// DLQ returns the queue's dead letter queue, if it exists.
	DLQ() Queue

	// IsDLQ returns true if the queue is a dead letter queue.
	IsDLQ() bool
}

type SharedBufferedTenantReader

type SharedBufferedTenantReader struct {
	// contains filtered or unexported fields
}

func NewSharedBufferedTenantReader

func NewSharedBufferedTenantReader(mq MessageQueue) *SharedBufferedTenantReader

func (*SharedBufferedTenantReader) Subscribe

func (s *SharedBufferedTenantReader) Subscribe(tenantId string, f DstFunc) (func() error, error)

type SharedTenantReader

type SharedTenantReader struct {
	// contains filtered or unexported fields
}

func NewSharedTenantReader

func NewSharedTenantReader(mq MessageQueue) *SharedTenantReader

func (*SharedTenantReader) Subscribe

func (s *SharedTenantReader) Subscribe(tenantId string, postAck AckHook) (func() error, error)

type SubBufferKind

type SubBufferKind string
const (
	PostAck SubBufferKind = "postAck"
	PreAck  SubBufferKind = "preAck"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL