Documentation
¶
Overview ¶
Package quark Reliable Event-Driven mechanisms for reactive ecosystems written in Go.
Based on reliable mechanisms from companies such as Uber who serve 1M+ requests per-hour, Quark offers a fine-tuned set of tools to ease overall complexity.
In Addition, Quark fans-out processes per-consumer to parallelize blocking I/O tasks (as consuming from a queue/topic would be) and isolate them. Thread-safe and graceful shutdown are a very important part of Quark.
Furthermore, Quark stores specific data (e.g. event id, correlation id, span context, etc) into messages headers in binary format to ease disk consumption on the infrastructure and it lets users use their own encoding preferred library. (JSON, Apache Avro, etc.)
Therefore, Quark lets developers take advantage of those mechanisms with its default configuration and a gorilla/mux-like router/mux to keep them in ease and get benefits without complex configurations and handling. You can either choose use global configurations specified in the broker or use an specific configuration for an specific consumer.
Index ¶
- Constants
- Variables
- func FormatQueueName(service, entity, action, event string) string
- func FormatTopicName(organization, service, kind, entity, action string, version int) string
- type Broker
- func (b *Broker) ActiveSupervisors() int
- func (b *Broker) ActiveWorkers() int
- func (b *Broker) GetConnRetries() int
- func (b *Broker) GetConnRetryBackoff() time.Duration
- func (b *Broker) ListenAndServe() error
- func (b *Broker) Serve() error
- func (b *Broker) Shutdown(ctx context.Context) error
- func (b *Broker) Topic(topic string) *Consumer
- func (b *Broker) Topics(topics ...string) *Consumer
- type Consumer
- func (c *Consumer) Address(addrs ...string) *Consumer
- func (c *Consumer) ContentType(t string) *Consumer
- func (c *Consumer) GetGroup() string
- func (c *Consumer) GetHandle() Handler
- func (c *Consumer) GetHandleFunc() HandlerFunc
- func (c *Consumer) GetProviderConfig() interface{}
- func (c *Consumer) GetTopics() []string
- func (c *Consumer) Group(g string) *Consumer
- func (c *Consumer) Handle(handler Handler) *Consumer
- func (c *Consumer) HandleFunc(handlerFunc HandlerFunc) *Consumer
- func (c *Consumer) MaxRetries(n int) *Consumer
- func (c *Consumer) PoolSize(s int) *Consumer
- func (c *Consumer) ProviderConfig(cfg interface{}) *Consumer
- func (c *Consumer) Publisher(p Publisher) *Consumer
- func (c *Consumer) RetryBackoff(t time.Duration) *Consumer
- func (c *Consumer) Source(s string) *Consumer
- func (c *Consumer) Topic(topic string) *Consumer
- func (c Consumer) TopicString() string
- func (c *Consumer) Topics(topics ...string) *Consumer
- func (c *Consumer) WorkerFactory(f WorkerFactory) *Consumer
- type ErrorHandler
- type Event
- type EventMux
- type EventWriter
- type Handler
- type HandlerFunc
- type Header
- type IDFactory
- type Message
- type MessageMetadata
- type Option
- func WithBaseContext(ctx context.Context) Option
- func WithBaseMessageContentType(s string) Option
- func WithBaseMessageSource(s string) Option
- func WithCluster(addr ...string) Option
- func WithConnRetryBackoff(backoff time.Duration) Option
- func WithErrorHandler(handler ErrorHandler) Option
- func WithEventMux(mux EventMux) Option
- func WithEventWriter(w EventWriter) Option
- func WithMaxConnRetries(t int) Option
- func WithMaxRetries(t int) Option
- func WithMessageIDFactory(factory IDFactory) Option
- func WithPoolSize(size int) Option
- func WithProviderConfiguration(cfg interface{}) Option
- func WithPublisher(p Publisher) Option
- func WithRetryBackoff(backoff time.Duration) Option
- func WithWorkerFactory(factory WorkerFactory) Option
- type Publisher
- type PublisherFactory
- type Supervisor
- type Worker
- type WorkerFactory
Constants ¶
const ( // Command Message type. Used when dispatching an asynchronous command (CQRS); in other words, this type is widely // used when asynchronous processing in an external system is required (e.g. processing a video in a serverless // function triggered by a service or simple synchronous REST API) Command = "command" // DomainEvent Message type. Used when something has happened in an specific aggregate and it must propagate // its side-effects in the entire event-driven ecosystem DomainEvent = "event" )
const ( // HeaderMessageId Message ID HeaderMessageId = "quark-id" // HeaderMessageType This attribute contains a value describing the type of event related to the originating occurrence. HeaderMessageType = "quark-type" // HeaderMessageSpecVersion The version of the CloudEvents specification which the event uses. HeaderMessageSpecVersion = "quark-spec-version" // HeaderMessageSource Identifies the context in which an event happened. HeaderMessageSource = "quark-source" // HeaderMessageTime Time when Message was published HeaderMessageTime = "quark-time" // HeaderMessageDataContentType Content type of data value. This attribute enables data to carry any type of content, // whereby format and encoding might differ from that of the chosen event format. HeaderMessageDataContentType = "quark-content-type" // HeaderMessageDataSchema Identifies the schema that data adheres to. HeaderMessageDataSchema = "quark-data-schema" // HeaderMessageData Message body encoded bytes HeaderMessageData = "quark-data" // HeaderMessageSubject This describes the subject of the event in the context of the event producer (identified by source). HeaderMessageSubject = "quark-subject" // HeaderMessageCorrelationId Message parent (origin) HeaderMessageCorrelationId = "quark-metadata-correlation-id" // HeaderMessageHost Node IP from deployed cluster in infrastructure HeaderMessageHost = "quark-metadata-host" // HeaderMessageRedeliveryCount Message total redeliveries HeaderMessageRedeliveryCount = "quark-metadata-redelivery-count" // HeaderMessageError Message error message from processing pipeline HeaderMessageError = "quark-metadata-error" // HeaderConsumerGroup Consumer group this message was received by HeaderConsumerGroup = "quark-consumer-group" // HeaderSpanContext Message span parent, used for distributed tracing mechanisms such as OpenCensus, OpenTracing // and/or OpenTelemetry HeaderSpanContext = "quark-span-context" )
using CNCF CloudEvents specification v1.0 ref. https://cloudevents.io/ ref. https://github.com/cloudevents/spec/blob/v1.0.1/spec.md
const CloudEventsVersion = "1.0"
CloudEventsVersion current Quark's CNCF CloudEvents specification version
Variables ¶
var ( // ErrBrokerClosed the broker was already closed ErrBrokerClosed = errors.New("broker closed") // ErrProviderNotValid the given provider is not accepted by Quark ErrProviderNotValid = errors.New("provider is not valid") // ErrPublisherNotImplemented the given publisher does not have its concrete implementation ErrPublisherNotImplemented = errors.New("publisher is not implemented") // ErrNotEnoughTopics no topics where found ErrNotEnoughTopics = errors.New("not enough topics") // ErrNotEnoughHandlers no consumer handler was found ErrNotEnoughHandlers = errors.New("not enough handlers") // ErrEmptyMessage no message was found ErrEmptyMessage = errors.New("message is empty") // ErrEmptyCluster the current cluster does not contains any hosts/addresses ErrEmptyCluster = errors.New("consumer cluster is empty") // ErrRequiredGroup a consumer group is required ErrRequiredGroup = errors.New("consumer group is required") )
var ErrMessageRedeliveredTooMuch = errors.New("message has been redelivered too much")
ErrMessageRedeliveredTooMuch the message has been published the number of times of the configuration limit
Functions ¶
func FormatQueueName ¶
FormatQueueName forms an Async API queue name
format e.g. "service.entity.action_on_event
func FormatTopicName ¶
FormatTopicName forms an Async API topic name
format e.g. "organization.service.version.kind.entity.action"
Types ¶
type Broker ¶
type Broker struct { ProviderConfig interface{} Cluster []string ErrorHandler ErrorHandler Publisher Publisher EventMux EventMux EventWriter EventWriter PoolSize int MaxRetries int RetryBackoff time.Duration ConnRetries int ConnRetryBackoff time.Duration MessageIDFactory IDFactory WorkerFactory WorkerFactory // BaseMessageSource is the default Source of a Message based on the CNCF CloudEvents specification v1 // // It could be a Internet-wide unique URI with a DNS authority, Universally-unique URN with a UUID or // Application-specific identifiers // // e.g. https://github.com/cloudevents, urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66, /cloudevents/spec/pull/123 BaseMessageSource string // BaseMessageContentType is the default Content type of data value. This attribute enables data to carry any type of content, // whereby format and encoding might differ from that of the chosen event format. // // Must adhere to the format specified in RFC 2046 // // e.g. application/avro, application/json, application/cloudevents+json BaseMessageContentType string BaseContext context.Context // contains filtered or unexported fields }
Broker coordinates every Event operation on the running Event-Driven application.
Administrates Consumer(s) supervisors and their workers wrapped with well-known concurrency and resiliency patterns.
func (*Broker) ActiveSupervisors ¶
activeSupervisors returns the current number of running supervisors
func (*Broker) ActiveWorkers ¶
activeWorkers returns the current number of running workers (inside every Supervisor)
func (*Broker) GetConnRetries ¶
GetConnRetries retrieves the default connection retries
func (*Broker) GetConnRetryBackoff ¶
GetConnRetryBackoff retrieves the default connection retry backoff
func (*Broker) ListenAndServe ¶
ListenAndServe starts listening to the given Consumer(s) concurrently-safe
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer main processing unit. It is intended to subscribe to a Topic or Queue within a worker pool to segregate application load and enable high-concurrency.
A Broker will receive and coordinate all Consumer nodes and will stop them gracefully when desired.
func (*Consumer) Address ¶
Address ip address(es) with its respective port(s) of the Message Broker/Message Queue system cluster
func (*Consumer) ContentType ¶
ContentType is the default Content type of data value. This attribute enables data to carry any type of content, whereby format and encoding might differ from that of the chosen event format.
Must adhere to the format specified in RFC 2046 RFC 2046">¶
e.g. application/avro, application/json, application/cloudevents+json
func (*Consumer) GetHandleFunc ¶
func (c *Consumer) GetHandleFunc() HandlerFunc
GetHandleFunc returns the current consumer Handler function component
func (*Consumer) GetProviderConfig ¶
func (c *Consumer) GetProviderConfig() interface{}
GetProviderConfig returns a custom provider configuration (e.g. sarama config, aws credentials)
func (*Consumer) Group ¶
Group set of consumers this specific consumer must be with to consume messages in parallel
Only available in: Apache Kafka
func (*Consumer) HandleFunc ¶
func (c *Consumer) HandleFunc(handlerFunc HandlerFunc) *Consumer
HandleFunc specific func Quark will use to send messages
func (*Consumer) MaxRetries ¶
MaxRetries total times to retry an Event operation if processing fails
func (*Consumer) ProviderConfig ¶
ProviderConfig Custom provider configuration (e.g. sarama config, aws credentials)
func (*Consumer) RetryBackoff ¶
RetryBackoff time to wait between each retry
func (*Consumer) Source ¶
Source is the specific Source of a Message based on the CNCF CloudEvents specification v1
It could be a Internet-wide unique URI with a DNS authority, Universally-unique URN with a UUID or Application-specific identifiers
e.g. https://github.com/cloudevents, urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66, /cloudevents/spec/pull/123
func (*Consumer) Topic ¶
Topic A Broker will use this topic to subscribe the Consumer
This field can be also used as Queue
func (Consumer) TopicString ¶
TopicString returns every topic registered into the current consumer as string
func (*Consumer) Topics ¶
Topics A Broker will use these topics to subscribe the Consumer to fan-in processing
These fields can be also used as Queues
func (*Consumer) WorkerFactory ¶
func (c *Consumer) WorkerFactory(f WorkerFactory) *Consumer
WorkerFactory specific Quark Node's concrete worker generator
type ErrorHandler ¶
ErrorHandler is a Hook that may be called when a error occurs inside Quark processes
type Event ¶
type Event struct { Context context.Context Topic string Header Header Body *Message RawValue []byte RawSession interface{} }
Event represents something that has happened or requested to happen within the Event-Driven ecosystem.
For example, this could be either a domain event or an asynchronous command.
type EventMux ¶
type EventMux interface { // Topic adds a new topic to the local registry Topic(topic string) *Consumer // Topics adds multiple topics Topics(topics ...string) *Consumer // Add manually adds a consumer Add(c *Consumer) // Get returns an specific consumer by the given topic Get(topic string) []*Consumer // Del removes an specific topic from the local registry Del(topic string) // Contains verifies if the given topic exists in the local registry Contains(topic string) bool // List returns the local registry List() map[string][]*Consumer }
EventMux Consumer node registry, the mux itself contains a local registry of all the given Consumer(s). This process is required by the Broker in order to start all the nodes registered on the mux.
type EventWriter ¶
type EventWriter interface { ReplaceHeader(Header) // Publisher actual publisher used to push Event(s) Publisher() Publisher // Header Event metadata Header() Header // Write push the given encoded message into the Event-Driven ecosystem. // // Returns number of messages published and non-nil error if publisher failed to push Event or // returns ErrNotEnoughTopics if no topic was specified // Sometimes, the writer might not publish messages to broker since they have passed the maximum redelivery cap Write(ctx context.Context, msg []byte, topics ...string) (int, error) // WriteMessage push the given message into the Event-Driven ecosystem. // // Returns number of messages published // and non-nil error if publisher failed to push Event // Remember Quark uses Message's "Type" field as topic name, so the developer must specify it either in mentioned field or in response headers // Sometimes, the writer might not publish messages to broker since they have passed the maximum redelivery cap WriteMessage(context.Context, ...*Message) (int, error) // WriteRetry push the given Event into a retry topic. // Publishing aside, WriteRetry method adds a new ID for every message passed using the Broker's ID Generator/Factory. // // It is recommended to point the Message's Topic/Type to an specific retry topic/queue (e.g. foo.executed -> foo.executed.retry/foo.executed.retry.N). // // This implementation differs from others because it increments the given Message "redelivery_count" delta field by one WriteRetry(ctx context.Context, msg *Message) error }
EventWriter works as an Event response writer. Lets an Event to respond, fan-out a topic message or to fail properly by sending the failed Event into either a Retry or Dead Letter Queue (DLQ).
Uses a Publisher to write the actual message
type Handler ¶
type Handler interface { // ServeEvent handles any Event coming from an specific topic(s) // // // Returned boolean works as Acknowledgement mechanism (Ack or NAck). // // - If true, Quark will send success to the actual message broker/queue system. // // - If false, Quark will not send any success response to the actual message broker/queue system. // // Ack mechanism is available in specific providers ServeEvent(EventWriter, *Event) bool }
Handler handles any Event coming from an specific topic(s)
Returned boolean works as Acknowledgement mechanism (Ack or NAck).
- If true, Quark will send success to the actual message broker/queue system.
- If false, Quark will not send any success response to the actual message broker/queue system.
Ack mechanism is available in specific providers
type HandlerFunc ¶
type HandlerFunc func(EventWriter, *Event) bool
HandlerFunc handles any Event coming from an specific topic(s).
Returned boolean works as Acknowledgement mechanism (Ack or NAck).
- If true, Quark will send success to the actual message broker/queue system.
- If false, Quark will not send any success response to the actual message broker/queue system.
Ack mechanism is available in specific providers
type Header ¶
Header holds relevant metadata about the Message, Consumer or Broker.
For example, a header may contain the following relevant information:
- Broker and Consumer configurations.
- Specific provider data (e.g. offsets, partitions and member id from a consumer group in Kafka).
- An span context from the distributed tracing provider, so it can propagate traces from remote parents.
- Message properties (e.g. total redeliveries, correlation id, origin service ip address).
type IDFactory ¶
type IDFactory func() string
IDFactory Message unique identifier generator
Default is `google/uuid`
type Message ¶
type Message struct { // Id message unique identifier Id string `json:"id"` // Type message topic. This attribute contains a value describing the type of event related to the originating occurrence. // // Often this attribute is used for routing, observability, policy enforcement, etc. // The format of this is producer defined and might include information such as the version of the type - // see Versioning of Attributes in the Primer for more information. // // e.g. com.example.object.deleted.v2 Type string `json:"type"` // SpecVersion The version of the CloudEvents specification which the event uses. // // This enables the interpretation of the context. // Compliant event producers MUST use a value of 1.0 when referring to this version of the specification. SpecVersion string `json:"specversion"` // Source Identifies the context in which an event happened. // Often this will include information such as the type of the event source, the organization publishing the event or the process that produced the event. // The exact syntax and semantics behind the data encoded in the URI is defined by the event producer. // // e.g. https://github.com/cloudevents, urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66, /cloudevents/spec/pull/123 Source string `json:"source"` // Data The event payload. This specification does not place any restriction on the type of this information. // // It is encoded into a media format which is specified by the datacontenttype attribute (e.g. application/json), // and adheres to the dataschema format when those respective attributes are present. Data []byte `json:"data,omitempty"` // ContentType Content type of data value. This attribute enables data to carry any type of content, // whereby format and encoding might differ from that of the chosen event format. ContentType string `json:"datacontenttype,omitempty"` // DataSchema Identifies the schema that data adheres to. // // Incompatible changes to the schema SHOULD be reflected by a different URI. See Versioning of Attributes // in the Primer for more information DataSchema string `json:"dataschema,omitempty"` // Subject This describes the subject of the event in the context of the event producer (identified by source). // // In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, but the source // identifier alone might not be sufficient as a qualifier for any specific event if the source context has internal sub-structure. // // e.g: source -> https://example.com/storage/tenant/container, // subject -> mynewfile.jpg Subject string `json:"subject,omitempty"` // Time Timestamp of when the occurrence happened. // If the time of the occurrence cannot be determined then this attribute MAY be set to some other time (such as the current time) by the // Quark producer, however all producers for the same source MUST be consistent in this respect. // In other words, either they all use the actual time of the occurrence or they all use the same algorithm to determine the value used. Time time.Time `json:"time,omitempty"` // Metadata message volatile information Metadata MessageMetadata `json:"metadata,omitempty"` }
Message an Event's body.
It is intended to be passed into the actual message broker/queue system (e.g. Kafka, AWS SNS/SQS, RabbitMQ) as default message form.
Thus, it keeps struct consistency between messages published in any programming language.
Based on the CNCF CloudEvents specification v1.0 ¶
ref. https://github.com/cloudevents/spec/blob/v1.0.1/spec.md
func NewMessage ¶
NewMessage creates a new message without a parent message
func NewMessageFromParent ¶
NewMessageFromParent creates a new Message setting as parent (on the CorrelationId field) the given parentId
type MessageMetadata ¶
type MessageMetadata struct { // CorrelationId root message id CorrelationId string `json:"correlation_id"` // Host sender node ip address Host string `json:"host,omitempty"` // RedeliveryCount attempts this specific message tried to get process RedeliveryCount int `json:"redelivery_count"` // ExternalData non-Quark data may be stored here (e.g. non-Quark headers) ExternalData map[string]string `json:"external_data,omitempty"` }
MessageMetadata a message volatile fields, used to store useful Quark resiliency mechanisms and custom developer-defined fields
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is a unit of configuration of a Broker
func WithBaseContext ¶
WithBaseContext defines the context a Broker will use to executes its operations
func WithBaseMessageContentType ¶
WithBaseMessageContentType defines the global base Message Content Type to comply with the CNCF CloudEvents specification
func WithBaseMessageSource ¶
WithBaseMessageSource defines the global base Message Source to comply with the CNCF CloudEvents specification
func WithCluster ¶
WithCluster defines a set of addresses a Broker will use
func WithConnRetryBackoff ¶
WithConnRetryBackoff defines a time duration a Worker will wait to connect to the specified infrastructure
func WithErrorHandler ¶
func WithErrorHandler(handler ErrorHandler) Option
WithErrorHandler defines an error hook/middleware that will be executed when an error occurs inside Quark low-level internals
func WithEventMux ¶
WithEventMux defines the EventMux that will be used for Broker's operations
func WithEventWriter ¶
func WithEventWriter(w EventWriter) Option
WithEventWriter defines the global event writer
func WithMaxConnRetries ¶
WithMaxConnRetries defines the global maximum ammount of times a consumer worker will retry its operations
func WithMaxRetries ¶
WithMaxRetries defines the global maximum ammount of times a publish operations will retry an Event operation
func WithMessageIDFactory ¶
WithMessageIDFactory defines the global Message ID Factory
func WithPoolSize ¶
WithPoolSize defines the global pool size of total Workers
func WithProviderConfiguration ¶
func WithProviderConfiguration(cfg interface{}) Option
WithProviderConfiguration defines an specific Provider configuration
func WithPublisher ¶
WithPublisher defines a global Publisher that will be used to push messages to the ecosystem through the EventWriter
func WithRetryBackoff ¶
WithRetryBackoff defines a time duration an EventWriter will wait to execute a write operation
func WithWorkerFactory ¶
func WithWorkerFactory(factory WorkerFactory) Option
WithWorkerFactory defines the global Worker Factory Quark's Supervisor(s) will use to schedule background I/O tasks
type PublisherFactory ¶
PublisherFactory is a crucial Broker and/or Consumer component which generates the concrete publishers Quark will use to produce data
type Supervisor ¶
type Supervisor struct { Broker *Broker Consumer *Consumer // contains filtered or unexported fields }
Supervisor is a Consumer logical unit of work. It acts as a Task scheduler for current jobs.
Distributes blocking I/O operations into different goroutines to enable parallelism with fan-out mechanisms.
func (*Supervisor) Close ¶
func (n *Supervisor) Close() error
Close ends the current Supervisor consuming session
func (*Supervisor) GetCluster ¶
func (n *Supervisor) GetCluster() []string
GetCluster retrieves the default cluster slice
func (*Supervisor) GetEventWriter ¶
func (n *Supervisor) GetEventWriter() EventWriter
GetEventWriter retrieves the default event writer
func (*Supervisor) GetGroup ¶
func (n *Supervisor) GetGroup() string
GetGroup retrieves the default consumer group
func (*Supervisor) ScheduleJobs ¶
func (n *Supervisor) ScheduleJobs(ctx context.Context) error
ScheduleJobs starts the blocking I/O consuming operations from the current Consumer parent
type Worker ¶
type Worker interface { // SetID inject the current Worker id SetID(i int) // Parent returns the parent Supervisor Parent() *Supervisor // StartJob starts an specific work // // Panics goroutine if any errors is thrown StartJob(context.Context) error // Close stop all Blocking I/O operations Close() error }
Worker is a unit of work a Consumer Supervisor.
It handles all the work and should be running inside a goroutine to enable parallelism.
type WorkerFactory ¶
type WorkerFactory func(parent *Supervisor) Worker
WorkerFactory is a crucial Broker and/or Consumer component which generates the concrete workers Quark will use to consume data