benthosx

package
v0.0.0-...-1987b95 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2019 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TypeServerless selects the serverless response option.
	TypeServerless = "serverless_response"
)

Variables

This section is empty.

Functions

func NewConfig

func NewConfig(b []byte) (*config.Type, error)

NewConfig parses a Benthos YAML config file and replaces all environment variables.

func NewLambdaHandler

func NewLambdaHandler(conf *config.Type) (lambda.Handler, func() error, error)

NewLambdaHandler is a convenience function for converting the LambdaFunc output from NewLambdaFunc into a lambda.Handler type.

func NewResponseContext

func NewResponseContext(ctx context.Context) context.Context

NewResponseContext injects a ResponseMap for tracking serverless responses.

func NewServerlessResponse

func NewServerlessResponse(
	conf ServerlessResponseConfig,
	mgr types.Manager,
	log log.Modular,
	stats metrics.Type,
) (output.Type, error)

NewServerlessResponse creates a new plugin output type.

Types

type BenthosProducer

type BenthosProducer struct {
	// PipelineInput will be sent the raw message received from the call to
	// Produce().
	PipelineInput chan<- types.Transaction
	// CloseFn will be called when the producer is closed. This is used to bind
	// shutdown behavior for any long lived resources used to power the producer.
	CloseFn func() error
}

BenthosProducer uses a set of Benthos transaction channels to coordinate processing and outputting an event.

func NewProducer

func NewProducer(conf *config.Type) (*BenthosProducer, error)

NewProducer uses the given Benthos configuration to create a Producer instance that may be used as either a client in other code or as input for constructing a Lambda function.

func (*BenthosProducer) Close

func (p *BenthosProducer) Close() error

Close the producer. It is not valid to call Produce after calling Close.

func (*BenthosProducer) Produce

func (p *BenthosProducer) Produce(ctx context.Context, in interface{}) (interface{}, error)

Produce an event to one or more outputs. The return is the final version of the event produces after being processed or an error if something went wrong. The input may be any type that can be marshaled to JSON.

type FormatFn

type FormatFn func(in interface{}) (interface{}, error)

FormatFn is used by the Lambda function to convert a Producer response into a version that it returns to the caller.

type LambdaFunc

type LambdaFunc struct {
	Producer Producer
}

LambdaFunc coordinates between a Producer and FormatFN to implement the behavior that will run in Lambda. The HandleEvent method is compatible with the lambda.NewHandler() method from the Go SDK for Lambda.

func NewLambdaFunc

func NewLambdaFunc(conf *config.Type) (*LambdaFunc, func() error, error)

NewLambdaFunc generates a LambdaFunc from a given Benthos configuratino. The input and buffer sections of the configuration are ignored since they are not relevant in a Lambda setting where the only input is the function call. If any of the outputs are STDOUT then the resulting LambdaFunc will return the processed event to the Lambda caller in JSON form. If none of outputs are STDOUT then a static message will be returned instead.

func (*LambdaFunc) HandleEvent

func (f *LambdaFunc) HandleEvent(ctx context.Context, in interface{}) (interface{}, error)

HandleEvent implements a function signature that is compatible with the Go Lambda SDK.

type Producer

type Producer interface {
	Produce(ctx context.Context, in interface{}) (interface{}, error)
	Close() error
}

Producer is referenced by components of this package as an entry point for sending messages to an output. The output is expected to be the final form of the message(s) sent to the underlying streams. In the case that the input results in multiple outputs then the response should be a slice of messages.

type ResponseMap

type ResponseMap struct {
	// contains filtered or unexported fields
}

ResponseMap is a thread-safe map for storing serverless responses. Each key of the map points to a slice of messages.

func ResponseFromContext

func ResponseFromContext(ctx context.Context) *ResponseMap

ResponseFromContext fetches the ResponseMap. If one is not installed then an empty map is returned.

func (*ResponseMap) Append

func (r *ResponseMap) Append(key string, response types.Message)

Append a response to a key.

func (*ResponseMap) Delete

func (r *ResponseMap) Delete(key string)

Delete an entire key from the map.

func (*ResponseMap) Len

func (r *ResponseMap) Len() int

Len returns the number of keys in the map.

func (*ResponseMap) Load

func (r *ResponseMap) Load(key string) ([]types.Message, bool)

Load a response set. Returns false if the key is not found.

func (*ResponseMap) Range

func (r *ResponseMap) Range(f func(key string, response []types.Message) bool)

Range over the values in the map.

type ServerlessResponse

type ServerlessResponse struct {
	// contains filtered or unexported fields
}

ServerlessResponse captures the final message value and writes it to a store where it can be retrieved by the serverless function.

func (*ServerlessResponse) CloseAsync

func (e *ServerlessResponse) CloseAsync()

CloseAsync shuts down the output and stops processing requests.

func (*ServerlessResponse) Connected

func (e *ServerlessResponse) Connected() bool

Connected returns true if this output is currently connected to its target.

func (*ServerlessResponse) Consume

func (e *ServerlessResponse) Consume(tChan <-chan types.Transaction) error

Consume starts this output consuming from a transaction channel.

func (*ServerlessResponse) WaitForClose

func (e *ServerlessResponse) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the output has closed down.

type ServerlessResponseConfig

type ServerlessResponseConfig struct {
	Name string
}

ServerlessResponseConfig contains configuration fields for the ServerlessResponse output.

func NewServerlessResponseConfig

func NewServerlessResponseConfig() ServerlessResponseConfig

NewServerlessResponseConfig returns a ServerlessResponseConfig with default values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL