batch

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2024 License: MIT Imports: 7 Imported by: 0

README

Batch requests and timeout automatically commit.

type myStruct struct {
    A int
    B string
}
process := batch.NewDispatch[myStruct]()
defer process.Release()

index := 10
handle := func(ctx context.Context, payload []*myStruct) bool {
    fmt.Printf("[payload] %v\n", payload)
    return true
}
for i := 0; i < index; i++ {
    process.Register("key#"+strconv.Itoa(i), 10, time.Second, batch.HandleBatch[myStruct](handle))
}
var wg sync.WaitGroup
wg.Add(index)
for i := 0; i < index; i++ {
    go func(i int) {
        defer wg.Done()

        key := "key#" + strconv.Itoa(rand.Intn(index))
        value := myStruct{A: rand.Int(), B: strconv.Itoa(i)}
        task, err := process.Submit(key, value)
        if err != nil {
            fmt.Println("submit err: ", err)
            return
        }
        err = task.Wait()
        if err != nil {
            fmt.Println(err)
            return
        }
    }(i)
}
wg.Wait()

OR

type myStruct struct {
    A int
    B string
}
process := batch.NewDispatch[myStruct]()
defer process.Release()

index := 10
handle := func(ctx context.Context, payload *myStruct) bool {
    fmt.Printf("[payload] %d %s\n", payload.A, payload.B)
    return true
}
for i := 0; i < index; i++ {
    process.Register("key#"+strconv.Itoa(i), 10, time.Second, batch.HandleSingle[myStruct](handle))
}
var wg sync.WaitGroup
wg.Add(index)
for i := 0; i < index; i++ {
    go func(i int) {
        defer wg.Done()

        key := "key#" + strconv.Itoa(rand.Intn(index))
        value := myStruct{A: rand.Int(), B: strconv.Itoa(i)}
        task, err := process.Submit(key, value)
        if err != nil {
            fmt.Println("submit err: ", err)
            return
        }
        err = task.Wait()
        if err != nil {
            fmt.Println(err)
            return
        }
    }(i)
}
wg.Wait()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidUniqid       = errors.New("invalid key")
	ErrDispatchExit        = errors.New("dispatch exit")
	ErrContextCancel       = errors.New("context cancel")
	ErrWorkerExit          = errors.New("worker exit")
	ErrWorkerShutdown      = errors.New("worker shutdown")
	ErrWorkerContextCancel = errors.New("worker context cancel")
	ErrExceedRetrys        = errors.New("exceed retrys")
)
View Source
var DefaultDispatch = NewDispatch[any]()

Functions

func Exit

func Exit() <-chan struct{}

func NewDispatch

func NewDispatch[T any]() *dispatch[T]

func Register

func Register(uniqID string, batchSize int, AutoCommitDuration time.Duration,
	handle Handler[any], opts ...Option[any]) error

func Release

func Release()

func Submit

func Submit(key string, value any)

func Unregister

func Unregister(uniqID any)

func UnregisterAll

func UnregisterAll()

Types

type DuplicateUniqId

type DuplicateUniqId struct {
	// contains filtered or unexported fields
}

func (DuplicateUniqId) Error

func (e DuplicateUniqId) Error() string

type HandleBatch

type HandleBatch[T any] func(context.Context, []*T) bool

func (HandleBatch[T]) Handle

func (handle HandleBatch[T]) Handle(ctx context.Context, _ *T, tasks []*T) bool

type HandleSingle

type HandleSingle[T any] func(context.Context, *T) bool

func (HandleSingle[T]) Handle

func (handle HandleSingle[T]) Handle(ctx context.Context, task *T, _ []*T) bool

type Handler

type Handler[T any] interface {
	Handle(context.Context, *T, []*T) bool
}

type Option

type Option[T any] interface {
	// contains filtered or unexported methods
}

func WithAutoCommitTimeOut

func WithAutoCommitTimeOut[T any](duration time.Duration) Option[T]

func WithBatchTimeOut

func WithBatchTimeOut[T any](duration time.Duration) Option[T]

func WithGraceDwonDuration

func WithGraceDwonDuration[T any](duration time.Duration) Option[T]

func WithRetrys

func WithRetrys[T any](retrys int) Option[T]

type Task

type Task[T any] struct {
	Id    any
	Value T
	// contains filtered or unexported fields
}

func (*Task[T]) Key

func (task *Task[T]) Key() string

func (*Task[T]) Wait

func (task *Task[T]) Wait() error

type Worker

type Worker[T any] struct {
	Key any

	Retrys int
	// Handle             any
	Handle Handler[T]
	// contains filtered or unexported fields
}

func (*Worker[T]) UniqID

func (p *Worker[T]) UniqID() any

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL