backend

package
v0.0.0-...-5a8c9f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2016 License: MIT Imports: 19 Imported by: 1

Documentation

Overview

A package with a Backend interface for storing streams of data and it's implementations for different storage engines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Copy

func Copy(drain stream.Stream, sink Stream) (error, bool)

Copy a producer stream to a consumer stream.

Returns an optional error and, if error is not nil, a flag indicating whether this error had happened in the consumer (false) or the producer (true).

func CopyDirect

func CopyDirect(drain stream.Stream, sink *BufferedStream) (error, bool)

An analog of Copy for BufferedStream sink.

func CreatorTypes

func CreatorTypes() []string

List all the backend types registered.

func NewHandler

func NewHandler(b Backend, errorCb func(error)) http.Handler

Create a http.Handler that maps URLs from HTTP backend to a methods of an object implementing Backend interface.

func RegisterCreator

func RegisterCreator(btype string, creator BackendCreator) error

Register backend creator by backend type.

func RegisterDefault

func RegisterDefault()

Register backend creators provided by this library.

Types

type Backend

type Backend interface {
	// Get backend configuration.
	Config() (interface{}, error)
	// List all available streams.
	Streams() ([]string, error)
	// Get a BackendStream for the stream by it's name.
	GetStream(name string) (BackendStream, error)
	// Delete all streams and supporting databases.
	Drop() error
	// Close the backend handler.
	Close() error
}

Backend is an interface for stream storage system.

func Create

func Create(btype string, args interface{}) (Backend, error)

Create a backend by it's type and config.

func NewDir

func NewDir(dir string) (Backend, error)

Create a backend that stores pushed events in files in a specified directory, one file per stream.

func NewHttp

func NewHttp(baseUrl string, p poster.Poster) Backend

Create a remote http backend/ It doesn't store anything and just send commants to a remote server via HTTP.

func NewLedis

func NewLedis(dirname string) (Backend, error)

Create a backend that stores pushed events in ledisdb.

func NewMem

func NewMem() Backend

Create a backend that stores pushed events in memory, so they don't persist on the disc.

Can be used for mocking a real backend in tests.

func NewNil

func NewNil() Backend

Create a nil backend, that doesn't store pushed events and just ignores them.

type BackendCreator

type BackendCreator func(args interface{}) (Backend, error)

BackendCreator is a function that created a specific type of backend from config.

type BackendStream

type BackendStream interface {
	Stream
	// Convert a relative interval into an absolute: like (0, -1) into (0, Len()).
	Interval(from int, to int) (uint, uint, error)
	// Read a range of events from the stream.
	Read(from uint, to uint) (stream.Stream, error)
	// Delete a range of events from the stream.
	Del(from uint, to uint) (bool, error)
	// Get a number of events in the stream.
	Len() (uint, error)
}

BackendStream is an interface for manipulating stream of events stored on a backend.

type BufferedStream

type BufferedStream struct {
	// contains filtered or unexported fields
}

Buffered stream is an implementation of a stream that collects all pushed events in a buffer until Start() is called. After that it flushes all events in the buffer to a base stream and then adds all the new incoming events directly to the base stream.

The usecase for it is event subscription to a golfstream service. When you add a subscriber golfstream returns you a stream representing a range from backend stream's history and you might want to push the history to this subscriber befire any new events.

Then you wrap your subscriber with BufferedStream before adding it to the golfstream service, manually push the history and call Start() like this:

bsub := backend.Buffered(sub, 0)
history, _ := sback.AddSub(bstrName, bsub, 0, -1)
go func() {
	_, _ := backend.CopyDirect(history, bsub)
	bsub.Start()
}()

func Buffered

func Buffered(s Stream, buf int) *BufferedStream

Create a buffered stream from a stream.

Using original stream after creating a bufferes stream from it is unsafe.

func (*BufferedStream) Add

func (self *BufferedStream) Add(evt stream.Event) error

And implementation of backend.Stream.Add for BufferedStream.

func (*BufferedStream) Close

func (self *BufferedStream) Close() error

And implementation of backend.Stream.Close for BufferedStream.

func (*BufferedStream) Start

func (self *BufferedStream) Start()

Tell the buffered stream that it can safely push to the the base stream now.

type Stream

type Stream interface {
	// Push event to the stream.
	Add(stream.Event) error
	// Close the stream handler.
	Close() error
}

Striam is an interface for consuming events.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL