kafka

package
v0.35.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AckNone   = "none"
	AckAll    = "all"
	AckLeader = "leader"

	HeaderKeyEncoding = "cerbos.audit.encoding"
	HeaderKeyKind     = "cerbos.audit.kind"

	CompressionNone   = "none"
	CompressionGzip   = "gzip"
	CompressionSnappy = "snappy"
	CompressionLZ4    = "lz4"
	CompressionZstd   = "zstd"
)
View Source
const Backend = "kafka"

Variables

This section is empty.

Functions

func NewTLSConfig added in v0.29.0

func NewTLSConfig(ctx context.Context, reloadInterval time.Duration, insecureSkipVerify bool, caPath, certPath, keyPath string) (*tls.Config, error)

Types

type Authentication added in v0.29.0

type Authentication struct {
	TLS *TLS `yaml:"tls"`
}

type Client

type Client interface {
	Close()
	Flush(context.Context) error
	Produce(context.Context, *kgo.Record, func(*kgo.Record, error))
	ProduceSync(context.Context, ...*kgo.Record) kgo.ProduceResults
}

type Conf

type Conf struct {
	// Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all".
	Ack string `yaml:"ack" conf:",example=all"`
	// Authentication
	Authentication Authentication `yaml:"authentication"`
	// Topic to write audit entries to.
	Topic string `yaml:"topic" conf:"required,example=cerbos.audit.log"`
	// Encoding format. Valid values are "json" (default) or "protobuf".
	Encoding Encoding `yaml:"encoding" conf:",example=json"`
	// ClientID reported in Kafka connections.
	ClientID string `yaml:"clientID" conf:",example=cerbos"`
	// Brokers list to seed the Kafka client.
	Brokers []string `yaml:"brokers" conf:"required,example=['localhost:9092']"`
	// Compression sets the compression algorithm to use in order of priority. Valid values are "none", "gzip", "snappy","lz4", "zstd". Default is ["snappy", "none"].
	Compression []string `yaml:"compression" conf:",example=['snappy']"`
	// CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed.
	CloseTimeout time.Duration `yaml:"closeTimeout" conf:",example=30s"`
	// MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
	MaxBufferedRecords int `yaml:"maxBufferedRecords" conf:",example=1000"`
	// ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
	ProduceSync bool `yaml:"produceSync" conf:",example=false"`
}

Conf is optional configuration for kafka Audit.

func (*Conf) Key

func (c *Conf) Key() string

func (*Conf) SetDefaults

func (c *Conf) SetDefaults()

func (*Conf) Validate

func (c *Conf) Validate() error

type Encoding

type Encoding string
const (
	EncodingJSON     Encoding = "json"
	EncodingProtobuf Encoding = "protobuf"
)

type Kind

type Kind []byte
var (
	// reallocate once ahead of time to avoid allocations in the hot path.
	KindAccess   Kind = []byte(audit.KindAccess)
	KindDecision Kind = []byte(audit.KindDecision)
)

type Publisher

type Publisher struct {
	Client Client
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(ctx context.Context, conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Publisher, error)

func (*Publisher) Backend

func (p *Publisher) Backend() string

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Enabled

func (p *Publisher) Enabled() bool

func (*Publisher) WriteAccessLogEntry

func (p *Publisher) WriteAccessLogEntry(ctx context.Context, record audit.AccessLogEntryMaker) error

func (*Publisher) WriteDecisionLogEntry

func (p *Publisher) WriteDecisionLogEntry(ctx context.Context, record audit.DecisionLogEntryMaker) error

type TLS added in v0.29.0

type TLS struct {
	// CAPath is the path to the CA certificate.
	CAPath string `yaml:"caPath" conf:"required,example=/path/to/ca.crt"`
	// CertPath is the path to the client certificate.
	CertPath string `yaml:"certPath" conf:",example=/path/to/tls.cert"`
	// KeyPath is the path to the client key.
	KeyPath string `yaml:"keyPath" conf:",example=/path/to/tls.key"`
	// ReloadInterval is the interval at which the TLS certificates are reloaded. The default is 0 (no reload).
	ReloadInterval time.Duration `yaml:"reloadInterval" conf:",example=5m"`
	// InsecureSkipVerify controls whether the server's certificate chain and host name are verified. Default is false.
	InsecureSkipVerify bool `yaml:"insecureSkipVerify" conf:",example=true"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL