bps

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2022 License: Apache-2.0 Imports: 8 Imported by: 5

README

BPS

Build Status GoDoc Go Report Card Gem Version

Multi-backend abstraction for message processing and pubsub queues for Go and Ruby.

Documentation

Check auto-generated documentation:

Install

# go:
go get -u github.com/bsm/bps
go get -u github.com/bsm/bps/kafka
go get -u github.com/bsm/bps/nats
go get -u github.com/bsm/bps/stan

# ruby:
bundle add 'bps-kafka'
bundle add 'bps-nats'
bundle add 'bps-stan'

Backends: Go

Backends: Ruby

Publishing: Go

package main

import (
	"context"
	"fmt"

	"github.com/bsm/bps"
)

func main() {
	ctx := context.Background()
	pub := bps.NewInMemPublisher()
	defer pub.Close()

	topicA := pub.Topic("topic-a")
	topicB := pub.Topic("topic-b")

	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-1"),
	})
	_ = topicB.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})
	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})

	fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
	fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))

}

Publishing: Ruby

require 'bps/kafka'

pub = BPS::Publisher.resolve('kafka://localhost:9092')
top = pub.topic('topic')

top.publish('foo')
top.publish('bar')

pub.close

To seed multiple brokers, use:

BPS::Publisher.resolve('kafka://10.0.0.1,10.0.0.2,10.0.0.3:9092')

If your brokers are on different ports, try:

BPS::Publisher.resolve('kafka://10.0.0.1%3A9092,10.0.0.2%3A9093,10.0.0.3%3A9094')

Subscribing: Go

package main

import (
	"context"
	"fmt"

	"github.com/bsm/bps"
)

func main() {
	ctx := context.Background()
	pub := bps.NewInMemPublisher()
	defer pub.Close()

	topicA := pub.Topic("topic-a")
	topicB := pub.Topic("topic-b")

	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-1"),
	})
	_ = topicB.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})
	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})

	fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
	fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))

}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenClientID

func GenClientID() string

GenClientID generates random client ID. It is guaranteed to: start with "bps-", only contain [0-9A-Za-z-]

func RegisterPublisher

func RegisterPublisher(scheme string, factory PublisherFactory)

RegisterPublisher registers a new protocol with a scheme and a corresponding PublisherFactory.

func RegisterSubscriber

func RegisterSubscriber(scheme string, factory SubscriberFactory)

RegisterSubscriber registers a new protocol with a scheme and a corresponding SubscriberFactory.

Types

type Handler

type Handler interface {
	Handle(SubMessage)
}

Handler defines a message handler. Consuming can be stopped by returning bps.Done.

func SafeHandler added in v0.1.0

func SafeHandler(h Handler) Handler

SafeHandler wraps a handler with a mutex to synchronize access. It is intended to be used only by subscriber implementations which need it. It shouldn't be used by lib consumer.

type HandlerFunc

type HandlerFunc func(SubMessage)

HandlerFunc is a func-based handler adapter.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(msg SubMessage)

Handle handles a single message.

type InMemPubTopic

type InMemPubTopic struct {
	// contains filtered or unexported fields
}

InMemPubTopic is an in-memory implementation of a Topic. Useful for tests.

func (*InMemPubTopic) Messages

func (t *InMemPubTopic) Messages() []*PubMessage

Messages returns published messages.

func (*InMemPubTopic) Publish

func (t *InMemPubTopic) Publish(_ context.Context, msg *PubMessage) error

Publish implements Topic.

type InMemPublisher

type InMemPublisher struct {
	// contains filtered or unexported fields
}

InMemPublisher is an in-memory publisher implementation which can be used for tests.

func NewInMemPublisher

func NewInMemPublisher() *InMemPublisher

NewInMemPublisher returns an initialised publisher.

func (*InMemPublisher) Close

func (*InMemPublisher) Close() error

Close implements Publisher.

func (*InMemPublisher) Topic

func (p *InMemPublisher) Topic(name string) PubTopic

Topic implements Publisher interface. It will auto-provision a topic if it does not exist.

type InMemSubTopic

type InMemSubTopic struct {
	// contains filtered or unexported fields
}

InMemSubTopic is a subscriber topic handle, that consumes messages from seeded data. It is useful mainly for testing.

func NewInMemSubTopic

func NewInMemSubTopic(msgs []SubMessage) *InMemSubTopic

NewInMemSubTopic returns new seeded in-memory subscriber topic handle.

func (*InMemSubTopic) Subscribe

func (s *InMemSubTopic) Subscribe(handler Handler, _ ...SubOption) (Subscription, error)

Subscribe subscribes to in-memory messages by topic. It starts handling from the first (oldest) available message.

type InMemSubscriber

type InMemSubscriber struct {
	// contains filtered or unexported fields
}

InMemSubscriber is a subscriber, that consumes messages from seeded data. It is useful mainly for testing.

func NewInMemSubscriber

func NewInMemSubscriber(messagesByTopic map[string][]SubMessage) *InMemSubscriber

NewInMemSubscriber returns new subscriber, that consumes messages from seeded data.

func (*InMemSubscriber) Close

func (s *InMemSubscriber) Close() error

Close forgets seeded messages.

func (*InMemSubscriber) Replace added in v0.2.0

func (s *InMemSubscriber) Replace(messagesByTopic map[string][]SubMessage)

Replace replaces messages. It does not affect already used topics (they will return messages, available before Replace).

func (*InMemSubscriber) Topic

func (s *InMemSubscriber) Topic(topic string) SubTopic

Topic returns named topic handle. Seeded messages are copied for each topic handle

type PubMessage

type PubMessage struct {
	// ID is an optional message identifier.
	// It may not be supported by some implementations (then it is ignored).
	// Or may be used just to calculate partition the message.
	ID string `json:"id,omitempty"`

	// Data is the message payload.
	Data []byte `json:"data,omitempty"`

	// Attributes contains optional key-value labels.
	// It may not be supported by some implementations (then it is ignored).
	Attributes map[string]string `json:"attributes,omitempty"`
}

PubMessage represents a single message for publishing.

type PubTopic

type PubTopic interface {
	// Publish publishes a message to the topic.
	Publish(context.Context, *PubMessage) error
}

PubTopic is a publisher handle to a topic.

type Publisher

type Publisher interface {
	// Topic returns a topic handle by name.
	Topic(name string) PubTopic
	// Close closes the producer connection.
	Close() error
}

Publisher defines the main publisher interface.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bsm/bps"
)

func main() {
	ctx := context.Background()
	pub := bps.NewInMemPublisher()
	defer pub.Close()

	topicA := pub.Topic("topic-a")
	topicB := pub.Topic("topic-b")

	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-1"),
	})
	_ = topicB.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})
	_ = topicA.Publish(ctx, &bps.PubMessage{
		Data: []byte("message-2"),
	})

	fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages()))
	fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages()))

}
Output:

2
1

func NewPublisher

func NewPublisher(ctx context.Context, urlStr string) (Publisher, error)

NewPublisher inits to a publisher via URL.

pub, err := bps.NewPublisher(context.TODO(), "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/namespace")

type PublisherFactory

type PublisherFactory func(context.Context, *url.URL) (Publisher, error)

PublisherFactory constructs a publisher from a URL.

type RawSubMessage

type RawSubMessage []byte

RawSubMessage is an adapter for raw slice of bytes that behaves as a SubMessage.

func (RawSubMessage) Data

func (m RawSubMessage) Data() []byte

Data returns raw message bytes.

type StartPosition

type StartPosition string

StartPosition defines starting position to consume messages.

const (
	// PositionNewest tells to start consuming messages from the newest available
	// (published AFTER subscribing).
	PositionNewest StartPosition = "newest"

	// PositionOldest tells to start consuming messages from the oldest available
	// (published BEFORE subscribing).
	PositionOldest StartPosition = "oldest"
)

StartPosition options.

type SubMessage

type SubMessage interface {
	// Data returns raw (serialized) message data.
	Data() []byte
}

SubMessage defines a subscription message details.

type SubOption

type SubOption func(*SubOptions)

SubOption defines a single subscription option.

func IgnoreSubscriptionErrors

func IgnoreSubscriptionErrors() SubOption

IgnoreSubscriptionErrors configures subscription to silently ignore errors.

func StartAt

func StartAt(pos StartPosition) SubOption

StartAt configures subscription start position.

func WithErrorHandler

func WithErrorHandler(h func(error)) SubOption

WithErrorHandler configures subscription error handler.

type SubOptions

type SubOptions struct {
	// StartAt defines starting position to consume messages.
	// May not be supported by some implementations.
	// Default: implementation-specific (PositionNewest is recommended).
	StartAt StartPosition
	// ErrorHandler is a subscription error handler (system/implementation-specific errors).
	// Default: log errors to STDERR.
	ErrorHandler func(error)
}

SubOptions holds subscription options.

func (*SubOptions) Apply

func (o *SubOptions) Apply(options []SubOption) *SubOptions

Apply configures SubOptions struct by applying each single SubOption one by one.

It is meant to be used by pubsub implementations like this:

func (s *SubImpl) Subscribe(..., options ...bps.SubOption) error {
  opts := (&bps.SubOptions{
    // implementation-specific defaults
  }).Apply(options)
  ...
}

type SubTopic

type SubTopic interface {
	// Subscribe subscribes for topic messages and handles them in background
	// till error occurs or bps.Done is returned.
	// Handler is guaranteed to be called synchronously (messages are handled one by one).
	Subscribe(handler Handler, opts ...SubOption) (Subscription, error)
}

SubTopic defines a subscriber topic handle.

type Subscriber

type Subscriber interface {
	// Topic returns a subscriber topic handle.
	Topic(name string) SubTopic
	// Close closes the subscriber connection.
	Close() error
}

Subscriber defines the main subscriber interface.

Example
package main

import (
	"fmt"
	"time"

	"github.com/bsm/bps"
)

func main() {
	subscriber := bps.NewInMemSubscriber(
		map[string][]bps.SubMessage{
			"foo": {
				bps.RawSubMessage("foo1"),
				bps.RawSubMessage("foo2"),
			},
		},
	)
	defer subscriber.Close()

	subscription, err := subscriber.Topic("foo").Subscribe(
		bps.HandlerFunc(func(msg bps.SubMessage) {
			fmt.Printf("%s\n", msg.Data())
		}),
	)
	if err != nil {
		panic(err.Error())
	}
	defer subscription.Close()

	time.Sleep(time.Second) // wait to receive some messages

}
Output:

foo1
foo2

func NewSubscriber

func NewSubscriber(ctx context.Context, urlStr string) (Subscriber, error)

NewSubscriber inits to a subscriber via URL.

sub, err := bps.NewSubscriber(context.TODO(), "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/namespace")

type SubscriberFactory

type SubscriberFactory func(context.Context, *url.URL) (Subscriber, error)

SubscriberFactory constructs a subscriber from a URL.

type Subscription

type Subscription interface {
	// Close stops message handling and frees resources.
	// It is safe to be called multiple times.
	Close() error
}

Subscription defines a subscription-manager interface.

Directories

Path Synopsis
file module
internal
concurrent
Package concurrent provides concurrent primitives/shortcuts.
Package concurrent provides concurrent primitives/shortcuts.
kafka module
nats module
pubsub module
stan module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL