Documentation ¶
Overview ¶
Package streamhub is a toolkit crafted for streaming-powered applications written in Go.
Index ¶
- Constants
- Variables
- func InjectMessageCausationID(ctx context.Context, messageID string) string
- func InjectMessageCorrelationID(ctx context.Context, messageID string) string
- type AvroMarshaler
- type Event
- type FailingMarshalerNoop
- type Hashing64AlgorithmFactory
- type Hub
- func (h *Hub) Listen(message interface{}, opts ...ListenerNodeOption) error
- func (h *Hub) ListenByStreamKey(stream string, opts ...ListenerNodeOption)
- func (h *Hub) RegisterStream(message interface{}, metadata StreamMetadata)
- func (h *Hub) RegisterStreamByString(messageType string, metadata StreamMetadata)
- func (h *Hub) Start(ctx context.Context)
- func (h *Hub) Write(ctx context.Context, message interface{}) error
- func (h *Hub) WriteBatch(ctx context.Context, messages ...interface{}) error
- func (h *Hub) WriteByMessageKey(ctx context.Context, messageKey string, message interface{}) error
- func (h *Hub) WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) error
- func (h *Hub) WriteRawMessage(ctx context.Context, message Message) error
- func (h *Hub) WriteRawMessageBatch(ctx context.Context, messages ...Message) error
- type HubOption
- func WithIDFactory(f IDFactoryFunc) HubOption
- func WithInstanceName(n string) HubOption
- func WithListenerBaseOptions(opts ...ListenerNodeOption) HubOption
- func WithListenerBehaviours(b ...ListenerBehaviour) HubOption
- func WithListenerDriver(d ListenerDriver) HubOption
- func WithMarshaler(m Marshaler) HubOption
- func WithSchemaRegistry(r SchemaRegistry) HubOption
- func WithWriter(p Writer) HubOption
- type IDFactoryFunc
- type InMemorySchemaRegistry
- type JSONMarshaler
- type Listener
- type ListenerBehaviour
- type ListenerDriver
- type ListenerFunc
- type ListenerNode
- type ListenerNodeOption
- func WithConcurrencyLevel(n int) ListenerNodeOption
- func WithDriver(d ListenerDriver) ListenerNodeOption
- func WithGroup(g string) ListenerNodeOption
- func WithListener(l Listener) ListenerNodeOption
- func WithListenerFunc(l ListenerFunc) ListenerNodeOption
- func WithMaxHandlerPoolSize(n int) ListenerNodeOption
- func WithProviderConfiguration(cfg interface{}) ListenerNodeOption
- func WithRetryInitialInterval(d time.Duration) ListenerNodeOption
- func WithRetryMaxInterval(d time.Duration) ListenerNodeOption
- func WithRetryTimeout(d time.Duration) ListenerNodeOption
- type ListenerNoop
- type ListenerTask
- type Marshaler
- type Message
- type MessageContextKey
- type NewMessageArgs
- type NoopSchemaRegistry
- type ProtocolBuffersMarshaler
- type SchemaRegistry
- type StreamMetadata
- type StreamRegistry
- func (r StreamRegistry) Get(message interface{}) (StreamMetadata, error)
- func (r StreamRegistry) GetByStreamName(name string) (StreamMetadata, error)
- func (r StreamRegistry) GetByString(key string) (StreamMetadata, error)
- func (r StreamRegistry) Set(message interface{}, metadata StreamMetadata)
- func (r StreamRegistry) SetByString(key string, metadata StreamMetadata)
- type WriteByMessageKeyBatchItems
- type Writer
Constants ¶
const CloudEventsSpecVersion = "1.0"
CloudEventsSpecVersion the CloudEvents specification version used by streamhub
Variables ¶
var ( // DefaultConcurrencyLevel default stream-listening jobs to be running concurrently for each ListenerNode. DefaultConcurrencyLevel = 1 // DefaultRetryInitialInterval default initial interval duration between each stream-listening job provisioning on failures. DefaultRetryInitialInterval = time.Second * 3 // DefaultRetryMaxInterval default maximum interval duration between each stream-listening job provisioning on failures. DefaultRetryMaxInterval = time.Second * 15 // DefaultRetryTimeout default duration of each stream-listening job provisioning on failures. DefaultRetryTimeout = time.Second * 15 // DefaultMaxHandlerPoolSize default pool size of goroutines for ListenerNode's Listener(s) / ListenerFunc(s) executions. DefaultMaxHandlerPoolSize = 10 )
var DefaultHubInstanceName = "com.streamhub"
DefaultHubInstanceName default instance names for nameless Hub instances
var ( // ErrInvalidProtocolBufferFormat the given data is not a valid protocol buffer message ErrInvalidProtocolBufferFormat = errors.New("streamhub: Invalid protocol buffer data") )
var ErrMissingSchemaDefinition = errors.New("streamhub: Missing stream schema definition in schema registry")
ErrMissingSchemaDefinition the requested stream message definition was not found in the SchemaRegistry
var ErrMissingStream = errors.New("streamhub: Missing stream entry in stream registry")
ErrMissingStream the requested stream was not found in the StreamRegistry
var ErrMissingWriterDriver = errors.New("streamhub: Missing writer driver")
ErrMissingWriterDriver no publisher driver was found.
var ListenerBaseBehaviours = []ListenerBehaviour{
unmarshalListenerBehaviour,
injectGroupListenerBehaviour,
injectTxIDsListenerBehaviour,
retryListenerBehaviour,
}
ListenerBaseBehaviours default ListenerBehaviours
Behaviours will be executed in descending order
var ListenerBaseBehavioursNoUnmarshal = []ListenerBehaviour{
injectGroupListenerBehaviour,
injectTxIDsListenerBehaviour,
retryListenerBehaviour,
}
ListenerBaseBehavioursNoUnmarshal default ListenerBehaviours without unmarshaling
Behaviours will be executed in descending order
Functions ¶
func InjectMessageCausationID ¶
InjectMessageCausationID injects the causation id from the given context if available. If not, it will use the message id as fallback.
Types ¶
type AvroMarshaler ¶
type AvroMarshaler struct { HashingFactory Hashing64AlgorithmFactory // contains filtered or unexported fields }
AvroMarshaler handles data transformation between primitives and Apache Avro format.
Apache Avro REQUIRES a defined SchemaRegistry to decode/encode data.
func NewAvroMarshaler ¶
func NewAvroMarshaler() AvroMarshaler
NewAvroMarshaler allocates a new Apache Avro marshaler with a simple caching system to reduce memory footprint and computational usage when parsing Avro schema definition files.
func (AvroMarshaler) ContentType ¶
func (a AvroMarshaler) ContentType() string
ContentType retrieves the encoding/decoding Apache Avro format using RFC 2046 standard (application/avro).
type Event ¶
type Event interface { // GetSubject This describes the subject of the event in the context of the event producer (identified by source). // In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, but the // source identifier alone might not be sufficient as a qualifier for any specific event if the source // context has internal sub-structure. // // Identifying the subject of the event in context metadata (opposed to only in the data payload) is particularly // helpful in generic subscription filtering scenarios where middleware is unable to interpret the data content. // In the above example, the subscriber might only be interested in blobs with names ending with '.jpg' or '.jpeg' // and the subject attribute allows for constructing a simple and efficient string-suffix filter for that // subset of events. GetSubject() string }
Event is an abstract message unit used by streamhub-based systems to publish messages with a `subject` populated field of a Message
type FailingMarshalerNoop ¶
type FailingMarshalerNoop struct{}
FailingMarshalerNoop the no-operation failing Marshaler
For testing purposes only
func (FailingMarshalerNoop) ContentType ¶
func (f FailingMarshalerNoop) ContentType() string
ContentType the failing content type operation
type Hashing64AlgorithmFactory ¶
Hashing64AlgorithmFactory factory for hash.Hash64 algorithms (used by Apache Avro schema definition caching system)
var DefaultHashing64AlgorithmFactory Hashing64AlgorithmFactory = func() hash.Hash64 { return fnv.New64a() }
DefaultHashing64AlgorithmFactory the default hashing64 algorithm factory for Marshaler schema definition caching layer
type Hub ¶
type Hub struct { InstanceName string StreamRegistry StreamRegistry Writer Writer Marshaler Marshaler IDFactory IDFactoryFunc SchemaRegistry SchemaRegistry ListenerDriver ListenerDriver ListenerBehaviours []ListenerBehaviour ListenerBaseOptions []ListenerNodeOption // contains filtered or unexported fields }
Hub is the main component which enables interactions between several systems through the usage of streams.
func (*Hub) Listen ¶
func (h *Hub) Listen(message interface{}, opts ...ListenerNodeOption) error
Listen registers a new stream-listening background job.
If listening to a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems
func (*Hub) ListenByStreamKey ¶
func (h *Hub) ListenByStreamKey(stream string, opts ...ListenerNodeOption)
ListenByStreamKey registers a new stream-listening background job using the raw stream identifier (e.g. topic name).
func (*Hub) RegisterStream ¶
func (h *Hub) RegisterStream(message interface{}, metadata StreamMetadata)
RegisterStream creates a relation between a stream message type and metadata.
If registering a Google's Protocol Buffer message, DO NOT use a pointer as message schema to avoid marshaling problems
func (*Hub) RegisterStreamByString ¶
func (h *Hub) RegisterStreamByString(messageType string, metadata StreamMetadata)
RegisterStreamByString creates a relation between a string key and metadata.
func (*Hub) Write ¶ added in v0.1.5
Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func (*Hub) WriteBatch ¶ added in v0.1.5
WriteBatch inserts a set of messages into a stream assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
If an item from the batch fails, other items will fail too
func (*Hub) WriteByMessageKey ¶ added in v0.1.5
WriteByMessageKey inserts a message into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func (*Hub) WriteByMessageKeyBatch ¶ added in v0.1.5
func (h *Hub) WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) error
WriteByMessageKeyBatch inserts a set of messages into a stream using the custom message key from StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
If an item from the batch fails, other items will fail too
func (*Hub) WriteRawMessage ¶ added in v0.1.5
WriteRawMessage inserts a raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
func (*Hub) WriteRawMessageBatch ¶ added in v0.1.5
WriteRawMessageBatch inserts a set of raw transport message into a stream in order to propagate the data to a set of subscribed systems for further processing.
Uses given context to inject correlation and causation IDs.
The whole batch will be passed to the underlying Writer driver implementation as every driver has its own way to deal with batches
type HubOption ¶
type HubOption interface {
// contains filtered or unexported methods
}
HubOption enables configuration of a Hub instance.
func WithIDFactory ¶
func WithIDFactory(f IDFactoryFunc) HubOption
WithIDFactory sets the default unique identifier factory of a Hub instance.
func WithInstanceName ¶
WithInstanceName sets the name of a Hub instance.
func WithListenerBaseOptions ¶
func WithListenerBaseOptions(opts ...ListenerNodeOption) HubOption
WithListenerBaseOptions sets a list of ListenerNodeOption of a Hub instance used as global options for each listener node
func WithListenerBehaviours ¶
func WithListenerBehaviours(b ...ListenerBehaviour) HubOption
WithListenerBehaviours sets a list of ListenerBehaviour of a Hub instance ready to be executed by every stream-listening job's ListenerFunc or Listener component.
func WithListenerDriver ¶
func WithListenerDriver(d ListenerDriver) HubOption
WithListenerDriver sets the default listener driver of a Hub instance.
func WithMarshaler ¶
WithMarshaler sets the default marshaler of a Hub instance.
func WithSchemaRegistry ¶
func WithSchemaRegistry(r SchemaRegistry) HubOption
WithSchemaRegistry sets the schema registry of a Hub instance for stream message schema definitions.
func WithWriter ¶ added in v0.1.5
WithWriter sets the writer of a Hub instance.
If both Writer and WriterFunc are defined, Writer will override WriterFunc.
type IDFactoryFunc ¶
IDFactoryFunc creates an unique identifier.
var RandInt64Factory IDFactoryFunc = func() (string, error) { i := rand.Int63() return strconv.Itoa(int(i)), nil }
RandInt64Factory creates a unique identifier using math/rand built-in package with 64-bit signed integer format
var UuidIdFactory IDFactoryFunc = func() (string, error) { id, err := uuid.NewUUID() return id.String(), err }
UuidIdFactory creates a unique identifier using UUID v4 algorithm.
type InMemorySchemaRegistry ¶
InMemorySchemaRegistry is the in memory schema registry, crafted specially for basic and/or testing scenarios.
func (InMemorySchemaRegistry) GetSchemaDefinition ¶
func (i InMemorySchemaRegistry) GetSchemaDefinition(name string, version int) (string, error)
GetSchemaDefinition retrieves a schema definition (in string format) from the registry
func (InMemorySchemaRegistry) RegisterDefinition ¶
func (i InMemorySchemaRegistry) RegisterDefinition(name, def string, version int)
RegisterDefinition stores the given schema definition into the registry
type JSONMarshaler ¶
type JSONMarshaler struct{}
JSONMarshaler handles data transformation between primitives and JSON format.
func (JSONMarshaler) ContentType ¶
func (m JSONMarshaler) ContentType() string
ContentType retrieves the encoding/decoding JSON format using RFC 2046 standard (application/json).
type Listener ¶
type Listener interface { // Listen starts the execution process triggered when a message is received from a stream. // // Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff. Listen(context.Context, Message) error }
Listener is a wrapping structure of the ListenFunc handler for complex data processing scenarios.
type ListenerBehaviour ¶
type ListenerBehaviour func(node *ListenerNode, hub *Hub, next ListenerFunc) ListenerFunc
ListenerBehaviour is a middleware function with extra functionality which will be executed prior a ListenerFunc or Listener component for every stream-listening job instance registered into a Hub.
The middleware gets injected the context ListenerNode (the stream-listening job to be executed), the root Hub instance and the parent middleware function.
Moreover, there are built-in behaviours ready to be used with streamhub:
- Retry backoff
- Correlation and causation ID injection
- Consumer group injection
- Auto-unmarshalling (*only if using reflection-based stream registry or GoType was defined when registering stream)
- Logging*
- Metrics*
- Tracing*
*Manual specification on configuration required
type ListenerDriver ¶
type ListenerDriver interface { // ExecuteTask addresses the stream-listening task ExecuteTask(_ context.Context, _ ListenerTask) error }
ListenerDriver defines the underlying implementation of the stream-listening job, which addresses the usage of custom protocols and/or APIs from providers (Apache Kafka, Amazon SQS, ...).
type ListenerFunc ¶
ListenerFunc is the execution process triggered when a message is received from a stream.
Returns an error to indicate the process has failed so Hub will retry the processing using exponential backoff.
type ListenerNode ¶
type ListenerNode struct { Stream string HandlerFunc ListenerFunc Group string ProviderConfiguration interface{} ConcurrencyLevel int RetryInitialInterval time.Duration RetryMaxInterval time.Duration RetryTimeout time.Duration ListenerDriver ListenerDriver MaxHandlerPoolSize int }
ListenerNode is the worker unit which schedules stream-listening job(s).
Each ListenerNode is independent of other nodes to guarantee resiliency of interleaved processes and avoid cascading failures.
type ListenerNodeOption ¶
type ListenerNodeOption interface {
// contains filtered or unexported methods
}
ListenerNodeOption enables configuration of a ListenerNode.
func WithConcurrencyLevel ¶
func WithConcurrencyLevel(n int) ListenerNodeOption
WithConcurrencyLevel sets the concurrency level of a ListenerNode. In other words, jobs to be scheduled by the ListenerNode.
Note: If level was defined less or equal than 0, the ListenerNode will schedule 1 job
func WithDriver ¶
func WithDriver(d ListenerDriver) ListenerNodeOption
WithDriver sets the driver of a ListenerNode (e.g. Apache Kafka, Apache Pulsar, Amazon SQS).
func WithGroup ¶
func WithGroup(g string) ListenerNodeOption
WithGroup sets the consumer group or queue name of a ListenerNode.
Note: It may not be available for some providers.
func WithListener ¶
func WithListener(l Listener) ListenerNodeOption
WithListener sets the Listener of a ListenerNode.
func WithListenerFunc ¶
func WithListenerFunc(l ListenerFunc) ListenerNodeOption
WithListenerFunc sets the ListenerFunc of a ListenerNode.
func WithMaxHandlerPoolSize ¶
func WithMaxHandlerPoolSize(n int) ListenerNodeOption
WithMaxHandlerPoolSize sets the maximum number of goroutines executed by a ListenerNode's Listener or ListenerFunc.
Note: If size was defined less or equal than 0, the ListenerNode internal implementations will allocate a semaphore of 10 goroutines per handler.
func WithProviderConfiguration ¶
func WithProviderConfiguration(cfg interface{}) ListenerNodeOption
WithProviderConfiguration sets the custom provider configuration of a ListenerNode (e.g. aws.Config, sarama.Config).
func WithRetryInitialInterval ¶
func WithRetryInitialInterval(d time.Duration) ListenerNodeOption
WithRetryInitialInterval sets the initial duration interval for each retying tasks of a ListenerNode.
func WithRetryMaxInterval ¶
func WithRetryMaxInterval(d time.Duration) ListenerNodeOption
WithRetryMaxInterval sets the maximum duration interval for each retying tasks of a ListenerNode.
func WithRetryTimeout ¶
func WithRetryTimeout(d time.Duration) ListenerNodeOption
WithRetryTimeout sets the maximum duration for retying tasks of a ListenerNode.
type ListenerNoop ¶
type ListenerNoop struct{}
ListenerNoop the no-operation implementation of Listener
type ListenerTask ¶
type ListenerTask struct { Stream string HandlerFunc ListenerFunc Group string Configuration interface{} Timeout time.Duration MaxHandlerPoolSize int }
ListenerTask job metadata in order to be executed by the ListenerNodeDriver.
type Marshaler ¶
type Marshaler interface { // Marshal transforms a complex data type into a primitive binary array for data transportation. Marshal(schemaDef string, data interface{}) ([]byte, error) // Unmarshal transforms a primitive binary array to a complex data type for data processing. Unmarshal(schemaDef string, data []byte, ref interface{}) error // ContentType retrieves the encoding/decoding format using RFC 2046 standard (e.g. application/json). ContentType() string }
Marshaler handles data transformation between primitives and specific codecs/formats (e.g. JSON, Apache Avro).
type Message ¶
type Message struct { ID string `json:"id"` Stream string `json:"stream"` Source string `json:"source"` SpecVersion string `json:"specversion"` Type string `json:"type"` Data []byte `json:"data"` DataContentType string `json:"datacontenttype,omitempty"` DataSchema string `json:"dataschema,omitempty"` DataSchemaVersion int `json:"dataschemaversion,omitempty"` Timestamp string `json:"time,omitempty"` Subject string `json:"subject,omitempty"` // Streamhub fields CorrelationID string `json:"correlation_id"` CausationID string `json:"causation_id"` // consumer-only fields DecodedData interface{} `json:"-"` GroupName string `json:"-"` }
Message is a unit of information which holds the primitive message (data) in binary format along multiple fields in order to preserve a schema definition within a stream pipeline.
The schema is based on the Cloud Native Computing Foundation (CNCF)'s CloudEvents specification.
For more information, please look: https://github.com/cloudevents/spec
func NewMessage ¶
func NewMessage(args NewMessageArgs) Message
NewMessage allocates an immutable Message ready to be transported in a stream.
type MessageContextKey ¶
type MessageContextKey string
MessageContextKey is the streamhub context key to inject data into transport messages.
const ( // ContextCorrelationID is the main trace of a stream processing. Once generated, it MUST NOT be generated again // to keep track of the process from the beginning. ContextCorrelationID MessageContextKey = "shub-correlation-id" // ContextCausationID is reference of the last message processed. This helps to know a direct relation between // a new process and the past one. ContextCausationID MessageContextKey = "shub-causation-id" )
type NewMessageArgs ¶
type NewMessageArgs struct { SchemaVersion int Data []byte ID string Source string Stream string SchemaDefinitionName string ContentType string GroupName string Subject string }
NewMessageArgs arguments required by NewMessage function to operate.
type NoopSchemaRegistry ¶
type NoopSchemaRegistry struct{}
NoopSchemaRegistry is the no-operation implementation of SchemaRegistry
func (NoopSchemaRegistry) GetSchemaDefinition ¶
func (n NoopSchemaRegistry) GetSchemaDefinition(_ string, _ int) (string, error)
GetSchemaDefinition retrieves an empty string and a nil error
type ProtocolBuffersMarshaler ¶
type ProtocolBuffersMarshaler struct{}
ProtocolBuffersMarshaler handles data transformation between primitives and Google Protocol Buffers format
func (ProtocolBuffersMarshaler) ContentType ¶
func (p ProtocolBuffersMarshaler) ContentType() string
ContentType retrieves the encoding/decoding Google Protocol Buffers format using the latest conventions.
More information here: https://github.com/google/protorpc/commit/eb03145a6a7c72ae6cc43867d9635a5b8d8c4545
type SchemaRegistry ¶
type SchemaRegistry interface { // GetSchemaDefinition retrieves a schema definition (in string format) from the registry GetSchemaDefinition(name string, version int) (string, error) }
SchemaRegistry is an external storage of stream message schemas definitions with proper versioning.
Examples of this schema registries are Amazon Glue Schema Registry and Confluent Schema Registry.
type StreamMetadata ¶
type StreamMetadata struct { Stream string SchemaDefinitionName string SchemaVersion int GoType reflect2.Type }
StreamMetadata contains information of stream messages.
type StreamRegistry ¶
type StreamRegistry map[string]StreamMetadata
StreamRegistry is an in-memory storage of streams metadata used by Hub and any external agent to set and retrieve information about a specific stream.
Uses a custom string (or Go's struct type as string) as key.
Note: A message key differs from stream name as the message key COULD be anything the developer sets within the stream registry. Thus, scenarios where multiple data types require publishing messages to the same stream are possible. Moreover, the message key is set by reflection-based registries with the reflect.TypeOf function, so it will differ from the actual stream name.
func (StreamRegistry) Get ¶
func (r StreamRegistry) Get(message interface{}) (StreamMetadata, error)
Get retrieves a stream message metadata from a stream message type.
func (StreamRegistry) GetByStreamName ¶
func (r StreamRegistry) GetByStreamName(name string) (StreamMetadata, error)
GetByStreamName retrieves a stream message metadata from a stream name.
It contains an optimistic lookup mechanism to keep constant time and space complexity.
If metadata is not found by the given key, then fallback default to O(n) lookup. This will increase time and space complexity of the fallback function by the GetByString base complexity. Nevertheless, GetByString will be always constant, so it is guaranteed to keep a constant complexity sum to the overall GetByStream complexity. E.g. GetByString = 49.75 ns/op, therefore GetByStreamName = original ns/op + GetByString ns/op.
This optimistic lookup is done in order to keep amortized time and space complexity when using non-reflection based implementations on the root Hub (using only String methods from this very Stream Registry component). Thus, greater performance is achieved for scenarios when reflection-based stream registration is not required by the program.
func (StreamRegistry) GetByString ¶
func (r StreamRegistry) GetByString(key string) (StreamMetadata, error)
GetByString retrieves a stream message metadata from a string key.
func (StreamRegistry) Set ¶
func (r StreamRegistry) Set(message interface{}, metadata StreamMetadata)
Set creates a relation between a stream message type and metadata.
func (StreamRegistry) SetByString ¶
func (r StreamRegistry) SetByString(key string, metadata StreamMetadata)
SetByString creates a relation between a string key and metadata.
type WriteByMessageKeyBatchItems ¶ added in v0.1.5
type WriteByMessageKeyBatchItems map[string]interface{}
WriteByMessageKeyBatchItems items to be writeed as batch on the Hub.WriteByMessageKeyBatch() function
type Writer ¶ added in v0.1.5
type Writer interface { // Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the // data to a set of subscribed systems for further processing. Write(ctx context.Context, message Message) error // WriteBatch inserts a set of messages into a stream assigned to the message in the StreamRegistry in order to propagate the // data to a set of subscribed systems for further processing. // // Depending on the underlying Writer driver implementation, this function MIGHT return an error if a single operation failed, // or it MIGHT return an error if the whole operation failed WriteBatch(ctx context.Context, messages ...Message) error }
Writer inserts messages into streams assigned on the StreamRegistry in order to propagate the data to a set of subscribed systems for further processing.
This type should be provided by a streamhub Driver (e.g. Apache Pulsar, Apache Kafka, Amazon SNS)
var NoopWriter Writer = noopWriter{}
NoopWriter is the no-operation implementation of Writer
Source Files ¶
- context.go
- doc.go
- errors.go
- event.go
- hub.go
- hub_options.go
- id_factory.go
- listener.go
- listener_behaviour.go
- listener_node.go
- listener_node_driver.go
- listener_node_options.go
- listener_registry.go
- listener_supervisor.go
- listener_task.go
- marshaler.go
- message.go
- schema_registry.go
- stream_registry.go
- writer.go
Directories ¶
Path | Synopsis |
---|---|
benchmarks
|
|
examples
|
|
Package streamhub-memory is the In-Memory implementation for Streamhub-based programs.
|
Package streamhub-memory is the In-Memory implementation for Streamhub-based programs. |