chanserv

package module
v0.0.0-...-76c834b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2016 License: MIT Imports: 9 Imported by: 0

README

Chanserv GoDoc

$ go get github.com/zenhotels/chanserv

Package chanserv provides a simple message queue framework based upon nested Go-lang channels being served using AstraNet.

Wait, what?..

Explanatory post is here: http://tech.zenhotels.com/chanserv-and-astranet

overview

Demo output

$ go test
2016/03/17 21:11:36 chanserv_test.go:23: will connect in 3 sec...
2016/03/17 21:11:39 chanserv_test.go:44: [HEAD] source @0, for request: hello
2016/03/17 21:11:39 chanserv_test.go:44: [HEAD] source @1, for request: hello
2016/03/17 21:11:39 chanserv_test.go:44: [HEAD] source @2, for request: hello
2016/03/17 21:11:39 chanserv_test.go:44: [HEAD] source @3, for request: hello
2016/03/17 21:11:39 chanserv_test.go:44: [HEAD] source @4, for request: hello

2016/03/17 21:11:39 chanserv_test.go:48: [FRAME 0 from @5] wait for me!
2016/03/17 21:11:39 chanserv_test.go:48: [FRAME 0 from @1] wait for me!
2016/03/17 21:11:39 chanserv_test.go:48: [FRAME 0 from @4] wait for me!
2016/03/17 21:11:39 chanserv_test.go:48: [FRAME 0 from @3] wait for me!
2016/03/17 21:11:39 chanserv_test.go:48: [FRAME 0 from @2] wait for me!

2016/03/17 21:11:40 chanserv_test.go:48: [FRAME 1 from @1] ok I'm ready
2016/03/17 21:11:41 chanserv_test.go:48: [FRAME 1 from @2] ok I'm ready
2016/03/17 21:11:42 chanserv_test.go:48: [FRAME 1 from @3] ok I'm ready
2016/03/17 21:11:43 chanserv_test.go:48: [FRAME 1 from @4] ok I'm ready
2016/03/17 21:11:44 chanserv_test.go:48: [FRAME 1 from @5] ok I'm ready
PASS

Benchmarks

Completed on MacBook's 2.8 GHz Intel Core i5 with 8 GB 1600 MHz DDR3.

$ go test -run=none -bench=BenchmarkConnectChanserv -benchtime 10s
BenchmarkConnectChanserv-4         10000       1025092 ns/op     1012843 B/op        367 allocs/op
PASS

$ go test -run=none -bench=.
BenchmarkHeavyChanserv-4           1    14000851652 ns/op # 14 sec
BenchmarkFloodChanserv-4           1    27001783856 ns/op # 27 sec
PASS

License

MIT

Documentation

Overview

Package chanserv provides a simple message queue framework based upon nested Go-lang channels being served using AstraNet.

Index

Constants

View Source
const FrameSizeLimit = 100 * 1024 * 1024

FrameSizeLimit specifies the maximum size of payload in a frame, this limit may be increased or lifted in future.

Variables

View Source
var CompressionHeader = []byte("lz4!")
View Source
var ErrWrongSize = errors.New("wrong frame size")
View Source
var ErrWrongUncompressedSize = errors.New("wrong uncompressed frame size")

Functions

This section is empty.

Types

type Client

type Client interface {
	// LookupAndPost tries to discover the given vAddr, and posts the body to the server's SourceFunc.
	// Provide additional tags if you want to change behaviour of the service discovery and set additional
	// request params. Returns a new source subscribtion or error if any. The subscription channel will be closed
	// upon network error or success on the remote side.
	LookupAndPost(vAddr string, body []byte, tags map[RequestTag]string) (<-chan Source, error)
}

func NewClient

func NewClient(mpx Multiplexer, opts ...ClientOption) Client

NewClient initializes a new client using the provided multiplexer for the network capabilities. Refer to the client options if you want to specify timeouts and error callbacks.

type ClientOption

type ClientOption func(c *client)

ClientOption applies a configuration option to the client.

func ClientDialTimeout

func ClientDialTimeout(d time.Duration) ClientOption

ClientDialTimeout specifies the timeout for dialing the remote host. Default value: 10s.

func ClientFrameBufferSize

func ClientFrameBufferSize(size int) ClientOption

ClientFrameBufferSize specifies the frame channel buffer size. Default value: 1024.

func ClientFrameReadTimeout

func ClientFrameReadTimeout(d time.Duration) ClientOption

ClientFrameReadTimeout specifies the timeout for reading a frame data bytes from descendant chan connections that have benn discovered.

func ClientMasterReadTimeout

func ClientMasterReadTimeout(d time.Duration) ClientOption

ClientMasterReadTimeout specifies the timeout for reading a source announcement bytes from the master connection.

func ClientMasterWriteTimeout

func ClientMasterWriteTimeout(d time.Duration) ClientOption

ClientMasterWriteTimeout specifies the timeout for writing the request body bytes after the connection has been accepted by the server.

func ClientOnError

func ClientOnError(fn func(err error)) ClientOption

ClientOnError specifies a function to call upon an error.

func ClientSourceBufferSize

func ClientSourceBufferSize(size int) ClientOption

ClientSourceBufferSize specifies the source announcements channel buffer size. Default value: 128.

type Frame

type Frame interface {
	// Bytes returns a byte representation of the payload.
	Bytes() []byte
}

Frame represents the payload to send over the channel, allowing user to implement any serialisation logic by himself.

For example, having your Message struct implement the Bytes() method that uses cap'n'proto or protobuf to return the representation as bytes is a good idea.

type MetaData

type MetaData interface {
	// RemoteAddr indicates the originating node's virtual address, e.g. VHyWCWr39kI:1697777
	RemoteAddr() string
}

MetaData for the source, usually is available on the client-side only, and is created by the chanserv itself.

type Multiplexer

type Multiplexer interface {
	Bind(net, laddr string) (net.Listener, error)
	DialTimeout(network string, address string, timeout time.Duration) (net.Conn, error)
}

Multiplexer can be any muxer that is able to bind to some address and dial some address. Chanserv assumes this would be the AstraNet multiplexer that can handle millions of streams.

type RequestTag

type RequestTag int

RequestTag allows to specify additional options of a client's request.

const (
	TagMeta RequestTag = iota
	// TagBucket specifies the bucket hash for the hash-based balancing algorithm.
	// Use this if your multiplexer can dial hosts with taking a hash into account.
	TagBucket
)

type Server

type Server interface {
	// ListenAndServe starts to listen incomming connections on vAddr,
	// and emits frame sources using the provided SourceFunc.
	ListenAndServe(vAddr string, src SourceFunc) error
}

func NewServer

func NewServer(mpx Multiplexer, opts ...ServerOption) Server

NewServer initializes a new server using the provided multiplexer for the network capabilities. Refer to the server options if you want to specify timeouts and error callbacks.

type ServerOption

type ServerOption func(s *server)

ServerOption applies a configuration option to the server.

func ServerChanAcceptTimeout

func ServerChanAcceptTimeout(d time.Duration) ServerOption

ServerChanAcceptTimeout specifies the timeout of frame channels waiting for being discovered by the client. Default value: 30s.

func ServerFrameWriteTimeout

func ServerFrameWriteTimeout(d time.Duration) ServerOption

ServerFrameWriteTimeout specifies the timeout for writing the frame bytes to the descendant chan connection.

func ServerMasterReadTimeout

func ServerMasterReadTimeout(d time.Duration) ServerOption

ServerMasterReadTimeout specifies the timeout for reading the client's request body after the master connection has been accepted.

func ServerMasterWriteTimeout

func ServerMasterWriteTimeout(d time.Duration) ServerOption

ServerMasterWriteTimeout specifies the timeout for writing the source announcement parts to the master connection (two parts).

func ServerMaxErrorMass

func ServerMaxErrorMass(mass int) ServerOption

ServerMaxErrorMass specifies amount of sequential serving errors before considering doing some action, usually to sleep 30 seconds before a new retry.

func ServerOnChanError

func ServerOnChanError(fn func(err error)) ServerOption

ServerOnChanError specifies a function to call upon frame chan serving error.

func ServerOnError

func ServerOnError(fn func(err error)) ServerOption

ServerOnError specifies a function to call upon master chan serving error.

func ServerOnMaxErrorMass

func ServerOnMaxErrorMass(fn func(mass int, err error)) ServerOption

ServerOnMaxErrorMass specifies the action to perform when error mass is critical enough. By default it sleeps 30 seconds before a new retry.

func ServerServingTimeout

func ServerServingTimeout(d time.Duration) ServerOption

ServerServingTimeout specifies the overall serving timeout. This timeout sets a deadline for the master channel and all the descendant frame channels.

func ServerSourcingTimeout

func ServerSourcingTimeout(d time.Duration) ServerOption

ServerSourcingTimeout specifies the timeout of source announcements in the master channel. The sourcing func must publish all the sources and close the sourcing channel before this timeout expires.

func ServerUseCompression

func ServerUseCompression(v bool) ServerOption

ServerCompression specifies whether the server must use compression for the output streams.

type Source

type Source interface {
	// Header gets the application data associated with this source. The source implementation
	// is not required to return any header bytes.
	Header() []byte
	// Meta returns MetaData that was created by chanserv on the client side.
	Meta() MetaData
	// Out is a read-only channel of frames, generated by some source.
	// On the server side the channel must be closed after sending all the available frames,
	// on the client side it will be closed by chanserv upon a network/timeout error or success on the remote side.
	Out() <-chan Frame
}

Source represents an announce of the new frame source.

type SourceFunc

type SourceFunc func(reqBody []byte) <-chan Source

SourceFunc emits frame sources based on the request data provided. On the server side the channel must be closed after sending all the source announcements, on the client side it will be closed by chanserv upon a network/timeout error or success on the remote side.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL