pbcloudevents

package module
v0.0.0-...-9c63e57 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

README

cloudevents-go-protobuf

NOTE: This code is functional but meant for demonstration until it's decided whether or not this will be contributed to the Go SDK directly or maintained as a 3rd party module. If this ends up forever being a 3rd party module then I will delete this repo and create another under my company's Github org.

Protobuf bindings for CloudEvents

This package implements the interfaces required to add protobuf support to the v2 CloudEvents SDK.

Usage

Registering The Extensions

CloudEvents uses a global registry to map content types to extensions. The easiest way to do this is to import the init modules from this package:

import _ "github.com/kconwayinvision/cloudevents-go-protobuf/init"

Alternatively, if you want finer grained control over the timing then you can put the equivalent code in your project somewhere:

package main

import (
	"github.com/cloudevents/sdk-go/v2/binding/format"
	"github.com/cloudevents/sdk-go/v2/event/datacodec"
	pbcloudevents "github.com/kconwayinvision/cloudevents-go-protobuf"
)

func main() {
    // ...
	format.Add(pbcloudevents.FormatProtobuf{})
	format.Add(pbcloudevents.FormatProtobufJSON{})
	datacodec.AddDecoder(pbcloudevents.ContentTypeProtobuf, pbcloudevents.Decode)
    datacodec.AddEncoder(pbcloudevents.ContentTypeProtobuf, pbcloudevents.Encode)
    // ...
}
Using With HTTP Transport
HTTP Client
package main

import (
	cloudevents "github.com/cloudevents/sdk-go/v2"
    cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
    _ "github.com/kconwayinvision/cloudevents-go-protobuf/init"
    pbcloudevents "github.com/kconwayinvision/cloudevents-go-protobuf"
    mypb "github.com/someone/my-pb-repo"
)

func main() {
    target := "http://localhost"
    p, err := cehttp.New(
        cehttp.WithTarget(target),
        cehttp.WithHeader("Content-Type", pbcloudevents.MediaTypeProtobuf),
        // Optionally swap for the JSON protobuf encoding for human readability.
        // cehttp.WithHeader("Content-Type", pbcloudevents.MediaTypeProtobufJSON),
    )
	if err != nil {
		log.Fatalf("Failed to create protocol, %v", err)
    }
    c, err := cloudevents.NewClient(
        p,
		cloudevents.WithTimeNow(),
	)
	if err != nil {
		log.Fatalf("Failed to create client, %v", err)
    }
    payload := &mypb.Greeting{
        Greeting: "Hello you!"
    }
    event := cloudevents.NewEvent()
    event.SetID("xyz")
    event.SetType("greeting.v1")
    event.SetSource("main")
    if err = event.SetData(pbcloudevents.ContentTypeProtobuf, payload); err != nil {
        log.Fatalf("failed to set protobuf data, %v", err)
    }
    result := c.Send(context.Background(), event)
    if cloudevents.IsUndelivered(result) {
        log.Printf("Failed to deliver request: %v", result)
    } else {
        // Event was delivered, but possibly not accepted and without a response.
        log.Printf("Event delivered at %s, Acknowledged==%t ", time.Now(), cloudevents.IsACK(result))
        var httpResult *cehttp.Result
        if cloudevents.ResultAs(result, &httpResult) {
            log.Printf("Response status code %d", httpResult.StatusCode)
        }
    }
}
HTTP Server
package main

import (
	cloudevents "github.com/cloudevents/sdk-go/v2"
    cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
    _ "github.com/kconwayinvision/cloudevents-go-protobuf/init"
    pbcloudevents "github.com/kconwayinvision/cloudevents-go-protobuf"
    mypb "github.com/someone/my-pb-repo"
)

func main() {
    target := "http://localhost"
    p, err := cehttp.New(
        cehttp.WithPath("/events"),
        cehttp.WithPort(8080),
        cehttp.WithShutdownTimeout(30*time.Second),
        // The server automatically decodes using the incoming Content-Type
        // headers. No special handling is needed to enable protobuf support
        // other than registering the extensions.
    )
	if err != nil {
		log.Fatalf("Failed to create protocol, %v", err)
    }
    c, err := cloudevents.NewClient(p)
	if err != nil {
		log.Fatalf("Failed to create client, %v", err)
    }
    log.Printf("will listen on :8080\n")
	log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), receive))
}

func receive(ctx context.Context, event cloudevents.Event) error {
    expected := &mypb.Greeting{}
    if err := event.DataAs(expected); err != nil {
        return fmt.Errorf("did not get expected greeting event. got %s", event.DataSchema())
    }
    return nil
}

Documentation

Index

Constants

View Source
const (
	// MediaTypeProtobuf is used to indicate that the event is in protobuf
	// encoding. Using a non-standard name to guard against any breaking changes
	// in the official protobuf schema before it becomes 1.0. The short SHA
	// identifies the commit where protobuf was added to the specification.
	MediaTypeProtobuf = "application/cloudevents+protobuf-ad5f142"
	// MediaTypeProtobufJSON is a non-standard encoding that uses the protobuf
	// JSON encoding rather than binary. This is useful for maintaining human
	// readability of messages.
	MediaTypeProtobufJSON = "application/cloudevents+protobuf+json-ad5f142"
)
View Source
const (
	// ContentTypeProtobuf indicates that the data attribute is a protobuf
	// message.
	ContentTypeProtobuf = "application/protobuf-ad5f142"
)

Variables

This section is empty.

Functions

func Decode

func Decode(ctx context.Context, in []byte, out interface{}) error

Decode converts an encoded protobuf message back into the message type (out). The message must be type/wire compatible with whatever was given to Encode. This method will assume that the encoded value was encoded by this package which wraps the content in an any type but will fall back to directly unmarshalling the bytes into the message if that fails.

func Encode

func Encode(ctx context.Context, in interface{}) ([]byte, error)

Encode a protobuf message to bytes. This implementations wraps the given message in an any type before marshaling. This is done because using the any type is a requirement when using the protobuf message format but the type information required to build an any is only available here, before encoding. Wrapping the message in an any allows the protobuf format to detect the encoding and unmarshal the bytes without knowing the underlying type information. The Decode method provided by this package does the inverse so that most users of the SDK are unaware that this happens.

Like the official datacodec implementations, this one returns the given value as-is if it is already a byte slice. Additionally, if the value is an any message already then we preserve it as is since this is the equivalent case for protobuf where the value is already encoded.

Types

type FormatProtobuf

type FormatProtobuf struct{}

func (FormatProtobuf) Marshal

func (FormatProtobuf) Marshal(e *event.Event) ([]byte, error)

func (FormatProtobuf) MediaType

func (FormatProtobuf) MediaType() string

func (FormatProtobuf) Unmarshal

func (FormatProtobuf) Unmarshal(b []byte, e *event.Event) error

type FormatProtobufJSON

type FormatProtobufJSON struct{}

func (FormatProtobufJSON) Marshal

func (FormatProtobufJSON) Marshal(e *event.Event) ([]byte, error)

func (FormatProtobufJSON) MediaType

func (FormatProtobufJSON) MediaType() string

func (FormatProtobufJSON) Unmarshal

func (FormatProtobufJSON) Unmarshal(b []byte, e *event.Event) error

Directories

Path Synopsis
Package init contains an init function that registers all of the protobuf related assets with the CloudEvents SDK.
Package init contains an init function that registers all of the protobuf related assets with the CloudEvents SDK.
internal
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL