Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrTaskClosed = errors.New("task is closed")
)
Functions ¶
func NewProcessError ¶ added in v0.2.0
func NewProductionError ¶ added in v0.3.0
func NewSerdeError ¶ added in v0.3.0
Types ¶
type Factory ¶
type Factory interface {
CreateTask(partition kafka.TopicPartition, producer kafka.Producer) (Task, error)
}
func NewTopologyTaskFactory ¶
type FactoryOption ¶ added in v0.2.0
type FactoryOption func(*topologyTaskFactory)
FactoryOption configures a topologyTaskFactory
func WithTelemetry ¶ added in v0.2.0
func WithTelemetry(tel *streamsotel.Telemetry) FactoryOption
WithTelemetry sets the telemetry instance for tasks created by this factory
type Manager ¶
type Manager interface {
CreateTasks(partitions []kafka.TopicPartition) error
CloseTasks(partitions []kafka.TopicPartition) error
DeleteTasks(partitions []kafka.TopicPartition) error
Tasks() map[kafka.TopicPartition]Task
TaskFor(partition kafka.TopicPartition) (Task, bool)
Close() error
}
Manager handles task lifecycle
type ProcessError ¶ added in v0.2.0
func AsProcessError ¶ added in v0.2.0
func AsProcessError(err error) (*ProcessError, bool)
func (*ProcessError) Error ¶ added in v0.2.0
func (e *ProcessError) Error() string
func (*ProcessError) Unwrap ¶ added in v0.2.0
func (e *ProcessError) Unwrap() error
type ProductionError ¶ added in v0.3.0
ProductionError wraps errors that occur during sink production.
func AsProductionError ¶ added in v0.3.0
func AsProductionError(err error) (*ProductionError, bool)
func (*ProductionError) Error ¶ added in v0.3.0
func (e *ProductionError) Error() string
func (*ProductionError) Unwrap ¶ added in v0.3.0
func (e *ProductionError) Unwrap() error
type SerdeError ¶ added in v0.3.0
type SerdeError struct {
Cause error
}
SerdeError wraps errors that occur during key/value serialization or deserialization.
func AsSerdeError ¶ added in v0.3.0
func AsSerdeError(err error) (*SerdeError, bool)
func (*SerdeError) Error ¶ added in v0.3.0
func (e *SerdeError) Error() string
func (*SerdeError) Unwrap ¶ added in v0.3.0
func (e *SerdeError) Unwrap() error
type Task ¶
type Task interface {
Partition() kafka.TopicPartition
Process(ctx context.Context, record kafka.ConsumerRecord) error
Close() error
IsClosed() bool
}
Task processes records for a single partition
type TopologyTask ¶
type TopologyTask struct {
// contains filtered or unexported fields
}
func (*TopologyTask) Close ¶
func (t *TopologyTask) Close() error
func (*TopologyTask) IsClosed ¶ added in v0.2.0
func (t *TopologyTask) IsClosed() bool
func (*TopologyTask) Partition ¶
func (t *TopologyTask) Partition() kafka.TopicPartition
func (*TopologyTask) Process ¶
func (t *TopologyTask) Process(ctx context.Context, rec kafka.ConsumerRecord) error
Click to show internal directories.
Click to hide internal directories.