connector

package
v0.0.0-...-83e654d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2017 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultWorkers = 1
	SubstitutePath = "/substitute/"
)

Variables

View Source
var (
	TopicParam     = "topic"
	ConnectorParam = "connector"
)
View Source
var (
	ErrSubscriberExists       = errors.New("Subscriber exists.")
	ErrSubscriberDoesNotExist = errors.New("Subscriber does not exist.")

	ErrRouteChannelClosed = errors.New("Subscriber route channel has been closed.")
)

Functions

func GenerateKey

func GenerateKey(topic string, params map[string]string) string

Types

type Config

type Config struct {
	Name       string
	Schema     string
	Prefix     string
	URLPattern string
	Workers    int
}

type Connector

func NewConnector

func NewConnector(router router.Router, sender Sender, config Config) (Connector, error)

type Manager

type Manager interface {
	Load() error
	List() []Subscriber
	Filter(map[string]string) []Subscriber
	Find(string) Subscriber
	Exists(string) bool
	Create(protocol.Path, router.RouteParams) (Subscriber, error)
	Add(Subscriber) error
	Update(Subscriber) error
	Remove(Subscriber) error
}

func NewManager

func NewManager(schema string, kvstore kvstore.KVStore) Manager

type Metadata

type Metadata struct {
	Latency time.Duration
}

type Queue

type Queue interface {
	ResponseHandlerSetter
	SenderSetter

	Start() error
	Push(request Request) error
	Stop() error
}

Queue is an interface modeling a task-queue (it is started and more Requests can be pushed to it, and finally it is stopped after all requests are handled).

func NewQueue

func NewQueue(sender Sender, nWorkers int) Queue

NewQueue returns a new Queue (not started).

type Request

type Request interface {
	Subscriber() Subscriber
	Message() *protocol.Message
}

func NewRequest

func NewRequest(s Subscriber, m *protocol.Message) Request

type ResponseHandler

type ResponseHandler interface {
	// HandleResponse handles the response+error (returned by a Sender)
	HandleResponse(Request, interface{}, *Metadata, error) error
}

type ResponseHandlerSetter

type ResponseHandlerSetter interface {
	ResponseHandler() ResponseHandler
	SetResponseHandler(ResponseHandler)
}

type ResponsiveConnector

type ResponsiveConnector interface {
	Connector
	ResponseHandler
}

type Runner

type Runner interface {
	Run(Subscriber)
}

type Sender

type Sender interface {
	// Send takes a Request and returns the response or error
	Send(Request) (interface{}, error)
}

type SenderSetter

type SenderSetter interface {
	Sender() Sender
	SetSender(Sender)
}

type Subscriber

type Subscriber interface {
	// Reset will recreate the route inside the subscribe with the information stored
	// in the subscriber data
	Reset() error
	Key() string
	Route() *router.Route
	Filter(map[string]string) bool
	Loop(context.Context, Queue) error
	SetLastID(ID uint64)
	Cancel()
	Encode() ([]byte, error)
}

func NewSubscriber

func NewSubscriber(topic protocol.Path, params router.RouteParams, lastID uint64) Subscriber

func NewSubscriberFromData

func NewSubscriberFromData(data SubscriberData) Subscriber

func NewSubscriberFromJSON

func NewSubscriberFromJSON(data []byte) (Subscriber, error)

type SubscriberData

type SubscriberData struct {
	Topic  protocol.Path
	Params router.RouteParams
	LastID uint64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL