kafkajobs

package
v5.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Acks

type Acks string
const (
	NoAck     Acks = "NoAck"
	LeaderAck Acks = "LeaderAck"
	AllISRAck Acks = "AllISRAck"
)

type ClientAuthType

type ClientAuthType string
const (
	NoClientCert               ClientAuthType = "no_client_cert"
	RequestClientCert          ClientAuthType = "request_client_cert"
	RequireAnyClientCert       ClientAuthType = "require_any_client_cert"
	VerifyClientCertIfGiven    ClientAuthType = "verify_client_cert_if_given"
	RequireAndVerifyClientCert ClientAuthType = "require_and_verify_client_cert"
)

type CompressionCodec

type CompressionCodec string

type Configurer

type Configurer interface {
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if a config section exists.
	Has(name string) bool
}

type ConsumerOpts

type ConsumerOpts struct {
	Topics              []string                     `mapstructure:"topics" json:"topics"`
	ConsumeRegexp       bool                         `mapstructure:"consume_regexp" json:"consume_regexp"`
	MaxFetchMessageSize int32                        `mapstructure:"max_fetch_message_size" json:"max_fetch_message_size"`
	MinFetchMessageSize int32                        `mapstructure:"min_fetch_message_size" json:"min_fetch_message_size"`
	ConsumePartitions   map[string]map[int32]*Offset `mapstructure:"consume_partitions" json:"consume_partitions"`
	ConsumerOffset      *Offset                      `mapstructure:"consumer_offset" json:"consumer_offset"`
}

type Driver

type Driver struct {
	// contains filtered or unexported fields
}

func FromConfig

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipeline jobs.Pipeline, pq jobs.Queue) (*Driver, error)

FromConfig initializes kafka pipeline from the configuration

func FromPipeline

func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error)

FromPipeline initializes a pipeline on-the-fly

func (*Driver) Pause

func (d *Driver) Pause(ctx context.Context, p string) error

func (*Driver) Push

func (d *Driver) Push(ctx context.Context, job jobs.Message) error

func (*Driver) Resume

func (d *Driver) Resume(ctx context.Context, p string) error

func (*Driver) Run

func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error

func (*Driver) State

func (d *Driver) State(ctx context.Context) (*jobs.State, error)

func (*Driver) Stop

func (d *Driver) Stop(ctx context.Context) error

type GroupOptions

type GroupOptions struct {
	GroupID              string `mapstructure:"group_id" json:"group_id"`
	BlockRebalanceOnPoll bool   `mapstructure:"block_rebalance_on_poll" json:"block_rebalance_on_poll"`
}

type Item

type Item struct {
	// Job contains the pluginName of job broker (usually PHP class).
	Job string `json:"job"`
	// Ident is a unique identifier of the job, should be provided from outside
	Ident string `json:"id"`
	// Payload is string data (usually JSON) passed to Job broker.
	Payload []byte `json:"payload"`

	// Options contain a set of PipelineOptions specific to job execution. Can be empty.
	Options *Options `json:"options,omitempty"`
	// contains filtered or unexported fields
}

func (*Item) Ack

func (i *Item) Ack() error

func (*Item) Body

func (i *Item) Body() []byte

Body packs job payload into binary payload.

func (*Item) Context

func (i *Item) Context() ([]byte, error)

Context packs job context (job, id) into binary payload. Not used in the amqp, amqp.Table used instead

func (*Item) Copy

func (i *Item) Copy() *Item

func (*Item) GroupID

func (i *Item) GroupID() string

func (*Item) Headers

func (i *Item) Headers() map[string][]string

func (*Item) ID

func (i *Item) ID() string

func (*Item) Nack

func (i *Item) Nack() error

func (*Item) NackWithOptions

func (i *Item) NackWithOptions(requeue bool, _ int) error

func (*Item) Priority

func (i *Item) Priority() int64

func (*Item) Requeue

func (i *Item) Requeue(headers map[string][]string, _ int) error

Requeue with the provided delay, handled by the Nack

func (*Item) Respond

func (i *Item) Respond(_ []byte, _ string) error

Respond is not used and presented to satisfy the Job interface

type Offset

type Offset struct {
	Type  OffsetTypes `mapstructure:"type"`
	Value int64       `mapstructure:"value"`
}

type OffsetTypes

type OffsetTypes string
const (
	At         OffsetTypes = "At"
	AfterMilli OffsetTypes = "AfterMilli"
	AtEnd      OffsetTypes = "AtEnd"
	AtStart    OffsetTypes = "AtStart"
	Relative   OffsetTypes = "Relative"
	WithEpoch  OffsetTypes = "WithEpoch"
)

type Options

type Options struct {
	// Priority is job priority, default - 10
	// pointer to distinguish 0 as a priority and nil as a priority not set
	Priority int64 `json:"priority"`
	// Pipeline manually specified pipeline.
	Pipeline string `json:"pipeline,omitempty"`
	// Delay defines time duration to delay execution for. Defaults to none.
	Delay int64 `json:"delay,omitempty"`
	// AutoAck option
	AutoAck bool `json:"auto_ack"`

	Queue     string
	Metadata  string
	Partition int32
	Offset    int64
}

Options carry information about how to handle a given job.

type Ping

type Ping struct {
	Timeout time.Duration `mapstructure:"timeout" json:"timeout"`
}

type ProducerOpts

type ProducerOpts struct {
	DisableIdempotent  bool             `mapstructure:"disable_idempotent" json:"disable_idempotent"`
	RequiredAcks       Acks             `mapstructure:"required_acks" json:"required_acks"`
	MaxMessageBytes    int32            `mapstructure:"max_message_bytes" json:"max_message_bytes"`
	RequestTimeout     time.Duration    `mapstructure:"request_timeout" json:"request_timeout"`
	DeliveryTimeout    time.Duration    `mapstructure:"delivery_timeout" json:"delivery_timeout"`
	TransactionTimeout time.Duration    `mapstructure:"transaction_timeout" json:"transaction_timeout"`
	CompressionCodec   CompressionCodec `mapstructure:"compression_codec" json:"compression_codec"`
}

type SASL

type SASL struct {
	Type SASLMechanism `mapstructure:"mechanism" json:"mechanism"`

	// plain + SHA
	Username string `mapstructure:"username" json:"username"`
	Password string `mapstructure:"password" json:"password"`
	Zid      string `mapstructure:"zid" json:"zid"`
	Nonce    []byte `mapstructure:"nonce" json:"nonce"`
	IsToken  bool   `mapstructure:"is_token" json:"is_token"`

	// aws_msk_iam
	AccessKey    string `mapstructure:"access_key" json:"access_key"`
	SecretKey    string `mapstructure:"secret_key" json:"secret_key"`
	SessionToken string `mapstructure:"session_token" json:"session_token"`
	UserAgent    string `mapstructure:"user_agent" json:"user_agent"`
}

type SASLMechanism

type SASLMechanism string

type TLS

type TLS struct {
	Timeout  time.Duration  `mapstructure:"timeout" json:"timeout"`
	Key      string         `mapstructure:"key"`
	Cert     string         `mapstructure:"cert"`
	RootCA   string         `mapstructure:"root_ca"`
	AuthType ClientAuthType `mapstructure:"client_auth_type"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL