quark

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2021 License: MIT Imports: 11 Imported by: 6

README

⚡ Quark GoDoc Build Status Coverage Status Report Status Codebeat Go Version

A Reliable and fully customizable event router for Event-Driven systems written in Go.

Based on reliable mechanisms from companies such as Uber, Quark offers an Event Router with a fine-tuned set of tools to ease messaging communication complexity.

Thread-safe processing, parallelism, concurrency and graceful shutdowns are the elemental principles of Quark.

Furthermore, Quark uses the Cloud Native Computing Foundation (CNCF) CloudEvents specification to compose messages. Quark lets developers use their preferred encoding format (JSON, Apache Avro, etc.) and sets message headers as binary data when possible to reduce computational costs.

Aside basic functionalities, it is worth to mention Quark is fully customizable, so any developer may get the maximum potential out of Quark.

A simple set of examples would be:

  • Override the default Event Writer to apply custom resilience mechanisms.
  • Increasing a Worker pool size for an specific Consumer process.
  • Override the default Publisher (e.g. Apache Kafka) for another provider Publisher (e.g. AWS SNS).
  • Set a tracing context as custom Message header to trace an specific event process.

To conclude, Quark exposes a friendly API based on Go's idiomatic best practices and the net/http + popular HTTP mux (gorilla/mux, gin-gonic/gin, labstack/echo) packages to increase overall usability and productivity.

More information about the internal low-level architecture may be found here.

Supported Infrastructure

  • Apache Kafka
  • In Memory*
  • Redis Pub/Sub*
  • Amazon Web Services Simple Queue Service (SQS)*
  • Amazon Web Services Simple Notification Service (SNS)*
  • Amazon Web Services Kinesis*
  • Amazon Web Services EventBridge*
  • Google Cloud Platform Pub/Sub*
  • Microsoft Azure Service Bus*
  • NATS*
  • RabbitMQ*

* to be implemented

Installation

Since Quark uses Go submodules to decompose specific depenencies for providers, it is required to install concrete implementations (Apache Kafka, In memory, Redis, ...) manually. One may install these using the following command.

go get github.com/neutrinocorp/quark/bus/YOUR_PROVIDER

If one wants to develop its own custom implementations, it is required to install Quark core library. It can be done running the following command.

go get github.com/neutrinocorp/quark

Note that Quark only supports the two most recent minor versions of Go.

Quick Start

Before we set up our consumers, we must define our Broker and its required configuration to work as desired.

The following example demonstrates how to set up an Apache Kafka Broker with an error handler (hook).

// Create error hook
customErrHandler := func(ctx context.Context, err error) {
  log.Print(err)
}

// ...

// Create broker
b := kafka.NewKafkaBroker(
	// ... A Shopify/sarama configuration,
	quark.WithCluster("localhost:9092", "localhost:9093"),
	quark.WithBaseMessageSource("https://neutrinocorp.org/cloudevents"),
	quark.WithBaseMessageContentType("application/cloudevents+json"),
	quark.WithErrorHandler(customErrHandler))
Listen to a Topic

Quark is very straight forward as is based on the net/http and well known Go HTTP mux packages.

This example demonstrates how to listen to a Topic using the Broker.Topic() method.

If no pool-size was specified, Quark will set up to 5 workers for each Consumer.

b.Topic("chat.1").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ...
  return true
})
Publish to Topic(s)

Internally, Quark uses the EventWriter and Publisher components to propagate an event into the provider infrastructure.

Moreover, Quark lets developers publish a message to multiple Topics (fan-out).

This can be done by calling the EventWriter's Write()/WriteMessage() methods.

b.Topic("chat.1").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ...
  _, _ = w.Write(e.Context, encodedMsgBody, "chat.2", "chat.3") // returns how many messages were published
  // or
  // msg is a quark.Message struct, writer will use Message.Type/Topic attribute to publish
  _, _ = w.WriteMessage(e.Context, msgA, msgB)
  return true
})
Retry an event process

Quark is based on reliable mechanisms such as retry-exponential+jitter backoff and sending poison messages to Dead-Letter Queues (DLQ) strategies.

This can be done by calling the EventWriter.WriteRetry() method.

To customize these mechanisms, the developer may use the quark.WithMaxRetries()/Consumer.MaxRetries() and quark.WithRetryBackoff()/Consumer.RetryBackoff() methods/functions.

b.Topic("cosmos.payments").MaxRetries(3).RetryBackoff(time.Second*3).HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ... something failed in our processing
  err := w.WriteRetry(e.Context, e.Body)
  if errors.Is(err, quark.ErrMessageRedeliveredTooMuch) {
       	// calling Write() will set the Message re-delivery delta to 0
	_, _ = w.Write(e.Context, e.Body.Data, "dlq.chat.1")
  }
  return true
})
Failed event processing

If a message processing fails, Quark will use Acknowledgement mechanisms if available.

This can be done by returning a false value from the event handler.

* Only available for specific providers.

b.Topic("cosmos.user_registered").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ...
  return true // this indicates if the consumer should mark the message or not (Ack or NAck)
})
Start Broker and Graceful Shutdown

To conclude, after setting up all of our consumers, the developer must start the Broker component to execute background jobs from registered Consumer(s).

The developer should not forget to shutdown gracefully the Broker -like an net/http server-.

// graceful shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
go func() {
	if err := b.ListenAndServe(); err != nil && err != quark.ErrBrokerClosed {
		log.Fatal(err)
	}
}()

<-stop

log.Printf("stopping %d supervisor(s) and %d worker(s)", b.ActiveSupervisors(), b.ActiveWorkers())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

if err := b.Shutdown(ctx); err != nil {
	log.Fatal(err)
}

log.Print(b.ActiveSupervisors(), b.ActiveWorkers()) // should be 0,0

Advanced techniques

Increase/Decrease Worker pool for a Consumer process

Quark parallelize message-processing jobs by creating a pool of Worker(s) for each Consumer using goroutines.

The pool size can be defined to an specific Consumer calling the quark.WithPoolSize()/Consumer.PoolSize() method/function.

b.Topic("chat.1").PoolSize(10).HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ...
  return true
})
Grouping Consumer jobs

When processing in parallel, every Worker in a Consumer pool will read from a Queue/Offset independently.

Even though this is intended by Quark, the developer might want to balance processing load from the Worker(s) by treating the Consumer pool as a whole.

This can be done calling the Consumer.Group() method.

* Only available for specific providers (e.g. Apache Kafka).

b.Topic("chat.1").Group("awesome-group").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ...
  return true
})
Listening N-Topics within a single Consumer (Fan-in)

A Quark Consumer accepts up to N topics by default.

This feature can be implemented by calling the Consumer.Topics() method.

b.Topics("chat.0", "chat.1").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ...
  return true
})
Event header read and manipulation

Like HTTP, Quark defines a set of headers for each Event and decodes/encodes them by default.

These headers may contain useful metadata from the current Broker, Consumer, provider (e.g. an Apache Kafka Offset or Partition) and/or Message (CloudEvents attributes).

Moreover, Quark lets developers read or manipulate these headers. Thus, modified headers will be published when EventWriter's Write methods are called.

This can be done by calling the EventWriter.Header().Get() and EventWriter.Header().Set() methods.

b.Topic("chat.1").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ...
  partition := w.Header().Get(kafka.HeaderPartition)
  w.Header().Set(quark.HeaderMessageDataContentType, "application/avro")
  _, _ = w.Write(e.Context, e.Body.Data, "dlq.chat.1") // will use new Content-Type header
  return true
})
Using a different Publisher for a Consumer process

As part of the fully customizable principle, a Quark Consumer may use a different Publisher component if desired.

This feature can be implemented by using a different provider Publisher implementation and by calling the Consumer.Publisher() method.

// on quark/bus/aws package

type SNSPublisher struct{}

func (a SNSPublisher) Publish(ctx context.Context, msgs ...*quark.Message) error {
	// ...
	return nil
}

// on developer application

// ...

// Listening from Google Cloud Platform Pub/Sub

b.Topic("alex.trades").Publisher(aws.SNSPublisher{}).
  HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
    // Write() will publish the message to Amazon Web Services Simple Notification Service (SNS)
    _, _ = w.Write(e.Context, []byte("alex has traded in a new index fund"),
      "aws.alex.trades", "aws.analytics.trades")
    return true
  })

See the documentation, examples and FAQ for more details.

Performance

As measured by its own benchmarking suite, not only is Quark more performant than comparable messaging processing packages. Like all benchmarks, take these with a grain of salt.1

Maintenance

This library is currently maintained by

Development Status: Alpha

All APIs are under development, yet no breaking changes will be made in the 1.x.x series of releases. Users of semver-aware dependency management systems should pin Quark to ^1.

Contributing

We encourage and support an active, healthy community of contributors — including you! Details are in the contribution guide and the code of conduct. The Quark maintainers keep an eye on issues and pull requests, but you can also report any negative conduct to oss-conduct@neutrinocorp.org. That email list is a private, safe space; even the Quark maintainers don't have access, so don't hesitate to hold us to a high standard.


Released under the MIT License.

1 In particular, keep in mind that we may be benchmarking against slightly older versions of other packages. Versions are pinned in the benchmarks/go.mod file.

Documentation

Overview

Package quark Reliable Event-Driven mechanisms for reactive ecosystems written in Go.

Based on reliable mechanisms from companies such as Uber who serve 1M+ requests per-hour, Quark offers a fine-tuned set of tools to ease overall complexity.

In Addition, Quark fans-out processes per-consumer to parallelize blocking I/O tasks (as consuming from a queue/topic would be) and isolate them. Thread-safe and graceful shutdown are a very important part of Quark.

Furthermore, Quark stores specific data (e.g. event id, correlation id, span context, etc) into messages headers in binary format to ease disk consumption on the infrastructure and it lets users use their own encoding preferred library. (JSON, Apache Avro, etc.)

Therefore, Quark lets developers take advantage of those mechanisms with its default configuration and a gorilla/mux-like router/mux to keep them in ease and get benefits without complex configurations and handling. You can either choose use global configurations specified in the broker or use an specific configuration for an specific consumer.

Index

Constants

View Source
const (
	// Command Message type. Used when dispatching an asynchronous command (CQRS); in other words, this type is widely
	// used when asynchronous processing in an external system is required (e.g. processing a video in a serverless
	// function triggered by a service or simple synchronous REST API)
	Command = "command"
	// DomainEvent Message type. Used when something has happened in an specific aggregate and it must propagate
	// its side-effects in the entire event-driven ecosystem
	DomainEvent = "event"
)
View Source
const (
	// HeaderMessageId Message ID
	HeaderMessageId = "quark-id"
	// HeaderMessageType This attribute contains a value describing the type of event related to the originating occurrence.
	HeaderMessageType = "quark-type"
	// HeaderMessageSpecVersion The version of the CloudEvents specification which the event uses.
	HeaderMessageSpecVersion = "quark-spec-version"
	// HeaderMessageSource Identifies the context in which an event happened.
	HeaderMessageSource = "quark-source"
	// HeaderMessageTime Time when Message was published
	HeaderMessageTime = "quark-time"
	// HeaderMessageDataContentType Content type of data value. This attribute enables data to carry any type of content,
	// whereby format and encoding might differ from that of the chosen event format.
	HeaderMessageDataContentType = "quark-content-type"
	// HeaderMessageDataSchema Identifies the schema that data adheres to.
	HeaderMessageDataSchema = "quark-data-schema"
	// HeaderMessageData Message body encoded bytes
	HeaderMessageData = "quark-data"
	// HeaderMessageSubject This describes the subject of the event in the context of the event producer (identified by source).
	HeaderMessageSubject = "quark-subject"
	// HeaderMessageCorrelationId Message parent (origin)
	HeaderMessageCorrelationId = "quark-metadata-correlation-id"
	// HeaderMessageHost Node IP from deployed cluster in infrastructure
	HeaderMessageHost = "quark-metadata-host"
	// HeaderMessageRedeliveryCount Message total redeliveries
	HeaderMessageRedeliveryCount = "quark-metadata-redelivery-count"
	// HeaderMessageError Message error message from processing pipeline
	HeaderMessageError = "quark-metadata-error"

	// HeaderConsumerGroup Consumer group this message was received by
	HeaderConsumerGroup = "quark-consumer-group"
	// HeaderSpanContext Message span parent, used for distributed tracing mechanisms such as OpenCensus, OpenTracing
	// and/or OpenTelemetry
	HeaderSpanContext = "quark-span-context"
)

using CNCF CloudEvents specification v1.0 ref. https://cloudevents.io/ ref. https://github.com/cloudevents/spec/blob/v1.0.1/spec.md

View Source
const CloudEventsVersion = "1.0"

CloudEventsVersion current Quark's CNCF CloudEvents specification version

Variables

View Source
var (
	// ErrBrokerClosed the broker was already closed
	ErrBrokerClosed = errors.New("broker closed")
	// ErrProviderNotValid the given provider is not accepted by Quark
	ErrProviderNotValid = errors.New("provider is not valid")
	// ErrPublisherNotImplemented the given publisher does not have its concrete implementation
	ErrPublisherNotImplemented = errors.New("publisher is not implemented")
	// ErrNotEnoughTopics no topics where found
	ErrNotEnoughTopics = errors.New("not enough topics")
	// ErrNotEnoughHandlers no consumer handler was found
	ErrNotEnoughHandlers = errors.New("not enough handlers")
	// ErrEmptyMessage no message was found
	ErrEmptyMessage = errors.New("message is empty")
	// ErrEmptyCluster the current cluster does not contains any hosts/addresses
	ErrEmptyCluster = errors.New("consumer cluster is empty")
	// ErrRequiredGroup a consumer group is required
	ErrRequiredGroup = errors.New("consumer group is required")
)
View Source
var ErrMessageRedeliveredTooMuch = errors.New("message has been redelivered too much")

ErrMessageRedeliveredTooMuch the message has been published the number of times of the configuration limit

Functions

func FormatQueueName

func FormatQueueName(service, entity, action, event string) string

FormatQueueName forms an Async API queue name

format e.g. "service.entity.action_on_event

func FormatTopicName

func FormatTopicName(organization, service, kind, entity, action string, version int) string

FormatTopicName forms an Async API topic name

format e.g. "organization.service.version.kind.entity.action"

Types

type Broker

type Broker struct {
	ProviderConfig   interface{}
	Cluster          []string
	ErrorHandler     ErrorHandler
	Publisher        Publisher
	EventMux         EventMux
	EventWriter      EventWriter
	PoolSize         int
	MaxRetries       int
	RetryBackoff     time.Duration
	ConnRetries      int
	ConnRetryBackoff time.Duration

	MessageIDFactory IDFactory
	WorkerFactory    WorkerFactory

	// BaseMessageSource is the default Source of a Message based on the CNCF CloudEvents specification v1
	//
	// It could be a Internet-wide unique URI with a DNS authority, Universally-unique URN with a UUID or
	// Application-specific identifiers
	//
	// e.g. https://github.com/cloudevents, urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66, /cloudevents/spec/pull/123
	BaseMessageSource string
	// BaseMessageContentType is the default Content type of data value. This attribute enables data to carry any type of content,
	// whereby format and encoding might differ from that of the chosen event format.
	//
	// Must adhere to the format specified in RFC 2046
	//
	// e.g. application/avro, application/json, application/cloudevents+json
	BaseMessageContentType string

	BaseContext context.Context
	// contains filtered or unexported fields
}

Broker coordinates every Event operation on the running Event-Driven application.

Administrates Consumer(s) supervisors and their workers wrapped with well-known concurrency and resiliency patterns.

func NewBroker

func NewBroker(opts ...Option) *Broker

NewBroker allocates and returns a Broker

func (*Broker) ActiveSupervisors

func (b *Broker) ActiveSupervisors() int

activeSupervisors returns the current number of running supervisors

func (*Broker) ActiveWorkers

func (b *Broker) ActiveWorkers() int

activeWorkers returns the current number of running workers (inside every Supervisor)

func (*Broker) GetConnRetries

func (b *Broker) GetConnRetries() int

GetConnRetries retrieves the default connection retries

func (*Broker) GetConnRetryBackoff

func (b *Broker) GetConnRetryBackoff() time.Duration

GetConnRetryBackoff retrieves the default connection retry backoff

func (*Broker) ListenAndServe

func (b *Broker) ListenAndServe() error

ListenAndServe starts listening to the given Consumer(s) concurrently-safe

func (*Broker) Serve

func (b *Broker) Serve() error

Serve starts the broker components

func (*Broker) Shutdown

func (b *Broker) Shutdown(ctx context.Context) error

Shutdown starts Broker graceful shutdown of its components

func (*Broker) Topic

func (b *Broker) Topic(topic string) *Consumer

Topic adds new Consumer Supervisor to the given EventMux

func (*Broker) Topics

func (b *Broker) Topics(topics ...string) *Consumer

Topics adds multiple Consumer supervisors to the given EventMux

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer main processing unit. It is intended to subscribe to a Topic or Queue within a worker pool to segregate application load and enable high-concurrency.

A Broker will receive and coordinate all Consumer nodes and will stop them gracefully when desired.

func (*Consumer) Address

func (c *Consumer) Address(addrs ...string) *Consumer

Address ip address(es) with its respective port(s) of the Message Broker/Message Queue system cluster

func (*Consumer) ContentType

func (c *Consumer) ContentType(t string) *Consumer

ContentType is the default Content type of data value. This attribute enables data to carry any type of content, whereby format and encoding might differ from that of the chosen event format.

Must adhere to the format specified in RFC 2046 RFC 2046">¶

e.g. application/avro, application/json, application/cloudevents+json

func (*Consumer) GetGroup

func (c *Consumer) GetGroup() string

GetGroup returns the current Consumer group

func (*Consumer) GetHandle

func (c *Consumer) GetHandle() Handler

GetHandle returns the current consumer Handler component

func (*Consumer) GetHandleFunc

func (c *Consumer) GetHandleFunc() HandlerFunc

GetHandleFunc returns the current consumer Handler function component

func (*Consumer) GetProviderConfig

func (c *Consumer) GetProviderConfig() interface{}

GetProviderConfig returns a custom provider configuration (e.g. sarama config, aws credentials)

func (*Consumer) GetTopics

func (c *Consumer) GetTopics() []string

GetTopics returns the current consumer Topic slice

func (*Consumer) Group

func (c *Consumer) Group(g string) *Consumer

Group set of consumers this specific consumer must be with to consume messages in parallel

Only available in: Apache Kafka

func (*Consumer) Handle

func (c *Consumer) Handle(handler Handler) *Consumer

Handle specific struct Quark will use to send messages

func (*Consumer) HandleFunc

func (c *Consumer) HandleFunc(handlerFunc HandlerFunc) *Consumer

HandleFunc specific func Quark will use to send messages

func (*Consumer) MaxRetries

func (c *Consumer) MaxRetries(n int) *Consumer

MaxRetries total times to retry an Event operation if processing fails

func (*Consumer) PoolSize

func (c *Consumer) PoolSize(s int) *Consumer

PoolSize worker pool size

func (*Consumer) ProviderConfig

func (c *Consumer) ProviderConfig(cfg interface{}) *Consumer

ProviderConfig Custom provider configuration (e.g. sarama config, aws credentials)

func (*Consumer) Publisher

func (c *Consumer) Publisher(p Publisher) *Consumer

Publisher pushes the given Message into the Event-Driven ecosystem.

func (*Consumer) RetryBackoff

func (c *Consumer) RetryBackoff(t time.Duration) *Consumer

RetryBackoff time to wait between each retry

func (*Consumer) Source

func (c *Consumer) Source(s string) *Consumer

Source is the specific Source of a Message based on the CNCF CloudEvents specification v1

It could be a Internet-wide unique URI with a DNS authority, Universally-unique URN with a UUID or Application-specific identifiers

e.g. https://github.com/cloudevents, urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66, /cloudevents/spec/pull/123

func (*Consumer) Topic

func (c *Consumer) Topic(topic string) *Consumer

Topic A Broker will use this topic to subscribe the Consumer

This field can be also used as Queue

func (Consumer) TopicString

func (c Consumer) TopicString() string

TopicString returns every topic registered into the current consumer as string

func (*Consumer) Topics

func (c *Consumer) Topics(topics ...string) *Consumer

Topics A Broker will use these topics to subscribe the Consumer to fan-in processing

These fields can be also used as Queues

func (*Consumer) WorkerFactory

func (c *Consumer) WorkerFactory(f WorkerFactory) *Consumer

WorkerFactory specific Quark Node's concrete worker generator

type ErrorHandler

type ErrorHandler func(context.Context, error)

ErrorHandler is a Hook that may be called when a error occurs inside Quark processes

type Event

type Event struct {
	Context    context.Context
	Topic      string
	Header     Header
	Body       *Message
	RawValue   []byte
	RawSession interface{}
}

Event represents something that has happened or requested to happen within the Event-Driven ecosystem.

For example, this could be either a domain event or an asynchronous command.

type EventMux

type EventMux interface {
	// Topic adds a new topic to the local registry
	Topic(topic string) *Consumer
	// Topics adds multiple topics
	Topics(topics ...string) *Consumer
	// Add manually adds a consumer
	Add(c *Consumer)
	// Get returns an specific consumer by the given topic
	Get(topic string) []*Consumer
	// Del removes an specific topic from the local registry
	Del(topic string)
	// Contains verifies if the given topic exists in the local registry
	Contains(topic string) bool
	// List returns the local registry
	List() map[string][]*Consumer
}

EventMux Consumer node registry, the mux itself contains a local registry of all the given Consumer(s). This process is required by the Broker in order to start all the nodes registered on the mux.

func NewMux

func NewMux() EventMux

NewMux allocates and returns a default EventMux

type EventWriter

type EventWriter interface {
	ReplaceHeader(Header)
	// Publisher actual publisher used to push Event(s)
	Publisher() Publisher
	// Header Event metadata
	Header() Header
	// Write push the given encoded message into the Event-Driven ecosystem.
	//
	// Returns number of messages published and non-nil error if publisher failed to push Event or
	// returns ErrNotEnoughTopics if no topic was specified
	//	Sometimes, the writer might not publish messages to broker since they have passed the maximum redelivery cap
	Write(ctx context.Context, msg []byte, topics ...string) (int, error)
	// WriteMessage push the given message into the Event-Driven ecosystem.
	//
	// Returns number of messages published
	// and non-nil error if publisher failed to push Event
	//	Remember Quark uses Message's "Type" field as topic name, so the developer must specify it either in mentioned field or in response headers
	//	Sometimes, the writer might not publish messages to broker since they have passed the maximum redelivery cap
	WriteMessage(context.Context, ...*Message) (int, error)
	// WriteRetry push the given Event into a retry topic.
	// Publishing aside, WriteRetry method adds a new ID for every message passed using the Broker's ID Generator/Factory.
	//
	// It is recommended to point the Message's Topic/Type to an specific retry topic/queue (e.g. foo.executed -> foo.executed.retry/foo.executed.retry.N).
	//
	// This implementation differs from others because it increments the given Message "redelivery_count" delta field by one
	WriteRetry(ctx context.Context, msg *Message) error
}

EventWriter works as an Event response writer. Lets an Event to respond, fan-out a topic message or to fail properly by sending the failed Event into either a Retry or Dead Letter Queue (DLQ).

Uses a Publisher to write the actual message

type Handler

type Handler interface {
	// ServeEvent handles any Event coming from an specific topic(s)
	//
	//
	// Returned boolean works as Acknowledgement mechanism (Ack or NAck).
	//
	// - If true, Quark will send success to the actual message broker/queue system.
	//
	// - If false, Quark will not send any success response to the actual message broker/queue system.
	//
	//	Ack mechanism is available in specific providers
	ServeEvent(EventWriter, *Event) bool
}

Handler handles any Event coming from an specific topic(s)

Returned boolean works as Acknowledgement mechanism (Ack or NAck).

- If true, Quark will send success to the actual message broker/queue system.

- If false, Quark will not send any success response to the actual message broker/queue system.

Ack mechanism is available in specific providers

type HandlerFunc

type HandlerFunc func(EventWriter, *Event) bool

HandlerFunc handles any Event coming from an specific topic(s).

Returned boolean works as Acknowledgement mechanism (Ack or NAck).

- If true, Quark will send success to the actual message broker/queue system.

- If false, Quark will not send any success response to the actual message broker/queue system.

Ack mechanism is available in specific providers
type Header map[string]string

Header holds relevant metadata about the Message, Consumer or Broker.

For example, a header may contain the following relevant information:

- Broker and Consumer configurations.

- Specific provider data (e.g. offsets, partitions and member id from a consumer group in Kafka).

- An span context from the distributed tracing provider, so it can propagate traces from remote parents.

- Message properties (e.g. total redeliveries, correlation id, origin service ip address).

func (Header) Contains

func (h Header) Contains(k string) bool

Contains verifies if the given record exists

func (Header) Del

func (h Header) Del(k string)

Del removes a record

func (Header) Get

func (h Header) Get(k string) string

Get returns a value from the given key

func (Header) Set

func (h Header) Set(k, v string)

Set attach or override the given fields

type IDFactory

type IDFactory func() string

IDFactory Message unique identifier generator

Default is `google/uuid`

type Message

type Message struct {
	// Id message unique identifier
	Id string `json:"id"`
	// Type message topic. This attribute contains a value describing the type of event related to the originating occurrence.
	//
	// Often this attribute is used for routing, observability, policy enforcement, etc.
	// The format of this is producer defined and might include information such as the version of the type -
	// see Versioning of Attributes in the Primer for more information.
	//
	//	e.g. com.example.object.deleted.v2
	Type string `json:"type"`
	// SpecVersion The version of the CloudEvents specification which the event uses.
	//
	// This enables the interpretation of the context.
	// Compliant event producers MUST use a value of 1.0 when referring to this version of the specification.
	SpecVersion string `json:"specversion"`
	// Source Identifies the context in which an event happened.
	// Often this will include information such as the type of the event source, the organization publishing the event or the process that produced the event.
	// The exact syntax and semantics behind the data encoded in the URI is defined by the event producer.
	//
	// e.g. https://github.com/cloudevents, urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66, /cloudevents/spec/pull/123
	Source string `json:"source"`

	// Data The event payload. This specification does not place any restriction on the type of this information.
	//
	// It is encoded into a media format which is specified by the datacontenttype attribute (e.g. application/json),
	// and adheres to the dataschema format when those respective attributes are present.
	Data []byte `json:"data,omitempty"`
	// ContentType Content type of data value. This attribute enables data to carry any type of content,
	// whereby format and encoding might differ from that of the chosen event format.
	ContentType string `json:"datacontenttype,omitempty"`
	// DataSchema Identifies the schema that data adheres to.
	//
	// Incompatible changes to the schema SHOULD be reflected by a different URI. See Versioning of Attributes
	// in the Primer for more information
	DataSchema string `json:"dataschema,omitempty"`
	// Subject This describes the subject of the event in the context of the event producer (identified by source).
	//
	// In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, but the source
	// identifier alone might not be sufficient as a qualifier for any specific event if the source context has internal sub-structure.
	//
	// e.g: source -> https://example.com/storage/tenant/container,
	// subject -> mynewfile.jpg
	Subject string `json:"subject,omitempty"`
	// Time Timestamp of when the occurrence happened.
	// If the time of the occurrence cannot be determined then this attribute MAY be set to some other time (such as the current time) by the
	// Quark producer, however all producers for the same source MUST be consistent in this respect.
	// In other words, either they all use the actual time of the occurrence or they all use the same algorithm to determine the value used.
	Time time.Time `json:"time,omitempty"`

	// Metadata message volatile information
	Metadata MessageMetadata `json:"metadata,omitempty"`
}

Message an Event's body.

It is intended to be passed into the actual message broker/queue system (e.g. Kafka, AWS SNS/SQS, RabbitMQ) as default message form.

Thus, it keeps struct consistency between messages published in any programming language.

Based on the CNCF CloudEvents specification v1.0

ref. https://cloudevents.io/

ref. https://github.com/cloudevents/spec/blob/v1.0.1/spec.md

func NewMessage

func NewMessage(id, msgType string, data []byte) *Message

NewMessage creates a new message without a parent message

func NewMessageFromParent

func NewMessageFromParent(parentId, id, msgType string, data []byte) *Message

NewMessageFromParent creates a new Message setting as parent (on the CorrelationId field) the given parentId

func (Message) Encode

func (m Message) Encode() ([]byte, error)

Encode returns the encoded data

func (Message) Length

func (m Message) Length() int

Length bytes size of the current message's data

type MessageMetadata

type MessageMetadata struct {
	// CorrelationId root message id
	CorrelationId string `json:"correlation_id"`
	// Host sender node ip address
	Host string `json:"host,omitempty"`
	// RedeliveryCount attempts this specific message tried to get process
	RedeliveryCount int `json:"redelivery_count"`
	// ExternalData non-Quark data may be stored here (e.g. non-Quark headers)
	ExternalData map[string]string `json:"external_data,omitempty"`
}

MessageMetadata a message volatile fields, used to store useful Quark resiliency mechanisms and custom developer-defined fields

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is a unit of configuration of a Broker

func WithBaseContext

func WithBaseContext(ctx context.Context) Option

WithBaseContext defines the context a Broker will use to executes its operations

func WithBaseMessageContentType

func WithBaseMessageContentType(s string) Option

WithBaseMessageContentType defines the global base Message Content Type to comply with the CNCF CloudEvents specification

func WithBaseMessageSource

func WithBaseMessageSource(s string) Option

WithBaseMessageSource defines the global base Message Source to comply with the CNCF CloudEvents specification

func WithCluster

func WithCluster(addr ...string) Option

WithCluster defines a set of addresses a Broker will use

func WithConnRetryBackoff

func WithConnRetryBackoff(backoff time.Duration) Option

WithConnRetryBackoff defines a time duration a Worker will wait to connect to the specified infrastructure

func WithErrorHandler

func WithErrorHandler(handler ErrorHandler) Option

WithErrorHandler defines an error hook/middleware that will be executed when an error occurs inside Quark low-level internals

func WithEventMux

func WithEventMux(mux EventMux) Option

WithEventMux defines the EventMux that will be used for Broker's operations

func WithEventWriter

func WithEventWriter(w EventWriter) Option

WithEventWriter defines the global event writer

func WithMaxConnRetries

func WithMaxConnRetries(t int) Option

WithMaxConnRetries defines the global maximum ammount of times a consumer worker will retry its operations

func WithMaxRetries

func WithMaxRetries(t int) Option

WithMaxRetries defines the global maximum ammount of times a publish operations will retry an Event operation

func WithMessageIDFactory

func WithMessageIDFactory(factory IDFactory) Option

WithMessageIDFactory defines the global Message ID Factory

func WithPoolSize

func WithPoolSize(size int) Option

WithPoolSize defines the global pool size of total Workers

func WithProviderConfiguration

func WithProviderConfiguration(cfg interface{}) Option

WithProviderConfiguration defines an specific Provider configuration

func WithPublisher

func WithPublisher(p Publisher) Option

WithPublisher defines a global Publisher that will be used to push messages to the ecosystem through the EventWriter

func WithRetryBackoff

func WithRetryBackoff(backoff time.Duration) Option

WithRetryBackoff defines a time duration an EventWriter will wait to execute a write operation

func WithWorkerFactory

func WithWorkerFactory(factory WorkerFactory) Option

WithWorkerFactory defines the global Worker Factory Quark's Supervisor(s) will use to schedule background I/O tasks

type Publisher

type Publisher interface {
	Publish(context.Context, ...*Message) error
}

Publisher pushes the given Message into the Event-Driven ecosystem.

type PublisherFactory

type PublisherFactory func(providerCfg interface{}, cluster []string) Publisher

PublisherFactory is a crucial Broker and/or Consumer component which generates the concrete publishers Quark will use to produce data

type Supervisor

type Supervisor struct {
	Broker   *Broker
	Consumer *Consumer
	// contains filtered or unexported fields
}

Supervisor is a Consumer logical unit of work. It acts as a Task scheduler for current jobs.

Distributes blocking I/O operations into different goroutines to enable parallelism with fan-out mechanisms.

func (*Supervisor) Close

func (n *Supervisor) Close() error

Close ends the current Supervisor consuming session

func (*Supervisor) GetCluster

func (n *Supervisor) GetCluster() []string

GetCluster retrieves the default cluster slice

func (*Supervisor) GetEventWriter

func (n *Supervisor) GetEventWriter() EventWriter

GetEventWriter retrieves the default event writer

func (*Supervisor) GetGroup

func (n *Supervisor) GetGroup() string

GetGroup retrieves the default consumer group

func (*Supervisor) ScheduleJobs

func (n *Supervisor) ScheduleJobs(ctx context.Context) error

ScheduleJobs starts the blocking I/O consuming operations from the current Consumer parent

type Worker

type Worker interface {
	// SetID inject the current Worker id
	SetID(i int)
	// Parent returns the parent Supervisor
	Parent() *Supervisor
	// StartJob starts an specific work
	//
	// Panics goroutine if any errors is thrown
	StartJob(context.Context) error
	// Close stop all Blocking I/O operations
	Close() error
}

Worker is a unit of work a Consumer Supervisor.

It handles all the work and should be running inside a goroutine to enable parallelism.

type WorkerFactory

type WorkerFactory func(parent *Supervisor) Worker

WorkerFactory is a crucial Broker and/or Consumer component which generates the concrete workers Quark will use to consume data

Directories

Path Synopsis
bus
kafka Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL