buffer

package
v0.52.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Fibonacci added in v0.51.6

func Fibonacci(n int) int

func SetDefaults

func SetDefaults(flushPeriodMilliseconds int, flushItemsThreshold int)

Types

type BuffStrategy added in v0.51.6

type BuffStrategy string
const (
	Dynamic BuffStrategy = "DYNAMIC"
	Static  BuffStrategy = "STATIC"
)

type BulkEventWriter

type BulkEventWriter struct {
	*TenantBufferManager[*repository.CreateStepRunEventOpts, int]
	// contains filtered or unexported fields
}

func (*BulkEventWriter) BulkWriteStepRunEvents

func (w *BulkEventWriter) BulkWriteStepRunEvents(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)

func (*BulkEventWriter) Cleanup

func (w *BulkEventWriter) Cleanup() error

func (*BulkEventWriter) SerialWriteStepRunEvent added in v0.50.2

func (w *BulkEventWriter) SerialWriteStepRunEvent(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)

type BulkQueueStepRunOpts

type BulkQueueStepRunOpts struct {
	*dbsqlc.GetStepRunForEngineRow

	Priority int
	IsRetry  bool
	Input    []byte
}

type BulkSemaphoreReleaser

type BulkSemaphoreReleaser struct {
	*TenantBufferManager[SemaphoreReleaseOpts, pgtype.UUID]
	// contains filtered or unexported fields
}

func (*BulkSemaphoreReleaser) BulkReleaseSemaphores

func (w *BulkSemaphoreReleaser) BulkReleaseSemaphores(ctx context.Context, opts []SemaphoreReleaseOpts) ([]pgtype.UUID, error)

func (*BulkSemaphoreReleaser) Cleanup

func (w *BulkSemaphoreReleaser) Cleanup() error

type BulkStepRunQueuer

type BulkStepRunQueuer struct {
	*TenantBufferManager[BulkQueueStepRunOpts, pgtype.UUID]
	// contains filtered or unexported fields
}

func (*BulkStepRunQueuer) BulkQueueStepRuns

func (w *BulkStepRunQueuer) BulkQueueStepRuns(ctx context.Context, opts []BulkQueueStepRunOpts) ([]pgtype.UUID, error)

func (*BulkStepRunQueuer) Cleanup

func (w *BulkStepRunQueuer) Cleanup() error

type ConfigFileBuffer

type ConfigFileBuffer struct {

	// WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers
	WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"`

	// MaxConcurrent is the maximum number of concurrent flushes
	MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"`

	// FlushPeriodMilliseconds is the number of milliseconds before flush
	FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"`

	// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
	FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`

	// SerialBuffer is a flag to determine if the buffer should be serial or bulk
	SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"`

	// FlushStrategy is the strategy to use for flushing the buffer
	FlushStrategy BuffStrategy `mapstructure:"flushStrategy" json:"flushStrategy" default:"DYNAMIC"`
}

ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies

type FlushResponse

type FlushResponse[U any] struct {
	Result U
	Err    error
}

type IngestBuf

type IngestBuf[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewIngestBuffer

func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]

NewIngestBuffer creates a new buffer for any type T

func (*IngestBuf[T, U]) BuffItem

func (b *IngestBuf[T, U]) BuffItem(item T) (chan *FlushResponse[U], error)

func (*IngestBuf[T, U]) Start

func (b *IngestBuf[T, U]) Start() (func() error, error)

func (*IngestBuf[T, U]) StartDebugLoop

func (b *IngestBuf[T, U]) StartDebugLoop()

type IngestBufOpts

type IngestBufOpts[T any, U any] struct {
	Name string `validate:"required"`
	// MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush
	MaxCapacity        int                                               `validate:"required,gt=0"`
	FlushPeriod        time.Duration                                     `validate:"required,gt=0"`
	MaxDataSizeInQueue int                                               `validate:"required,gt=0"`
	OutputFunc         func(ctx context.Context, items []T) ([]U, error) `validate:"required"`
	SizeFunc           func(T) int                                       `validate:"required"`
	L                  *zerolog.Logger                                   `validate:"required"`
	MaxConcurrent      int                                               `validate:"omitempty,gt=0"`
	WaitForFlush       time.Duration                                     `validate:"omitempty,gt=0"`
	FlushStrategy      BuffStrategy                                      `validate:"required"`
}

type SemaphoreReleaseOpts

type SemaphoreReleaseOpts struct {
	StepRunId pgtype.UUID
	TenantId  pgtype.UUID
}

type TenantBufManagerOpts

type TenantBufManagerOpts[T any, U any] struct {
	Name       string                                            `validate:"required"`
	OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"`
	SizeFunc   func(T) int                                       `validate:"required"`
	L          *zerolog.Logger                                   `validate:"required"`
	V          validator.Validator                               `validate:"required"`
	Config     ConfigFileBuffer                                  `validate:"required"`
}

type TenantBufferManager

type TenantBufferManager[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewTenantBufManager

func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)

Create a new TenantBufferManager with generic types T for input and U for output

func (*TenantBufferManager[T, U]) BuffItem

func (t *TenantBufferManager[T, U]) BuffItem(tenantKey string, eventOps T) (chan *FlushResponse[U], error)

func (*TenantBufferManager[T, U]) Cleanup

func (t *TenantBufferManager[T, U]) Cleanup() error

cleanup all tenant buffers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL