kfake

package module
v0.0.0-...-6a58760 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: BSD-3-Clause Imports: 27 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster is a mock Kafka broker cluster.

func MustCluster

func MustCluster(opts ...Opt) *Cluster

MustCluster is like NewCluster, but panics on error.

func NewCluster

func NewCluster(opts ...Opt) (*Cluster, error)

NewCluster returns a new mocked Kafka cluster.

func (*Cluster) AddNode

func (c *Cluster) AddNode(nodeID int32, port int) (int32, int, error)

AddNode adds a node to the cluster. If nodeID is -1, the next node ID is used. If port is 0 or negative, a random port is chosen. This returns the added node ID and the port used, or an error if the node already exists or the port cannot be listened to.

func (*Cluster) Close

func (c *Cluster) Close()

Close shuts down the cluster.

func (*Cluster) Control

func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool))

Control is a function to call on any client request the cluster handles.

If the control function returns true, then either the response is written back to the client or, if there the control function returns an error, the client connection is closed. If both returns are nil, then the cluster will loop continuing to read from the client and the client will likely have a read timeout at some point.

Controlling a request drops the control function from the cluster, meaning that a control function can only control *one* request. To keep the control function handling more requests, you can call KeepControl within your control function. Alternatively, if you want to just run some logic in your control function but then have the cluster handle the request as normal, you can call DropControl to drop a control function that was not handled.

It is safe to add new control functions within a control function.

Control functions are run serially unless you use SleepControl, multiple control functions are "in progress", and you run Cluster.Close. Closing a Cluster awakens all sleeping control functions.

func (*Cluster) ControlKey

func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool))

Control is a function to call on a specific request key that the cluster handles.

If the control function returns true, then either the response is written back to the client or, if there the control function returns an error, the client connection is closed. If both returns are nil, then the cluster will loop continuing to read from the client and the client will likely have a read timeout at some point.

Controlling a request drops the control function from the cluster, meaning that a control function can only control *one* request. To keep the control function handling more requests, you can call KeepControl within your control function. Alternatively, if you want to just run some logic in your control function but then have the cluster handle the request as normal, you can call DropControl to drop a control function that was not handled.

It is safe to add new control functions within a control function.

Control functions are run serially unless you use SleepControl, multiple control functions are "in progress", and you run Cluster.Close. Closing a Cluster awakens all sleeping control functions.

func (*Cluster) CoordinatorFor

func (c *Cluster) CoordinatorFor(key string) int32

CoordinatorFor returns the node ID of the group or transaction coordinator for the given key.

func (*Cluster) CurrentNode

func (c *Cluster) CurrentNode() int32

CurrentNode is solely valid from within a control function; it returns the broker id that the request was received by. If there's no request currently inflight, this returns -1.

func (*Cluster) DropControl

func (c *Cluster) DropControl()

DropControl allows you to drop the current control function. This takes precedence over KeepControl. The use of this function is you can run custom control logic *once*, drop the control function, and return that the function was not handled -- thus allowing other control functions to run, or allowing the kfake cluster to process the request as normal.

func (*Cluster) KeepControl

func (c *Cluster) KeepControl()

KeepControl marks the currently running control function to be kept even if you handle the request and return true. This can be used to continuously control requests without needing to re-add control functions manually.

func (*Cluster) ListenAddrs

func (c *Cluster) ListenAddrs() []string

ListenAddrs returns the hostports that the cluster is listening on.

func (*Cluster) MoveTopicPartition

func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32) error

MoveTopicPartition simulates the rebalancing of a partition to an alternative broker. This returns an error if the topic, partition, or node does not exit.

func (*Cluster) RehashCoordinators

func (c *Cluster) RehashCoordinators()

RehashCoordinators simulates group and transacational ID coordinators moving around. All group and transactional IDs are rekeyed. This forces clients to reload coordinators.

func (*Cluster) RemoveNode

func (c *Cluster) RemoveNode(nodeID int32) error

RemoveNode removes a ndoe from the cluster. This returns an error if the node does not exist.

func (*Cluster) ShufflePartitionLeaders

func (c *Cluster) ShufflePartitionLeaders()

ShufflePartitionLeaders simulates a leader election for all partitions: all partitions have a randomly selected new leader and their internal epochs are bumped.

func (*Cluster) SleepControl

func (c *Cluster) SleepControl(wakeup func())

SleepControl sleeps the current control function until wakeup returns. This yields to run any other connection.

Note that per protocol, requests on the same connection must be replied to in order. Many clients write multiple requests to the same connection, so if you sleep until a different request runs, you may sleep forever -- you must know the semantics of your client to know whether requests run on different connections (or, ensure you are writing to different brokers).

For example, franz-go uses a dedicated connection for:

  • produce requests
  • fetch requests
  • join&sync requests
  • requests with a Timeout field
  • all other request

So, for franz-go, there are up to five separate connections depending on what you are doing.

You can run SleepControl multiple times in the same control function. If you sleep a request you are controlling, and another request of the same key comes in, it will run the same control function and may also sleep (i.e., you must have logic if you want to avoid sleeping on the same request).

type LogLevel

type LogLevel int8

LogLevel designates which level the logger should log at.

const (
	// LogLevelNone disables logging.
	LogLevelNone LogLevel = iota
	// LogLevelError logs all errors. Generally, these should not happen.
	LogLevelError
	// LogLevelWarn logs all warnings, such as request failures.
	LogLevelWarn
	// LogLevelInfo logs informational messages, such as requests. This is
	// usually the default log level.
	LogLevelInfo
	// LogLevelDebug logs verbose information, and is usually not used in
	// production.
	LogLevelDebug
)

func (LogLevel) String

func (l LogLevel) String() string

type Logger

type Logger interface {
	Logf(LogLevel, string, ...any)
}

Logger can be provided to hook into the fake cluster's logs.

func BasicLogger

func BasicLogger(dst io.Writer, level LogLevel) Logger

BasicLogger returns a logger that writes newline delimited messages to dst.

type Opt

type Opt interface {
	// contains filtered or unexported methods
}

Opt is an option to configure a client.

func AllowAutoTopicCreation

func AllowAutoTopicCreation() Opt

AllowAutoTopicCreation allows metadata requests to create topics if the metadata request has its AllowAutoTopicCreation field set to true.

func ClusterID

func ClusterID(clusterID string) Opt

ClusterID sets the cluster ID to return in metadata responses.

func DefaultNumPartitions

func DefaultNumPartitions(n int) Opt

DefaultNumPartitions sets the number of partitions to create by default for auto created topics / CreateTopics with -1 partitions, overriding the default of 10.

func EnableSASL

func EnableSASL() Opt

EnableSASL enables SASL authentication for the cluster. If you do not configure a bootstrap user / pass, the default superuser is "admin" / "admin" with the SCRAM-SHA-256 SASL mechanisms.

func GroupMaxSessionTimeout

func GroupMaxSessionTimeout(d time.Duration) Opt

GroupMaxSessionTimeout sets the cluster's maximum session timeout allowed for groups, overriding the default 5 minutes.

func GroupMinSessionTimeout

func GroupMinSessionTimeout(d time.Duration) Opt

GroupMinSessionTimeout sets the cluster's minimum session timeout allowed for groups, overriding the default 6 seconds.

func NumBrokers

func NumBrokers(n int) Opt

NumBrokers sets the number of brokers to start in the fake cluster.

func Ports

func Ports(ports ...int) Opt

Ports sets the ports to listen on, overriding randomly choosing NumBrokers amount of ports.

func SeedTopics

func SeedTopics(partitions int32, ts ...string) Opt

SeedTopics provides topics to create by default in the cluster. Each topic will use the given partitions and use the default internal replication factor. If you use a non-positive number for partitions, DefaultNumPartitions is used. This option can be provided multiple times if you want to seed topics with different partition counts. If a topic is provided in multiple options, the last specification wins.

func SleepOutOfOrder

func SleepOutOfOrder() Opt

SleepOutOfOrder allows functions to be handled out of order when control functions are sleeping. The functions are be handled internally out of order, but responses still wait for the sleeping requests to finish. This can be used to set up complicated chains of control where functions only advance when you know another request is actively being handled.

func Superuser

func Superuser(method, user, pass string) Opt

Superuser seeds the cluster with a superuser. The method must be either PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Note that PLAIN superusers cannot be deleted. SCRAM superusers can be modified with AlterUserScramCredentials. If you delete all SASL users, the kfake cluster will be unusable.

func TLS

func TLS(c *tls.Config) Opt

TLS enables TLS for the cluster, using the provided TLS config for listening.

func WithLogger

func WithLogger(logger Logger) Opt

WithLogger sets the logger to use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL