Documentation
¶
Index ¶
- func GenClientID() string
- func RegisterPublisher(scheme string, factory PublisherFactory)
- func RegisterSubscriber(scheme string, factory SubscriberFactory)
- type Handler
- type HandlerFunc
- type InMemPubTopic
- type InMemPublisher
- type InMemSubTopic
- type InMemSubscriber
- type PubMessage
- type PubTopic
- type Publisher
- type PublisherFactory
- type RawSubMessage
- type StartPosition
- type SubMessage
- type SubOption
- type SubOptions
- type SubTopic
- type Subscriber
- type SubscriberFactory
- type Subscription
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenClientID ¶
func GenClientID() string
GenClientID generates random client ID. It is guaranteed to: start with "bps-", only contain [0-9A-Za-z-]
func RegisterPublisher ¶
func RegisterPublisher(scheme string, factory PublisherFactory)
RegisterPublisher registers a new protocol with a scheme and a corresponding PublisherFactory.
func RegisterSubscriber ¶
func RegisterSubscriber(scheme string, factory SubscriberFactory)
RegisterSubscriber registers a new protocol with a scheme and a corresponding SubscriberFactory.
Types ¶
type Handler ¶
type Handler interface {
Handle(SubMessage)
}
Handler defines a message handler. Consuming can be stopped by returning bps.Done.
func SafeHandler ¶ added in v0.1.0
SafeHandler wraps a handler with a mutex to synchronize access. It is intended to be used only by subscriber implementations which need it. It shouldn't be used by lib consumer.
type HandlerFunc ¶
type HandlerFunc func(SubMessage)
HandlerFunc is a func-based handler adapter.
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(msg SubMessage)
Handle handles a single message.
type InMemPubTopic ¶
type InMemPubTopic struct {
// contains filtered or unexported fields
}
InMemPubTopic is an in-memory implementation of a Topic. Useful for tests.
func (*InMemPubTopic) Messages ¶
func (t *InMemPubTopic) Messages() []*PubMessage
Messages returns published messages.
func (*InMemPubTopic) Publish ¶
func (t *InMemPubTopic) Publish(_ context.Context, msg *PubMessage) error
Publish implements Topic.
type InMemPublisher ¶
type InMemPublisher struct {
// contains filtered or unexported fields
}
InMemPublisher is an in-memory publisher implementation which can be used for tests.
func NewInMemPublisher ¶
func NewInMemPublisher() *InMemPublisher
NewInMemPublisher returns an initialised publisher.
func (*InMemPublisher) Topic ¶
func (p *InMemPublisher) Topic(name string) PubTopic
Topic implements Publisher interface. It will auto-provision a topic if it does not exist.
type InMemSubTopic ¶
type InMemSubTopic struct {
// contains filtered or unexported fields
}
InMemSubTopic is a subscriber topic handle, that consumes messages from seeded data. It is useful mainly for testing.
func NewInMemSubTopic ¶
func NewInMemSubTopic(msgs []SubMessage) *InMemSubTopic
NewInMemSubTopic returns new seeded in-memory subscriber topic handle.
func (*InMemSubTopic) Subscribe ¶
func (s *InMemSubTopic) Subscribe(handler Handler, _ ...SubOption) (Subscription, error)
Subscribe subscribes to in-memory messages by topic. It starts handling from the first (oldest) available message.
type InMemSubscriber ¶
type InMemSubscriber struct {
// contains filtered or unexported fields
}
InMemSubscriber is a subscriber, that consumes messages from seeded data. It is useful mainly for testing.
func NewInMemSubscriber ¶
func NewInMemSubscriber(messagesByTopic map[string][]SubMessage) *InMemSubscriber
NewInMemSubscriber returns new subscriber, that consumes messages from seeded data.
func (*InMemSubscriber) Close ¶
func (s *InMemSubscriber) Close() error
Close forgets seeded messages.
func (*InMemSubscriber) Replace ¶ added in v0.2.0
func (s *InMemSubscriber) Replace(messagesByTopic map[string][]SubMessage)
Replace replaces messages. It does not affect already used topics (they will return messages, available before Replace).
func (*InMemSubscriber) Topic ¶
func (s *InMemSubscriber) Topic(topic string) SubTopic
Topic returns named topic handle. Seeded messages are copied for each topic handle
type PubMessage ¶
type PubMessage struct { // ID is an optional message identifier. // It may not be supported by some implementations (then it is ignored). // Or may be used just to calculate partition the message. ID string `json:"id,omitempty"` // Data is the message payload. Data []byte `json:"data,omitempty"` // Attributes contains optional key-value labels. // It may not be supported by some implementations (then it is ignored). Attributes map[string]string `json:"attributes,omitempty"` }
PubMessage represents a single message for publishing.
type PubTopic ¶
type PubTopic interface { // Publish publishes a message to the topic. Publish(context.Context, *PubMessage) error }
PubTopic is a publisher handle to a topic.
type Publisher ¶
type Publisher interface { // Topic returns a topic handle by name. Topic(name string) PubTopic // Close closes the producer connection. Close() error }
Publisher defines the main publisher interface.
Example ¶
package main import ( "context" "fmt" "github.com/bsm/bps" ) func main() { ctx := context.Background() pub := bps.NewInMemPublisher() defer pub.Close() topicA := pub.Topic("topic-a") topicB := pub.Topic("topic-b") _ = topicA.Publish(ctx, &bps.PubMessage{ Data: []byte("message-1"), }) _ = topicB.Publish(ctx, &bps.PubMessage{ Data: []byte("message-2"), }) _ = topicA.Publish(ctx, &bps.PubMessage{ Data: []byte("message-2"), }) fmt.Println(len(topicA.(*bps.InMemPubTopic).Messages())) fmt.Println(len(topicB.(*bps.InMemPubTopic).Messages())) }
Output: 2 1
type PublisherFactory ¶
PublisherFactory constructs a publisher from a URL.
type RawSubMessage ¶
type RawSubMessage []byte
RawSubMessage is an adapter for raw slice of bytes that behaves as a SubMessage.
type StartPosition ¶
type StartPosition string
StartPosition defines starting position to consume messages.
const ( // PositionNewest tells to start consuming messages from the newest available // (published AFTER subscribing). PositionNewest StartPosition = "newest" // PositionOldest tells to start consuming messages from the oldest available // (published BEFORE subscribing). PositionOldest StartPosition = "oldest" )
StartPosition options.
type SubMessage ¶
type SubMessage interface { // Data returns raw (serialized) message data. Data() []byte }
SubMessage defines a subscription message details.
type SubOption ¶
type SubOption func(*SubOptions)
SubOption defines a single subscription option.
func IgnoreSubscriptionErrors ¶
func IgnoreSubscriptionErrors() SubOption
IgnoreSubscriptionErrors configures subscription to silently ignore errors.
func StartAt ¶
func StartAt(pos StartPosition) SubOption
StartAt configures subscription start position.
func WithErrorHandler ¶
WithErrorHandler configures subscription error handler.
type SubOptions ¶
type SubOptions struct { // StartAt defines starting position to consume messages. // May not be supported by some implementations. // Default: implementation-specific (PositionNewest is recommended). StartAt StartPosition // ErrorHandler is a subscription error handler (system/implementation-specific errors). // Default: log errors to STDERR. ErrorHandler func(error) }
SubOptions holds subscription options.
func (*SubOptions) Apply ¶
func (o *SubOptions) Apply(options []SubOption) *SubOptions
Apply configures SubOptions struct by applying each single SubOption one by one.
It is meant to be used by pubsub implementations like this:
func (s *SubImpl) Subscribe(..., options ...bps.SubOption) error { opts := (&bps.SubOptions{ // implementation-specific defaults }).Apply(options) ... }
type SubTopic ¶
type SubTopic interface { // Subscribe subscribes for topic messages and handles them in background // till error occurs or bps.Done is returned. // Handler is guaranteed to be called synchronously (messages are handled one by one). Subscribe(handler Handler, opts ...SubOption) (Subscription, error) }
SubTopic defines a subscriber topic handle.
type Subscriber ¶
type Subscriber interface { // Topic returns a subscriber topic handle. Topic(name string) SubTopic // Close closes the subscriber connection. Close() error }
Subscriber defines the main subscriber interface.
Example ¶
package main import ( "fmt" "time" "github.com/bsm/bps" ) func main() { subscriber := bps.NewInMemSubscriber( map[string][]bps.SubMessage{ "foo": { bps.RawSubMessage("foo1"), bps.RawSubMessage("foo2"), }, }, ) defer subscriber.Close() subscription, err := subscriber.Topic("foo").Subscribe( bps.HandlerFunc(func(msg bps.SubMessage) { fmt.Printf("%s\n", msg.Data()) }), ) if err != nil { panic(err.Error()) } defer subscription.Close() time.Sleep(time.Second) // wait to receive some messages }
Output: foo1 foo2
func NewSubscriber ¶
func NewSubscriber(ctx context.Context, urlStr string) (Subscriber, error)
NewSubscriber inits to a subscriber via URL.
sub, err := bps.NewSubscriber(context.TODO(), "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/namespace")
type SubscriberFactory ¶
SubscriberFactory constructs a subscriber from a URL.
type Subscription ¶
type Subscription interface { // Close stops message handling and frees resources. // It is safe to be called multiple times. Close() error }
Subscription defines a subscription-manager interface.