wirenettransport

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2020 License: MIT Imports: 8 Imported by: 0

README

go-wirenet-gokit

Transport layer for go-kit

See examples in tests files at the links:

StreamEndpoint, Endpoint, TestService

Examples (Key points)

Stream Endpoint

transport/wirenet.go

// server side
func MakeWirenetHandlers(wire wirenet.Wire, endpoints Set) {
	wire.Stream("uploadFile", wirenettransport.NewStreamServer(
		endpoints.UploadFileEndpoint,
		uploadFileServerSideCodec,
		[]wirenettransport.StreamServerOption{}...,
	).Handle)
}

transport/wirenet.go

// client side
func MakeWirenetClient(wire wirenet.Wire) Service {
	return &Set{
		UploadFileEndpoint: wirenettransport.NewStreamClient(
			wire,
			"uploadFile",
			uploadFileClientSideCodec,
			[]wirenettransport.StreamClientOption{}...,
		).Endpoint(),
	}
}

transport/wirenet_codec.go

func uploadFileServerSideCodec(_ context.Context, s wirenet.Stream) (interface{}, error) {
	defer s.Close()

	w := s.Writer()
	r := s.Reader()

	// read fileInfo
	var req UploadFileRequest
	if err := json.NewDecoder(r).Decode(&req); err != nil {
		return nil, err
	}
	r.Close()

	// read data
	fp := filepath.Join(os.TempDir(), req.Name)
	file, err := os.Create(fp)
	if err != nil {
		return nil, err
	}
	defer file.Close()
	_, err = s.WriteTo(file)

	resp := &UploadFileResponse{
		Err: err,
	}

	// write data
	if err := json.NewEncoder(w).Encode(resp); err != nil {
		return nil, err
	}
	return resp, nil
}

transport/wirenet_codec.go

func uploadFileClientSideCodec(_ context.Context, request interface{}, s wirenet.Stream) (interface{}, error) {
	req := request.(UploadFileRequest)
	file, err := os.Open(req.Filepath)
	if err != nil {
		return nil, err
	}
	defer file.Close()
	defer s.Close()

	w := s.Writer()
	r := s.Reader()

	// write fileInfo
	if err := json.NewEncoder(w).Encode(&req); err != nil {
		return nil, err
	}
	w.Close()

	// write data
	if _, err = s.ReadFrom(file); err != nil {
		return nil, err
	}

	// read data
	var resp UploadFileResponse
	if err := json.NewDecoder(r).Decode(&resp); err != nil {
		return err, nil
	}
	r.Close()

	return resp, nil
}

cmd/client.go

client := MakeWirenetClient(wire)
...
sess := sessionHub.findSessionByUser("123")
...
// Sets the current wirenet session id
ctxWithSess := wirenettransport.InjectSessionID(sess.ID(), ctx)
err = client.UploadFile(ctxWithSess, "./test/testdata/data.db", 1024, "data.db")

cmd/server.go

svc := NewService()
endpoints := NewEndpointSet(svc)
MakeWirenetHandlers(wire, endpoints)

Endpoint

transport/wirenet.go

// server side
func MakeWirenetHandlers(wire wirenet.Wire, endpoints Set) {
	options := make([]wirenettransport.ServerOption, 0)
	wire.Stream("updateBalance", wirenettransport.NewServer(
		endpoints.UpdateBalanceEndpoint,
		decodeWirenetUpdateBalanceRequest,
		encodeWirenetUpdateBalanceResponse,
		options...,
	).Handle)
}

transport/wirenet.go

// client side
func MakeWirenetClient(wire wirenet.Wire) Service {
	options := make([]wirenettransport.ClientOption, 0)
	return &Set{
		UpdateBalanceEndpoint: wirenettransport.NewClient(
			wire,
			"updateBalance",
			encodeWirenetUpdateBalanceRequest,
			decodeWirenetUpdateBalanceResponse,
			options...,
		).Endpoint(),
	}
}

transport/wirenet_encode_decode.go

func decodeWirenetUpdateBalanceRequest(_ context.Context, r io.ReadCloser) (request interface{}, err error) {
	defer r.Close()
	var req UpdateBalanceRequest
	err = json.NewDecoder(r).Decode(&req)
	return req, err
}

func encodeWirenetUpdateBalanceRequest(_ context.Context, request interface{}, w io.WriteCloser) error {
	defer w.Close()
	return json.NewEncoder(w).Encode(&request)
}

func decodeWirenetUpdateBalanceResponse(_ context.Context, r io.ReadCloser) (response interface{}, err error) {
	defer r.Close()
	var resp UpdateBalanceResponse
	err = json.NewDecoder(r).Decode(&resp)
	return resp, err
}

func encodeWirenetUpdateBalanceResponse(_ context.Context, response interface{}, w io.WriteCloser) error {
	defer w.Close()
	return json.NewEncoder(w).Encode(response)
}

cmd/server.go

// go-kit
svc := NewService()
endpoints := NewEndpointSet(svc)
MakeWirenetHandlers(wire, endpoints)

cmd/client.go

client := MakeWirenetClient(wire)
...
sess := sessionHub.findSessionByUser("123")
...
// Sets the current wirenet session id
ctxWithSess := wirenettransport.InjectSessionID(sess.ID(), ctx)
sum, err := client.UpdateBalance(ctxWithSess, 1, 4)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSessionIDNotAssigned returned when no session is defined in context.
	ErrSessionIDNotDefined = errors.New("session id not defined")
)

Functions

func InjectSessionID

func InjectSessionID(sid uuid.UUID, ctx context.Context) context.Context

InjectSessionID returns a new context with session id.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps a wirenet connection and provides a method that implements endpoint.Endpoint.

func NewClient

func NewClient(
	wire wirenet.Wire,
	streamName string,
	enc EncodeRequestFunc,
	dec DecodeResponseFunc,
	options ...ClientOption,
) *Client

NewClient constructs a usable Client for a single remote endpoint.

func (Client) Endpoint

func (c Client) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint that will invoke the wirenet specified by the client.

type ClientCodec

type ClientCodec func(context.Context, interface{}, wirenet.Stream) (interface{}, error)

ClientCodec encodes and decodes the byte stream in the user-domain.

type ClientOption

type ClientOption func(*Client)

ClientOption sets an optional parameter for clients.

func ClientBefore

func ClientBefore(before ...ClientRequestFunc) ClientOption

ClientBefore sets the RequestFuncs that are applied to the outgoing request before it's invoked.

type ClientRequestFunc

type ClientRequestFunc func(context.Context) context.Context

ClientRequestFunc can take information from the context and use it to build the stream.

func SetSessionID

func SetSessionID(sid uuid.UUID) ClientRequestFunc

SetSessionID returns a ClientRequestFunc that sets the session id.

type ContextKeySessionID

type ContextKeySessionID struct{}

ContextKeySessionID used to inject into session context.

type DecodeRequestFunc

type DecodeRequestFunc func(context.Context, io.ReadCloser) (request interface{}, err error)

DecodeRequestFunc extracts a user-domain request object from a wirenet request.

type DecodeResponseFunc

type DecodeResponseFunc func(context.Context, io.ReadCloser) (response interface{}, err error)

DecodeResponseFunc extracts a user-domain response object from a wirenet response object.

type EncodeRequestFunc

type EncodeRequestFunc func(context.Context, interface{}, io.WriteCloser) error

EncodeRequestFunc encodes the passed request object into the wirenet request object.

type EncodeResponseFunc

type EncodeResponseFunc func(context.Context, interface{}, io.WriteCloser) error

EncodeResponseFunc encodes the passed response object to the wirenet response message.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server wraps an endpoint and implements wirenet.Handler.

func NewServer

func NewServer(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	enc EncodeResponseFunc,
	options ...ServerOption,
) *Server

NewStreamServer constructs a new server, which implements wraps the provided endpoint and implements the Handler interface.

func (Server) Handle

func (s Server) Handle(ctx context.Context, stream wirenet.Stream)

Handle implements the Handler interface.

type ServerCodec

type ServerCodec func(context.Context, wirenet.Stream) (interface{}, error)

ServerCodec encodes and decodes the byte stream in the user-domain.

type ServerOption

type ServerOption func(*Server)

ServerOption sets an optional parameter for servers.

func ServerBefore

func ServerBefore(before ...ServerRequestFunc) ServerOption

ServerBefore functions are executed on the stream request object before the request is decoded.

func ServerErrorHandler

func ServerErrorHandler(errorHandler transport.ErrorHandler) ServerOption

ServerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure.

type ServerRequestFunc

type ServerRequestFunc func(context.Context) context.Context

ServerRequestFunc can take information from the context and use it to the handle stream.

type StreamClient

type StreamClient struct {
	// contains filtered or unexported fields
}

StreamClient wraps a wirenet connection and provides a method that implements endpoint.Endpoint.

func NewStreamClient

func NewStreamClient(
	wire wirenet.Wire,
	streamName string,
	codec ClientCodec,
	options ...StreamClientOption,
) *StreamClient

NewStreamClient constructs a usable StreamClient for a single remote endpoint.

func (StreamClient) Endpoint

func (c StreamClient) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint that will invoke the wirenet specified by the client.

type StreamClientOption

type StreamClientOption func(*StreamClient)

StreamClientOption sets an optional parameter for clients.

func StreamClientBefore

func StreamClientBefore(before ...ClientRequestFunc) StreamClientOption

StreamClientBefore sets the RequestFuncs that are applied to the outgoing request before it's invoked.

type StreamServer

type StreamServer struct {
	// contains filtered or unexported fields
}

StreamServer wraps an endpoint and implements wirenet.Handler.

func NewStreamServer

func NewStreamServer(
	e endpoint.Endpoint,
	codec ServerCodec,
	options ...StreamServerOption,
) *StreamServer

NewStreamServer constructs a new server, which implements wraps the provided endpoint and implements the Handler interface.

func (StreamServer) Handle

func (s StreamServer) Handle(ctx context.Context, stream wirenet.Stream)

Handle implements the Handler interface.

type StreamServerOption

type StreamServerOption func(*StreamServer)

StreamServerOption sets an optional parameter for servers.

func StreamServerBefore

func StreamServerBefore(before ...ServerRequestFunc) StreamServerOption

StreamServerBefore functions are executed on the stream request object before the request is decoded.

func StreamServerErrorHandler

func StreamServerErrorHandler(errorHandler transport.ErrorHandler) StreamServerOption

StreamServerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL