binding

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2020 License: Apache-2.0 Imports: 6 Imported by: 9

Documentation

Overview

Package binding defines interfaces for protocol bindings.

NOTE: Most applications that emit or consume events should use the ../client package, which provides a simpler API to the underlying binding.

The interfaces in this package provide extra encoding and protocol information to allow efficient forwarding and end-to-end reliable delivery between a Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events.

Protocol Bindings

A protocol binding implements at least Message, Sender and Receiver, and usually Encoder.

Receiver: receives protocol messages and wraps them to implement the Message interface.

Message: converts to protocol-neutral cloudevents.Event or structured event data. It also provides methods to manage acknowledgment for reliable delivery across bindings.

Sender: converts arbitrary Message implementations to a protocol-specific form and sends them. A protocol Sender should preserve the spec-version and structured/binary mode of sent messages as far as possible. This package provides generic Sender wrappers to pre-process messages into a specific spec-version or structured/binary mode when the user requires that.

Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported.

Intermediaries

Intermediaries can forward Messages from a Receiver to a Sender without knowledge of the underlying protocols. The Message interface allows structured messages to be forwarded without decoding and re-encoding. It also allows any Message to be fully decoded and examined as needed.

Example (Implementing)

Example of implementing a transport including a simple message type, and a transport sender and receiver.

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"io"
	"io/ioutil"

	"github.com/cloudevents/sdk-go/pkg/binding"
	"github.com/cloudevents/sdk-go/pkg/binding/event"
	"github.com/cloudevents/sdk-go/pkg/binding/format"
	ce "github.com/cloudevents/sdk-go/pkg/cloudevents"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)

// ExMessage is a json.RawMessage, a byte slice containing a JSON encoded event.
// It implements binding.MockStructuredMessage
//
// Note: a good binding implementation should provide an easy way to convert
// between the Message implementation and the "native" message format.
// In this case it's as simple as:
//
//	native = ExMessage(impl)
//	impl = json.RawMessage(native)
//
// For example in a HTTP binding it should be easy to convert between
// the HTTP binding.Message implementation and net/http.Request and
// Response types.  There are no interfaces for this conversion as it
// requires the use of unknown types.
type ExMessage json.RawMessage

func (m ExMessage) Structured(b binding.StructuredEncoder) error {
	return b.SetStructuredEvent(format.JSON, bytes.NewReader([]byte(m)))
}

func (m ExMessage) Binary(binding.BinaryEncoder) error {
	return binding.ErrNotBinary
}

func (m ExMessage) Event(b binding.EventEncoder) error {
	e := ce.Event{}
	err := json.Unmarshal(m, &e)
	if err != nil {
		return err
	}
	return b.SetEvent(e)
}

func (m ExMessage) Finish(error) error { return nil }

var _ binding.Message = (*ExMessage)(nil)

// ExSender sends by writing JSON encoded events to an io.Writer
// ExSender supports transcoding
// ExSender implements directly StructuredEncoder & EventEncoder
type ExSender struct {
	encoder      *json.Encoder
	transformers binding.TransformerFactories
}

func NewExSender(w io.Writer, factories ...binding.TransformerFactory) binding.Sender {
	return &ExSender{encoder: json.NewEncoder(w), transformers: factories}
}

func (s *ExSender) Send(ctx context.Context, m binding.Message) error {
	// Translate tries the various encodings, starting with provided root encoder factories.
	// If a sender doesn't support a specific encoding, a null root encoder factory could be provided.
	_, _, err := binding.Translate(
		m,
		func() binding.StructuredEncoder {
			return s
		},
		nil,
		func() binding.EventEncoder {
			return s
		},
		s.transformers)

	return err
}

func (s *ExSender) SetStructuredEvent(f format.Format, event io.Reader) error {
	if f == format.JSON {
		b, err := ioutil.ReadAll(event)
		if err != nil {
			return err
		}
		return s.encoder.Encode(json.RawMessage(b))
	} else {
		return binding.ErrNotStructured
	}
}

func (s *ExSender) SetEvent(event ce.Event) error {
	return s.encoder.Encode(&event)
}

func (s *ExSender) Close(context.Context) error { return nil }

var _ binding.Sender = (*ExSender)(nil)
var _ binding.StructuredEncoder = (*ExSender)(nil)
var _ binding.EventEncoder = (*ExSender)(nil)

// ExReceiver receives by reading JSON encoded events from an io.Reader
type ExReceiver struct{ decoder *json.Decoder }

func NewExReceiver(r io.Reader) binding.Receiver { return &ExReceiver{json.NewDecoder(r)} }

func (r *ExReceiver) Receive(context.Context) (binding.Message, error) {
	var rm json.RawMessage
	err := r.decoder.Decode(&rm) // This is just a byte copy.
	return ExMessage(rm), err
}
func (r *ExReceiver) Close(context.Context) error { return nil }

// NewExTransport returns a transport.Transport which is implemented by
// an ExSender and an ExReceiver
func NewExTransport(r io.Reader, w io.Writer) transport.Transport {
	return event.NewTransportAdapter(NewExSender(w), NewExReceiver(r))
}

// Example of implementing a transport including a simple message type,
// and a transport sender and receiver.
func main() {}
Example (Using)

This example shows how to use a transport in sender, receiver, and intermediary processes.

The sender and receiver use the client.Client API to send and receive messages. the transport. Only the intermediary example actually uses the transport APIs for efficiency and reliability in forwarding events.

package main

import (
	"context"
	"fmt"
	"io"
	"strconv"

	"github.com/cloudevents/sdk-go/pkg/cloudevents"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
)

const count = 3 // Example ends after this many events.

// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
	c, err := client.New(NewExTransport(nil, w))
	if err != nil {
		return err
	}
	for i := 0; i < count; i++ {
		e := cloudevents.New()
		e.SetType("example.com/event")
		e.SetSource("example.com/source")
		e.SetID(strconv.Itoa(i))
		if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil {
			return err
		}
		if _, _, err := c.Send(context.TODO(), e); err != nil {
			return err
		}
	}
	return nil
}

// The receiver uses the cloudevents.Client API, not the transport APIs directly.
func runReceiver(r io.Reader) error {
	i := 0
	process := func(e cloudevents.Event) error {
		fmt.Printf("%s\n", e)
		i++
		if i == count {
			return io.EOF
		}
		return nil
	}
	c, err := client.New(NewExTransport(r, nil))
	if err != nil {
		return err
	}
	return c.StartReceiver(context.TODO(), process)
}

// The intermediary receives events and forwards them to another
// process using ExReceiver and ExSender directly.
//
// By forwarding a transport.Message instead of a cloudevents.Event,
// it allows the transports to avoid un-necessary decoding of
// structured events, and to exchange delivery status between reliable
// transports. Even transports using different protocols can ensure
// reliable delivery.
func runIntermediary(r io.Reader, w io.WriteCloser) error {
	defer w.Close()
	for {
		receiver := NewExReceiver(r)
		sender := NewExSender(w)
		for i := 0; i < count; i++ {
			if m, err := receiver.Receive(context.TODO()); err != nil {
				return err
			} else if err := sender.Send(context.TODO(), m); err != nil {
				return err
			}
		}
	}
}

// This example shows how to use a transport in sender, receiver,
// and intermediary processes.
//
// The sender and receiver use the client.Client API to send and
// receive messages.  the transport.  Only the intermediary example
// actually uses the transport APIs for efficiency and reliability in
// forwarding events.
func main() {
	r1, w1 := io.Pipe() // The sender-to-intermediary pipe
	r2, w2 := io.Pipe() // The intermediary-to-receiver pipe

	done := make(chan error)
	go func() { done <- runReceiver(r2) }()
	go func() { done <- runIntermediary(r1, w2) }()
	go func() { done <- runSender(w1) }()
	for i := 0; i < 2; i++ {
		if err := <-done; err != nil && err != io.EOF {
			fmt.Println(err)
		}
	}

}
Output:

Validation: valid
Context Attributes,
  specversion: 1.0
  type: example.com/event
  source: example.com/source
  id: 0
Data,
  "hello 0"

Validation: valid
Context Attributes,
  specversion: 1.0
  type: example.com/event
  source: example.com/source
  id: 1
Data,
  "hello 1"

Validation: valid
Context Attributes,
  specversion: 1.0
  type: example.com/event
  source: example.com/source
  id: 2
Data,
  "hello 2"

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNotBinary = errors.New("message is not in binary mode")

ErrNotBinary returned by Message.Binary for non-binary messages.

View Source
var ErrNotStructured = errors.New("message is not in structured mode")

ErrNotStructured returned by Message.Structured for non-structured messages.

Functions

func Translate added in v0.10.1

func Translate(
	message Message,
	createRootStructuredEncoder func() StructuredEncoder,
	createRootBinaryEncoder func() BinaryEncoder,
	createRootEventEncoder func() EventEncoder,
	factories TransformerFactories,
) (bool, bool, error)

Invokes the encoders. createRootStructuredEncoder and createRootBinaryEncoder could be null if the protocol doesn't support it

Returns: * true, false, nil if message was structured and correctly translated to Event * false, true, nil if message was binary and correctly translated to Event * false, false, nil if message was event and correctly translated to Event * true, false, err if message was structured but error happened during translation * false, true, err if message was binary but error happened during translation * false, false, err if message was event but error happened during translation * false, false, err in other cases

Types

type BinaryEncoder added in v0.10.0

type BinaryEncoder interface {
	// SetData receives an io.Reader for the data attribute.
	// io.Reader could be empty, meaning that message payload is empty
	SetData(data io.Reader) error

	// Set a standard attribute.
	//
	// The value can either be the correct golang type for the attribute, or a canonical
	// string encoding. See package cloudevents/types
	SetAttribute(attribute spec.Attribute, value interface{}) error

	// Set an extension attribute.
	//
	// The value can either be the correct golang type for the attribute, or a canonical
	// string encoding. See package cloudevents/types
	SetExtension(name string, value interface{}) error
}

BinaryEncoder should generate a new representation of the event starting from a binary message.

Protocols that supports binary encoding should implement this interface to implement direct binary -> binary transfer.

type ChanReceiver added in v0.10.0

type ChanReceiver <-chan Message

ChanReceiver implements Receiver by receiving from a channel.

func (ChanReceiver) Close added in v0.10.0

func (r ChanReceiver) Close(ctx context.Context) error

func (ChanReceiver) Receive added in v0.10.0

func (r ChanReceiver) Receive(ctx context.Context) (Message, error)

type ChanSender added in v0.10.0

type ChanSender chan<- Message

ChanSender implements Sender by sending on a channel.

func (ChanSender) Close added in v0.10.0

func (s ChanSender) Close(ctx context.Context) (err error)

func (ChanSender) Send added in v0.10.0

func (s ChanSender) Send(ctx context.Context, m Message) (err error)

type Closer added in v0.10.0

type Closer interface {
	Close(ctx context.Context) error
}

Closer is the common interface for things that can be closed

type EventEncoder added in v0.11.0

type EventEncoder interface {
	SetEvent(ce.Event) error
}

EventEncoder should generate a new representation of the event starting from an event message.

Every protocol must implement this interface. If a protocol supports both structured and binary encoding, two EventEncoder implementations could be provided

type ExactlyOnceMessage

type ExactlyOnceMessage interface {
	Message

	// Received is called by a forwarding QoS2 Sender when it gets
	// acknowledgment of receipt (e.g. AMQP 'accept' or MQTT PUBREC)
	//
	// The receiver must call settle(nil) when it get's the ack-of-ack
	// (e.g. AMQP 'settle' or MQTT PUBCOMP) or settle(err) if the
	// transfer fails.
	//
	// Finally the Sender calls Finish() to indicate the message can be
	// discarded.
	//
	// If sending fails, or if the sender does not support QoS 2, then
	// Finish() may be called without any call to Received()
	Received(settle func(error))
}

ExactlyOnceMessage is implemented by received Messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.

type Message

type Message interface {
	// Structured transfers a structured-mode event to a StructuredEncoder.
	// Returns ErrNotStructured if message is not in structured mode.
	//
	// Returns a different err if something wrong happened while trying to read the structured event
	// In this case, the caller must Finish the message with appropriate error
	//
	// This allows Senders to avoid re-encoding messages that are
	// already in suitable structured form.
	Structured(StructuredEncoder) error

	// Binary transfers a binary-mode event to an BinaryEncoder.
	// Returns ErrNotBinary if message is not in binary mode.
	//
	// Returns a different err if something wrong happened while trying to read the binary event
	// In this case, the caller must Finish the message with appropriate error
	//
	// Allows Senders to forward a binary message without allocating an
	// intermediate Event.
	Binary(BinaryEncoder) error

	// Event transfers an event to an EventEncoder.
	//
	// A message implementation should always provide a conversion to Event data structure.
	// The implementor can use binding.ToEvent for a straightforward implementation, starting
	// from binary or structured representation
	//
	// Returns an err if something wrong happened while trying to read the event
	// In this case, the caller must Finish the message with appropriate error
	//
	// Useful when the Sender can't implement a direct binary/structured to binary/structured conversion,
	// So the intermediate Event representation is required
	Event(EventEncoder) error

	// Finish *must* be called when message from a Receiver can be forgotten by
	// the receiver. Sender.Send() calls Finish() when the message is sent.  A QoS
	// 1 sender should not call Finish() until it gets an acknowledgment of
	// receipt on the underlying transport.  For QoS 2 see ExactlyOnceMessage.
	//
	// Passing a non-nil err indicates sending or processing failed.
	// A non-nil return indicates that the message was not accepted
	// by the receivers peer.
	Finish(error) error
}

Message is the interface to a binding-specific message containing an event.

Reliable Delivery

There are 3 reliable qualities of service for messages:

0/at-most-once/unreliable: messages can be dropped silently.

1/at-least-once: messages are not dropped without signaling an error to the sender, but they may be duplicated in the event of a re-send.

2/exactly-once: messages are never dropped (without error) or duplicated, as long as both sending and receiving ends maintain some binding-specific delivery state. Whether this is persisted depends on the configuration of the binding implementations.

The Message interface supports QoS 0 and 1, the ExactlyOnceMessage interface supports QoS 2

The Structured and Binary methods provide optional optimized transfer of an event to a Sender, they may not be implemented by all Message instances. A Sender should try each method of interest and fall back to Event(EventEncoder) if none are supported.

func WithFinish added in v0.10.0

func WithFinish(m Message, finish func(error)) Message

WithFinish returns a wrapper for m that calls finish() and m.Finish() in its Finish(). Allows code to be notified when a message is Finished.

type ReceiveCloser added in v0.10.0

type ReceiveCloser interface {
	Receiver
	Closer
}

ReceiveCloser is a Receiver that can be closed.

type Receiver

type Receiver interface {
	// Receive blocks till a message is received or ctx expires.
	//
	// A non-nil error means the receiver is closed.
	// io.EOF means it closed cleanly, any other value indicates an error.
	Receive(ctx context.Context) (Message, error)
}

Receiver receives messages.

type Requester added in v0.10.0

type Requester interface {
	// Request sends m like Sender.Send() but also arranges to receive a response.
	// The returned Receiver is used to receive the response.
	Request(ctx context.Context, m Message) (Receiver, error)
}

Requester sends a message and receives a response

Optional interface that may be implemented by protocols that support request/response correlation.

type SendCloser added in v0.10.0

type SendCloser interface {
	Sender
	Closer
}

SendCloser is a Sender that can be closed.

type Sender

type Sender interface {
	// Send a message.
	//
	// Send returns when the "outbound" message has been sent. The Sender may
	// still be expecting acknowledgment or holding other state for the message.
	//
	// m.Finish() is called when sending is finished: expected acknowledgments (or
	// errors) have been received, the Sender is no longer holding any state for
	// the message. m.Finish() may be called during or after Send().
	//
	// To support optimized forwading of structured-mode messages, Send()
	// should use the encoding returned by m.Structured() if there is one.
	// Otherwise m.Event() can be encoded as per the binding's rules.
	Send(ctx context.Context, m Message) error
}

Sender sends messages.

type StructuredEncoder added in v0.11.0

type StructuredEncoder interface {
	// Event receives an io.Reader for the whole event.
	SetStructuredEvent(format format.Format, event io.Reader) error
}

StructuredEncoder should generate a new representation of the event starting from a structured message.

Protocols that supports structured encoding should implement this interface to implement direct structured -> structured transfer.

type TransformerFactories added in v0.11.0

type TransformerFactories []TransformerFactory

Utility type alias to manage multiple TransformerFactory

func (TransformerFactories) BinaryTransformer added in v0.11.0

func (t TransformerFactories) BinaryTransformer(encoder BinaryEncoder) BinaryEncoder

func (TransformerFactories) EventTransformer added in v0.11.0

func (t TransformerFactories) EventTransformer(encoder EventEncoder) EventEncoder

func (TransformerFactories) StructuredTransformer added in v0.11.0

func (t TransformerFactories) StructuredTransformer(encoder StructuredEncoder) StructuredEncoder

type TransformerFactory added in v0.11.0

type TransformerFactory interface {
	// Can return nil if the transformation doesn't support structured encoding directly
	StructuredTransformer(encoder StructuredEncoder) StructuredEncoder

	// Can return nil if the transformation doesn't support binary encoding directly
	BinaryTransformer(encoder BinaryEncoder) BinaryEncoder

	// Cannot return nil
	EventTransformer(encoder EventEncoder) EventEncoder
}

Implements a transformation process while transferring the event from the Message implementation to the provided encoder

A transformer could optionally not provide an implementation for binary and/or structured encodings, returning nil to the respective factory method.

Directories

Path Synopsis
Package format formats structured events.
Package format formats structured events.
Package spec provides spec-version metadata.
Package spec provides spec-version metadata.
Package test contains test data and generic tests for testing bindings.
Package test contains test data and generic tests for testing bindings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL