dataproxy

package
v0.0.0-...-6260e1a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidGroupID    = errors.New("invalid group ID")
	ErrInvalidURL        = errors.New("invalid URL")
	ErrNoEndpoint        = errors.New("service has no endpoints")
	ErrNoAvailableWorker = errors.New("no available worker")
)

variables

View Source
var (
	// DefaultURL is the default Manager URL for discovering the DataProxy cluster
	DefaultURL = "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"
)

Functions

func NewDiscoverer

func NewDiscoverer(url, groupID string, lookupInterval time.Duration, log logger.Logger) (discoverer.Discoverer, error)

NewDiscoverer news a DataProxy discoverer

Types

type Callback

type Callback func(message Message, err error)

Callback is the callback function signature of the DataProxy producer

type Client

type Client interface {
	// Send sends a message and wait for the result.
	Send(ctx context.Context, msg Message) error
	// SendAsync sends a message asynchronously, when the message is sent or timeout, the callback will be called.
	SendAsync(ctx context.Context, msg Message, callback Callback)
	// Close flushes all the message to the server and wait for the results or timeout, then close the producer.
	Close()
}

Client is the interface of a DataProxy client

func NewClient

func NewClient(opts ...Option) (Client, error)

NewClient Creates a dataproxy-go client instance

type Message

type Message struct {
	GroupID  string            // InLong group ID
	StreamID string            // InLong stream ID
	Payload  []byte            // the content of the message
	Headers  map[string]string // message headers, won't be sent to the server right now
	MetaData interface{}       // any data you want, won't be sent to the server, but you can get it in the callback
}

Message is the message to send

type Option

type Option func(*Options)

Option is the Options helper.

func WithAddColumns

func WithAddColumns(cols map[string]string) Option

WithAddColumns sets AddColumns

func WithBatchingMaxMessages

func WithBatchingMaxMessages(n int) Option

WithBatchingMaxMessages sets BatchingMaxMessages

func WithBatchingMaxPublishDelay

func WithBatchingMaxPublishDelay(t time.Duration) Option

WithBatchingMaxPublishDelay sets BatchingMaxPublishDelay

func WithBatchingMaxSize

func WithBatchingMaxSize(n int) Option

WithBatchingMaxSize sets BatchingMaxSize

func WithBlockIfQueueIsFull

func WithBlockIfQueueIsFull(b bool) Option

WithBlockIfQueueIsFull sets BlockIfQueueIsFull

func WithBufferPool

func WithBufferPool(bp bufferpool.BufferPool) Option

WithBufferPool sets BufferPool

func WithBufferPoolSize

func WithBufferPoolSize(n int) Option

WithBufferPoolSize sets BufferPoolSize

func WithBytePool

func WithBytePool(bp bufferpool.BytePool) Option

WithBytePool sets BytePool

func WithBytePoolSize

func WithBytePoolSize(n int) Option

WithBytePoolSize sets BytePoolSize

func WithBytePoolWidth

func WithBytePoolWidth(n int) Option

WithBytePoolWidth sets BytePoolWidth

func WithConnTimeout

func WithConnTimeout(t time.Duration) Option

WithConnTimeout sets ConnTimeout

func WithGroupID

func WithGroupID(g string) Option

WithGroupID sets GroupID

func WithLogger

func WithLogger(log logger.Logger) Option

WithLogger sets Logger

func WithMaxPendingMessages

func WithMaxPendingMessages(n int) Option

WithMaxPendingMessages sets MaxPendingMessages

func WithMaxRetries

func WithMaxRetries(n int) Option

WithMaxRetries sets MaxRetries

func WithMetricsName

func WithMetricsName(name string) Option

WithMetricsName sets Logger

func WithMetricsRegistry

func WithMetricsRegistry(reg prometheus.Registerer) Option

WithMetricsRegistry sets Logger

func WithReadBufferSize

func WithReadBufferSize(n int) Option

WithReadBufferSize sets ReadBufferSize

func WithSendTimeout

func WithSendTimeout(t time.Duration) Option

WithSendTimeout sets SendTimeout

func WithSocketRecvBufferSize

func WithSocketRecvBufferSize(n int) Option

WithSocketRecvBufferSize sets SocketRecvBufferSize

func WithSocketSendBufferSize

func WithSocketSendBufferSize(n int) Option

WithSocketSendBufferSize sets SocketSendBufferSize

func WithURL

func WithURL(u string) Option

WithURL sets URL

func WithUpdateInterval

func WithUpdateInterval(u time.Duration) Option

WithUpdateInterval sets UpdateInterval

func WithWorkerNum

func WithWorkerNum(n int) Option

WithWorkerNum sets WorkerNum

func WithWriteBufferSize

func WithWriteBufferSize(n int) Option

WithWriteBufferSize sets WriteBufferSize

type Options

type Options struct {
	GroupID                 string                // InLong group ID
	URL                     string                // the Manager URL for discovering the DataProxy cluster
	UpdateInterval          time.Duration         // interval to refresh the endpoint list, default: 5m
	ConnTimeout             time.Duration         // connection timeout: default: 3000ms
	WriteBufferSize         int                   // write buffer size in bytes, default: 8M
	ReadBufferSize          int                   // read buffer size in bytes, default: 1M
	SocketSendBufferSize    int                   // socket send buffer size in bytes, default: 8M
	SocketRecvBufferSize    int                   // socket receive buffer size in bytes, default: 1M
	BufferPool              bufferpool.BufferPool // encoding/decoding buffer pool, if not given, SDK will init a new one
	BytePool                bufferpool.BytePool   // encoding/decoding byte pool, if not given, SDK will init a new one
	BufferPoolSize          int                   // buffer pool size, default: 409600
	BytePoolSize            int                   // byte pool size, default: 409600
	BytePoolWidth           int                   // byte pool width, default: equals to BatchingMaxSize
	Logger                  logger.Logger         // debug logger, default: stdout
	MetricsName             string                // the unique metrics name of this SDK, used to isolate metrics in the case that more than 1 client are initialized in one process
	MetricsRegistry         prometheus.Registerer // metrics registry, default: prometheus.DefaultRegisterer
	WorkerNum               int                   // worker number, default: 8
	SendTimeout             time.Duration         // send timeout, default: 30000ms
	MaxRetries              int                   // max retry count, default: 2
	BatchingMaxPublishDelay time.Duration         // the time period within which the messages sent will be batched, default: 20ms
	BatchingMaxMessages     int                   // the maximum number of messages permitted in a batch, default: 50
	BatchingMaxSize         int                   // the maximum number of bytes permitted in a batch, default: 40K
	MaxPendingMessages      int                   // the max size of the queue holding the messages pending to receive an acknowledgment from the broker, default: 204800
	BlockIfQueueIsFull      bool                  // whether Send and SendAsync block if producer's message queue is full, default: false
	AddColumns              map[string]string     // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy
	// contains filtered or unexported fields
}

Options is the DataProxy go client configs

func (*Options) ValidateAndSetDefault

func (options *Options) ValidateAndSetDefault() error

ValidateAndSetDefault validates an options and set up the default values

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL