processor

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2021 License: MIT Imports: 8 Imported by: 1

README

core-processor

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchFunc

type BatchFunc func(ctx context.Context, data []interface{}) error

BatchFunc type for BatchHandler.Batch Func.

type BatchHandler

type BatchHandler interface {
	Handler
	// Batch processing the results returned by Handler.Handle.
	Batch(ctx context.Context, data []interface{}) error
}

BatchHandler one more Batch method than Handler.

type HandleFunc

type HandleFunc func(ctx context.Context, msg *kafka.Message) (interface{}, error)

HandleFunc type for Handler.Handle Func.

type Handler

type Handler interface {
	// Info set the topic name and some config.
	Info() *Info
	// Handle for *kafka.Message.
	Handle(ctx context.Context, msg *kafka.Message) (interface{}, error)
}

Handler only include Info and Handle func.

type Info

type Info struct {
	// used to get reader from otkafka.ReaderMaker.
	// default: "default"
	Name string
	// reader workers count.
	// default: 1
	ReadWorker int
	// batch workers count.
	// default: 1
	BatchWorker int
	// data size for batch processing.
	// default: 1
	BatchSize int
	// handler workers count.
	HandleWorker int
	// the size of the data channel.
	// default: 100
	ChanSize int
	// run the batchFunc automatically at specified intervals, avoid not executing without reaching BatchSize
	// default: 30s
	AutoBatchInterval time.Duration
}

Info the info of BatchHandler.

Note:

If sequence is necessary, make sure that per worker count is one.
Multiple goroutines cannot guarantee the order in which data is processed.

type Out

type Out struct {
	di.Out

	Handlers []Handler `group:"ProcessorHandler,flatten"`
}

Out to provide Handler to in.Handlers.

func NewOut

func NewOut(handlers ...Handler) Out

NewOut create Out to provide Handler to in.Handlers.

Usage:
	func newHandlerA(logger log.Logger) processor.Out {
		return processor.NewOut(
			&HandlerA{logger: logger},
		)
	}
Or
	func newHandlers(logger log.Logger) processor.Out {
		return processor.NewOut(
			&HandlerA{logger: logger},
			&HandlerB{logger: logger},
		)
	}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor dispatch Handler.

func New

func New(i in) (*Processor, error)

New create *Processor Module.

func (*Processor) ProvideCloser

func (e *Processor) ProvideCloser()

ProvideCloser implements container.CloserProvider for the Module.

func (*Processor) ProvideRunGroup

func (e *Processor) ProvideRunGroup(group *run.Group)

ProvideRunGroup run workers:

  1. Fetch message from *kafka.Reader.
  2. Handle message by Handler.Handle.
  3. Batch data by BatchHandler.Batch. If batch success then commit message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL