sabuhp

package module
Version: v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2021 License: MIT Imports: 21 Imported by: 3

README

SabuHP

GoDoc

Power your backend with a simple service architecture that provides direct connection into a function/processor network through supported protocols (HTTP, WebSocket, ServerSent Events).

SabuHP exposes a two server system by providing a Client Server and a Worker Server architecture that allow better scaling of client connections and business logic processing in the new age of message busses as backbone of communications.

The client server exists to allow direct connections from clients (CLI, Browsers) which can directly send desired request payload to desired topics and receive response from a target message bus. This allows us decouple the definition of our APIs, and their desired behaviour from how clients interact and connect to with them. The client servers purpose is to hide way the needed intricacies to access this message queues or buses, providing a clear and familiar APIs that clients can work with such systems with ease.

The worker server exists to provided scalable services that can be horizontal scaled with only required to be able to connect to a message bus to listen and process request payload for target topics with ease. This allows us decouple entirely how we connect and process messages or work within a software systems.

Protocols

SabuHP supports the following protocols for communicating with the service server (allowing both backend and frontend easily inter-related through such protocols):

  • Websocket
  • HTTP
  • HTTP Server Sent Events

Getting

go get -u github.com/ewe-studios/sabuhp

Client Server

Client servers provides a server which hosts all necessary client protocols (http, websocket, server-sent event routes) which allows clients (browsers, CLI agents) to connect into the SabuHP networks allowing these clients to deliver requests and receive responses for their requests


package main

import (
	"context"
	"log"

	"github.com/influx6/npkg/ndaemon"

	"github.com/ewe-studios/sabuhp"

	"github.com/ewe-studios/sabuhp/bus/redispub"
	"github.com/ewe-studios/sabuhp/servers/clientServer"
	redis "github.com/go-redis/redis/v8"
)

func main() {
	var ctx, canceler = context.WithCancel(context.Background())
	ndaemon.WaiterForKillWithSignal(ndaemon.WaitForKillChan(), canceler)

	var logger sabuhp.GoLogImpl

	var redisBus, busErr = redispub.Stream(redispub.Config{
		Logger: logger,
		Ctx:    ctx,
		Redis:  redis.Options{},
		Codec:  clientServer.DefaultCodec,
	})

	if busErr != nil {
		log.Fatalf("Failed to create bus connection: %q\n", busErr.Error())
	}

	var cs = clientServer.New(
		ctx,
		logger,
		redisBus,
		clientServer.WithHttpAddr("0.0.0.0:9650"),
	)

	cs.Start()

	log.Println("Starting client server")
	if err := cs.ErrGroup.Wait(); err != nil {
		log.Fatalf("service group finished with error: %+s", err.Error())
	}
}

Worker Server

Worker servers exposes a server with different registered workers (Functions, Processors) who will listen to the connected message bus for new requests to be processed. These servers can be scaled horizontally and grouped into listen groups based on support by the underline message bus to create a cloud of processors that allow endless scaling.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ewe-studios/sabuhp/actions"

	"github.com/ewe-studios/sabuhp/servers/serviceServer"

	"github.com/influx6/npkg/ndaemon"

	"github.com/ewe-studios/sabuhp"

	"github.com/ewe-studios/sabuhp/bus/redispub"
	"github.com/ewe-studios/sabuhp/servers/clientServer"
	redis "github.com/go-redis/redis/v8"
)

func main() {
	var ctx, canceler = context.WithCancel(context.Background())
	ndaemon.WaiterForKillWithSignal(ndaemon.WaitForKillChan(), canceler)

	var logger sabuhp.GoLogImpl

	var redisBus, busErr = redispub.Stream(redispub.Config{
		Logger: logger,
		Ctx:    ctx,
		Redis:  redis.Options{},
		Codec:  clientServer.DefaultCodec,
	})

	if busErr != nil {
		log.Fatalf("Failed to create bus connection: %q\n", busErr.Error())
	}

	var workers = actions.NewWorkerTemplateRegistry()
	var cs = serviceServer.New(
		ctx,
		logger,
		redisBus,
		serviceServer.WithWorkerRegistry(workers),
	)

	fmt.Println("Starting worker service")
	cs.Start()

	fmt.Println("Started worker service")
	if err := cs.ErrGroup.Wait(); err != nil {
		log.Fatalf("service group finished with error: %+s", err.Error())
	}

	fmt.Println("Closed worker service")
}

Contact

Ewetumo Alexander @influx6

License

Source code is available under the MIT License.

Documentation

Index

Constants

View Source
const MessageContentType = "application/x-event-message"
View Source
const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"

TimeFormat is the time format to use when generating times in HTTP headers. It is like time.RFC1123 but hard-codes GMT as the time zone. The time being formatted must be in UTC for Format to generate the correct format.

For parsing this time format, see ParseTime.

Variables

View Source
var (
	SUBSCRIBE   = T("+SUB")
	UNSUBSCRIBE = T("-USUB")
	DONE        = T("+OK")
	NOTDONE     = T("-NOK")
)
View Source
var BodyToLargeErr = &requestTooLargeErr{Err: errors.New("http: req body to large")}
View Source
var ErrAlreadySubscribed = nerror.New("id is already used")

Functions

func IsTokenRune

func IsTokenRune(r rune) bool

func MaxBytesReader

func MaxBytesReader(r io.ReadCloser, n int64) io.ReadCloser

MaxBytesReader is similar to io.LimitReader but is intended for limiting the size of incoming request bodies. In contrast to io.LimitReader, MaxBytesReader's result is a ReadCloser, returns a non-EOF error for a Read beyond the limit, and closes the underlying reader when its Close method is called.

Returns a RequestToLargeErr object when request body is to large.

MaxBytesReader prevents clients from accidentally or maliciously sending a large request and wasting server resources.

func SplitMessagesToGroups

func SplitMessagesToGroups(b []Message) (subGroups []Message, unsubGroups []Message, dataGroups []Message)

SplitMessagesToGroups will split messages into subscription, unsubscription and message groups.

Types

type BusBuilder

type BusBuilder struct {
	SendFunc         func(data ...Message)
	SendForReplyFunc func(tm time.Duration, from Topic, replyGroup string, data ...Message) *nthen.Future
	ListenFunc       func(topic string, grp string, handler TransportResponse) Channel
}

func (BusBuilder) Listen

func (t BusBuilder) Listen(topic string, grp string, handler TransportResponse) Channel

func (BusBuilder) Send

func (t BusBuilder) Send(data ...Message)

func (BusBuilder) SendForReply added in v0.5.1

func (t BusBuilder) SendForReply(tm time.Duration, from Topic, replyGroup string, data ...Message) *nthen.Future

type BusRelay

type BusRelay struct {
	Relay *PbRelay
	Bus   MessageBus
	// contains filtered or unexported fields
}

func BusWithRelay

func BusWithRelay(relay *PbRelay, bus MessageBus) *BusRelay

func NewBusRelay

func NewBusRelay(ctx context.Context, logger Logger, bus MessageBus) *BusRelay

func (*BusRelay) Group

func (br *BusRelay) Group(topic string, grp string) *PbGroup

func (*BusRelay) Handle

func (tm *BusRelay) Handle(ctx context.Context, message Message, transport Transport) MessageErr

type BytesDecoder

type BytesDecoder interface {
	Decode([]byte) (*Message, error)
}

BytesDecoder transforms a http request into a Message to be delivered.

type BytesSplitter

type BytesSplitter interface {
	SplitBytes(data []byte) (chan<- Message, error)
}

BytesSplitter takes a large block of bytes returning a chan of messages which are part messages which represent the whole of said bytes. This allows larger messages be sent across message channels with ease.

type Channel

type Channel interface {
	Topic() string
	Group() string
	Close()
	Err() error
}

Channel represents a generated subscription on a topic which provides the giving callback an handler to define the point at which the channel should be closed and stopped from receiving updates.

type Client

type Client interface {
	Send(data []byte, timeout time.Duration) error
}

type Codec

type Codec interface {
	Encode(msg Message) ([]byte, error)
	Decode(b []byte) (Message, error)
}

Codec embodies implementation for the serialization of a message into bytes and vice-versa.

type CodecWriter

type CodecWriter struct {
	Client Client
	Codec  Codec
	Logger Logger
}

func NewCodecWriter

func NewCodecWriter(client Client, codec Codec, logger Logger) *CodecWriter

func (*CodecWriter) Send

func (c *CodecWriter) Send(msg Message, timeout time.Duration) error

type Conn

type Conn interface{}

Conn defines the connection type which we can retrieve and understand the type.

type Cookie struct {
	Name  string
	Value string

	Path       string    // optional
	Domain     string    // optional
	Expires    time.Time // optional
	RawExpires string    // for reading cookies only

	// MaxAge=0 means no 'Max-Age' attribute specified.
	// MaxAge<0 means delete cookie now, equivalently 'Max-Age: 0'
	// MaxAge>0 means Max-Age attribute present and given in seconds
	MaxAge   int
	Secure   bool
	HttpOnly bool
	SameSite SameSite
	Raw      string
	Unparsed []string // Raw text of unparsed attribute-value pairs
}

A Cookie represents an HTTP cookie as sent in the Set-Cookie header of an HTTP response or the Cookie header of an HTTP request.

See https://tools.ietf.org/html/rfc6265 for details.

func ReadCookies

func ReadCookies(h Header, filter string) []Cookie

ReadCookies parses all "Cookie" values from the header h and returns the successfully parsed Cookies.

if filter isn't empty, only cookies of that name are returned

func ReadSetCookies

func ReadSetCookies(h Header) []Cookie

ReadSetCookies parses all "Set-Cookie" values from the header h and returns the successfully parsed Cookies.

ReadSetCookies is more stricter on the names and values of cookies.

func (*Cookie) String

func (c *Cookie) String() string

String returns the serialization of the cookie for use in a Cookie header (if only Name and Value are set) or a Set-Cookie response header (if other fields are set). If c is nil or c.PageName is invalid, the empty string is returned.

type ErrChannel

type ErrChannel struct{ Error error }

ErrChannel implements the Channel interface but has servers purpose to always return an error.

func (*ErrChannel) Close

func (n *ErrChannel) Close()

func (*ErrChannel) Err

func (n *ErrChannel) Err() error

type GoLogImpl

type GoLogImpl struct{}

func (GoLogImpl) Log

func (l GoLogImpl) Log(cb *njson.JSON)

type Handler

type Handler interface {
	Handle(http.ResponseWriter, *http.Request, Params)
}

type HandlerFunc

type HandlerFunc func(http.ResponseWriter, *http.Request, Params)

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(rw http.ResponseWriter, r *http.Request, p Params)
type Header map[string][]string

func (Header) Add

func (h Header) Add(k string, v string)

func (Header) Delete

func (h Header) Delete(k string)

func (Header) Get

func (h Header) Get(k string) string

func (Header) Set

func (h Header) Set(k string, v string)

func (Header) Values

func (h Header) Values(k string) []string

type HeaderModifications

type HeaderModifications func(header http.Header)

type HttpClient added in v0.6.3

type HttpClient interface {
	Do(req *http.Request) (*http.Response, error)
}

type HttpDecoder

type HttpDecoder interface {
	Decode(req *http.Request, params Params) (Message, error)
}

HttpDecoder transforms a http request into a Message to be delivered.

type HttpDecoderImpl

type HttpDecoderImpl struct {
	Codec       Codec
	Logger      Logger
	MaxBodySize int64
}

func NewHttpDecoderImpl

func NewHttpDecoderImpl(codec Codec, logger Logger, maxBody int64) *HttpDecoderImpl

func (*HttpDecoderImpl) Decode

func (r *HttpDecoderImpl) Decode(req *http.Request, params Params) (Message, error)

type HttpEncoder

type HttpEncoder interface {
	Encode(req http.ResponseWriter, message Message) error
}

HttpEncoder transforms a message into an appropriate response to an http response object.

type HttpEncoderImpl added in v0.4.1

type HttpEncoderImpl struct {
	Codec  Codec
	Logger Logger
}

func NewHttpEncoderImpl added in v0.4.1

func NewHttpEncoderImpl(codec Codec, logger Logger) *HttpEncoderImpl

func (*HttpEncoderImpl) Encode added in v0.4.1

func (r *HttpEncoderImpl) Encode(res http.ResponseWriter, m Message) error

type HttpMatcher

type HttpMatcher interface {
	Handler

	Match(http.ResponseWriter, *http.Request, Params)
}

HttpMatcher embodies a matcher which indicates if the request exclusively belongs to it, hence allowing it to divert a giving request to itself.

type HttpWrapper

type HttpWrapper func(Handler) Handler

type HttpWrappers

type HttpWrappers []HttpWrapper

func (HttpWrappers) For

func (w HttpWrappers) For(main Handler) Handler

For registers the wrappers for a specific handler and returns a handler that can be passed via the `UseHandle` function.

func (HttpWrappers) ForFunc

func (w HttpWrappers) ForFunc(mainFunc HandlerFunc) Handler

ForFunc registers the wrappers for a specific raw handler function and returns a handler that can be passed via the `UseHandle` function.

type LocationService

type LocationService interface {
	Get(ipAddress string) (nnet.Location, error)
}

type LogHandler

type LogHandler func([]*Message)

type Logger

type Logger interface {
	njson.Logger
}

type Matcher

type Matcher interface {
	Match(message *Message) bool
}

Matcher is the interface that all Matchers should be implemented in order to be registered into the Mux via the `Mux#AddRequestHandler/Match/MatchFunc` functions.

Look the `Mux#AddRequestHandler` for more.

type Message

type Message struct {
	// Optional future which will indicate if message delivery should
	// notify attached future on result.
	Future *nthen.Future

	// Path of the request producing this if from http.
	Path string

	// IP of the request producing this if from http.
	IP string

	// LocalIP of the request producing this if from http.
	LocalIP string

	// ExpectReply indicates if the receiver of said message should
	// handle this as a SendReply operation.
	ExpectReply bool

	// Optional reply error send to indicate message is an error reply
	// and the error that occurred.
	ReplyErr error

	// SuggestedStatusCode is an optional field settable by the
	// creator to suggest possible status code of a message.
	SuggestedStatusCode int

	// ContentType is an required value set default to MessageContentType.
	// Its an important use in the translators where its the deciding factor
	// if a message is written as a whole or just the payload into the
	// response object.
	ContentType string

	// FormName is optional attached form name which represents this data.
	FormName string

	// FileName is optional attached file name which represents this data.
	FileName string

	// Headers are related facts attached to a message.
	Headers Header

	// Headers are related facts attached to a message.
	//
	// Only available when set, so it's very optional
	Cookies []Cookie

	// Form contains the parsed form data, including both the URL
	// field's query parameters and the PATCH, POST, or PUT form data.
	//
	// Only available when set, so it's very optional
	Form url.Values

	// Query contains the parsed form data, including both the URL
	// field's query parameters and the PATCH, POST, or PUT form data.
	//
	// Only available when set, so it's very optional
	Query url.Values

	// Within indicates senders intent on how long they are
	// willing to wait for message delivery. Usually this should end
	// with error resolution of attached future if present.
	Within time.Duration

	// Id is the unique id attached to giving message
	// for tracking it's delivery and trace its different touch
	// points where it was handled.
	Id nxid.ID

	// EndPartId is the unique id attached to giving messages which
	// indicate the expected end id which when seen as the Id
	// should consider a part stream as completed.
	//
	// This will be created from the start and then tagged to the final
	// message as both the EndPartId and PartId fields, which will identify
	// that a series of broken messages have been completed.
	EndPartId nxid.ID

	// PartId is the unique id attached to giving messages when
	// they are a shared part of a larger messages. There are cases
	// when a message may become sent as broken parts for recollection
	// at the other end.
	PartId nxid.ID

	// SubscribeGroup for subscribe/unsubscribe message types which
	// allow to indicate which group a topic should fall into.
	SubscribeGroup string

	// SubscribeTo for subscribe/unsubscribe message types which
	// allow to indicate which topic should a subscribe or unsubscribe
	// be applied to.
	SubscribeTo string

	// Topic for giving message (serving as to address).
	Topic Topic

	// ReplyGroup is the when provided the suggested topic to reply to by receiving party.
	ReplyGroup string

	// FromAddr is the logical address of the sender of message.
	FromAddr string

	// Bytes is the payload for giving message.
	Bytes []byte

	// Metadata are related facts attached to a message.
	Metadata Params

	// Params are related facts attached to a message based on some route or
	// sender and provide some values to keyed expectation, unlike metadata
	// it has specific input in the message.
	Params Params

	// Parts are possible fragments collected of a message which was split into
	// multiple parts to send over the wire and have being collected through the use
	// of the PartId.
	//
	// We do this because we do not let handlers handle a list of messages but one
	// and to accommodate large messages split in parts or messages which are logical
	// parts of themselves, this field is an option, generally.
	// Codecs should never read this
	Parts []Message
}

func BasicMsg

func BasicMsg(topic Topic, message string, fromAddr string) Message

func NOTOK

func NOTOK(message string, fromAddr string) Message

func NewMessage

func NewMessage(topic Topic, fromAddr string, payload []byte) Message

func OK

func OK(message string, fromAddr string) Message

func SubscribeMessage

func SubscribeMessage(topic string, grp string, fromAddr string) Message

func UnsubscribeMessage

func UnsubscribeMessage(topic string, grp string, fromAddr string) Message

func (Message) Copy

func (m Message) Copy() Message

Copy returns a copy of this commands with underline data copied across. The copy

func (Message) EncodeObject

func (m Message) EncodeObject(encoder npkg.ObjectEncoder)

func (Message) ReplyTo

func (m Message) ReplyTo() Message

ReplyTo returns a new instance of a Message using the FromAddr as the topic.

func (Message) ReplyToWith

func (m Message) ReplyToWith(params Params, meta Params, payload []byte) Message

ReplyToWith returns a new instance of a Message using the FromAddr as the topic.

func (Message) ReplyWithTopic added in v0.4.8

func (m Message) ReplyWithTopic(t Topic) Message

ReplyWithTopic returns a new message with provided topic.

func (Message) String

func (m Message) String() string

func (*Message) WithId added in v0.4.8

func (m *Message) WithId(t nxid.ID)

func (*Message) WithMetadata

func (m *Message) WithMetadata(meta map[string]string)

func (*Message) WithParams

func (m *Message) WithParams(params map[string]string)

func (*Message) WithPayload

func (m *Message) WithPayload(lp []byte)

func (*Message) WithTopic added in v0.4.8

func (m *Message) WithTopic(t Topic)

type MessageBus

type MessageBus interface {
	Send(data ...Message)
	Listen(topic string, grp string, handler TransportResponse) Channel
	SendForReply(tm time.Duration, fromTopic Topic, replyGroup string, data ...Message) *nthen.Future
}

MessageBus defines what an underline message transport implementation like a message bus or rpc connection that can deliver according to required semantics of one-to-one and one-to-many.

type MessageErr

type MessageErr interface {
	error
	StatusCode() int
	ShouldAck() bool
}

func WrapErr

func WrapErr(err error, shouldAck bool) MessageErr

func WrapErrWithStatusCode added in v0.5.0

func WrapErrWithStatusCode(err error, code int, shouldAck bool) MessageErr

type MessageRouter

type MessageRouter interface {
	TransportResponse
	Matcher
}

type NoopChannel

type NoopChannel struct{}

NoopChannel implements the Channel interface but has no internal operational capacity. It represents a non functioning channel.

func (NoopChannel) Close

func (n NoopChannel) Close()

func (NoopChannel) Err

func (n NoopChannel) Err() error

type Params

type Params map[string]string

func (Params) Delete

func (h Params) Delete(k string)

func (Params) EncodeObject

func (h Params) EncodeObject(encoder npkg.ObjectEncoder)

func (Params) Get

func (h Params) Get(k string) string

func (Params) Set

func (h Params) Set(k string, v string)

type PbGroup

type PbGroup struct {
	// contains filtered or unexported fields
}

func (*PbGroup) Add

func (sc *PbGroup) Add(id nxid.ID, tr TransportResponse) error

func (*PbGroup) IsEmpty

func (sc *PbGroup) IsEmpty() bool
func (sc *PbGroup) Link(bus MessageBus)

func (*PbGroup) Listen

func (sc *PbGroup) Listen(handler TransportResponse) Channel

func (*PbGroup) Notify

func (sc *PbGroup) Notify(ctx context.Context, msg Message, transport Transport) MessageErr

Notify will notify the groups of handlers and will return the first occurrence of a message error seen by one of the handlers. This means if one of the handlers returns an error then the publisher will be notified as if all handlers failed to handle the message. If you dont want that, then dont return an error from any of your registered handlers and handle the error appropriately.

func (*PbGroup) Remove

func (sc *PbGroup) Remove(id nxid.ID)

func (*PbGroup) Run

func (sc *PbGroup) Run()

type PbRelay

type PbRelay struct {
	// contains filtered or unexported fields
}

PbRelay aka MultiSubscriberSingleTopicManager wraps a transport object with a subscription management core that allows multiple subscribers listen to messages for one topic, allowing only one listeners to this topic (the manager itself).

This is useful when your underline transport bares a cost with two many subscriptions or does not support multiple subscriptions to a single topic.

func NewPbRelay

func NewPbRelay(ctx context.Context, logger Logger) *PbRelay

func (*PbRelay) Group

func (tm *PbRelay) Group(topic string, group string) *PbGroup

func (*PbRelay) Handle

func (tm *PbRelay) Handle(ctx context.Context, message Message, transport Transport) MessageErr

func (*PbRelay) UnlistenAllWithId

func (tm *PbRelay) UnlistenAllWithId(id nxid.ID)

UnlistenAllWithId sends a signal to remove possible handler with giving id from all topics.

func (*PbRelay) UnlistenWithId

func (tm *PbRelay) UnlistenWithId(topic string, id nxid.ID)

UnlistenWithId sends a signal to remove possible handler with giving id from specific topic.

func (*PbRelay) Wait

func (tm *PbRelay) Wait()

type RetryFunc

type RetryFunc func(last int) time.Duration

type SameSite

type SameSite int

SameSite allows a server to define a cookie attribute making it impossible for the browser to send this cookie along with cross-site requests. The main goal is to mitigate the risk of cross-origin information leakage, and provide some protection against cross-site request forgery attacks.

See https://tools.ietf.org/html/draft-ietf-httpbis-cookie-same-site-00 for details.

const (
	SameSiteDefaultMode SameSite = iota + 1
	SameSiteLaxMode
	SameSiteStrictMode
	SameSiteNoneMode
)

type Sock

type Sock struct {
	// contains filtered or unexported fields
}

func NewSock

func NewSock(handler SocketMessageHandler) *Sock

func (*Sock) Notify

func (sh *Sock) Notify(b Message, from Socket) error

func (*Sock) Use

func (sh *Sock) Use(handler SocketMessageHandler)

type Socket

type Socket interface {
	ID() nxid.ID
	Send(...Message)
	Stat() SocketStat
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	Listen(SocketMessageHandler)
}

Socket defines an underline connection handler which handles delivery of messages to the underline stream.

type SocketByteHandler

type SocketByteHandler func(b []byte, from Socket) error

SocketByteHandler defines the function contract a manager uses to handle a message.

Be aware that returning an error from the handler to the Gorilla socket will cause the immediate closure of that socket and ending communication with the client and the error will be logged. So unless your intention is to end the connection, handle it yourself.

type SocketHandler

type SocketHandler func(from Socket)

SocketHandler defines the function contract to be called for a socket instace.

type SocketMessageHandler

type SocketMessageHandler func(b Message, from Socket) error

SocketMessageHandler defines the function contract a sabuhp.Socket uses to handle a message.

Be aware that returning an error from the handler to the Gorilla sabuhp.Socket will cause the immediate closure of that socket and ending communication with the client and the error will be logged. So unless your intention is to end the connection, handle it yourself.

type SocketNotification

type SocketNotification func(socket Socket)

type SocketServer

type SocketServer interface {
	Stream(SocketService)
}

type SocketServers

type SocketServers struct {
	// contains filtered or unexported fields
}

func NewSocketServers

func NewSocketServers() *SocketServers

func (*SocketServers) SocketClosed

func (htp *SocketServers) SocketClosed(socket Socket)

func (*SocketServers) SocketOpened

func (htp *SocketServers) SocketOpened(socket Socket)

func (*SocketServers) Stream

func (htp *SocketServers) Stream(server SocketService)

type SocketService

type SocketService interface {
	SocketOpened(Socket)
	SocketClosed(Socket)
}

type SocketStat

type SocketStat struct {
	Addr       net.Addr
	RemoteAddr net.Addr
	Id         string
	Sent       int64
	Received   int64
	Handled    int64
}

func (SocketStat) EncodeObject

func (g SocketStat) EncodeObject(encoder npkg.ObjectEncoder)

type StreamBus

type StreamBus struct {
	Logger Logger
	Bus    MessageBus
	// contains filtered or unexported fields
}

func NewStreamBus

func NewStreamBus(logger Logger, bus MessageBus) *StreamBus

func WithBus

func WithBus(logger Logger, socketServer SocketServer, bus MessageBus) *StreamBus

WithBus returns the instance of a StreamBus which will be connected to the provided SocketServer and handle delivery of messages to a message bus and subscription/unsubcriptions as well.

func (*StreamBus) SocketBusSend

func (st *StreamBus) SocketBusSend(b Message, sock Socket) MessageErr

func (*StreamBus) SocketClosed

func (st *StreamBus) SocketClosed(socket Socket)

func (*StreamBus) SocketOpened

func (st *StreamBus) SocketOpened(socket Socket)

func (*StreamBus) SocketSubscribe

func (st *StreamBus) SocketSubscribe(b Message, socket Socket) MessageErr

func (*StreamBus) SocketUnsubscribe

func (st *StreamBus) SocketUnsubscribe(b Message, socket Socket) MessageErr

type StreamBusRelay

type StreamBusRelay struct {
	Logger   Logger
	BusRelay *BusRelay
	Bus      MessageBus
}

func NewStreamBusRelay

func NewStreamBusRelay(logger Logger, bus MessageBus, relay *BusRelay) *StreamBusRelay

func WithBusRelay

func WithBusRelay(logger Logger, socketServer SocketServer, bus MessageBus, relay *BusRelay) *StreamBusRelay

WithBusRelay returns the instance of a StreamBusRelay which will be connected to the provided SocketServer and handle delivery of messages to a message bus and subscription to a target relay.

func (*StreamBusRelay) SocketBusSend

func (st *StreamBusRelay) SocketBusSend(b Message, sock Socket) MessageErr

func (*StreamBusRelay) SocketClosed

func (st *StreamBusRelay) SocketClosed(socket Socket)

func (*StreamBusRelay) SocketOpened

func (st *StreamBusRelay) SocketOpened(socket Socket)

func (*StreamBusRelay) SocketSubscribe

func (st *StreamBusRelay) SocketSubscribe(b Message, socket Socket) MessageErr

func (*StreamBusRelay) SocketUnsubscribe

func (st *StreamBusRelay) SocketUnsubscribe(b Message, socket Socket) MessageErr

type StreamFunc

type StreamFunc struct {
	Listen      func(Message, Socket) error
	Subscribe   func(Message, Socket) error
	Unsubscribe func(Message, Socket) error
	Closed      func(Socket)
}

func (StreamFunc) SocketClosed

func (st StreamFunc) SocketClosed(socket Socket)

func (StreamFunc) SocketOpened

func (st StreamFunc) SocketOpened(socket Socket)

type StreamRelay

type StreamRelay struct {
	Logger Logger
	Relay  *PbRelay
	Bus    MessageBus
}

func NewStreamRelay

func NewStreamRelay(logger Logger, bus MessageBus, relay *PbRelay) *StreamRelay

func WithRelay

func WithRelay(logger Logger, socketServer SocketServer, bus MessageBus, relay *PbRelay) *StreamRelay

WithRelay returns the instance of a StreamRelay which will be connected to the provided SocketServer and handle delivery of messages to a message bus and subscription to a target relay.

func (*StreamRelay) SocketBusSend

func (st *StreamRelay) SocketBusSend(b Message, sock Socket) MessageErr

func (*StreamRelay) SocketClosed

func (st *StreamRelay) SocketClosed(socket Socket)

func (*StreamRelay) SocketOpened

func (st *StreamRelay) SocketOpened(socket Socket)

func (*StreamRelay) SocketSubscribe

func (st *StreamRelay) SocketSubscribe(b Message, socket Socket) MessageErr

func (*StreamRelay) SocketUnsubscribe

func (st *StreamRelay) SocketUnsubscribe(b Message, socket Socket) MessageErr

type Topic added in v0.5.1

type Topic struct {
	T string
	R string
}

func NewTopic added in v0.5.1

func NewTopic(t string, r string) Topic

func T added in v0.5.1

func T(t string) Topic

T creates a topic with a 20 length random string suffix.

func TR added in v0.6.1

func TR(env string, t string) Topic

TR allows creating a topic with a environment prefix and a 20 length random string suffix.

func TRS added in v0.6.1

func TRS(env string, service string, t string) Topic

TRS allows creating a topic with a environment and service prefix and a 20 length random string suffix.

func (Topic) ReplyTopic added in v0.5.1

func (t Topic) ReplyTopic() Topic

func (Topic) String added in v0.5.1

func (t Topic) String() string

type TopicPartial added in v0.6.2

type TopicPartial func(topicName string) Topic

func CreateTopicPartial added in v0.6.2

func CreateTopicPartial(env string, service string) TopicPartial

CreateTopicPartial returns a TopicPartial which will always generate a topic match the target TRS topic naming format: env.service.topic_name.

type Transport

type Transport struct {
	Bus    MessageBus
	Socket Socket
}

func (Transport) ToBoth added in v0.4.7

func (t Transport) ToBoth(msg ...Message)

func (Transport) ToBusElseSocket added in v0.4.7

func (t Transport) ToBusElseSocket(msg ...Message)

func (Transport) ToBusOnly added in v0.4.7

func (t Transport) ToBusOnly(msg ...Message)

func (Transport) ToSocketElseBus added in v0.4.7

func (t Transport) ToSocketElseBus(msg ...Message)

func (Transport) ToSocketOnly added in v0.4.7

func (t Transport) ToSocketOnly(msg ...Message)

type TransportResponse

type TransportResponse interface {
	Handle(context.Context, Message, Transport) MessageErr
}

type TransportResponseFunc

type TransportResponseFunc func(context.Context, Message, Transport) MessageErr

func (TransportResponseFunc) Handle

func (t TransportResponseFunc) Handle(ctx context.Context, message Message, tr Transport) MessageErr

type Wrapper

type Wrapper func(response TransportResponse) TransportResponse

Wrapper is just a type of `func(TransportResponse) TransportResponse` which is a common type definition for net/http middlewares.

type Wrappers

type Wrappers []Wrapper

Wrappers contains `Wrapper`s that can be registered and used by a "main route handler". Look the `Pre` and `For/ForFunc` functions too.

func Pre

func Pre(middleware ...Wrapper) Wrappers

Pre starts a chain of handlers for wrapping a "main route handler" the registered "middleware" will run before the main handler(see `Wrappers#For/ForFunc`).

Usage: mux := muxie.NewMux() myMiddlewares := muxie.Pre(myFirstMiddleware, mySecondMiddleware) mux.UseHandle("/", myMiddlewares.ForFunc(myMainRouteTransportResponse))

func (Wrappers) For

For registers the wrappers for a specific handler and returns a handler that can be passed via the `UseHandle` function.

func (Wrappers) ForFunc

func (w Wrappers) ForFunc(mainFunc TransportResponseFunc) TransportResponse

ForFunc registers the wrappers for a specific raw handler function and returns a handler that can be passed via the `UseHandle` function.

type WriterToSplitter

type WriterToSplitter interface {
	Split(w io.WriterTo) (chan<- Message, error)
}

WriterToSplitter takes a desired writerTo object and transforms the stream into a series of messages message parts which will be assembled on the following receiving end.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL