figg

package module
v0.0.0-...-703b195 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2023 License: MIT Imports: 8 Imported by: 2

README

Figg Go SDK

Usage

Connect

Note though Connect waits for the initial connection to succeed, if the connection drops the SDK will automatically reconnect.

import (
	"github.com/dunstall/figg/sdk"
)

// Connect with default options.
client, err := figg.Connect("10.26.104.52:8119")
if err != nil {
	// handle err
}

Options can be provided, such as WithReadBufLen, described in options.go

Subscribe

Subscribe to a topic to receive all messages published on that topic using Subscribe(name string, onMessage MessageCB, options ...TopicOption). Once subscribed the SDK ensures no messages are dropped by recovering missed messages while the client was disconnected.

Subscribe blocks until the server confirms the subscription is setup.

err := client.Subscribe("foo", func(m figg.Message)) {
	fmt.Println("message: ", string(m.Data), "offset: ", m.Offset)
})
if err != nil {
	// handle err
}

figg.Message is described in message.go. This contains both a Data field containing the published data, and an Offset field which points to the location of next message in the topic.

The SDK uses this offset to automatically recover missed messages when the connection is dropped, but it can also be passed as an option to Subscribe using WithOffset to continue from an old message (such as may persist the offset of the last message received to resume later).

Publish

Publish a message to topic foo using Publish(name string, data []byte, onACK func()).

client.Publish("foo", []byte("bar"), func() {
	fmt.Println("message acked")
})

To acheive high thoughput the SDK supports sending multiple message before the first has been acknowledged (similar to TCP), though does have a limit on the number of unacknowledged messages (configured with WithWindowSize). If the clients connection drops all unacknowledged will be retried (in order).

If its important to wait for each message to be acknowledged before sending the next a wrapper PublishWaitForACK can be used.

Note you do not need to be subscribed to publish a message to a topic.

Documentation

Index

Constants

View Source
const (
	DISCONNECTED = ConnState(iota)
	CONNECTED
)
View Source
const (
	DefaultReadBufLen   = 1 << 15 // 32 KB
	DefaultWindowSize   = 256
	DefaultPingInterval = 2 * time.Second
	DefaultMaxPingOut   = 2
)
View Source
const (
	// ATTACHED when the client is connected and is received messages published
	// to the topic.
	ATTACHED = TopicState(iota)
	// ATTACHING when clients connection has dropped and will reattach once
	// connected.
	ATTACHING
	// DETACHED when the user has explicitly unsubscribed from the topic.
	DETACHED
)

Variables

View Source
var (
	ErrAlreadySubscribed = errors.New("already subscribed")
)
View Source
var (
	ErrNotConnected = errors.New("not connected")
)

Functions

This section is empty.

Types

type ConnState

type ConnState int

func (ConnState) String

func (c ConnState) String() string

type ConnStateChangeCB

type ConnStateChangeCB func(state ConnState)

type Dialer

type Dialer interface {
	Dial(network string, address string) (net.Conn, error)
}

type Figg

type Figg struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(addr string, options ...Option) (*Figg, error)

Connect will attempt to connect to the given Figg node.

func (*Figg) Close

func (f *Figg) Close() error

func (*Figg) Publish

func (f *Figg) Publish(name string, data []byte, onACK func())

Publish publishes the data to the given topic. When the server acknowledges the message onACK is called.

func (*Figg) PublishNoACK

func (f *Figg) PublishNoACK(name string, data []byte)

PublishNoACK is the same as Publish except it doesn't wait for the message to be acknowledged

func (*Figg) PublishWaitForACK

func (f *Figg) PublishWaitForACK(name string, data []byte)

PublishBlocking is similar to Publish except it will block waiting for the message is acknowledged. Note this will seriously limit thoughput so if high thoughput is needed use Publish and don't wait for messages to be acknowledged before sending the next.

func (*Figg) Subscribe

func (f *Figg) Subscribe(name string, onMessage MessageCB, options ...TopicOption) error

Subscribe to the given topic.

Note only one subscriber is allowed per topic.

func (*Figg) Unsubscribe

func (f *Figg) Unsubscribe(topic string)

type Message

type Message struct {
	// Data contains the published payload.
	Data []byte

	// Offset is the messages position in the topic. This can be used to recover
	// messages from this offset.
	Offset uint64
}

type MessageCB

type MessageCB func(m *Message)

type Option

type Option func(*Options)

func WithConnStateChangeCB

func WithConnStateChangeCB(cb ConnStateChangeCB) Option

func WithDialer

func WithDialer(dialer Dialer) Option

func WithLogger

func WithLogger(logger *zap.Logger) Option

func WithMaxPingOut

func WithMaxPingOut(maxPingOut int) Option

func WithPingInterval

func WithPingInterval(pingInterval time.Duration) Option

func WithReadBufLen

func WithReadBufLen(readBufLen int) Option

func WithReconnectBackoffCB

func WithReconnectBackoffCB(cb ReconnectBackoffCB) Option

func WithWindowSize

func WithWindowSize(windowSize int) Option

type Options

type Options struct {
	// Addr is the address of the Figg node.
	Addr string

	// ReadBufLen is the size of the read buffer ontop of the socket.
	ReadBufLen int

	// Dialer is a custom dialer to connect to the server. If nil uses
	// net.Dialer with a 5 second timeout.
	Dialer Dialer

	// ReconnectBackoffCB is a callback to define a custom backoff strategy
	// when attempting to reconnect to the server. If nil uses a default
	// strategy where the retry doubles after each attempt, starting with a
	// 1 second interval after the first attempt, a maximum wait of 30
	// seconds, and adding 20% random jitter (see defaultReconnectBackoffCB).
	ReconnectBackoffCB ReconnectBackoffCB

	// ConnStateChangeCB is an optional callback called when the clients
	// connection state changes. Note this must not block.
	ConnStateChangeCB ConnStateChangeCB

	// WindowSize is the number of unacknowledged in-flight messages are allowed
	// before Publish blocking. Defaults to 256.
	WindowSize int

	// PingInterval is the time between sending pings. Defaults to 2 seconds.
	PingInterval time.Duration

	// MaxPingOut is the maximum number of pings that have not received a
	// pong before determining the connection has dropped. Defaults to 2.
	MaxPingOut int

	// Logger is a custom logger to log events, which should be configured with
	// the desired logging level. If nil no logging is used.
	Logger *zap.Logger
}

type ReconnectBackoffCB

type ReconnectBackoffCB func(attempts int) time.Duration

type TopicOption

type TopicOption func(*TopicOptions)

func WithOffset

func WithOffset(offset uint64) TopicOption

type TopicOptions

type TopicOptions struct {
	// Offset is an offset of an old message to subscribe from without missing
	// messages. This is only used if FromOffset is true, otherwise is ignored.
	Offset     uint64
	FromOffset bool
}

type TopicState

type TopicState int

func (TopicState) String

func (s TopicState) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL