processor

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2021 License: MIT Imports: 8 Imported by: 1

README

core-processor

The project provides a framework for consuming Kafka.

It aims to simplify the logic of data consumption and transmission, and actively provide a configurable and efficient way.

With core and core-processor, we can do this:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    
    "github.com/DoNewsCode/core"
    processor "github.com/DoNewsCode/core-processor"
    "github.com/DoNewsCode/core/di"
    "github.com/DoNewsCode/core/otkafka"
    "github.com/segmentio/kafka-go"
)

type Handler struct {
}

func NewHandlerOut() processor.Out {
    return processor.NewOut(
        &Handler{},
    )
}

type Data struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

func (h *Handler) Info() *processor.Info {
    return &processor.Info{
        Name:      "default", // the reader name is default
        BatchSize: 3,
    }
}

func (h *Handler) Handle(ctx context.Context, msg *kafka.Message) (interface{}, error) {
    e := &Data{}
    if err := json.Unmarshal(msg.Value, &e); err != nil {
        return nil, err
    }
    return e, nil
}

func (h *Handler) Batch(ctx context.Context, data []interface{}) error {
    for _, e := range data {
        fmt.Println(e.(*Data))
    }
    return nil
}

func main() {
    // prepare config and dependencies
    c := core.New(
        core.WithInline("kafka.reader.default.brokers", []string{"127.0.0.1:9092"}),
        core.WithInline("kafka.reader.default.topic", "test"),
        core.WithInline("kafka.reader.default.groupID", "test"),
        core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),
    )
    defer c.Shutdown()
    c.ProvideEssentials()
    c.Provide(otkafka.Providers())
    c.AddModuleFunc(processor.New)

    // provide your handlers
    c.Provide(di.Deps{
        NewHandlerOut,
    })
    
    // start server
    err := c.Serve(context.Background())
    if err != nil {
        panic(err)
    }
}

After the above, we just need to add handlers and provide new methods for core. We can use processor.Info to flexibly adjust the operation of the processor.

Have fun!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Provides added in v0.1.0

func Provides(hs ...Handler) di.Deps

Types

type BatchFunc

type BatchFunc func(ctx context.Context, data []interface{}) error

BatchFunc type for BatchHandler.Batch Func.

type BatchHandler

type BatchHandler interface {
	Handler
	// Batch processing the results returned by Handler.Handle.
	Batch(ctx context.Context, data []interface{}) error
}

BatchHandler one more Batch method than Handler.

type HandleFunc

type HandleFunc func(ctx context.Context, msg *kafka.Message) (interface{}, error)

HandleFunc type for Handler.Handle Func.

type Handler

type Handler interface {
	// Info set the topic name and some config.
	Info() *Info
	// Handle for *kafka.Message.
	Handle(ctx context.Context, msg *kafka.Message) (interface{}, error)
}

Handler only include Info and Handle func.

type Info

type Info struct {
	// used to get reader from otkafka.ReaderMaker.
	// default: "default"
	Name string
	// reader workers count.
	// default: 1
	ReadWorker int
	// batch workers count.
	// default: 1
	BatchWorker int
	// data size for batch processing.
	// default: 1
	BatchSize int
	// handler workers count.
	HandleWorker int
	// the size of the data channel.
	// default: 100
	ChanSize int
	// run the batchFunc automatically at specified intervals, avoid not executing without reaching BatchSize
	// default: 30s
	AutoBatchInterval time.Duration
}

Info the info of BatchHandler.

Note:

If sequence is necessary, make sure that per worker count is one.
Multiple goroutines cannot guarantee the order in which data is processed.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor dispatch Handler.

func New

func New(i in) (*Processor, error)

New create *Processor Module.

func (*Processor) ProvideCloser

func (e *Processor) ProvideCloser()

ProvideCloser implements container.CloserProvider for the Module.

func (*Processor) ProvideRunGroup

func (e *Processor) ProvideRunGroup(group *run.Group)

ProvideRunGroup run workers:

  1. Fetch message from *kafka.Reader.
  2. Handle message by Handler.Handle.
  3. Batch data by BatchHandler.Batch. If batch success then commit message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL