nakadi

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2017 License: MIT Imports: 11 Imported by: 0

README

Build Status Coverage Status GoDoc

go-nakadi

Is a client for the Nakadi event broker written in and for Go. the package provides convenient access to many features of Nakadi's API.

The package can be used to manage event type definitions. The EventAPI can be used to inspect existing event types and allows further to create new event types and to update existing ones. The SubscriptionAPI provides subscription management: existing subscriptions can be fetched from Nakadi and of course it is also possible to create new ones. The PublishAPI of this library is used to broadcast event types of all event type categories via Nakadi. Last but not least, the package also implements a StreamAPI, which enables event processing on top of Nakadi's subscription based high level API.

To make the communication with Nakadi more resilient all sub APIs of this package can be configured to retry failed requests using an exponential back-off algorithm. Please consult the package documentation for further details.

Versions and stability

This package can be considered stable and ready to use. All releases follow the rules of semantic versioning.

Although the master branch is supposed to remain stable, there is not guarantee that braking changes will not be merged into master when major versions are released. Therefore the repository contains version tags in order to support vendoring tools such as glide. The tag names follow common conventions and have the following format v1.0.0.

Dependencies

Build dependencies

  • github.com/cenkalti/backoff
  • github.com/pkg/errors

Test dependencies

  • github.com/stretchr/testify
  • gopkg.in/jarcoal/httpmock.v1

Run unit and integration tests

In oder to run the unit and integration tests all of the above dependencies must be installed. Furthermore these tests require a running Nakadi instance on the local computer.

To run all tests invoke the following command within the go-nakadi root directory:

go test -tags=integration .

License

This project is open source an published under the MIT license.

Documentation

Overview

Package nakadi is a client library for the Nakadi event broker. It provides convenient access to many features of Nakadi's API. The package can be used to manage event type definitions.

The EventAPI can be used to inspect existing event types and allows further to create new event types and to update existing ones. The SubscriptionAPI provides subscription management: existing subscriptions can be fetched from Nakadi and of course it is also possible to create new ones. The PublishAPI of this package is used to broadcast event types of all event type categories via Nakadi. Last but not least, the package also implements a StreamAPI, which enables event processing on top of Nakadi's subscription based high level API.

To make the communication with Nakadi more resilient all sub APIs of this package can be configured to retry failed requests using an exponential back-off algorithm.

Example (Complete)
//  create a new client
client := nakadi.New("http://localhost:8080", &nakadi.ClientOptions{ConnectionTimeout: 500 * time.Millisecond})

// create an event api create a new event type
eventAPI := nakadi.NewEventAPI(client, &nakadi.EventOptions{Retry: true})
eventType := &nakadi.EventType{
	Name:                 "test-type",
	OwningApplication:    "test-app",
	Category:             "data",
	EnrichmentStrategies: []string{"metadata_enrichment"},
	PartitionStrategy:    "random",
	Schema: &nakadi.EventTypeSchema{
		Type:   "json_schema",
		Schema: `{"properties":{"test":{"type":"string"}}}`,
	},
}
err := eventAPI.Create(eventType)
if err != nil {
	log.Fatal(err)
}

// create a new subscription API and a new subscription
subAPI := nakadi.NewSubscriptionAPI(client, &nakadi.SubscriptionOptions{Retry: true})
sub := &nakadi.Subscription{
	OwningApplication: "another-app",
	EventTypes:        []string{"test-type"},
	ReadFrom:          "begin",
}
sub, err = subAPI.Create(sub)
if err != nil {
	log.Fatal(err)
}

// create a publish api and publish events
pubAPI := nakadi.NewPublishAPI(client, eventType.Name, nil)
event := nakadi.DataChangeEvent{
	Metadata: nakadi.EventMetadata{
		EID:        "9aabcd94-7ebd-11e7-898b-97df92934aa5",
		OccurredAt: time.Now(),
	},
	Data:     map[string]string{"test": "some value"},
	DataOP:   "U",
	DataType: "test",
}
err = pubAPI.PublishDataChangeEvent([]nakadi.DataChangeEvent{event})
if err != nil {
	log.Fatal(err)
}
fmt.Println("event published")

// create a new stream and read one event
stream := nakadi.NewStream(client, sub.ID, nil)
cursor, _, err := stream.NextEvents()
if err != nil {
	log.Fatal(err)
}

fmt.Println("1 event received")

stream.CommitCursor(cursor)
stream.Close()

subAPI.Delete(sub.ID)
eventAPI.Delete(eventType.Name)
Output:

event published
1 event received

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchItemResponse

type BatchItemResponse struct {
	EID              string `json:"eid"`
	PublishingStatus string `json:"publishing_status"`
	Step             string `json:"step"`
	Detail           string `json:"detail"`
}

BatchItemResponse if a batch is only published partially each batch item response contains information about whether a singe event was successfully published or not.

type BatchItemsError

type BatchItemsError []BatchItemResponse

BatchItemsError represents an error which contains information about the publishing status of each single event in a batch.

func (BatchItemsError) Error

func (err BatchItemsError) Error() string

Error implements the error interface for BatchItemsError.

type BusinessEvent

type BusinessEvent struct {
	Metadata    EventMetadata `json:"metadata"`
	OrderNumber string        `json:"order_number"`
}

BusinessEvent represents a Nakadi events from the category "business".

type Client

type Client struct {
	// contains filtered or unexported fields
}

A Client represents a basic configuration to access a Nakadi instance. The client is used to configure other sub APIs of the `go-nakadi` package.

func New

func New(url string, options *ClientOptions) *Client

New creates a new Nakadi client. New receives the URL of the Nakadi instance the client should connect to. In addition the second parameter options can be used to configure the behavior of the client and of all sub APIs in this package. The options may be nil.

type ClientOptions

type ClientOptions struct {
	TokenProvider     func() (string, error)
	ConnectionTimeout time.Duration
}

ClientOptions contains all non mandatory parameters used to instantiate the Nakadi client.

type Cursor

type Cursor struct {
	Partition      string `json:"partition"`
	Offset         string `json:"offset"`
	EventType      string `json:"event_type"`
	CursorToken    string `json:"cursor_token"`
	NakadiStreamID string `json:"-"`
}

A Cursor marks the current read position in a stream. It returned along with each received batch of events and is furthermore used to commit a batch of events (as well as all previous events).

type DataChangeEvent

type DataChangeEvent struct {
	Metadata EventMetadata `json:"metadata"`
	Data     interface{}   `json:"data"`
	DataOP   string        `json:"data_op"`
	DataType string        `json:"data_type"`
}

DataChangeEvent is a Nakadi event from the event category "data".

type EventAPI

type EventAPI struct {
	// contains filtered or unexported fields
}

EventAPI is a sub API that allows to inspect and manage event types on a Nakadi instance.

func NewEventAPI

func NewEventAPI(client *Client, options *EventOptions) *EventAPI

NewEventAPI creates a new instance of a EventAPI implementation which can be used to manage event types on a specific Nakadi service. The last parameter is a struct containing only optional parameters. The options may be nil.

func (*EventAPI) Create

func (e *EventAPI) Create(eventType *EventType) error

Create saves a new event type.

func (*EventAPI) Delete

func (e *EventAPI) Delete(name string) error

Delete removes an event type.

func (*EventAPI) Get

func (e *EventAPI) Get(name string) (*EventType, error)

Get returns an event type based on its name.

func (*EventAPI) List

func (e *EventAPI) List() ([]*EventType, error)

List returns all registered event types.

func (*EventAPI) Update

func (e *EventAPI) Update(eventType *EventType) error

Update updates an existing event type.

type EventMetadata

type EventMetadata struct {
	EID        string     `json:"eid"`
	OccurredAt time.Time  `json:"occurred_at"`
	EventType  string     `json:"event_type,omitempty"`
	Partition  string     `json:"partition,omitempty"`
	ParentEIDs []string   `json:"parent_eids,omitempty"`
	FlowID     string     `json:"flow_id,omitempty"`
	ReceivedAt *time.Time `json:"received_at,omitempty"`
}

EventMetadata represents the meta information which comes along with all Nakadi events. For publishing purposes only the fields eid and occurred_at must be present.

type EventOptions

type EventOptions struct {
	// Whether or not methods of the EventAPI retry when a request fails. If
	// set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have
	// no effect (default: false).
	Retry bool
	// The initial (minimal) retry interval used for the exponential backoff algorithm
	// when retry is enables.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
	// this value the retry intervals remain constant.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when when performing a request.
	// Once this value was reached the exponential backoff is halted and the request will
	// fail with an error.
	MaxElapsedTime time.Duration
}

EventOptions is a set of optional parameters used to configure the EventAPI.

type EventType

type EventType struct {
	Name                 string               `json:"name"`
	OwningApplication    string               `json:"owning_application"`
	Category             string               `json:"category"`
	EnrichmentStrategies []string             `json:"enrichment_strategies,omitempty"`
	PartitionStrategy    string               `json:"partition_strategy,omitempty"`
	CompatibilityMode    string               `json:"compatibility_mode,omitempty"`
	Schema               *EventTypeSchema     `json:"schema"`
	PartitionKeyFields   []string             `json:"partition_key_fields"`
	DefaultStatistics    *EventTypeStatistics `json:"default_statistics,omitempty"`
	Options              *EventTypeOptions    `json:"options,omitempty"`
	CreatedAt            time.Time            `json:"created_at,omitempty"`
	UpdatedAt            time.Time            `json:"updated_at,omitempty"`
}

An EventType defines a kind of event that can be processed on a Nakadi service.

type EventTypeOptions

type EventTypeOptions struct {
	RetentionTime int64 `json:"retention_time"`
}

EventTypeOptions provide additional parameters for tuning Nakadi.

type EventTypeSchema

type EventTypeSchema struct {
	Version   string    `json:"version,omitempty"`
	Type      string    `json:"type"`
	Schema    string    `json:"schema"`
	CreatedAt time.Time `json:"created_at,omitempty"`
}

EventTypeSchema is a non optional description of the schema on an event type.

type EventTypeStatistics

type EventTypeStatistics struct {
	MessagesPerMinute int `json:"messages_per_minute"`
	MessageSize       int `json:"message_size"`
	ReadParallelism   int `json:"read_parallelism"`
	WriteParallelism  int `json:"write_parallelism"`
}

EventTypeStatistics describe operational statistics for an event type. This statistics are used by Nakadi to optimize the throughput events from a certain kind. They are provided on event type creation.

type PublishAPI

type PublishAPI struct {
	// contains filtered or unexported fields
}

PublishAPI is a sub API for publishing Nakadi events. All publish methods emit events as a single batch. If a publish method returns an error, the caller should check whether the error is a BatchItemsError in order to verify which events of a batch have been published.

func NewPublishAPI

func NewPublishAPI(client *Client, eventType string, options *PublishOptions) *PublishAPI

NewPublishAPI creates a new instance of the PublishAPI which can be used to publish Nakadi events. As for all sub APIs of the `go-nakadi` package NewPublishAPI receives a configured Nakadi client. Furthermore the name of the event type must be provided. The last parameter is a struct containing only optional parameters. The options may be nil.

func (*PublishAPI) Publish

func (p *PublishAPI) Publish(events interface{}) error

Publish is used to emit a batch of undefined events. But can also be used to publish data change or business events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.

func (*PublishAPI) PublishBusinessEvent

func (p *PublishAPI) PublishBusinessEvent(events []BusinessEvent) error

PublishBusinessEvent emits a batch of business events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.

func (*PublishAPI) PublishDataChangeEvent

func (p *PublishAPI) PublishDataChangeEvent(events []DataChangeEvent) error

PublishDataChangeEvent emits a batch of data change events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.

type PublishOptions

type PublishOptions struct {
	// Whether or not publish methods retry when publishing fails. If set to true
	// InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have no effect
	// (default: false).
	Retry bool
	// The initial (minimal) retry interval used for the exponential backoff algorithm
	// when retry is enables.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
	// this value the retry intervals remain constant.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when publishing events. Once
	// this value was reached the exponential backoff is halted and the events will not be
	// published.
	MaxElapsedTime time.Duration
}

PublishOptions is a set of optional parameters used to configure the PublishAPI.

type StreamAPI

type StreamAPI struct {
	// contains filtered or unexported fields
}

A StreamAPI is a sub API which is used to consume events from a specific subscription using Nakadi's high level stream API. In order to ensure that only successfully processed events are committed, it is crucial to commit cursors of respective event batches in the same order they were received.

func NewStream

func NewStream(client *Client, subscriptionID string, options *StreamOptions) *StreamAPI

NewStream is used to instantiate a new steam processing sub API. As for all sub APIs of the `go-nakadi` package NewStream receives a configured Nakadi client. Furthermore a valid subscription ID must be provided. Use the SubscriptionAPI in order to obtain subscriptions. The options parameter can be used to configure the behavior of the stream. The options may be nil.

func (*StreamAPI) Close

func (s *StreamAPI) Close() error

Close ends the stream.

func (*StreamAPI) CommitCursor

func (s *StreamAPI) CommitCursor(cursor Cursor) error

CommitCursor commits a cursor to Nakadi.

func (*StreamAPI) NextEvents

func (s *StreamAPI) NextEvents() (Cursor, []byte, error)

NextEvents reads the next batch of events from the stream and returns the encoded events along with the respective cursor. It blocks until the batch of events can be read from the stream, or the stream is closed.

type StreamOptions

type StreamOptions struct {
	// The maximum number of Events in each chunk (and therefore per partition) of the stream (default: 1)
	BatchLimit int
	// The initial (minimal) retry interval used for the exponential backoff. This value is applied for
	// stream initialization as well as for cursor commits.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this value
	// the retry intervals remain constant. This value is applied for stream initialization as well as
	// for cursor commits.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when committing a cursor. Once this value
	// was reached the exponential backoff is halted and the cursor will not be committed.
	CommitMaxElapsedTime time.Duration
	// NotifyErr is called when an error occurs that leads to a retry. This notify function can be used to
	// detect unhealthy streams.
	NotifyErr func(error, time.Duration)
	// NotifyOK is called whenever a successful operation was completed. This notify function can be used
	// to detect that a stream is healthy again.
	NotifyOK func()
}

StreamOptions contains optional parameters that are used to create a StreamAPI.

type Subscription

type Subscription struct {
	ID                string    `json:"id,omitempty"`
	OwningApplication string    `json:"owning_application"`
	EventTypes        []string  `json:"event_types"`
	ConsumerGroup     string    `json:"consumer_group,omitempty"`
	ReadFrom          string    `json:"read_from,omitempty"`
	CreatedAt         time.Time `json:"created_at,omitempty"`
}

Subscription represents a subscription as used by the Nakadi high level API.

type SubscriptionAPI

type SubscriptionAPI struct {
	// contains filtered or unexported fields
}

SubscriptionAPI is a sub API that is used to manage subscriptions.

func NewSubscriptionAPI

func NewSubscriptionAPI(client *Client, options *SubscriptionOptions) *SubscriptionAPI

NewSubscriptionAPI crates a new instance of the SubscriptionAPI. As for all sub APIs of the `go-nakadi` package NewSubscriptionAPI receives a configured Nakadi client. The last parameter is a struct containing only optional \ parameters. The options may be nil.

func (*SubscriptionAPI) Create

func (s *SubscriptionAPI) Create(subscription *Subscription) (*Subscription, error)

Create initializes a new subscription. If the subscription already exists the pre existing subscription is returned.

func (*SubscriptionAPI) Delete

func (s *SubscriptionAPI) Delete(id string) error

Delete removes an existing subscription.

func (*SubscriptionAPI) Get

func (s *SubscriptionAPI) Get(id string) (*Subscription, error)

Get obtains a single subscription identified by its ID.

func (*SubscriptionAPI) List

func (s *SubscriptionAPI) List() ([]*Subscription, error)

List returns all available subscriptions.

type SubscriptionOptions

type SubscriptionOptions struct {
	// Whether or not methods of the SubscriptionAPI retry when a request fails. If
	// set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have
	// no effect (default: false).
	Retry bool
	// The initial (minimal) retry interval used for the exponential backoff algorithm
	// when retry is enables.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
	// this value the retry intervals remain constant.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when when performing a request.
	// Once this value was reached the exponential backoff is halted and the request will
	// fail with an error.
	MaxElapsedTime time.Duration
}

SubscriptionOptions is a set of optional parameters used to configure the SubscriptionAPI.

type UndefinedEvent

type UndefinedEvent struct {
	Metadata EventMetadata `json:"metadata"`
}

UndefinedEvent can be embedded in structs representing Nakadi events from the event category "undefined".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL