kawa

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2023 License: Apache-2.0 Imports: 7 Imported by: 1

README

kawa

  
Go Reference GoFrame CI Go Report Card License

kawa ("Kaa-Wah") is an opinionated framework for scalable, reliable stream processing.

kawad ("Kaa-Wah-Dee") is a daemon for collecting system logs and metrics.

Installation

Kawad

Find the package for your OS and architecture on the releases page. Download that file to the machine, and install somewhere visible on your $path.

curl -L https://github.com/runreveal/kawa/releases/download/<RELEASE_VERSION>/kawa-linux-amd64.tar.gz | sudo tar --directory /usr/local/bin -xz

Copy an example config from the examples/ directory, then run it! There is also an example for deploying as a systemd service. Additionally, we'll have kubernetes examples soon.

Kawa

Add the library to your project as you would any other Go library:

go get -u github.com/runreveal/kawa

Design and Rationale

See https://blog.runreveal.com/kawa-the-event-processor-for-the-grug-brained-developer/

Roadmap

  • Ensure that consumers of kawa aren't subject to all the dependencies of the kawa program.
  • Related: consider breaking apart the library from the daemon.
  • Event Routing and/or Multiple Processors in kawa program
  • Dynamic Sources (e.g. Kafka Consumer Groups)

Disclaimer

This is nascent software, subject to breaking changes as we reach a good working set of APIs, interfaces and data models. Please try it out and help shape the direction of the project by giving us feedback!

Getting started using Kawad

An example use case might be shipping your nginx logs to s3. Save the following config.json, and fill in the config file.

{
  "sources": [
    {
      "type": "syslog",
      "addr": "0.0.0.0:5514",
      "contentType": "application/json; rrtype=nginx-json",
    },
  ],
  "destinations": [
    {
      "type": "s3",
      "bucketName": "{{YOUR-S3-BUCKET-NAME}}",
      "bucketRegion": "us-east-2",
    },
  ],
}

Next, add the following line to your nginx server config.

server {
    access_log syslog:server=127.0.0.1:5514;
    # ... other config ...
}

Run it!

$ kawa run --config config.json

Development & Extension

The source and destination interfaces are designed for simplicity of use and implementation. It should be easy to use the sources and destinations in an abstract manner without knowing the underlying implementation, and it should be relatively easy to implement sources and destinations of various types.

The library provides multiple abstractions suitable for different purposes.

The easiest way to get started is by using the polling/batch implementations for sources/destinations, respectively, since they require less overhead in terms of accounting for offset tracking to ensure at-least-once reliable processing.

Extensions under the x package provide either generic or []byte based sources, destinations and utility functions which aren't part of the core functionality of kawa. They're provided for re-use in other applications.

To use them, import them into your program and apply the proper serialization techniques relevant to your application. See examples of this in practice in the cmd/kawad/internal package, where we use it for system logs.

Configure and Run Design Pattern

The "Configure and Run" pattern is a pattern discovered while writing this framework that works nicely with other patterns and libraries in Go.

The general idea is as follows. Each struct maintaining long running goroutines can be made easy to reason about and operate by splitting it's configuration and runtime into two separate stages.

The first stage, "Configure", is simply the initialization of the struct. Most often, this is the New function for the struct, with required arguments passed in first, and options passed in as a variadic functional options slice afterwards. Occasionally, this may involve also implementing a translation layer for serialization of the struct from JSON or some other serialization format (see cmd/kawad/config.go for an example of this pattern).

The next stage, "Run", involves implementing the simple Runner interface:

type Runner interface {
	Run(ctx context.Context) error
}

Implementing this interface consistently across all instances of structs that have long-running processes means that we can easily implement cancellation across a broad number of distinct types via a common context variable.

It also means that any goroutine can trigger a shutdown by returning an error from the Run routine.

While not absolutely required, following this pattern will enable the source or destination to be seamlessly integrated into the daemon.

Implementing Sources

Sources are things that you read message-oriented data from. At the most basic level, it's a collection of bytes that together represent some discrete event.

Polling Source

We recommend implementing polling sources when querying an API, or whenever it's easiest to implement a function to periodically get called. The following is the interface which needs to be satisfied to implement a polling source.

type Poller[T any] interface {
	Poll(context.Context, int) ([]kawa.Message[T], func(), error)
}
Streaming Source

We recommend implementing streaming sources when the source either implements it's own batching semantics (like Kafka), or when message latency is more important than message volume.

Implementing Destinations

Destinations are things that you write message-oriented data to.

Batch Destination

Implementing a batch destination is the easiest way to process messages as a batch being written to some persistent storage. It handles timeouts, batch size, and parallel writes at the configuration level so destinations only have to implement a single method "Flush".

type Flusher[T any] interface {
	Flush(context.Context, []kawa.Message[T]) error
}
Streaming Destination

We recommend implementing streaming destinations when the destination either implements it's own batching semantics (like Kafka), or when message latency is more important than message volume.

Supported sources

  • syslog
  • scanner
  • journald
  • mqtt
  • windows event logs

Supported destinations

  • s3 / r2
  • printer
  • runreveal
  • mqtt

Configuring the Daemon

Source Configuration

syslog

With the syslog config, and address and content type can be set.

{
    "type":"syslog",
    "addr":"0.0.0.0:5514",
}
journald

Journald has no configuration, just set the type and kawa will read from journald.

{
    "type":"journald"
}
scanner

Read from stdin. Useful for testing or doing something you probably shouldn't.

{
    "type":"scanner",
}
MQTT

MQTT will listen on the supplied topic for new events.

broker and clientID are required to receive data. clientID must be unique from any other mqtt destinations or sources If topic is not supplied, it will default to the wildcard #.

Do not read events from the same topic that an MQTT destination is sending to otherwise kawa will create an infinite loop and eventually crash.

{
  "type": "mqtt",
  "broker": "mqtt://broker.mqtt:1883",
  "clientID": "kawa_src",
  "userName": "",
  "password": "",
  "topic": "kawa/src",

  "qos": 1, // Optional defaults to 1 if not included
  "retained": false, // Optional defaults to false if not included
}
Windows Event Logs

Listen for new windows event logs on the specified channel.

Windows event log collection only works on Windows machines. Use the Windows build to run Kawad on a Windows machine. Kawad will need to be run as an administrator to have access to the event log stream.

The source config needs a required channel and an optional query. The channel is the windows event log full name, e.g. to log the operational logs for the TaskScheduler the channel would be 'Microsoft-Windows-TaskScheduler/Operational'. The query is a filter that can be used to limit the logs that are collected to specific events. View Microsoft documentation on how filtering works and how to create one to use.

The following example shows how to log every Security event on the machine.

{
    "type": "eventlog",
    "channel": "Security",
    "query": "*"
  }

Destination Configuration

RunReveal

WebhookURL is the only config argument and it is required.

{
    "type":"runreveal",
    "webhookURL": "https://api.runreveal.com/....."
}
S3

The s3 destination is compatible with s3 and other s3 compatible interfaces. By default the s3 destination will pull credentials from the standard places the aws sdk looks, but they can optionally be set in the configuration.

customEndpoint must be set for custom destinations, and in that case bucketRegion probably will not be set. bucketName is the only required argument.

For high volume or low volume, the batchSize can be tweaked but is set to 100 by default.

{
    "type":"s3",
    "bucketName":"my-cool-log-bucket",
    "bucketRegion":"us-east-2",
    "batchSize":1000,
}
Printer

Printer will print the results to stdout. Useful for testing and development.

{
    "type":"printer",
}
MQTT

MQTT will send events to the supplied topic.

broker and clientID are required to send data. clientID must be unique from any other mqtt destinations or sources If topic is not supplied, it will default to the wildcard #.

{
  "type": "mqtt",
  "broker": "mqtt://broker.mqtt:1883",
  "clientID": "kawa_dst",
  "userName": "",
  "password": "",
  "topic": "kawa/dest",

  "qos": 1, // Optional defaults to 1 if not included
  "retained": false, // Optional defaults to false if not included
}

Source / Destination Wishlist

  • Kafka
  • redis
  • NATS
  • amqp
  • pubsub
  • Kinesis
  • memcache?
  • zmq?
  • NSQ?

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Ack added in v0.0.4

func Ack(ack func())

Ack is a convenience function for calling the ack function after checking if it's nil.

func Metrics added in v0.0.4

func Metrics(b bool) func(*Options)

func Parallelism

func Parallelism(n int) func(*Options)

func Tracing added in v0.0.4

func Tracing(b bool) func(*Options)

func TransformUnmarshalJSON

func TransformUnmarshalJSON[T any](bs []byte) (T, error)

Types

type Attributes

type Attributes interface {
	Unwrap() Attributes
}

type ByteSource

type ByteSource interface {
	Recv(context.Context) (Message[[]byte], func(), error)
}

type Config

type Config[T1, T2 any] struct {
	Source      Source[T1]
	Destination Destination[T2]
	Handler     Handler[T1, T2]
}

type DeserFunc

type DeserFunc[T any] func([]byte) (T, error)

type DeserializationSource

type DeserializationSource[T any] struct {
	// contains filtered or unexported fields
}

func NewDeserSource

func NewDeserSource[T any](src ByteSource, deser DeserFunc[T]) DeserializationSource[T]

func (DeserializationSource[T]) Recv

func (ds DeserializationSource[T]) Recv(ctx context.Context) (Message[T], func(), error)

type Destination

type Destination[T any] interface {
	// Send sends the passed in messages to the Destination. Implementations
	// _must_ listen on <-ctx.Done() and return ctx.Err() if the context finishes
	// while waiting to send messages.
	//
	// *Send need not be blocking*.  In the case of a non-blocking call to send,
	// it's expected that ack will be called _only after_ the message has been
	// successfully written to the Destination.
	//
	// All errors which are retryable must be handled inside the Send func, or
	// otherwise handled internally.  Any errors returned from Send indicate a
	// fatal error to the processor, and the processor will terminate.  If you
	// want to be able to delegate the responsibility of deciding retryable
	// errors to the user of the Destination, then allow the user to register a
	// callback, e.g. `IsRetryable(err error) bool`, when instantiating a
	// Destination.
	//
	// The second argument value is the acknowlegement function.  Ack is called
	// when the message has been successfully written to the Destination.  It
	// should not be called twice.  Sources may panic if ack is called twice as
	// it indicates a logical flaw for delivery guarantees within the program.
	//
	// In the case of sending to multiple destinations, or teeing the data stream
	// inside a processor's handler function, then the programmer must decide
	// themselves how to properly acknowledge the event, and recognize that
	// destinations will probably be acknowledging the message as well.
	Send(context.Context, func(), ...Message[T]) error
}

Destination defines the abstraction for writing messages to an external entity. Most notable implementations are queues (Kafka, RabbitMQ, Redis), but anything which is message oriented could be made into a Destination (e.g. a newline-delimited-JSON file could conceivably be a Destination).

type DestinationFunc

type DestinationFunc[T any] func(context.Context, func(), ...Message[T]) error

func (DestinationFunc[T]) Send

func (df DestinationFunc[T]) Send(ctx context.Context, ack func(), msgs ...Message[T]) error

type Handler

type Handler[T1, T2 any] interface {
	Handle(context.Context, Message[T1]) ([]Message[T2], error)
}

Handler defines a function which operates on a single event of type T1 and returns a list of events of type T2. T1 and T2 may be equivalent types. Returning an empty slice and a nil error indicates that the message passed in was processed successfully, no output was necessary, and therefore should be acknowledged by the processor as having been processed successfully.

func Pipe

func Pipe[T any]() Handler[T, T]

type HandlerFunc

type HandlerFunc[T1, T2 any] func(context.Context, Message[T1]) ([]Message[T2], error)

func (HandlerFunc[T1, T2]) Handle

func (hf HandlerFunc[T1, T2]) Handle(ctx context.Context, msg Message[T1]) ([]Message[T2], error)

type Message

type Message[T any] struct {
	// Key represents the key of this message.  This field is intended to be used
	// primarily as an input into sharding functions to determine how a message
	// should be routed within a topic.
	Key string
	// Value is the embedded value of this message.  It is the object of interest
	// to the users of this library.  It can be any serializable type so long as
	// the sources and destinations know how to serialize it.
	Value T
	// Topic indicates which topic this message came from (if applicable).  It
	// should not be used as a means to set the output topic for destinations.
	Topic string
	// Attributes are inspired by context.Context and are used as a means to pass
	// metadata from a source implementation through to a consumer.  See examples
	// for details.
	Attributes Attributes
}

Message is the data wrapper which accepts any serializable type as it's embedded Value as well as some other metadata.

type MsgAck added in v0.0.4

type MsgAck[T any] struct {
	Msg Message[T]
	Ack func()
}

MsgAck is a utility type which is used to pass a message and it's corresponding ack function through a channel internal to a source or destination

type Option

type Option func(*Options)

type Options

type Options struct {
	Parallelism int
	Tracing     bool
	Metrics     bool
}

type Processor

type Processor[T1, T2 any] struct {
	// contains filtered or unexported fields
}

func New

func New[T1, T2 any](c Config[T1, T2], opts ...Option) (*Processor[T1, T2], error)

New instantiates a new Processor. `Processor.Run` must be called after calling `New` before events will be processed.

func (*Processor[T1, T2]) Run

func (p *Processor[T1, T2]) Run(ctx context.Context) error

Run is a blocking call, and runs until either the ctx is canceled, or an unrecoverable error is encountered. If any error is returned from a source, destination or the handler func, then it's wrapped and returned. If the passed-in context is canceled, this will not return the context.Canceled error to indicate a clean shutdown was successful. Run will return ctx.Err() in other cases where context termination leads to shutdown of the processor.

type Source

type Source[T any] interface {
	// Recv should block until Message is available to be returned from the
	// source.  Implementations _must_ listen on <-ctx.Done() and return
	// ctx.Err() if the context finishes while waiting for new messages.
	//
	// All errors which are retryable must be handled inside the Recv func, or
	// otherwise handled internally.  Any errors returned from Recv indicate a
	// fatal error to the processor, and the processor will terminate.  If you
	// want to be able to delegate the responsibility of deciding retryable
	// errors to the user of the Source, then allow the user to register a
	// callback, e.g. `IsRetryable(err error) bool`, on source instantiation.
	//
	// The second return value is the acknowlegement function.  Ack is called when
	// the message returned from Recv has been successfully written to it's
	// destination.  It should not be called twice.  Sources may panic in that
	// scenario as it indicates a logical flaw for delivery guarantees within the
	// program.
	//
	// In the case of sending to multiple destinations, or teeing the data stream
	// inside a processor's handler function, then the programmer must decide
	// themselves how to properly acknowledge the event, and recognize that
	// destinations will probably be acknowledging the message as well.
	Recv(context.Context) (Message[T], func(), error)
}

Source defines the abstraction for which kawa consumes or receives messages from an external entity. Most notable implementations are queues (Kafka, RabbitMQ, Redis), but anything which is message oriented could be made into a source (e.g. a newline-delimited-JSON file could conceivably be a source).

type SourceFunc

type SourceFunc[T any] func(context.Context) (Message[T], func(), error)

func (SourceFunc[T]) Recv

func (sf SourceFunc[T]) Recv(ctx context.Context) (Message[T], func(), error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL