streamhub

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2022 License: MIT Imports: 15 Imported by: 0

README

✉ Streamhub

Go Build GoDoc Go Report Card Coverage Status Go Version

Streamhub is a toolkit crafted for streaming-powered applications written in Go.

Requirements

  • Go version >= 1.17

Overall Architecture

Streamhub is composed by several inner components which collaborate with each other in order to accomplish basic streaming handling operations (publishing and consuming messages).

Streamhub exposes all its operational capabilities through a simple and idiomatic API, enabling interactions between the program and the actual live infrastructure using a facade component called Hub.

Image

Internal Hub architecture and specific flows of basic streams operations. On the left: Message publishing flow. On the right: Message consumption flow.

Message

The Message type is the unit of information which will be used to interact with multiple systems through live infrastructure.

Streamhub implements natively most of the CNCF's CloudEvents specification fields to keep consistency between messages passed through a stream.

Just as the CloudEvents specification states, depending on the underlying communication protocol from Event Buses and Message Brokers (e.g. MQTT, Apache Kafka, raw JSON, Amazon SNS), a message will be constructed accordingly to the given protocol.

For example, if using Apache Kafka, most of the message fields will be attached to binary headers instead the body of the message itself. In the other hand, if using Amazon Simple Notification Service, messages will be encoded into the raw JSON template for messages as AWS specifies on their API definition for SNS. These processes are independent from the Marshaler operations. Hence, message inner data (the actual message content) codec won't change.

For more information about CloudEvents, please review this repository.

Stream Registry

An Stream Registry is an in-memory key-value database used by both Listener Node(s) and Publisher which holds metadata about every stream that will interact with the program.

Moreover, stream metadata might contain critical information about the stream such as the name of the stream (also called topic), schema definition version and/or the schema definition name so components such as Publisher and Listener Node can find schema definitions from the Schema Registry in order to continue with their further operations normally. The stream name defined here is used by both Publisher and Listener Node(s) to interact with live infrastructure.

The Stream Registry accepts reflection-based structs which will lead to a registration with the given struct name (e.g. package_name.struct_name -> main.fooMessage) as string. In addition, the registry also accepts plain strings as keys in order to increase flexibility (one may use the stream name, e.g. foo-stream).

Note: If using plain strings as keys, remember to fulfill the GoType metadata field so the Listener Node handler can decode the incoming message data. If no GoType was found in stream metadata while consuming a message, the marshaling capabilities will be disabled to avoid program panics.

Note: Using reflection-based stream definitions will lead to performance degradation when listening to streams.

Unique Identifier Factory

A Unique Identifier Factory is a component which generates unique identifiers using an underlying concrete implementation of a unique identifier algorithm (e.g. UUID, NanoID). It is used by the Publisher component to construct unique messages.

Schema Registry

An Schema Registry is a database which holds messages schema definitions and versioning. It ensures that every message produced and consumed by the program complies with the specified schema definition.

The registry MIGHT be implemented using either external or internal underlying solutions (e.g. Third-Party service such as Amazon Glue, Host's disk or In-memory).

Note: For Apache Avro message formats, the usage of an Schema Registry is a MUST in order for the Marshaler component to decode and encode message data.

Marshaler

A Marshaler is a component in charge of message data coding and encoding.

Currently, Streamhub has Apache Avro and JSON native implementations. Nevertheless, the Marshaler interface is exported through Streamhub API to give flexibility to developers as it lets custom Marshaler implementations.

We are currently considering adding Protocol-Buffers and Flat/Flex Buffers codecs for edge cases where greater performance is required.

Message Broker / Event Bus Driver

The Message Broker / Event Bus Driver is an abstract component which enables interactions between Hub internal components and the actual stream-messaging live infrastructure (e.g. Apache Kafka, Amazon SNS/SQS, In-memory).

The driver component implements both Publisher and Listener Node interfaces. Thus, by separating behaviours through interfaces, technology heterogeneity and autonomy between processes is achieved, giving the program even greater flexibility of interaction.

For example, the program might contain a Hub which publishes messages to Amazon Simple Notification Service (SNS) while one set of Listener Nodes polls messages from Amazon Simple Queue Service (SQS) queues and another set of Listener Nodes receive messages from Apache Kafka topics.

Publisher

A Publisher is a high-level component that lets the program publish messages to desired streams defined on the message broker / event bus, so external programs may react to published messages in parallel.

Furthermore, the publisher API is designed to allow chain of responsibility pattern implementations (middlewares) in order to aggregate extra behaviours when publishing messages (e.g. logging, tracing, monitoring, retries).

Streamhub offers native implementations through the use of a Driver. Nevertheless, custom Publisher implementations crafted by developers are available as Streamhub API exposes the publisher interface.

Listener Registry

A Stream Listener Registry is an in-memory database which holds information about workers to be scheduled when Hub gets started.

Workers are also called Listener Node.

Listener Supervisor

The Listener Supervisor is an internal Hub component which manages Listener Node(s) lifecycles.

It forks new workers into the Listener Registry queue, and it schedules workers on Hub startup.

In addition, when forking new workers, the supervisor crafts a Listener Task template, using the listener node configuration, which will be later passed to Driver listener node interface implementations on Hub startup. This template is used internally by drivers to access critical data so they can interact with live infrastructure (e.g. Stream / Topic name, Consumer Groups / Queues to be used, Vendor-specific configurations such as Amazon Web Services or Shopify's Sarama lib for Apache Kafka).

Listener Node

A Listener Node is an internal Listener Supervisor component which schedules actual stream-listening jobs. These stream-listening jobs are mostly I/O blocking so the node will try to run then concurrently if a degree of parallelism was configured for the worker.

It uses the Driver listener node interface implementation to interact with live infrastructure.

Note: In order to stop Listener Node inner processes, a context cancellation MUST be issued through the root Context passed originally on Hub startup. Moreover, every node job has an internal timeout context constructed from the root context in order to avoid stream-listener jobs hang up or considerable wait times, affecting throughput directly.

Note: Every Listener Node inner process runs inside a new goroutine and uses a timeout scoped context to keep process autonomy and increase overall throughput.

Listener / ListenerFunc

Each Listener Node contains a specific-configuration as previously mentioned. This configuration holds, asides from critical data for Driver implementations, Listener and ListenerFunc interface/type which represent the entry point for desired message processing operations defined by the developer (the handler for each message received from a queue/topic).

These types/interfaces lets programs to return an error if something failed when processing the message. If no error was returned, the Driver implementation will acknowledge the message to the actual live infrastructure to avoid message re-processing issues. As side note and recommendation, remember to keep message processors idempotent to deal with the nature of distributed systems (duplicated and un-ordered messages).

Moreover, the Listener and ListenerFunc types/interfaces APIs were defined to enable chain of responsibility pattern implementations (middlewares), just as the Publisher API, to let developers add layers of extra behaviour when processing a message.

It is required to say that Streamhub adds layers of behaviour by default for every Listener/ListenerFunc forked. These behaviours include:

  • Exponential backoff retrying (fully customizable)
  • Correlation and Causation IDs injection into the handler-scoped context
  • Unmarshaling*
  • Logging*
  • Monitoring/Metrics*
  • Tracing*

* Available if properly configured

Supported infrastructure

  • Apache Kafka (on-premise, Confluent cloud or Amazon Managed Streaming for Apache Kafka/MSK)
  • Amazon Simple Notification Service (SNS) and Simple Queue Service (SQS) with the Topic-Queue chaining pattern implementation
  • Apache Pulsar*
  • MQTT-based buses/brokers (e.g. RabbitMQ, Apache ActiveMQ)*
  • Google Cloud PubSub*
  • Microsoft Azure Service Bus*
  • Redis Streams*

* On Streamhub's roadmap, not yet implemented.

Documentation

Overview

Package streamhub is a toolkit crafted for streaming-powered applications written in Go.

Index

Constants

View Source
const CloudEventsSpecVersion = "1.0"

CloudEventsSpecVersion the CloudEvents specification version used by streamhub

Variables

View Source
var (
	// DefaultConcurrencyLevel default stream-listening jobs to be running concurrently for each ListenerNode.
	DefaultConcurrencyLevel = 1
	// DefaultRetryInitialInterval default initial interval duration between each stream-listening job provisioning on failures.
	DefaultRetryInitialInterval = time.Second * 3
	// DefaultRetryMaxInterval default maximum interval duration between each stream-listening job provisioning on failures.
	DefaultRetryMaxInterval = time.Second * 15
	// DefaultRetryTimeout default duration of each stream-listening job provisioning on failures.
	DefaultRetryTimeout = time.Second * 15
	// DefaultMaxHandlerPoolSize default pool size of goroutines for ListenerNode's Listener(s) / ListenerFunc(s) executions.
	DefaultMaxHandlerPoolSize = 10
)
View Source
var DefaultHubInstanceName = "com.streamhub"

DefaultHubInstanceName default instance names for nameless Hub instances

View Source
var (
	// ErrInvalidProtocolBufferFormat the given data is not a valid protocol buffer message
	ErrInvalidProtocolBufferFormat = errors.New("streamhub: Invalid protocol buffer data")
)
View Source
var ErrMissingSchemaDefinition = errors.New("streamhub: Missing stream schema definition in schema registry")

ErrMissingSchemaDefinition the requested stream message definition was not found in the SchemaRegistry

View Source
var ErrMissingStream = errors.New("streamhub: Missing stream entry in stream registry")

ErrMissingStream the requested stream was not found in the StreamRegistry

View Source
var ErrMissingWriterDriver = errors.New("streamhub: Missing writer driver")

ErrMissingWriterDriver no publisher driver was found.

View Source
var ListenerBaseBehaviours = []ListenerBehaviour{
	unmarshalListenerBehaviour,
	injectGroupListenerBehaviour,
	injectTxIDsListenerBehaviour,
	retryListenerBehaviour,
}

ListenerBaseBehaviours default ListenerBehaviours

Behaviours will be executed in descending order

View Source
var ListenerBaseBehavioursNoUnmarshal = []ListenerBehaviour{
	injectGroupListenerBehaviour,
	injectTxIDsListenerBehaviour,
	retryListenerBehaviour,
}

ListenerBaseBehavioursNoUnmarshal default ListenerBehaviours without unmarshaling

Behaviours will be executed in descending order

Functions

func InjectMessageCausationID

func InjectMessageCausationID(ctx context.Context, messageID string) string

InjectMessageCausationID injects the causation id from the given context if available. If not, it will use the message id as fallback.

func InjectMessageCorrelationID

func InjectMessageCorrelationID(ctx context.Context, messageID string) string

InjectMessageCorrelationID injects the correlation id from the given context if available. If not, it will use the message id as fallback.

Types

type AvroMarshaler

type AvroMarshaler struct {
	HashingFactory Hashing64AlgorithmFactory
	// contains filtered or unexported fields
}

AvroMarshaler handles data transformation between primitives and Apache Avro format.

Apache Avro REQUIRES a defined SchemaRegistry to decode/encode data.

func NewAvroMarshaler

func NewAvroMarshaler() AvroMarshaler

NewAvroMarshaler allocates a new Apache Avro marshaler with a simple caching system to reduce memory footprint and computational usage when parsing Avro schema definition files.

func (AvroMarshaler) ContentType

func (a AvroMarshaler) ContentType() string

ContentType retrieves the encoding/decoding Apache Avro format using RFC 2046 standard (application/avro).

func (AvroMarshaler) Marshal

func (a AvroMarshaler) Marshal(schemaDef string, data interface{}) (parsedData []byte, err error)

Marshal transforms a complex data type into a primitive binary array for data transportation using Apache Avro format.

func (AvroMarshaler) Unmarshal

func (a AvroMarshaler) Unmarshal(schemaDef string, data []byte, ref interface{}) (err error)

Unmarshal transforms a primitive binary array to a complex data type for data processing using Apache Avro format.

type Event

type Event interface {
	// GetSubject This describes the subject of the event in the context of the event producer (identified by source).
	// In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, but the
	// source identifier alone might not be sufficient as a qualifier for any specific event if the source
	// context has internal sub-structure.
	//
	// Identifying the subject of the event in context metadata (opposed to only in the data payload) is particularly
	// helpful in generic subscription filtering scenarios where middleware is unable to interpret the data content.
	// In the above example, the subscriber might only be interested in blobs with names ending with '.jpg' or '.jpeg'
	// and the subject attribute allows for constructing a simple and efficient string-suffix filter for that
	// subset of events.
	GetSubject() string
}

Event is an abstract message unit used by streamhub-based systems to publish messages with a `subject` populated field of a Message

type FailingMarshalerNoop

type FailingMarshalerNoop struct{}

FailingMarshalerNoop the no-operation failing Marshaler

For testing purposes only

func (FailingMarshalerNoop) ContentType

func (f FailingMarshalerNoop) ContentType() string

ContentType the failing content type operation

func (FailingMarshalerNoop) Marshal

func (f FailingMarshalerNoop) Marshal(_ string, _ interface{}) ([]byte, error)

Marshal the failing marshal operation

func (FailingMarshalerNoop) Unmarshal

func (f FailingMarshalerNoop) Unmarshal(_ string, _ []byte, _ interface{}) error

Unmarshal the failing unmarshal operation

type Hashing64AlgorithmFactory

type Hashing64AlgorithmFactory func() hash.Hash64

Hashing64AlgorithmFactory factory for hash.Hash64 algorithms (used by Apache Avro schema definition caching system)

var DefaultHashing64AlgorithmFactory Hashing64AlgorithmFactory = func() hash.Hash64 {
	return fnv.New64a()
}

DefaultHashing64AlgorithmFactory the default hashing64 algorithm factory for Marshaler schema definition caching layer

type Hub

type Hub struct {
	InstanceName        string
	StreamRegistry      StreamRegistry
	Writer              Writer
	Marshaler           Marshaler
	IDFactory           IDFactoryFunc
	SchemaRegistry      SchemaRegistry
	ListenerDriver      ListenerDriver
	ListenerBehaviours  []ListenerBehaviour
	ListenerBaseOptions []ListenerNodeOption
	// contains filtered or unexported fields
}

Hub is the main component which enables interactions between several systems through the usage of streams.

func NewHub

func NewHub(opts ...HubOption) *Hub

NewHub allocates a new Hub

func (*Hub) Listen

func (h *Hub) Listen(message interface{}, opts ...ListenerNodeOption) error

Listen registers a new stream-listening background job.

If listening to a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems

func (*Hub) ListenByStreamKey

func (h *Hub) ListenByStreamKey(stream string, opts ...ListenerNodeOption)

ListenByStreamKey registers a new stream-listening background job using the raw stream identifier (e.g. topic name).

func (*Hub) RegisterStream

func (h *Hub) RegisterStream(message interface{}, metadata StreamMetadata)

RegisterStream creates a relation between a stream message type and metadata.

If registering a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems

func (*Hub) RegisterStreamByString

func (h *Hub) RegisterStreamByString(messageType string, metadata StreamMetadata)

RegisterStreamByString creates a relation between a string key and metadata.

func (*Hub) Start

func (h *Hub) Start(ctx context.Context)

Start initiates all daemons (e.g. stream-listening jobs) processes

func (*Hub) Write added in v0.1.5

func (h *Hub) Write(ctx context.Context, message interface{}) error

Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func (*Hub) WriteBatch added in v0.1.5

func (h *Hub) WriteBatch(ctx context.Context, messages ...interface{}) error

WriteBatch inserts a set of messages into a stream assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

If an item from the batch fails, other items will fail too

func (*Hub) WriteByMessageKey added in v0.1.5

func (h *Hub) WriteByMessageKey(ctx context.Context, messageKey string, message interface{}) error

WriteByMessageKey inserts a message into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func (*Hub) WriteByMessageKeyBatch added in v0.1.5

func (h *Hub) WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) error

WriteByMessageKeyBatch inserts a set of messages into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

If an item from the batch fails, other items will fail too

func (*Hub) WriteRawMessage added in v0.1.5

func (h *Hub) WriteRawMessage(ctx context.Context, message Message) error

WriteRawMessage inserts a raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

func (*Hub) WriteRawMessageBatch added in v0.1.5

func (h *Hub) WriteRawMessageBatch(ctx context.Context, messages ...Message) error

WriteRawMessageBatch inserts a set of raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.

Uses given context to inject correlation and causation IDs.

The whole batch will be passed to the underlying Writer driver implementation as every driver has its own way to deal with batches

type HubOption

type HubOption interface {
	// contains filtered or unexported methods
}

HubOption enables configuration of a Hub instance.

func WithIDFactory

func WithIDFactory(f IDFactoryFunc) HubOption

WithIDFactory sets the default unique identifier factory of a Hub instance.

func WithInstanceName

func WithInstanceName(n string) HubOption

WithInstanceName sets the name of a Hub instance.

func WithListenerBaseOptions

func WithListenerBaseOptions(opts ...ListenerNodeOption) HubOption

WithListenerBaseOptions sets a list of ListenerNodeOption of a Hub instance used as global options for each listener node

func WithListenerBehaviours

func WithListenerBehaviours(b ...ListenerBehaviour) HubOption

WithListenerBehaviours sets a list of ListenerBehaviour of a Hub instance ready to be executed by every stream-listening job's ListenerFunc or Listener component.

func WithListenerDriver

func WithListenerDriver(d ListenerDriver) HubOption

WithListenerDriver sets the default listener driver of a Hub instance.

func WithMarshaler

func WithMarshaler(m Marshaler) HubOption

WithMarshaler sets the default marshaler of a Hub instance.

func WithSchemaRegistry

func WithSchemaRegistry(r SchemaRegistry) HubOption

WithSchemaRegistry sets the schema registry of a Hub instance for stream message schema definitions.

func WithWriter added in v0.1.5

func WithWriter(p Writer) HubOption

WithWriter sets the writer of a Hub instance.

If both Writer and WriterFunc are defined, Writer will override WriterFunc.

type IDFactoryFunc

type IDFactoryFunc func() (string, error)

IDFactoryFunc creates an unique identifier.

var RandInt64Factory IDFactoryFunc = func() (string, error) {
	i := rand.Int63()
	return strconv.Itoa(int(i)), nil
}

RandInt64Factory creates a unique identifier using math/rand built-in package with 64-bit signed integer format

var UuidIdFactory IDFactoryFunc = func() (string, error) {
	id, err := uuid.NewUUID()
	return id.String(), err
}

UuidIdFactory creates a unique identifier using UUID v4 algorithm.

type InMemorySchemaRegistry

type InMemorySchemaRegistry map[string]string

InMemorySchemaRegistry is the in memory schema registry, crafted specially for basic and/or testing scenarios.

func (InMemorySchemaRegistry) GetSchemaDefinition

func (i InMemorySchemaRegistry) GetSchemaDefinition(name string, version int) (string, error)

GetSchemaDefinition retrieves a schema definition (in string format) from the registry

func (InMemorySchemaRegistry) RegisterDefinition

func (i InMemorySchemaRegistry) RegisterDefinition(name, def string, version int)

RegisterDefinition stores the given schema definition into the registry

type JSONMarshaler

type JSONMarshaler struct{}

JSONMarshaler handles data transformation between primitives and JSON format.

func (JSONMarshaler) ContentType

func (m JSONMarshaler) ContentType() string

ContentType retrieves the encoding/decoding JSON format using RFC 2046 standard (application/json).

func (JSONMarshaler) Marshal

func (m JSONMarshaler) Marshal(_ string, data interface{}) ([]byte, error)

Marshal transforms a complex data type into a primitive binary array for data transportation using JSON format.

func (JSONMarshaler) Unmarshal

func (m JSONMarshaler) Unmarshal(_ string, data []byte, ref interface{}) error

Unmarshal transforms a primitive binary array to a complex data type for data processing using JSON format.

type Listener

type Listener interface {
	// Listen starts the execution process triggered when a message is received from a stream.
	//
	// Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff.
	Listen(context.Context, Message) error
}

Listener is a wrapping structure of the ListenFunc handler for complex data processing scenarios.

type ListenerBehaviour

type ListenerBehaviour func(node *ListenerNode, hub *Hub, next ListenerFunc) ListenerFunc

ListenerBehaviour is a middleware function with extra functionality which will be executed prior a ListenerFunc or Listener component for every stream-listening job instance registered into a Hub.

The middleware gets injected the context ListenerNode (the stream-listening job to be executed), the root Hub instance and the parent middleware function.

Moreover, there are built-in behaviours ready to be used with streamhub:

- Retry backoff

- Correlation and causation ID injection

- Consumer group injection

- Auto-unmarshalling (*only if using reflection-based stream registry or GoType was defined when registering stream)

- Logging*

- Metrics*

- Tracing*

*Manual specification on configuration required

type ListenerDriver

type ListenerDriver interface {
	// ExecuteTask addresses the stream-listening task
	ExecuteTask(_ context.Context, _ ListenerTask) error
}

ListenerDriver defines the underlying implementation of the stream-listening job, which addresses the usage of custom protocols and/or APIs from providers (Apache Kafka, Amazon SQS, ...).

type ListenerFunc

type ListenerFunc func(context.Context, Message) error

ListenerFunc is the execution process triggered when a message is received from a stream.

Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff.

type ListenerNode

type ListenerNode struct {
	Stream                string
	HandlerFunc           ListenerFunc
	Group                 string
	ProviderConfiguration interface{}
	ConcurrencyLevel      int
	RetryInitialInterval  time.Duration
	RetryMaxInterval      time.Duration
	RetryTimeout          time.Duration
	ListenerDriver        ListenerDriver
	MaxHandlerPoolSize    int
}

ListenerNode is the worker unit which schedules stream-listening job(s).

Each ListenerNode is independent of other nodes to guarantee resiliency of interleaved processes and avoid cascading failures.

type ListenerNodeOption

type ListenerNodeOption interface {
	// contains filtered or unexported methods
}

ListenerNodeOption enables configuration of a ListenerNode.

func WithConcurrencyLevel

func WithConcurrencyLevel(n int) ListenerNodeOption

WithConcurrencyLevel sets the concurrency level of a ListenerNode. In other words, jobs to be scheduled by the ListenerNode.

Note: If level was defined less or equal than 0, the ListenerNode will schedule 1 job

func WithDriver

func WithDriver(d ListenerDriver) ListenerNodeOption

WithDriver sets the driver of a ListenerNode (e.g. Apache Kafka, Apache Pulsar, Amazon SQS).

func WithGroup

func WithGroup(g string) ListenerNodeOption

WithGroup sets the consumer group or queue name of a ListenerNode.

Note: It may not be available for some providers.

func WithListener

func WithListener(l Listener) ListenerNodeOption

WithListener sets the Listener of a ListenerNode.

func WithListenerFunc

func WithListenerFunc(l ListenerFunc) ListenerNodeOption

WithListenerFunc sets the ListenerFunc of a ListenerNode.

func WithMaxHandlerPoolSize

func WithMaxHandlerPoolSize(n int) ListenerNodeOption

WithMaxHandlerPoolSize sets the maximum number of goroutines executed by a ListenerNode's Listener or ListenerFunc.

Note: If size was defined less or equal than 0, the ListenerNode internal implementations will allocate a semaphore of 10 goroutines per handler.

func WithProviderConfiguration

func WithProviderConfiguration(cfg interface{}) ListenerNodeOption

WithProviderConfiguration sets the custom provider configuration of a ListenerNode (e.g. aws.Config, sarama.Config).

func WithRetryInitialInterval

func WithRetryInitialInterval(d time.Duration) ListenerNodeOption

WithRetryInitialInterval sets the initial duration interval for each retying tasks of a ListenerNode.

func WithRetryMaxInterval

func WithRetryMaxInterval(d time.Duration) ListenerNodeOption

WithRetryMaxInterval sets the maximum duration interval for each retying tasks of a ListenerNode.

func WithRetryTimeout

func WithRetryTimeout(d time.Duration) ListenerNodeOption

WithRetryTimeout sets the maximum duration for retying tasks of a ListenerNode.

type ListenerNoop

type ListenerNoop struct{}

ListenerNoop the no-operation implementation of Listener

func (ListenerNoop) Listen

func (l ListenerNoop) Listen(_ context.Context, _ Message) error

Listen the no-operation implementation of Listener.Listen()

type ListenerTask

type ListenerTask struct {
	Stream             string
	HandlerFunc        ListenerFunc
	Group              string
	Configuration      interface{}
	Timeout            time.Duration
	MaxHandlerPoolSize int
}

ListenerTask job metadata in order to be executed by the ListenerNodeDriver.

type Marshaler

type Marshaler interface {
	// Marshal transforms a complex data type into a primitive binary array for data transportation.
	Marshal(schemaDef string, data interface{}) ([]byte, error)
	// Unmarshal transforms a primitive binary array to a complex data type for data processing.
	Unmarshal(schemaDef string, data []byte, ref interface{}) error
	// ContentType retrieves the encoding/decoding format using RFC 2046 standard (e.g. application/json).
	ContentType() string
}

Marshaler handles data transformation between primitives and specific codecs/formats (e.g. JSON, Apache Avro).

type Message

type Message struct {
	ID          string `json:"id"`
	Stream      string `json:"stream"`
	Source      string `json:"source"`
	SpecVersion string `json:"specversion"`
	Type        string `json:"type"`
	Data        []byte `json:"data"`

	DataContentType   string `json:"datacontenttype,omitempty"`
	DataSchema        string `json:"dataschema,omitempty"`
	DataSchemaVersion int    `json:"dataschemaversion,omitempty"`
	Timestamp         string `json:"time,omitempty"`
	Subject           string `json:"subject,omitempty"`

	// Streamhub fields
	CorrelationID string `json:"correlation_id"`
	CausationID   string `json:"causation_id"`

	// consumer-only fields
	DecodedData interface{} `json:"-"`
	GroupName   string      `json:"-"`
}

Message is a unit of information which holds the primitive message (data) in binary format along multiple fields in order to preserve a schema definition within a stream pipeline.

The schema is based on the Cloud Native Computing Foundation (CNCF)'s CloudEvents specification.

For more information, please look: https://github.com/cloudevents/spec

func NewMessage

func NewMessage(args NewMessageArgs) Message

NewMessage allocates an immutable Message ready to be transported in a stream.

type MessageContextKey

type MessageContextKey string

MessageContextKey is the streamhub context key to inject data into transport messages.

const (
	// ContextCorrelationID is the main trace of a stream processing. Once generated, it MUST NOT be generated again
	// to keep track of the process from the beginning.
	ContextCorrelationID MessageContextKey = "shub-correlation-id"
	// ContextCausationID is reference of the last message processed. This helps to know a direct relation between
	// a new process and the past one.
	ContextCausationID MessageContextKey = "shub-causation-id"
)

type NewMessageArgs

type NewMessageArgs struct {
	SchemaVersion        int
	Data                 []byte
	ID                   string
	Source               string
	Stream               string
	SchemaDefinitionName string
	ContentType          string
	GroupName            string
	Subject              string
}

NewMessageArgs arguments required by NewMessage function to operate.

type NoopSchemaRegistry

type NoopSchemaRegistry struct{}

NoopSchemaRegistry is the no-operation implementation of SchemaRegistry

func (NoopSchemaRegistry) GetSchemaDefinition

func (n NoopSchemaRegistry) GetSchemaDefinition(_ string, _ int) (string, error)

GetSchemaDefinition retrieves an empty string and a nil error

type ProtocolBuffersMarshaler

type ProtocolBuffersMarshaler struct{}

ProtocolBuffersMarshaler handles data transformation between primitives and Google Protocol Buffers format

func (ProtocolBuffersMarshaler) ContentType

func (p ProtocolBuffersMarshaler) ContentType() string

ContentType retrieves the encoding/decoding Google Protocol Buffers format using the latest conventions.

More information here: https://github.com/google/protorpc/commit/eb03145a6a7c72ae6cc43867d9635a5b8d8c4545

func (ProtocolBuffersMarshaler) Marshal

func (p ProtocolBuffersMarshaler) Marshal(_ string, data interface{}) ([]byte, error)

Marshal transforms a complex data type into a primitive binary array for data transportation using Google Protocol Buffers format

func (ProtocolBuffersMarshaler) Unmarshal

func (p ProtocolBuffersMarshaler) Unmarshal(_ string, data []byte, ref interface{}) error

Unmarshal transforms a primitive binary array to a complex data type for data processing using Google Protocol Buffers format

type SchemaRegistry

type SchemaRegistry interface {
	// GetSchemaDefinition retrieves a schema definition (in string format) from the registry
	GetSchemaDefinition(name string, version int) (string, error)
}

SchemaRegistry is an external storage of stream message schemas definitions with proper versioning.

Examples of this schema registries are Amazon Glue Schema Registry and Confluent Schema Registry.

type StreamMetadata

type StreamMetadata struct {
	Stream               string
	SchemaDefinitionName string
	SchemaVersion        int
	GoType               reflect2.Type
}

StreamMetadata contains information of stream messages.

type StreamRegistry

type StreamRegistry map[string]StreamMetadata

StreamRegistry is an in-memory storage of streams metadata used by Hub and any external agent to set and retrieve information about a specific stream.

Uses a custom string (or Go's struct type as string) as key.

Note: A message key differs from stream name as the message key COULD be anything the developer sets within the stream registry. Thus, scenarios where multiple data types require publishing messages to the same stream are possible. Moreover, the message key is set by reflection-based registries with the reflect.TypeOf function, so it will differ from the actual stream name.

func (StreamRegistry) Get

func (r StreamRegistry) Get(message interface{}) (StreamMetadata, error)

Get retrieves a stream message metadata from a stream message type.

func (StreamRegistry) GetByStreamName

func (r StreamRegistry) GetByStreamName(name string) (StreamMetadata, error)

GetByStreamName retrieves a stream message metadata from a stream name.

It contains an optimistic lookup mechanism to keep constant time and space complexity.

If metadata is not found by the given key, then fallback default to O(n) lookup. This will increase time and space complexity of the fallback function by the GetByString base complexity. Nevertheless, GetByString will be always constant, so it is guaranteed to keep a constant complexity sum to the overall GetByStream complexity. E.g. GetByString = 49.75 ns/op, therefore GetByStreamName = original ns/op + GetByString ns/op.

This optimistic lookup is done in order to keep amortized time and space complexity when using non-reflection based implementations on the root Hub (using only String methods from this very Stream Registry component). Thus, greater performance is achieved for scenarios when reflection-based stream registration is not required by the program.

func (StreamRegistry) GetByString

func (r StreamRegistry) GetByString(key string) (StreamMetadata, error)

GetByString retrieves a stream message metadata from a string key.

func (StreamRegistry) Set

func (r StreamRegistry) Set(message interface{}, metadata StreamMetadata)

Set creates a relation between a stream message type and metadata.

func (StreamRegistry) SetByString

func (r StreamRegistry) SetByString(key string, metadata StreamMetadata)

SetByString creates a relation between a string key and metadata.

type WriteByMessageKeyBatchItems added in v0.1.5

type WriteByMessageKeyBatchItems map[string]interface{}

WriteByMessageKeyBatchItems items to be writeed as batch on the Hub.WriteByMessageKeyBatch() function

type Writer added in v0.1.5

type Writer interface {
	// Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the
	// data to a set of subscribed systems for further processing.
	Write(ctx context.Context, message Message) error
	// WriteBatch inserts a set of messages into a stream assigned to the message in the StreamRegistry in order to propagate the
	// data to a set of subscribed systems for further processing.
	//
	// Depending on the underlying Writer driver implementation, this function MIGHT return an error if a single operation failed,
	// or it MIGHT return an error if the whole operation failed
	WriteBatch(ctx context.Context, messages ...Message) error
}

Writer inserts messages into streams assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.

This type should be provided by a streamhub Driver (e.g. Apache Pulsar, Apache Kafka, Amazon SNS)

var NoopWriter Writer = noopWriter{}

NoopWriter is the no-operation implementation of Writer

Directories

Path Synopsis
benchmarks
examples
Package streamhub-memory is the In-Memory implementation for Streamhub-based programs.
Package streamhub-memory is the In-Memory implementation for Streamhub-based programs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL