gated

package
v0.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: MPL-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package gated implements a Filter that provides the ability to buffer events based on their IDs until an event is flushed. When an individual gated event is flushed, the filter will build and emit a composite event for the flushed event using it's ID to identify all the related gated events up until that point in time.

Index

Examples

Constants

View Source
const DefaultEventTimeout = time.Second * 10

DefaultEventTimeout defines a default expiry for events processed by a gated.Filter

Variables

This section is empty.

Functions

This section is empty.

Types

type EventPayload

type EventPayload struct {
	ID      string                 `json:"id"`
	Header  map[string]interface{} `json:"header,omitempty"`
	Details []EventPayloadDetails  `json:"details,omitempty"`
}

EventPayload defines the resulting Event.Payload from gated Payload.ComposeFrom

type EventPayloadDetails

type EventPayloadDetails struct {
	Type      string                 `json:"type"`
	CreatedAt string                 `json:"created_at"`
	Payload   map[string]interface{} `json:"payload,omitempty"`
}

EventPayloadDetails defines the struct used in the gated EventPayload.Details slice.

type Filter

type Filter struct {
	// Broker used to send along expired gated events
	Broker Sender

	// Expiration for gated events.  It's important because without an
	// expiration gated events that aren't flushed/processed could consume all
	// available memory.  Expired events will be sent along if there's a Broker
	// or deleted if there's no Broker. If no expiration is set the
	// DefaultGatedEventTimeout will be used.
	Expiration time.Duration

	// NowFunc is a func that returns the current time and the Filter and
	// if unset, it will default to time.Now()
	NowFunc func() time.Time
	// contains filtered or unexported fields
}

Filter provides the ability to buffer events identified by a Gateable.GetID() until an event is processed that returns true for Gateable.FlushEvent().

When a Gateable Event returns true for FlushEvent(), the filter will call Gateable.ComposedOf(...) with the list of gated events with the coresponding Gateable.GetID() up to that point in time and return the resulting composed event. There is no dependency on Filter.Broker to handle an event that returns true for FlushEvent() since the Filter simply needs to return the flushed event from Filter.Process(...)

Filter.Broker is only used when handling expired events or when handling calls to Filter.FlushAll(). If Filter.Broker is nil, expired gated events will simply be deleted. If the Broker is NOT nil, then the expiring gated events will be flushed using Gateable.ComposedOf(...) and the resulting composed event is sent using the Broker. If the Broker is nil when Filter.FlushAll() is called then the gated events will just be deleted. If the Broker is not nil when Filter.FlushAll() is called, then all the gated events will be sent using the Broker.

Example
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/hashicorp/eventlogger"
	"github.com/hashicorp/eventlogger/filters/gated"
	"github.com/hashicorp/eventlogger/sinks/writer"
)

func main() {
	then := time.Date(
		2009, 11, 17, 20, 34, 58, 651387237, time.UTC)
	// Create a broker
	b, _ := eventlogger.NewBroker()

	b.StopTimeAt(then) // setting this so the output timestamps are predictable for testing.

	// A gated.Filter for events
	gf := &gated.Filter{
		Broker:  b,
		NowFunc: func() time.Time { return then }, // setting this so the output timestamps are predictable for testing.
	}
	// Marshal to JSON
	jsonFmt := &eventlogger.JSONFormatter{}

	// Send the output to stdout
	stdoutSink := &writer.Sink{
		Writer: os.Stdout,
	}

	// Register the nodes with the broker
	nodes := []eventlogger.Node{gf, jsonFmt, stdoutSink}
	nodeIDs := make([]eventlogger.NodeID, len(nodes))
	for i, node := range nodes {
		id := eventlogger.NodeID(fmt.Sprintf("node-%d", i))
		err := b.RegisterNode(id, node)
		if err != nil {
			// handle error
		}
		nodeIDs[i] = id
	}

	et := eventlogger.EventType("test-event")
	// Register a pipeline for our event type
	err := b.RegisterPipeline(eventlogger.Pipeline{
		EventType:  et,
		PipelineID: "gated-filter-pipeline",
		NodeIDs:    nodeIDs,
	})
	if err != nil {
		// handle error
	}

	// define a common event ID for a set of events we want gated together.
	eventID := "event-1"
	payloads := []*gated.Payload{
		{
			// our first event
			ID: eventID,
			Header: map[string]interface{}{
				"tmz":  "EST",
				"user": "alice",
			},
			Detail: map[string]interface{}{
				"file_name":   "file1.txt",
				"total_bytes": 1024,
			},
		},
		{
			// our 2nd event
			ID: eventID,
			Header: map[string]interface{}{
				"roles": []string{"admin", "individual-contributor"},
			},
		},
		// the last event
		{
			ID:    eventID,
			Flush: true,
			Detail: map[string]interface{}{
				"file_name":   "file2.txt",
				"total_bytes": 512,
			},
		},
	}

	ctx := context.Background()
	for _, p := range payloads {
		// Send our gated event payloads
		if status, err := b.Send(ctx, et, p); err != nil {
			// handle err and status.Warnings
			fmt.Println("err: ", err)
			fmt.Println("warnings: ", status.Warnings)
		}
	}

}
Output:

{"created_at":"2009-11-17T20:34:58.651387237Z","event_type":"test-event","payload":{"id":"event-1","header":{"roles":["admin","individual-contributor"],"tmz":"EST","user":"alice"},"details":[{"type":"test-event","created_at":"2009-11-17 20:34:58.651387237 +0000 UTC","payload":{"file_name":"file1.txt","total_bytes":1024}},{"type":"test-event","created_at":"2009-11-17 20:34:58.651387237 +0000 UTC","payload":{"file_name":"file2.txt","total_bytes":512}}]}}

func (*Filter) Close added in v0.2.0

func (w *Filter) Close(ctx context.Context) error

Close implements eventlogger.Closer interface so the gated.Filter will call FlushAll() when asked to close.

func (*Filter) FlushAll

func (w *Filter) FlushAll(ctx context.Context) error

FlushAll will flush all events that have been gated and is useful for circumstances where the system is shuting down and you need to flush everything that's been gated.

If the Broker is nil when Filter.FlushAll() is called then the gated events will just be deleted. If the Broker is not nil when Filter.FlushAll() is called, then all the gated events will be sent using the Broker.

func (*Filter) Now

func (w *Filter) Now() time.Time

Now returns the current time. If Filter.NowFunc is unset, then time.Now() is used as a default.

func (*Filter) Process

func (w *Filter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error)

Process will determine if an Event is Gateable. Events that are not not Gateable are immediately returned. If the Event is Gateable, it's added to a list of Events using it's Gateable.ID() as an index, until an event with a matching Gateable.ID() is processed where Gateable.Flush() returns true. If Gateable.Flush(), then Gateable.ComposedFrom([]*Event) is called with all the gated events for the ID.

func (*Filter) Reopen

func (w *Filter) Reopen() error

Reopen is a no op for Filter.

func (*Filter) Type

func (w *Filter) Type() eventlogger.NodeType

Type describes the type of the node as a Filter.

type Gateable

type Gateable interface {
	// GetID returns an ID which allows the gated.Filter to determine that the
	// payload is part of a group of Gateable payloads.
	GetID() string

	// FlushEvent returns true when the Gateable event payload includes a Flush
	// indicator.
	FlushEvent() bool

	// ComposeFrom creates one payload which is a composition of a list events.
	// When ComposeFrom(...) is called by a gated.Filter the receiver will
	// always be nil. The payload returned must not have a Gateable payload.
	ComposeFrom(events []*eventlogger.Event) (t eventlogger.EventType, payload interface{}, err error)
}

Gateable defines an interface for Event payloads which are "gateable" by the gated.Filter

type Payload

type Payload struct {
	// ID must be a unique ID
	ID string `json:"id"`

	// Flush value is returned from FlushEvent()
	Flush bool `json:"-"`

	// Header is top level header info
	Header map[string]interface{} `json:"header,omitempty"`

	// Detail is detail info
	Detail map[string]interface{} `json:"detail,omitempty"`
}

Payload implements the Gateable interface for an Event payload and can be used when sending events with a Broker.

func (*Payload) ComposeFrom

func (s *Payload) ComposeFrom(events []*eventlogger.Event) (eventlogger.EventType, interface{}, error)

ComposedFrom will build a single event payload which will be Flushed/Processed from a collection of gated events. The payload returned is not a Gateable payload intentionally. Note: the Payload receiver is always nil when this function is called.

func (*Payload) FlushEvent

func (s *Payload) FlushEvent() bool

FlushEvent tells the Filter to flush/process the events associated with the Gateable ID

func (*Payload) GetID

func (s *Payload) GetID() string

GetID returns the unique ID

type Sender

type Sender interface {
	Send(ctx context.Context, t eventlogger.EventType, payload interface{}) (eventlogger.Status, error)
}

Sender defines an interface for sending events via broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL