Documentation ¶
Overview ¶
Package qp implements the "QP" protocol found at qp.github.io
The QP protocol allows for agnostic communication with any underlying message queue system. By using QP, you remove technology-dependent code from your application, while gaining additional functionality, such as QP's pipeline concept.
Publish and Subscribe ¶
The pub/sub model is achieved by using the Publisher and Subscriber types, which expose Publish and Subscribe methods respectively.
Request and Response ¶
Making requests and getting back a response from a pipeline of handlers is handled by using the Requester type, which offers the Issue method.
Building services that respond to requests can be achieved by using the Responder type, which exposes the Handle method.
Name and instance ID ¶
Most types require a name and instance ID. The name describes the type of capability being provided. Instance ID allows for many things of the same type to coexist. All name and instance ID combinations must be unique within a system.
Index ¶
- Variables
- func Service(name, instanceID string, codec Codec, transport DirectTransport, ...) error
- func ServiceFunc(name, instanceID string, codec Codec, transport DirectTransport, ...) error
- func ServiceLogger(name, instanceID string, codec Codec, transport DirectTransport, ...) error
- func ServiceLoggerFunc(name, instanceID string, codec Codec, transport DirectTransport, ...) error
- type Codec
- type DirectTransport
- type Event
- type EventHandler
- type EventHandlerFunc
- type Future
- type Handler
- type HandlerFunc
- type Message
- type PubSubTransport
- type Publisher
- type RequestID
- type Requester
- type Responder
- type Signal
- type Subscriber
- type Transaction
- type TransactionHandler
- type TransactionHandlerFunc
Constants ¶
This section is empty.
Variables ¶
var ErrNotRunning = errors.New("transport is not running")
ErrNotRunning is returned when an method is called on a transport that is not running.
var ErrRunning = errors.New("transport is running")
ErrRunning is returned when an method is called on a transport that is running.
var ErrTimeout = errors.New("timed out")
ErrTimeout represents situations when timeouts have occurred.
var JSON = NewCodec(func(object interface{}) ([]byte, error) { return json.Marshal(object) }, func(data []byte, to interface{}) error { return json.Unmarshal(data, to) })
JSON is a Codec that talks JSON.
Functions ¶
func Service ¶
func Service(name, instanceID string, codec Codec, transport DirectTransport, handler TransactionHandler) error
Service is an endpoint that automatically subscribes to its own name, allowing other endpoints to issue requests to it. Multiple services with the same name will automatically draw upon the same channel, creating implicit load balancing.
func ServiceFunc ¶
func ServiceFunc(name, instanceID string, codec Codec, transport DirectTransport, handler TransactionHandlerFunc) error
ServiceFunc creates a service with a TransactionHandlerFunc rather than a TransactionHandler.
func ServiceLogger ¶
func ServiceLogger(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger, handler TransactionHandler) error
ServiceLogger does the same thing as Service but also uses the specified Logger to log to.
func ServiceLoggerFunc ¶
func ServiceLoggerFunc(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger, handler TransactionHandlerFunc) error
ServiceLoggerFunc does the same thing ServiceLogger does but takes a TransactionHandlerFunc rather than a TransactionHandler.
Types ¶
type Codec ¶
type Codec interface { // Marshal takes an object and creates a byte slice representation // of the object in the underlying data format. Marshal(object interface{}) ([]byte, error) // Unmarshal takes a bytes slice of data in the underlying data format // and decodes it into the provided object Unmarshal(data []byte, to interface{}) error }
Codec defines types that can marshal and unmarshal data to and from bytes.
type DirectTransport ¶
type DirectTransport interface { start.StartStopper // Send sends data on the channel. Send(channel string, data []byte) error // OnMessage binds the handler to the specified channel. // Only one handler can be associated with a given channel. // Multiple calls to OnMessage wiht the same channel will replace the previous handler. OnMessage(channel string, handler Handler) error }
DirectTransport represents a transport capable of providing request/response capabilities.
type Event ¶
type Event struct { // From is the instance ID of the sender. From string `json:"from"` // Data is the payload of the event. Data interface{} `json:"data"` }
Event defines all the fields and information included as part of a Event to a request.
type EventHandler ¶
type EventHandler interface {
Handle(*Event)
}
EventHandler represents types capable of handling Requests.
type EventHandlerFunc ¶
type EventHandlerFunc func(*Event)
EventHandlerFunc represents functions capable of handling Requests.
func (EventHandlerFunc) Handle ¶
func (f EventHandlerFunc) Handle(r *Event)
Handle calls the EventHandlerFunc in order to handle the specific Event.
type Future ¶
type Future struct {
// contains filtered or unexported fields
}
Future implements a future for a response object It allows execution to continue until the response object is requested from this object, at which point it blocks and waits for the response to come back.
func (*Future) Response ¶
func (r *Future) Response(timeout time.Duration) (*Transaction, error)
Response uses a future mechanism to retrieve the response. Execution continues asynchronously until this method is called, at which point execution blocks until the Response object is available, or if the timeout is reached. If the Response times out, nil is returned.
type Handler ¶
type Handler interface {
Handle(msg *Message)
}
Handler represents types capable of handling messages from the transports.
type HandlerFunc ¶
type HandlerFunc func(msg *Message)
HandlerFunc represents functions capable of handling messages.
type Message ¶
type Message struct { // The channel the Message came from. Source string // The data of the message. Data []byte }
Message represents a single message of data and its source.
type PubSubTransport ¶
type PubSubTransport interface { start.StartStopper // Publish publishes data on the specified channel. Publish(channel string, data []byte) error // Subscribe binds the handler to the specified channel. // Only one handler can be associated with a given channel. // Multiple calls to Subscribe with the same channel will replace the previous handler. Subscribe(channel string, handler Handler) error }
PubSubTransport represents a transport capable of providing publish/subscribe capabilities.
type Publisher ¶
type Publisher interface { // Publish publishes the object on the specified channel. Publish(channel string, obj interface{}) error }
Publisher represents types capable of publishing events.
func NewPublisher ¶
func NewPublisher(name, instanceID string, codec Codec, transport PubSubTransport) Publisher
NewPublisher makes a new publisher capable of Publishing events.
type Requester ¶
type Requester interface { // Issue issues the request and returns a Future from which you can // get the response. // The pipeline may be one or more endpoints. If it is more than one, each will receive // the message, in order, and have an opportunity to mutate it before it is dispatched // to the next endpoint in the pipeline. // The provided object will be serialized and send as the "data" field in the message. Issue(pipeline []string, obj interface{}) (*Future, error) }
Requester represents a type capable of issuing requests and getting responses.
func NewRequester ¶
func NewRequester(name, instanceID string, codec Codec, transport DirectTransport) (Requester, error)
NewRequester makes a new object capable of making requests and handling responses.
func NewRequesterLogger ¶
func NewRequesterLogger(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger) (Requester, error)
NewRequesterLogger makes a new object capable of making requests and handling responses with logs going to the specified Logger.
type Responder ¶
type Responder interface { // Handle binds a TransactionHandler to the specified channel. Handle(channel string, handler TransactionHandler) error // HandleFunc binds the specified function to the specified channel. HandleFunc(channel string, f TransactionHandlerFunc) error }
Responder represents types capable of responding to requests.
func NewResponder ¶
func NewResponder(name, instanceID string, codec Codec, transport DirectTransport) Responder
NewResponder makes a new object capable of responding to requests.
func NewResponderLogger ¶
func NewResponderLogger(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger) Responder
NewResponderLogger makes a new object capable of responding to requests, which will log errors to the specified Logger.
type Signal ¶
type Signal struct{}
Signal is an empty struct type useful for signalling down channels.
type Subscriber ¶
type Subscriber interface { // Subscribe binds the handler to the specified channel. Subscribe(channel string, handler EventHandler) error // SubscribeFunc binds the EventHandlerFunc to the specified channel. SubscribeFunc(channel string, fn EventHandlerFunc) error }
Subscriber represents types capable of subscribing to events.
func NewSubscriber ¶
func NewSubscriber(codec Codec, transport PubSubTransport) Subscriber
NewSubscriber creates a Subscriber object capable of subscribing to events.
func NewSubscriberLogger ¶
func NewSubscriberLogger(codec Codec, transport PubSubTransport, logger slog.Logger) Subscriber
NewSubscriberLogger creates a Subscriber object capable of subscribing to events, while logging errors to the specified logger.
type Transaction ¶
type Transaction struct { // To is an array of destination addresses To []string `json:"to"` // From is an array of addresses encountered thus far From []string `json:"from"` // ID is a number identifying this message ID RequestID `json:"id"` // Data is an arbitrary data payload Data interface{} `json:"data"` }
Transaction defines all the fields and information in the standard qp request object.
func (*Transaction) Abort ¶
func (r *Transaction) Abort()
Abort clears the To slice indicating that the Transaction should be sent back to the originator.
type TransactionHandler ¶
type TransactionHandler interface {
Handle(req *Transaction) *Transaction
}
TransactionHandler represents types capable of handling Requests.
type TransactionHandlerFunc ¶
type TransactionHandlerFunc func(r *Transaction) *Transaction
TransactionHandlerFunc represents functions capable of handling Requests.
func (TransactionHandlerFunc) Handle ¶
func (f TransactionHandlerFunc) Handle(r *Transaction) *Transaction
Handle calls the TransactionHandlerFunc in order to handle the specific Transaction.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
codecs
|
|
Package example is a real multi-service implementation with HTTP endpoint (server) that uses Redis transports.
|
Package example is a real multi-service implementation with HTTP endpoint (server) that uses Redis transports. |
Package inproc provides transports for in-process operations.
|
Package inproc provides transports for in-process operations. |
Package redis implements the various qp transports.
|
Package redis implements the various qp transports. |
Package templates contains template code for writing QP components, such as codecs and transports.
|
Package templates contains template code for writing QP components, such as codecs and transports. |