pool

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2025 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package pool provides unified high-performance object pooling for Nebula This is the SINGLE pool implementation that replaces all other pool packages

Index

Constants

This section is empty.

Variables

View Source
var (
	// RecordChannelPool for single record channels
	RecordChannelPool = NewChannelPool[*Record](10000)

	// BatchChannelPool for batch channels
	BatchChannelPool = NewChannelPool[[]*Record](100)

	// SmallRecordChannelPool for smaller buffers
	SmallRecordChannelPool = NewChannelPool[*Record](100)

	// ErrorChannelPool for error channels
	ErrorChannelPool = NewChannelPool[error](10)
)

Global channel pools for common types

View Source
var (
	// RecordPool provides optimized pooling for Record objects
	RecordPool = New(
		func() *Record {
			return &Record{
				Data: make(map[string]interface{}, 16),
			}
		},
		func(r *Record) {
			r.ID = ""
			r.Schema = nil
			r.RawData = nil

			for k := range r.Data {
				delete(r.Data, k)
			}
			if r.Metadata.Custom != nil {
				for k := range r.Metadata.Custom {
					delete(r.Metadata.Custom, k)
				}
			}
			if r.Metadata.Before != nil {
				for k := range r.Metadata.Before {
					delete(r.Metadata.Before, k)
				}
			}

			r.Metadata = RecordMetadata{}
		},
	)

	// MapPool for map[string]interface{} objects
	MapPool = New(
		func() map[string]interface{} {
			return make(map[string]interface{}, 16)
		},
		func(m map[string]interface{}) {
			for k := range m {
				delete(m, k)
			}
		},
	)

	// StringSlicePool for []string objects
	StringSlicePool = New(
		func() []string {
			return make([]string, 0, 32)
		},
		func(s []string) {
			for i := range s {
				s[i] = ""
			}
			s = s[:0]
		},
	)

	// ByteSlicePool for []byte objects
	ByteSlicePool = New(
		func() []byte {
			return make([]byte, 0, 1024)
		},
		func(b []byte) {
			b = b[:0]
		},
	)

	// IDBufferPool for ID generation
	IDBufferPool = New(
		func() []byte {
			return make([]byte, 0, 64)
		},
		func(b []byte) {
			b = b[:0]
		},
	)

	// BatchSlicePool for []*Record objects (used in pipeline batching)
	BatchSlicePool = New(
		func() []*Record {
			return make([]*Record, 0, 1000)
		},
		func(s []*Record) {

			for i := range s {
				s[i] = nil
			}
			s = s[:0]
		},
	)
)

Global unified pools for the entire system

View Source
var (
	// GlobalBufferPool for byte buffer pooling
	GlobalBufferPool = NewBufferPool()

	// GlobalArenaPool for arena allocation
	GlobalArenaPool = NewArenaPool(16*1024*1024, 10) // 16MB chunks, 10 arenas
)

Global pools for advanced use cases

View Source
var (
	// Pool for [][]string batches (CSV rows)
	StringBatchPool = &sync.Pool{
		New: func() interface{} {
			return make([][]string, 0, 5000)
		},
	}
)

String batch pools for CSV and similar operations

Functions

func GenerateID

func GenerateID(prefix string) string

GenerateID generates a unique ID using pooled buffers

func GetBatchChannel

func GetBatchChannel() chan []*Record

GetBatchChannel gets a batch channel from the pool

func GetByteSlice

func GetByteSlice() []byte

GetByteSlice gets a byte slice from the global pool

func GetCSVRow

func GetCSVRow(capacity int) []string

GetCSVRow gets a []string from the pool for CSV row operations

func GetColumnName

func GetColumnName(index int) string

GetColumnName returns an interned column name Optimized for CSV and similar columnar data

func GetErrorChannel

func GetErrorChannel() chan error

GetErrorChannel gets an error channel from the pool

func GetErrorSlice

func GetErrorSlice(expectedSize int) []error

GetErrorSlice gets an error slice from the pool based on expected size

func GetFieldName

func GetFieldName(prefix string, index int) string

GetFieldName returns an interned field name for common patterns This avoids allocations for frequently used field names

func GetGlobalStats

func GetGlobalStats() map[string]Stats

GetGlobalStats returns statistics for all global pools

func GetInternStats

func GetInternStats() (size, hits, misses int64)

GetInternStats returns global intern pool statistics

func GetMap

func GetMap() map[string]interface{}

GetMap gets a map from the global pool

func GetRecordChannel

func GetRecordChannel(size int) chan *Record

GetRecordChannel gets a record channel from the pool

func GetRecordID

func GetRecordID(index int) string

GetRecordID returns an interned record ID Optimized for sequential access patterns

func GetStringBatch

func GetStringBatch(capacity int) [][]string

GetStringBatch gets a [][]string from the pool

func GetStringSlice

func GetStringSlice() []string

GetStringSlice gets a string slice from the global pool

func InternBytes

func InternBytes(b []byte) string

InternBytes interns a byte slice as a string using the global pool

func InternString

func InternString(s string) string

InternString interns a string using the global pool

func PreInternFieldNames

func PreInternFieldNames(names []string)

PreInternFieldNames pre-interns a batch of field names Useful for known schemas or repeated patterns

func PutBatchChannel

func PutBatchChannel(ch chan []*Record)

PutBatchChannel returns a batch channel to the pool

func PutBatchSlice

func PutBatchSlice(batch []*Record)

PutBatchSlice returns a batch slice to the global pool

func PutByteSlice

func PutByteSlice(b []byte)

PutByteSlice returns a byte slice to the global pool

func PutCSVRow

func PutCSVRow(slice []string)

PutCSVRow returns a []string to the pool

func PutErrorChannel

func PutErrorChannel(ch chan error)

PutErrorChannel returns an error channel to the pool

func PutErrorSlice

func PutErrorSlice(errs []error)

PutErrorSlice returns an error slice to the appropriate pool

func PutMap

func PutMap(m map[string]interface{})

PutMap returns a map to the global pool

func PutRecord

func PutRecord(record *Record)

PutRecord returns a record to the global pool

func PutRecordChannel

func PutRecordChannel(ch chan *Record)

PutRecordChannel returns a record channel to the pool

func PutStringBatch

func PutStringBatch(batch [][]string)

PutStringBatch returns a [][]string to the pool

func PutStringSlice

func PutStringSlice(s []string)

PutStringSlice returns a string slice to the global pool

func PutTypedRecord

func PutTypedRecord(tr *TypedRecord)

PutTypedRecord returns a typed record to the pool

Types

type Arena

type Arena struct {
	// contains filtered or unexported fields
}

Arena represents a memory arena

type ArenaPool

type ArenaPool struct {
	// contains filtered or unexported fields
}

ArenaPool provides arena-style allocation

func NewArenaPool

func NewArenaPool(chunkSize, maxArenas int) *ArenaPool

NewArenaPool creates a new arena pool

func (*ArenaPool) Alloc

func (p *ArenaPool) Alloc(size int) []byte

Alloc allocates memory from the arena

func (*ArenaPool) Reset

func (p *ArenaPool) Reset()

Reset resets all arenas

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool manages byte buffer pooling with size-based buckets

func NewBufferPool

func NewBufferPool() *BufferPool

NewBufferPool creates a new buffer pool with predefined sizes

func (*BufferPool) Get

func (p *BufferPool) Get(size int) []byte

Get returns a buffer of at least the requested size

func (*BufferPool) Put

func (p *BufferPool) Put(buf []byte)

Put returns a buffer to the pool

type ChannelPool

type ChannelPool[T any] struct {
	// contains filtered or unexported fields
}

ChannelPool provides pooling for channels to reduce allocations

func NewChannelPool

func NewChannelPool[T any](size int) *ChannelPool[T]

NewChannelPool creates a new channel pool with specified buffer size

func (*ChannelPool[T]) Get

func (p *ChannelPool[T]) Get() chan T

Get retrieves a channel from the pool

func (*ChannelPool[T]) Put

func (p *ChannelPool[T]) Put(ch chan T)

Put returns a channel to the pool after draining it

type ErrorSlicePool

type ErrorSlicePool struct {
	// contains filtered or unexported fields
}

ErrorSlicePool provides pooling for error slices

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool represents a generic object pool with type safety

func New

func New[T any](new func() T, reset func(T)) *Pool[T]

New creates a new typed pool

func (*Pool[T]) Get

func (p *Pool[T]) Get() T

Get retrieves an object from the pool

func (*Pool[T]) Put

func (p *Pool[T]) Put(obj T)

Put returns an object to the pool

func (*Pool[T]) Stats

func (p *Pool[T]) Stats() (allocated, inUse, hits, misses int64)

Stats returns pool statistics

type Record

type Record struct {
	ID       string                 `json:"id"`
	Data     map[string]interface{} `json:"data"`
	Metadata RecordMetadata         `json:"metadata"`
	Schema   interface{}            `json:"schema,omitempty"`
	RawData  []byte                 `json:"-"`
}

Record represents the unified record type (compatible with models.Record)

func GetBatchSlice

func GetBatchSlice(capacity int) []*Record

GetBatchSlice gets a batch slice from the global pool with specified capacity

func GetRecord

func GetRecord() *Record

GetRecord gets a record from the global pool

func NewCDCRecord

func NewCDCRecord(database, table, operation string, before, after map[string]interface{}) *Record

NewCDCRecord creates a new record for Change Data Capture

func NewRecord

func NewRecord(source string, data map[string]interface{}) *Record

NewRecord creates a new record with the given source and data

func NewRecordFromPool

func NewRecordFromPool(source string) *Record

NewRecordFromPool creates a new record using pooled resources

func NewStreamingRecord

func NewStreamingRecord(streamID string, offset int64, data map[string]interface{}) *Record

NewStreamingRecord creates a new record for streaming

func (*Record) GetCDCBefore

func (r *Record) GetCDCBefore() map[string]interface{}

GetCDCBefore returns the before state for UPDATE/DELETE operations

func (*Record) GetCDCDatabase

func (r *Record) GetCDCDatabase() string

GetCDCDatabase returns the database name for CDC records

func (*Record) GetCDCOperation

func (r *Record) GetCDCOperation() string

GetCDCOperation returns the CDC operation type

func (*Record) GetCDCPosition

func (r *Record) GetCDCPosition() string

GetCDCPosition returns the replication position

func (*Record) GetCDCTable

func (r *Record) GetCDCTable() string

GetCDCTable returns the table name for CDC records

func (*Record) GetCDCTransaction

func (r *Record) GetCDCTransaction() string

GetCDCTransaction returns the transaction ID

func (*Record) GetData

func (r *Record) GetData(key string) (interface{}, bool)

GetData efficiently gets a data field

func (*Record) GetMetadata

func (r *Record) GetMetadata(key string) (interface{}, bool)

GetMetadata efficiently gets a custom metadata field

func (*Record) GetOffset

func (r *Record) GetOffset() int64

GetOffset returns the stream offset

func (*Record) GetStreamID

func (r *Record) GetStreamID() string

GetStreamID returns the stream identifier

func (*Record) GetTimestamp

func (r *Record) GetTimestamp() time.Time

GetTimestamp returns the timestamp

func (*Record) IsCDCRecord

func (r *Record) IsCDCRecord() bool

IsCDCRecord returns true if this is a CDC record

func (*Record) IsDelete

func (r *Record) IsDelete() bool

IsDelete returns true if this is a CDC DELETE operation

func (*Record) IsInsert

func (r *Record) IsInsert() bool

IsInsert returns true if this is a CDC INSERT operation

func (*Record) IsStreamingRecord

func (r *Record) IsStreamingRecord() bool

IsStreamingRecord returns true if this is a streaming record

func (*Record) IsUpdate

func (r *Record) IsUpdate() bool

IsUpdate returns true if this is a CDC UPDATE operation

func (*Record) Release

func (r *Record) Release()

Release returns the record and its resources to the pool

func (*Record) SetCDCBefore

func (r *Record) SetCDCBefore(before map[string]interface{})

SetCDCBefore sets the before state for UPDATE/DELETE operations

func (*Record) SetCDCDatabase

func (r *Record) SetCDCDatabase(database string)

SetCDCDatabase sets the database name for CDC records

func (*Record) SetCDCOperation

func (r *Record) SetCDCOperation(operation string)

SetCDCOperation sets the CDC operation type (INSERT, UPDATE, DELETE)

func (*Record) SetCDCPosition

func (r *Record) SetCDCPosition(position string)

SetCDCPosition sets the replication position

func (*Record) SetCDCTable

func (r *Record) SetCDCTable(table string)

SetCDCTable sets the table name for CDC records

func (*Record) SetCDCTransaction

func (r *Record) SetCDCTransaction(txID string)

SetCDCTransaction sets the transaction ID

func (*Record) SetData

func (r *Record) SetData(key string, value interface{})

SetData efficiently sets a data field

func (*Record) SetMetadata

func (r *Record) SetMetadata(key string, value interface{})

SetMetadata efficiently sets a custom metadata field

func (*Record) SetOffset

func (r *Record) SetOffset(offset int64)

SetOffset sets the stream offset

func (*Record) SetStreamID

func (r *Record) SetStreamID(streamID string)

SetStreamID sets the stream identifier

func (*Record) SetTimestamp

func (r *Record) SetTimestamp(t time.Time)

SetTimestamp sets the timestamp

type RecordMetadata

type RecordMetadata struct {
	Source      string                 `json:"source,omitempty"`
	Table       string                 `json:"table,omitempty"`
	Operation   string                 `json:"operation,omitempty"`
	Offset      int64                  `json:"offset,omitempty"`
	StreamID    string                 `json:"stream_id,omitempty"`
	Timestamp   time.Time              `json:"timestamp"`
	Database    string                 `json:"database,omitempty"`
	Schema      string                 `json:"schema,omitempty"`
	Position    string                 `json:"position,omitempty"`
	Transaction string                 `json:"transaction,omitempty"`
	Before      map[string]interface{} `json:"before,omitempty"`
	Custom      map[string]interface{} `json:"custom,omitempty"`
}

RecordMetadata contains structured metadata (forward declaration for models.Record compatibility)

type Stats

type Stats struct {
	Allocated int64
	InUse     int64
	Hits      int64
	Misses    int64
}

Stats represents pool statistics

type StringInternPool

type StringInternPool struct {
	// contains filtered or unexported fields
}

StringInternPool provides string interning to reduce memory allocations for frequently used strings (like field names, map keys, etc.)

func (*StringInternPool) Clear

func (p *StringInternPool) Clear()

Clear clears the intern pool (useful for tests)

func (*StringInternPool) Intern

func (p *StringInternPool) Intern(s string) string

Intern returns an interned version of the string

func (*StringInternPool) InternBytes

func (p *StringInternPool) InternBytes(b []byte) string

InternBytes interns a byte slice as a string

func (*StringInternPool) Stats

func (p *StringInternPool) Stats() (size, hits, misses int64)

Stats returns intern pool statistics

type TypedRecord

type TypedRecord struct {
	*Record

	// Typed fields for common data types
	StringFields map[string]string
	IntFields    map[string]int64
	FloatFields  map[string]float64
	BoolFields   map[string]bool
	TimeFields   map[string]time.Time
	BytesFields  map[string][]byte
}

TypedRecord provides specialized record types to avoid interface{} boxing

func GetTypedRecord

func GetTypedRecord() *TypedRecord

GetTypedRecord gets a typed record from the pool

func (*TypedRecord) GetBool

func (tr *TypedRecord) GetBool(key string) (bool, bool)

GetBool gets a bool field without unboxing

func (*TypedRecord) GetBytes

func (tr *TypedRecord) GetBytes(key string) ([]byte, bool)

GetBytes gets a bytes field without unboxing

func (*TypedRecord) GetFloat

func (tr *TypedRecord) GetFloat(key string) (float64, bool)

GetFloat gets a float field without unboxing

func (*TypedRecord) GetInt

func (tr *TypedRecord) GetInt(key string) (int64, bool)

GetInt gets an int field without unboxing

func (*TypedRecord) GetString

func (tr *TypedRecord) GetString(key string) (string, bool)

GetString gets a string field without unboxing

func (*TypedRecord) GetTime

func (tr *TypedRecord) GetTime(key string) (time.Time, bool)

GetTime gets a time field without unboxing

func (*TypedRecord) Release

func (tr *TypedRecord) Release()

Release releases the typed record

func (*TypedRecord) SetBool

func (tr *TypedRecord) SetBool(key string, value bool)

SetBool sets a bool field without boxing

func (*TypedRecord) SetBytes

func (tr *TypedRecord) SetBytes(key string, value []byte)

SetBytes sets a bytes field without boxing

func (*TypedRecord) SetFloat

func (tr *TypedRecord) SetFloat(key string, value float64)

SetFloat sets a float field without boxing

func (*TypedRecord) SetInt

func (tr *TypedRecord) SetInt(key string, value int64)

SetInt sets an int field without boxing

func (*TypedRecord) SetString

func (tr *TypedRecord) SetString(key, value string)

SetString sets a string field without boxing

func (*TypedRecord) SetTime

func (tr *TypedRecord) SetTime(key string, value time.Time)

SetTime sets a time field without boxing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL