task

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskClosed = errors.New("task is closed")
)

Functions

func NewProcessError added in v0.2.0

func NewProcessError(cause error, node string) error

func NewProductionError added in v0.3.0

func NewProductionError(cause error, node string) error

func NewSerdeError added in v0.3.0

func NewSerdeError(cause error) error

Types

type Factory

type Factory interface {
	CreateTask(partition kafka.TopicPartition, producer kafka.Producer) (Task, error)
}

func NewTopologyTaskFactory

func NewTopologyTaskFactory(t *topology.Topology, logger logger.Logger, opts ...FactoryOption) (Factory, error)

type FactoryOption added in v0.2.0

type FactoryOption func(*topologyTaskFactory)

FactoryOption configures a topologyTaskFactory

func WithTelemetry added in v0.2.0

func WithTelemetry(tel *streamsotel.Telemetry) FactoryOption

WithTelemetry sets the telemetry instance for tasks created by this factory

type Manager

type Manager interface {
	CreateTasks(partitions []kafka.TopicPartition) error
	CloseTasks(partitions []kafka.TopicPartition) error
	DeleteTasks(partitions []kafka.TopicPartition) error

	Tasks() map[kafka.TopicPartition]Task
	TaskFor(partition kafka.TopicPartition) (Task, bool)

	Close() error
}

Manager handles task lifecycle

func NewManager

func NewManager(factory Factory, producer kafka.Producer, logger logger.Logger) Manager

type ProcessError added in v0.2.0

type ProcessError struct {
	Cause error
	Node  string
}

func AsProcessError added in v0.2.0

func AsProcessError(err error) (*ProcessError, bool)

func (*ProcessError) Error added in v0.2.0

func (e *ProcessError) Error() string

func (*ProcessError) Unwrap added in v0.2.0

func (e *ProcessError) Unwrap() error

type ProductionError added in v0.3.0

type ProductionError struct {
	Cause error
	Node  string
}

ProductionError wraps errors that occur during sink production.

func AsProductionError added in v0.3.0

func AsProductionError(err error) (*ProductionError, bool)

func (*ProductionError) Error added in v0.3.0

func (e *ProductionError) Error() string

func (*ProductionError) Unwrap added in v0.3.0

func (e *ProductionError) Unwrap() error

type SerdeError added in v0.3.0

type SerdeError struct {
	Cause error
}

SerdeError wraps errors that occur during key/value serialization or deserialization.

func AsSerdeError added in v0.3.0

func AsSerdeError(err error) (*SerdeError, bool)

func (*SerdeError) Error added in v0.3.0

func (e *SerdeError) Error() string

func (*SerdeError) Unwrap added in v0.3.0

func (e *SerdeError) Unwrap() error

type Task

type Task interface {
	Partition() kafka.TopicPartition
	Process(ctx context.Context, record kafka.ConsumerRecord) error
	Close() error
	IsClosed() bool
}

Task processes records for a single partition

type TopologyTask

type TopologyTask struct {
	// contains filtered or unexported fields
}

func (*TopologyTask) Close

func (t *TopologyTask) Close() error

func (*TopologyTask) IsClosed added in v0.2.0

func (t *TopologyTask) IsClosed() bool

func (*TopologyTask) Partition

func (t *TopologyTask) Partition() kafka.TopicPartition

func (*TopologyTask) Process

func (t *TopologyTask) Process(ctx context.Context, rec kafka.ConsumerRecord) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL