goduck

package module
v0.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2023 License: BSD-3-Clause Imports: 1 Imported by: 0

README

goDuck

This project's purpose is to be an engine that abstract message dispatching for workers that deals with the concept of either streams or pools. In other words, reading a message from a stream or a pool, and delivering that message through the Processor Interface and interpreting its return value.

It is important to note that, if the Process function returns an error, the engine wont Ack the message, thus, not removing it from the queue or stream. The main idea for this, is that the engine guarantees that every message will be processed at least once, without errors.

Sample of a stream processor

import(
	"github.com/arquivei/goduck"
	"github.com/arquivei/goduck/engine/streamengine"
)
// The engine requires a type that implements the Process function
type processor struct{}

// Process func will receive the pulled message from the engine.
func (p processor) Process(ctx context.Context, message []byte) error {
	...
    err := serviceCall(args)
    ...
	return err
}
func main {
    // call below returns a kafka abstraction (interface)
    kafka := NewKafkaStream(<your-config>)
    engine := streamengine.New(processor{}, []goduck.Stream{kafka})
    engine.Run(context.Background())
}

Sample of a pool processor

import(
	"github.com/arquivei/goduck"
	"github.com/arquivei/goduck/engine/streamengine"
)
// The engine requires a type that implements the Process function
type processor struct{}

// Process func will receive the pulled message from the engine.
func (p processor) Process(ctx context.Context, message []byte) error {
	...
    err := serviceCall(args)
    ...
	return err
}


func main {
    // call below returns a pubsub abstraction (interface)
    pubsub, err := NewPubsubQueue(<your-config>) 
    if err != nil {
        <handle err>
    }
    engine := jobpoolengine.New(pubsub, processor{}, 1)
    engine.Run(context.Background())
}

Important configuration

Kafka

  • Commit interval: Set this to allow asynchronous message processing between commits. Without a value, defaults to every message having to be acknowledged before a new one is retrieved. This is bad to have when you should avoid message reprocessing. Suppose there is a failure and the engine stops executing while processing. The larger the commit interval is, higher is the chance of duplicating messages

To terminate the engine execution, a simple context cancellation will perform a shutdown of the application.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AnyProcessor added in v0.3.0

type AnyProcessor interface {
	Processor
	BatchProcessor
}

AnyProcessor refer to structs that can behave both as Processor and BatchProcessor

type BatchProcessor

type BatchProcessor interface {
	// Process handles a multiple messages in its raw form, exactly as provided
	// from the source. If the return is an error, the engine is responsible
	// for retrying or marking the whole batch as failed. If the return is nil,
	// the engine will mark the all the messages as complete.
	//
	// Exact guarantees depend on the engine/stream/queue implementation, but
	// typically this method will be called at least once per message.
	// Therefore, the implementation should be idempotent.
	//
	// Depending on the engine, this method may be called concurrently.
	BatchProcess(ctx context.Context, messages [][]byte) error
}

BatchProcessor is a basic low level message processor, that is able to handle multiple messages at once. It should be wrapped with middlewares such as logging, instrumenting, and so on.

type EndpointBatchDecoder added in v0.3.1

type EndpointBatchDecoder func(context.Context, [][]byte) (interface{}, error)

EndpointBatchDecoder decodes a message into a endpoint request. See go-kit's endpoint.Endpoint.

type EndpointDecoder added in v0.3.1

type EndpointDecoder func(context.Context, []byte) (interface{}, error)

EndpointDecoder decodes a message into a endpoint request. See go-kit's endpoint.Endpoint.

type MessagePool

type MessagePool interface {
	// Next should return the next message available in the queue. If the
	// queue is permanently closed, it should return io.EOF error. Any
	// other error will be retried.
	Next(ctx context.Context) (RawMessage, error)
	// Done marks a specific message as completed, so that it shouldn't
	// appear on subsequent "Next" calls.
	Done(ctx context.Context, msg RawMessage) error
	// Failed marks a specific message as failed, so that it appears
	// on subsequent "Next" calls.
	Failed(ctx context.Context, msg RawMessage) error
	// Close closes the Queue. After calling this function, "Next" should
	// return io.EOF
	Close() error
}

MessagePool represents a pool of unordered messages, where each one can be individually marked as complete.

type Processor

type Processor interface {
	// Process handles a single message in its raw form, exactly as provided
	// from the source. If the return is an error, the engine is responsible
	// for retrying or marking the message as failed. If the return is nil,
	// the engine will mark the message as complete.
	//
	// Exact guarantees depend on the engine/stream/queue implementation, but
	// typically this method will be called at least once per message.
	// Therefore, the implementation should be idempotent.
	//
	// Depending on the engine, this method may be called concurrently.
	Process(ctx context.Context, message []byte) error
}

Processor is a basic low level message processor. It should be wrapped with middlewares such as logging, instrumenting, and so on.

type RawMessage

type RawMessage interface {
	Bytes() []byte
}

RawMessage is the interface that Queue/Stream sources should provide. It can contain some internal control variables, such as MessageID, useful for marking the message as complete.

type Stream

type Stream interface {
	// Next should return the next message available in the queue. If the
	// queue is permanently closed, it should return io.EOF error. Any
	// other error will be retried.
	Next(ctx context.Context) (RawMessage, error)
	// Done marks all messages polled so far as complete.
	Done(ctx context.Context) error
	// Close closes the Stream. After calling this function, "Next" should
	// return io.EOF
	Close() error
}

Stream represents an (un)bounded ordered list of messages. Typical usage would require one Stream for each goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL