nakadi

package module
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2018 License: MIT Imports: 17 Imported by: 0

README

Build Status Coverage Status GoDoc

go-nakadi

Is a client for the Nakadi event broker written in and for Go. the package provides convenient access to many features of Nakadi's API.

The package can be used to manage event type definitions. The EventAPI can be used to inspect existing event types and allows further to create new event types and to update existing ones. The SubscriptionAPI provides subscription management: existing subscriptions can be fetched from Nakadi and of course it is also possible to create new ones. The PublishAPI of this library is used to broadcast event types of all event type categories via Nakadi. Last but not least, the package also implements a StreamAPI, which enables event processing on top of Nakadi's subscription based high level API.

To make the communication with Nakadi more resilient all sub APIs of this package can be configured to retry failed requests using an exponential back-off algorithm. Please consult the package documentation for further details.

Versions and stability

This package can be considered stable and ready to use. All releases follow the rules of semantic versioning.

Although the master branch is supposed to remain stable, there is not guarantee that braking changes will not be merged into master when major versions are released. Therefore the repository contains version tags in order to support vendoring tools such as glide. The tag names follow common conventions and have the following format v1.0.0.

Dependencies

Build dependencies

  • github.com/cenkalti/backoff
  • github.com/pkg/errors

Test dependencies

  • github.com/stretchr/testify
  • gopkg.in/jarcoal/httpmock.v1

Run unit and integration tests

In oder to run the unit and integration tests all of the above dependencies must be installed. Furthermore these tests require a running Nakadi instance on the local computer.

To run all tests invoke the following command within the go-nakadi root directory:

go test -tags=integration .

License

This project is open source an published under the MIT license.

Documentation

Overview

Package nakadi is a client library for the Nakadi event broker. It provides convenient access to many features of Nakadi's API. The package can be used to manage event type definitions.

The EventAPI can be used to inspect existing event types and allows further to create new event types and to update existing ones. The SubscriptionAPI provides subscription management: existing subscriptions can be fetched from Nakadi and of course it is also possible to create new ones. The PublishAPI of this package is used to broadcast event types of all event type categories via Nakadi. Last but not least, the package also implements a StreamAPI, which enables event processing on top of Nakadi's subscription based high level API.

To make the communication with Nakadi more resilient all sub APIs of this package can be configured to retry failed requests using an exponential back-off algorithm.

Example (Complete)
//  create a new client
client := nakadi.New("http://localhost:8080", &nakadi.ClientOptions{ConnectionTimeout: 500 * time.Millisecond})

// create an event api create a new event type
eventAPI := nakadi.NewEventAPI(client, &nakadi.EventOptions{Retry: true})
eventType := &nakadi.EventType{
	Name:                 "test-type",
	OwningApplication:    "test-app",
	Category:             "data",
	EnrichmentStrategies: []string{"metadata_enrichment"},
	PartitionStrategy:    "random",
	Schema: &nakadi.EventTypeSchema{
		Type:   "json_schema",
		Schema: `{"properties":{"test":{"type":"string"}}}`,
	},
}
err := eventAPI.Create(eventType)
if err != nil {
	log.Fatal(err)
}

// create a new subscription API and a new subscription
subAPI := nakadi.NewSubscriptionAPI(client, &nakadi.SubscriptionOptions{Retry: true})
sub := &nakadi.Subscription{
	OwningApplication: "another-app",
	EventTypes:        []string{"test-type"},
	ReadFrom:          "begin",
}
sub, err = subAPI.Create(sub)
if err != nil {
	log.Fatal(err)
}

// create a publish api and publish events
pubAPI := nakadi.NewPublishAPI(client, eventType.Name, nil)
event := nakadi.DataChangeEvent{
	Metadata: nakadi.EventMetadata{
		EID:        "9aabcd94-7ebd-11e7-898b-97df92934aa5",
		OccurredAt: time.Now(),
	},
	Data:     map[string]string{"test": "some value"},
	DataOP:   "U",
	DataType: "test",
}
err = pubAPI.PublishDataChangeEvent([]nakadi.DataChangeEvent{event})
if err != nil {
	log.Fatal(err)
}
fmt.Println("event published")

// create a new stream and read one event
stream := nakadi.NewStream(client, sub.ID, nil)
cursor, _, err := stream.NextEvents()
if err != nil {
	log.Fatal(err)
}

fmt.Println("1 event received")

stream.CommitCursor(cursor)
stream.Close()

subAPI.Delete(sub.ID)
eventAPI.Delete(eventType.Name)
Output:

event published
1 event received

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchItemResponse

type BatchItemResponse struct {
	EID              string `json:"eid"`
	PublishingStatus string `json:"publishing_status"`
	Step             string `json:"step"`
	Detail           string `json:"detail"`
}

BatchItemResponse if a batch is only published partially each batch item response contains information about whether a singe event was successfully published or not.

type BatchItemsError

type BatchItemsError []BatchItemResponse

BatchItemsError represents an error which contains information about the publishing status of each single event in a batch.

func (BatchItemsError) Error

func (err BatchItemsError) Error() string

Error implements the error interface for BatchItemsError.

func (BatchItemsError) Format added in v1.4.5

func (err BatchItemsError) Format(s fmt.State, verb rune)

Format implements fmt.Formatter for BatchItemsError

type BusinessEvent

type BusinessEvent struct {
	Metadata    EventMetadata `json:"metadata"`
	OrderNumber string        `json:"order_number"`
}

BusinessEvent represents a Nakadi events from the category "business".

type Client

type Client struct {
	// contains filtered or unexported fields
}

A Client represents a basic configuration to access a Nakadi instance. The client is used to configure other sub APIs of the `go-nakadi` package.

func New

func New(url string, options *ClientOptions) *Client

New creates a new Nakadi client. New receives the URL of the Nakadi instance the client should connect to. In addition the second parameter options can be used to configure the behavior of the client and of all sub APIs in this package. The options may be nil.

type ClientOptions

type ClientOptions struct {
	TokenProvider     func() (string, error)
	ConnectionTimeout time.Duration
}

ClientOptions contains all non mandatory parameters used to instantiate the Nakadi client.

type Cursor

type Cursor struct {
	Partition      string `json:"partition"`
	Offset         string `json:"offset"`
	EventType      string `json:"event_type"`
	CursorToken    string `json:"cursor_token"`
	NakadiStreamID string `json:"-"`
}

A Cursor marks the current read position in a stream. It returned along with each received batch of events and is furthermore used to commit a batch of events (as well as all previous events).

type DataChangeEvent

type DataChangeEvent struct {
	Metadata EventMetadata `json:"metadata"`
	Data     interface{}   `json:"data"`
	DataOP   string        `json:"data_op"`
	DataType string        `json:"data_type"`
}

DataChangeEvent is a Nakadi event from the event category "data".

type EventAPI

type EventAPI struct {
	// contains filtered or unexported fields
}

EventAPI is a sub API that allows to inspect and manage event types on a Nakadi instance.

func NewEventAPI

func NewEventAPI(client *Client, options *EventOptions) *EventAPI

NewEventAPI creates a new instance of a EventAPI implementation which can be used to manage event types on a specific Nakadi service. The last parameter is a struct containing only optional parameters. The options may be nil.

func (*EventAPI) Create

func (e *EventAPI) Create(eventType *EventType) error

Create saves a new event type.

func (*EventAPI) Delete

func (e *EventAPI) Delete(name string) error

Delete removes an event type.

func (*EventAPI) Get

func (e *EventAPI) Get(name string) (*EventType, error)

Get returns an event type based on its name.

func (*EventAPI) List

func (e *EventAPI) List() ([]*EventType, error)

List returns all registered event types.

func (*EventAPI) Update

func (e *EventAPI) Update(eventType *EventType) error

Update updates an existing event type.

type EventMetadata

type EventMetadata struct {
	EID        string     `json:"eid"`
	OccurredAt time.Time  `json:"occurred_at"`
	EventType  string     `json:"event_type,omitempty"`
	Partition  string     `json:"partition,omitempty"`
	ParentEIDs []string   `json:"parent_eids,omitempty"`
	FlowID     string     `json:"flow_id,omitempty"`
	ReceivedAt *time.Time `json:"received_at,omitempty"`
}

EventMetadata represents the meta information which comes along with all Nakadi events. For publishing purposes only the fields eid and occurred_at must be present.

type EventOptions

type EventOptions struct {
	// Whether or not methods of the EventAPI retry when a request fails. If
	// set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have
	// no effect (default: false).
	Retry bool
	// The initial (minimal) retry interval used for the exponential backoff algorithm
	// when retry is enables.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
	// this value the retry intervals remain constant.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when when performing a request.
	// Once this value was reached the exponential backoff is halted and the request will
	// fail with an error.
	MaxElapsedTime time.Duration
}

EventOptions is a set of optional parameters used to configure the EventAPI.

type EventType

type EventType struct {
	Name                 string               `json:"name"`
	OwningApplication    string               `json:"owning_application"`
	Category             string               `json:"category"`
	EnrichmentStrategies []string             `json:"enrichment_strategies,omitempty"`
	PartitionStrategy    string               `json:"partition_strategy,omitempty"`
	CompatibilityMode    string               `json:"compatibility_mode,omitempty"`
	Schema               *EventTypeSchema     `json:"schema"`
	PartitionKeyFields   []string             `json:"partition_key_fields"`
	DefaultStatistics    *EventTypeStatistics `json:"default_statistics,omitempty"`
	Options              *EventTypeOptions    `json:"options,omitempty"`
	CreatedAt            time.Time            `json:"created_at,omitempty"`
	UpdatedAt            time.Time            `json:"updated_at,omitempty"`
}

An EventType defines a kind of event that can be processed on a Nakadi service.

type EventTypeOptions

type EventTypeOptions struct {
	RetentionTime int64 `json:"retention_time"`
}

EventTypeOptions provide additional parameters for tuning Nakadi.

type EventTypeSchema

type EventTypeSchema struct {
	Version   string    `json:"version,omitempty"`
	Type      string    `json:"type"`
	Schema    string    `json:"schema"`
	CreatedAt time.Time `json:"created_at,omitempty"`
}

EventTypeSchema is a non optional description of the schema on an event type.

type EventTypeStatistics

type EventTypeStatistics struct {
	MessagesPerMinute int `json:"messages_per_minute"`
	MessageSize       int `json:"message_size"`
	ReadParallelism   int `json:"read_parallelism"`
	WriteParallelism  int `json:"write_parallelism"`
}

EventTypeStatistics describe operational statistics for an event type. This statistics are used by Nakadi to optimize the throughput events from a certain kind. They are provided on event type creation.

type Operation added in v1.2.0

type Operation func(int, string, []byte) error

Operation defines a certain procedure that consumes the event data from a processor. An operation, can either be successful or may fail with an error. Operation receives three parameters: the stream number or position in the processor, the Nakadi stream id of the underlying stream, and the json encoded event payload.

type PartitionStats added in v1.1.0

type PartitionStats struct {
	Partition        string `json:"partition"`
	State            string `json:"state"`
	UnconsumedEvents int    `json:"unconsumed_events"`
	StreamID         string `json:"stream_id"`
}

PartitionStats represents statistic information for the particular partition

type Processor added in v1.2.0

type Processor struct {
	sync.Mutex
	// contains filtered or unexported fields
}

A Processor for nakadi events. The Processor is a high level API for consuming events from Nakadi. It can process event batches from multiple partitions (streams) and can be configured with a certain event rate, that limits the number of processed events per minute. The cursors of event batches that were successfully processed are automatically committed.

func NewProcessor added in v1.2.0

func NewProcessor(client *Client, subscriptionID string, options *ProcessorOptions) *Processor

NewProcessor creates a new processor for a given subscription ID. The constructor receives a configured Nakadi client as first parameter. Furthermore a valid subscription ID must be provided. The last parameter is a struct containing only optional parameters. The options may be nil, in this case the processor falls back to the defaults defined in the ProcessorOptions.

func (*Processor) Start added in v1.2.0

func (p *Processor) Start(operation Operation) error

Start begins event processing. All event batches received from the underlying streams are passed to the operation function. If the operation function terminates without error the respective cursor will be automatically committed to Nakadi. If the operations terminates with an error, the underlying stream will be halted and a new stream will continue to pass event batches to the operation function.

Event processing will go on indefinitely unless the processor is stopped via its Stop method. Star will return an error if the processor is already running.

func (*Processor) Stop added in v1.2.0

func (p *Processor) Stop() error

Stop halts all steams and terminates event processing. Stop will return with an error if the processor is not running.

type ProcessorOptions added in v1.2.0

type ProcessorOptions struct {
	// The maximum number of Events in each chunk (and therefore per partition) of the stream (default: 1)
	BatchLimit uint
	// Maximum time in seconds to wait for the flushing of each chunk (per partition).(default: 30)
	FlushTimeout uint
	// The number of parallel streams the Processor will use to consume events (default: 1)
	StreamCount uint
	// Limits the number of events that the processor will handle per minute. This value represents an
	// upper bound, if some streams are not healthy e.g. if StreamCount exceeds the number of partitions,
	// or the actual batch size is lower than BatchLimit the actual number of processed events can be
	// much lower. 0 is interpreted as no limit at all (default: no limit)
	EventsPerMinute uint
	// The amount of uncommitted events Nakadi will stream before pausing the stream. When in paused
	// state and commit comes - the stream will resume. If MaxUncommittedEvents is lower than BatchLimit,
	// effective batch size will be upperbound by MaxUncommittedEvents. (default: 10, minimum: 1)
	MaxUncommittedEvents uint
	// The initial (minimal) retry interval used for the exponential backoff. This value is applied for
	// stream initialization as well as for cursor commits.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this value
	// the retry intervals remain constant. This value is applied for stream initialization as well as
	// for cursor commits.
	MaxRetryInterval time.Duration
	// CommitMaxElapsedTime is the maximum time spent on retries when committing a cursor. Once this value
	// was reached the exponential backoff is halted and the cursor will not be committed.
	CommitMaxElapsedTime time.Duration
	// NotifyErr is called when an error occurs that leads to a retry. This notify function can be used to
	// detect unhealthy streams. The first parameter indicates the stream No that encountered the error.
	NotifyErr func(uint, error, time.Duration)
	// NotifyOK is called whenever a successful operation was completed. This notify function can be used
	// to detect that a stream is healthy again. The first parameter indicates the stream No that just
	// regained health.
	NotifyOK func(uint)
}

ProcessorOptions contains optional parameters that are used to create a Processor.

type PublishAPI

type PublishAPI struct {
	// contains filtered or unexported fields
}

PublishAPI is a sub API for publishing Nakadi events. All publish methods emit events as a single batch. If a publish method returns an error, the caller should check whether the error is a BatchItemsError in order to verify which events of a batch have been published.

func NewPublishAPI

func NewPublishAPI(client *Client, eventType string, options *PublishOptions) *PublishAPI

NewPublishAPI creates a new instance of the PublishAPI which can be used to publish Nakadi events. As for all sub APIs of the `go-nakadi` package NewPublishAPI receives a configured Nakadi client. Furthermore the name of the event type must be provided. The last parameter is a struct containing only optional parameters. The options may be nil.

func (*PublishAPI) Publish

func (p *PublishAPI) Publish(events interface{}) error

Publish is used to emit a batch of undefined events. But can also be used to publish data change or business events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.

func (*PublishAPI) PublishBusinessEvent

func (p *PublishAPI) PublishBusinessEvent(events []BusinessEvent) error

PublishBusinessEvent emits a batch of business events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.

func (*PublishAPI) PublishDataChangeEvent

func (p *PublishAPI) PublishDataChangeEvent(events []DataChangeEvent) error

PublishDataChangeEvent emits a batch of data change events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.

type PublishOptions

type PublishOptions struct {
	// Whether or not publish methods retry when publishing fails. If set to true
	// InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have no effect
	// (default: false).
	Retry bool
	// The initial (minimal) retry interval used for the exponential backoff algorithm
	// when retry is enables.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
	// this value the retry intervals remain constant.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when publishing events. Once
	// this value was reached the exponential backoff is halted and the events will not be
	// published.
	MaxElapsedTime time.Duration
}

PublishOptions is a set of optional parameters used to configure the PublishAPI.

type StreamAPI

type StreamAPI struct {
	// contains filtered or unexported fields
}

A StreamAPI is a sub API which is used to consume events from a specific subscription using Nakadi's high level stream API. In order to ensure that only successfully processed events are committed, it is crucial to commit cursors of respective event batches in the same order they were received.

func NewStream

func NewStream(client *Client, subscriptionID string, options *StreamOptions) *StreamAPI

NewStream is used to instantiate a new steam processing sub API. As for all sub APIs of the `go-nakadi` package NewStream receives a configured Nakadi client. Furthermore a valid subscription ID must be provided. Use the SubscriptionAPI in order to obtain subscriptions. The options parameter can be used to configure the behavior of the stream. The options may be nil.

func (*StreamAPI) Close

func (s *StreamAPI) Close() error

Close ends the stream.

func (*StreamAPI) CommitCursor

func (s *StreamAPI) CommitCursor(cursor Cursor) error

CommitCursor commits a cursor to Nakadi.

func (*StreamAPI) NextEvents

func (s *StreamAPI) NextEvents() (Cursor, []byte, error)

NextEvents reads the next batch of events from the stream and returns the encoded events along with the respective cursor. It blocks until the batch of events can be read from the stream, or the stream is closed.

type StreamOptions

type StreamOptions struct {
	// The maximum number of Events in each chunk (and therefore per partition) of the stream (default: 1)
	BatchLimit uint
	// Maximum time in seconds to wait for the flushing of each chunk (per partition).(default: 30)
	FlushTimeout uint
	// The amount of uncommitted events Nakadi will stream before pausing the stream. When in paused
	// state and commit comes - the stream will resume. If MaxUncommittedEvents is lower than BatchLimit,
	// effective batch size will be upperbound by MaxUncommittedEvents. (default: 10, minimum: 1)
	MaxUncommittedEvents uint
	// The initial (minimal) retry interval used for the exponential backoff. This value is applied for
	// stream initialization as well as for cursor commits.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this value
	// the retry intervals remain constant. This value is applied for stream initialization as well as
	// for cursor commits.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when committing a cursor. Once this value
	// was reached the exponential backoff is halted and the cursor will not be committed.
	CommitMaxElapsedTime time.Duration
	// Whether or not CommitCursor will retry when a request fails. If
	// set to true InitialRetryInterval, MaxRetryInterval, and CommitMaxElapsedTime have
	// no effect for commit requests (default: false).
	CommitRetry bool
	// NotifyErr is called when an error occurs that leads to a retry. This notify function can be used to
	// detect unhealthy streams.
	NotifyErr func(error, time.Duration)
	// NotifyOK is called whenever a successful operation was completed. This notify function can be used
	// to detect that a stream is healthy again.
	NotifyOK func()
}

StreamOptions contains optional parameters that are used to create a StreamAPI.

type Subscription

type Subscription struct {
	ID                string    `json:"id,omitempty"`
	OwningApplication string    `json:"owning_application"`
	EventTypes        []string  `json:"event_types"`
	ConsumerGroup     string    `json:"consumer_group,omitempty"`
	ReadFrom          string    `json:"read_from,omitempty"`
	CreatedAt         time.Time `json:"created_at,omitempty"`
}

Subscription represents a subscription as used by the Nakadi high level API.

type SubscriptionAPI

type SubscriptionAPI struct {
	// contains filtered or unexported fields
}

SubscriptionAPI is a sub API that is used to manage subscriptions.

func NewSubscriptionAPI

func NewSubscriptionAPI(client *Client, options *SubscriptionOptions) *SubscriptionAPI

NewSubscriptionAPI crates a new instance of the SubscriptionAPI. As for all sub APIs of the `go-nakadi` package NewSubscriptionAPI receives a configured Nakadi client. The last parameter is a struct containing only optional \ parameters. The options may be nil.

func (*SubscriptionAPI) Create

func (s *SubscriptionAPI) Create(subscription *Subscription) (*Subscription, error)

Create initializes a new subscription. If the subscription already exists the pre existing subscription is returned.

func (*SubscriptionAPI) Delete

func (s *SubscriptionAPI) Delete(id string) error

Delete removes an existing subscription.

func (*SubscriptionAPI) Get

func (s *SubscriptionAPI) Get(id string) (*Subscription, error)

Get obtains a single subscription identified by its ID.

func (*SubscriptionAPI) GetStats added in v1.1.0

func (s *SubscriptionAPI) GetStats(id string) ([]*SubscriptionStats, error)

GetStats returns statistic information for subscription

func (*SubscriptionAPI) List

func (s *SubscriptionAPI) List() ([]*Subscription, error)

List returns all available subscriptions.

type SubscriptionOptions

type SubscriptionOptions struct {
	// Whether or not methods of the SubscriptionAPI retry when a request fails. If
	// set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have
	// no effect (default: false).
	Retry bool
	// The initial (minimal) retry interval used for the exponential backoff algorithm
	// when retry is enables.
	InitialRetryInterval time.Duration
	// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
	// this value the retry intervals remain constant.
	MaxRetryInterval time.Duration
	// MaxElapsedTime is the maximum time spent on retries when when performing a request.
	// Once this value was reached the exponential backoff is halted and the request will
	// fail with an error.
	MaxElapsedTime time.Duration
}

SubscriptionOptions is a set of optional parameters used to configure the SubscriptionAPI.

type SubscriptionStats added in v1.1.0

type SubscriptionStats struct {
	EventType  string            `json:"event_type"`
	Partitions []*PartitionStats `json:"partitions"`
}

SubscriptionStats represents detailed statistics for the subscription

type UndefinedEvent

type UndefinedEvent struct {
	Metadata EventMetadata `json:"metadata"`
}

UndefinedEvent can be embedded in structs representing Nakadi events from the event category "undefined".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL