gomdb

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2021 License: Apache-2.0 Imports: 8 Imported by: 1

README

Go Message DB Client

Checks PkgGoDev

This module implements a thin Go wrapper around the Message DB message store. Message DB is an event store implemented on top of Postgres, ideal for event sourcing applications.

The client supports all Message DB read and write procedures, choosing to default to their simplest forms and providing configurability through options functions.

Getting started

// Open database connection
db, err := sql.Open("postgres", "dbname=message_store sslmode=disable user=message_store")
if err != nil {
    log.Fatalf("unexpected error opening db: %s", err)
}
defer db.Close()

// Set search path for schema
if _, err := db.Exec("SET search_path TO message_store,public;"); err != nil {
    log.Fatalf("setting search path: %s", err)
}

// Create client
client := gomdb.NewClient(db)

// Read from stream
msgs, err := client.GetStreamMessages(context.Background(), stream)
if err != nil {
    log.Fatalf("reading from stream: %s", err)
}

log.Println(msgs)

See the examples or tests directory for more complete examples.

Subscriptions

Subscriptions are built on top of the GetStreamMessages and GetCategoryMessages methods and simply poll from the last read version or position.

subCtx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will stop the subscription

err := client.SubscribeToCategory(subCtx, "user",
    func(m *gomdb.Message) { // Message handler
        log.Printf("Received message: %v", m)
    },
    func(live bool) { // Liveness handler
        if live {
            log.Print("subscription is handling live messages!")
        } else {
            log.Print("subscription has fallen behind")
        }
    },
    func(err error) { // subscription dropped handler
        if err != nil {
            log.Fatalf("subscription dropped with error: %s", err)
        }
    },
)
if err != nil {
    log.Fatal(err)
}

Different polling strategies can be configured to reduce reads to the database for subscriptions that rarely receive messages. A default strategy can be set in the client, or a subscription specific strategy can be set when creating a subscription.

// Client configured with exponential backoff as default strategy.
client := gomdb.NewClient(
    db,
    gomdb.WithDefaultPollingStrategy(
        gomdb.ExpBackoffPolling(
            50*time.Millisecond, // minimum polling delay on no messages read
            5*time.Second,       // maximum polling delay on no messages read
            2,                   // delay will double for every read that returns no messages
        ),
    ),
)

// Client configured with constant polling interval as default strategy.
client = gomdb.NewClient(
    db,
    gomdb.WithDefaultPollingStrategy(
        gomdb.ConstantPolling(100*time.Millisecond), // polling delay on no messages read
    ),
)

// Default strategy overidden for specific subscription.
client.SubscribeToCategory(subCtx, "user",
    func(m *gomdb.Message) {}, // Message handler
    func(live bool) {},        // Liveness handler
    func(err error) {},        // subscription dropped handler
    gomdb.WithCategoryPollingStrategy(
        gomdb.ConstantPolling(time.Second)()), // poll every second
    ),
)

Running tests

The unit tests can be run with go test.

See the integration tests README for instructions on how to run integration tests.

Contributing

All contributions welcome, especially anyone with SQL experience who could tidy up how queries are run and how read errors are handled.

Documentation

Overview

Package gomdb provides a Client for calling Message DB procedures.

Index

Constants

View Source
const (
	// NoStreamVersion is expected version for a stream that doesn't exist.
	NoStreamVersion = int64(-1)
	// AnyVersion allows writing of a message regardless of the stream version.
	AnyVersion = int64(-2)
	// DefaultPollingInterval defines the default polling duration for
	// subscriptions.
	DefaultPollingInterval = 100 * time.Millisecond
)
View Source
const (
	// CorrelationKey attribute allows a component to tag an outbound message
	// with its origin
	CorrelationKey = "correlationStreamName"

	// WriteMessageSQL with (
	//   id,
	//   stream_name,
	//   type,
	//   data,
	//   metadata,
	//   expected_version
	// )
	WriteMessageSQL = "SELECT write_message($1, $2, $3, $4, $5, $6)"
	// GetStreamMessagesSQL with (
	//   stream_name,
	//   position,
	//   batch_size,
	//   condition
	// )
	GetStreamMessagesSQL = "SELECT * FROM get_stream_messages($1, $2, $3, $4)"
	// GetCategoryMessagesSQL with (
	//   category_name,
	//   position,
	//   batch_size,
	//   correlation,
	//   consumer_group_member,
	//   consumer_group_size,
	//   condition
	// )
	GetCategoryMessagesSQL = "SELECT * FROM get_category_messages($1, $2, $3, $4, $5, $6, $7)"
	// GetLastStreamMessageSQL with (stream_name)
	GetLastStreamMessageSQL = "SELECT * FROM get_last_stream_message($1)"
	// StreamVersionSQL with (stream_name)
	GetStreamVersionSQL = "SELECT * FROM stream_version($1)"
)
View Source
const StreamNameSeparator = "-"

StreamNameSeparator is the character used to separate the stream category from the stream ID in a stream name.

Variables

View Source
var (
	// ErrInvalidReadStreamVersion is returned when the stream version inside a
	// read call is less than zero.
	ErrInvalidReadStreamVersion = errors.New("stream version cannot be less than 0")
	// ErrInvalidReadBatchSize is returned when the batch size inside a read
	// call is less than one.
	ErrInvalidReadBatchSize = errors.New("batch size must be greater than 0")
	// ErrInvalidReadPosition is returned when the stream position inside a
	// read call is less than zero.
	ErrInvalidReadPosition = errors.New("stream position cannot be less than 0")
	// ErrInvalidConsumerGroupMember is returned when the consumer group ID
	// index is either less than zero or greater than or equal to the consumer
	// group size.
	ErrInvalidConsumerGroupMember = errors.New("consumer group member must be >= 0 < group size")
	// ErrInvalidConsumerGroupSize is returned when the consumer group size is
	// less that zero.
	ErrInvalidConsumerGroupSize = errors.New("consumer group size must be 0 or greater (0 to disbale consumer groups)")
)
View Source
var (
	// ErrInvalidMessageID is returned when the proposed message ID is not a
	// valid UUID.
	ErrInvalidMessageID = errors.New("proposed message ID must be a valid UUID")
	// ErrMissingType is returned when the proposed message is missing the
	// message type.
	ErrMissingType = errors.New("proposed message must include Type")
	// ErrMissingData is returned when the proposed message is missing any
	// data.
	ErrMissingData = errors.New("proposed message must include Data")
	// ErrMissingCategory is returned when the stream identifier category is
	// missing.
	ErrMissingCategory = errors.New("category cannot be blank")
	// ErrInvalidCategory is returned when the stream identifier category
	// contains the reserved stream name seperator character.
	ErrInvalidCategory = fmt.Errorf("category cannot contain separator (%s)", StreamNameSeparator)
	// ErrMissingStreamID is returned when the stream identifier ID is missing.
	ErrMissingStreamID = errors.New("ID cannot be blank")
)
View Source
var ErrUnexpectedStreamVersion = errors.New("unexpected stream version when writing message")

ErrUnexpectedStreamVersion is returned when a stream is not at the expected version when writing a message.

Functions

func ConstantPolling added in v0.2.0

func ConstantPolling(interval time.Duration) func() PollingStrategy

ConstantPolling returns a constant interval polling strategy

func DynamicPolling added in v0.4.0

func DynamicPolling(target float64, step, min, max time.Duration) func() PollingStrategy

DynamicPolling returns a factory for a PollingStrategy that will dynamically adjust a subscription's polling delay by the step amount so as to hit a target read utilisation. Read utilisation is the number of messages retreived over the number of messages expoected.

func ExpBackoffPolling added in v0.2.0

func ExpBackoffPolling(min, max time.Duration, multiplier float64) func() PollingStrategy

ExpBackoffPolling returns an exponential polling backoff strategy that starts at the min duration but is multipled for every read that did not return any messages up to the max duration. The backoff duration is reset to min everytime a message is read.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client exposes the message-db interface.

func NewClient

func NewClient(db *sql.DB, opts ...ClientOption) *Client

NewClient returns a new message-db client for the provided database.

func (*Client) GetCategoryMessages

func (c *Client) GetCategoryMessages(ctx context.Context, category string, opts ...GetCategoryOption) ([]*Message, error)

GetCategoryMessages reads messages from a category. By default the category is read from the beginning of the message store with a batch size of 1000. Use GetCategoryOptions to adjust this behaviour and to configure consumer groups and filtering.

func (*Client) GetLastStreamMessage

func (c *Client) GetLastStreamMessage(ctx context.Context, stream StreamIdentifier) (*Message, error)

GetLastStreamMessage returns the last message for the specified stream, or nil if the stream is empty.

func (*Client) GetStreamMessages

func (c *Client) GetStreamMessages(ctx context.Context, stream StreamIdentifier, opts ...GetStreamOption) ([]*Message, error)

GetStreamMessages reads messages from an individual stream. By default the stream is read from the beginning with a batch size of 1000. Use GetStreamOptions to adjust this behaviour.

func (*Client) GetStreamVersion

func (c *Client) GetStreamVersion(ctx context.Context, stream StreamIdentifier) (int64, error)

GetStreamVersion returns the version of the specified stream. Always check the error value before using the returned version.

func (*Client) SubscribeToCategory added in v0.2.0

func (c *Client) SubscribeToCategory(
	ctx context.Context,
	category string,
	handleMessage MessageHandler,
	handleLiveness LivenessHandler,
	handleDropped SubDroppedHandler,
	opts ...GetCategoryOption,
) error

SubscribeToCategory subscribes to a category and asynchronously passes messages to the message handler in batches. Once a subscription has caught up it will poll the database periodically for new messages. To stop a subscription cancel the provided context. When a subscription catches up it will call the LivenessHandler with true. If the subscription falls behind again it will called the LivenessHandler with false. If there is an error while reading messages then the subscription will be stopped and the SubDroppedHandler will be called with the stopping error. If the subscription is cancelled then the SubDroppedHandler will be called with nil.

func (*Client) SubscribeToStream added in v0.2.0

func (c *Client) SubscribeToStream(
	ctx context.Context,
	stream StreamIdentifier,
	handleMessage MessageHandler,
	handleLiveness LivenessHandler,
	handleDropped SubDroppedHandler,
	opts ...GetStreamOption,
) error

SubscribeToStream subscribes to a stream and asynchronously passes messages to the message handler in batches. Once a subscription has caught up it will poll the database periodically for new messages. To stop a subscription cancel the provided context. When a subscription catches up it will call the LivenessHandler with true. If the subscription falls behind again it will called the LivenessHandler with false. If there is an error while reading messages then the subscription will be stopped and the SubDroppedHandler will be called with the stopping error. If the subscription is cancelled then the SubDroppedHandler will be called with nil.

func (*Client) WriteMessage

func (c *Client) WriteMessage(ctx context.Context, stream StreamIdentifier, message ProposedMessage, expectedVersion int64) (int64, error)

WriteMessage attempted to write the proposed message to the specifed stream.

type ClientOption added in v0.2.0

type ClientOption func(*Client)

ClientOption is an option for modifiying how the Message DB client operates.

func WithDefaultPollingStrategy added in v0.4.0

func WithDefaultPollingStrategy(strat func() PollingStrategy) ClientOption

WithDefaultPollingStrategy configures to use the specified PollingStrategy as the default for new subscriptions.

type GetCategoryOption

type GetCategoryOption func(*categoryConfig)

GetCategoryOption is an option for modifiying how to read from a category.

func AsConsumerGroup

func AsConsumerGroup(member, size int64) GetCategoryOption

AsConsumerGroup specifies the consumer group options for this read. Size is used to specify the number of consumers, and member specifies which consumer is currently reading. Message-db used consistent hashing on stream names within a category and then distributes the streams amoungst the consumer group members.

func FromPosition

func FromPosition(position int64) GetCategoryOption

FromPosition specifies the inclusive global position from which to read messages.

func WithCategoryBatchSize

func WithCategoryBatchSize(batchSize int64) GetCategoryOption

WithCategoryBatchSize specifies the batch size to read messages.

func WithCategoryCondition

func WithCategoryCondition(condition string) GetCategoryOption

WithCategoryCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"

func WithCategoryPollingStrategy added in v0.4.0

func WithCategoryPollingStrategy(strat PollingStrategy) GetCategoryOption

WithCategoryPollingStrategy sets the polling strategy for this category subscription. Polling Strategies are only used in subscriptions.

func WithCorrelation

func WithCorrelation(correlation string) GetCategoryOption

WithCorrelation sets the correlation value that messages will be filtered by. correlation is compared against each messages medatadata correlationStreamName field.

type GetStreamOption

type GetStreamOption func(*streamConfig)

GetStreamOption is an option for modifiying how to read from a stream.

func FromVersion

func FromVersion(version int64) GetStreamOption

FromVersion specifies the inclusive version from which to read messages.

func WithStreamBatchSize

func WithStreamBatchSize(batchSize int64) GetStreamOption

WithStreamBatchSize specifies the batch size to read messages.

func WithStreamCondition

func WithStreamCondition(condition string) GetStreamOption

WithStreamCondition specifies an SQL condition to apply to the read request. For example: "messages.time::time >= current_time"

func WithStreamPollingStrategy added in v0.4.0

func WithStreamPollingStrategy(strat PollingStrategy) GetStreamOption

WithStreamPollingStrategy sets the polling strategy for this stream subscription. Polling Strategies are only used in subscriptions.

type LivenessHandler added in v0.2.0

type LivenessHandler func(bool)

LivenessHandler handles whether the subscription is in a "live" state or whether it is catching up.

type Message

type Message struct {
	ID             string
	Stream         StreamIdentifier
	Type           string
	Version        int64
	GlobalPosition int64
	Timestamp      time.Time
	// contains filtered or unexported fields
}

Message represents a message that was stored in message-db.

func (*Message) UnmarshalData

func (m *Message) UnmarshalData(i interface{}) error

UnmarshalData attempts to unmarshall the Message's data into the provided object.

func (*Message) UnmarshalMetadata

func (m *Message) UnmarshalMetadata(i interface{}) error

UnmarshalMetadata attempts to unmarshall the Message's metadata into the provided object.

type MessageHandler added in v0.2.0

type MessageHandler func(*Message)

MessageHandler handles messages as they appear after being written.

type PollingStrategy added in v0.2.0

type PollingStrategy func(retrieved, expected int64) time.Duration

PollingStrategy returns the delay duration before the next polling attempt based on how many messages were returned from the previous poll vs how many were expected.

type ProposedMessage

type ProposedMessage struct {
	ID       string
	Type     string
	Data     interface{}
	Metadata interface{}
}

ProposedMessage proposes a messages to be written to message-db.

type StreamIdentifier

type StreamIdentifier struct {
	Category string
	ID       string
}

StreamIdentifier captures the two components of a message-db stream name.

func (StreamIdentifier) String

func (si StreamIdentifier) String() string

String returns the string respresentation of a StreamIdentifier.

type SubDroppedHandler added in v0.2.0

type SubDroppedHandler func(error)

SubDroppedHandler handles errors that appear and stop the subscription.

Directories

Path Synopsis
tests module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL