Versions in this module Expand all Collapse all v0 v0.22.3 Jan 31, 2024 v0.22.2 Jan 31, 2024 v0.22.1 Jan 30, 2024 Changes in this version + const Bytes + const ConcurrentRequests + const Key + const MagicPrefixSize + const RecordNameStrategy + const String + const Timeout + const TopicNameStrategy + const TopicRecordNameStrategy + const Value + var Balancers map[string]kafkago.Balancer + var CompressionCodecs map[string]compress.Compression + var ErrFailedTypeCast = NewXk6KafkaError(failedTypeCast, "Failed to cast type", nil) + var ErrForbiddenInInitContext = NewXk6KafkaError(kafkaForbiddenInInitContext, ...) + var ErrInvalidDataType = NewXk6KafkaError(invalidDataType, "Invalid data type provided for serializer/deserializer", nil) + var ErrInvalidPEMData = errors.New("tls: failed to find any PEM data in certificate input") + var ErrInvalidSchema = NewXk6KafkaError(failedUnmarshalSchema, "Failed to unmarshal schema", nil) + var ErrNoJKSConfig = NewXk6KafkaError(failedConfigureJKS, "Failed to configure JKS", nil) + var ErrNoSchemaRegistryClient = NewXk6KafkaError(failedConfigureSchemaRegistryClient, ...) + var ErrNotEnoughArguments = errors.New("not enough arguments") + var ErrPartitionAndGroupID = NewXk6KafkaError(partitionAndGroupID, "Partition and groupID cannot be set at the same time", ...) + var ErrTopicAndGroupID = NewXk6KafkaError(topicAndGroupID, ...) + var ErrUnknownSerdesType = NewXk6KafkaError(invalidSerdeType, "Unknown serdes type", nil) + var ErrUnsupportedOperation = NewXk6KafkaError(unsupportedOperation, "Operation not supported", nil) + var GroupBalancers map[string]kafkago.GroupBalancer + var HeartbeatInterval = time.Second * 3 + var IsolationLevels map[string]kafkago.IsolationLevel + var JoinGroupBackoff = time.Second * 5 + var PartitionWatchInterval = time.Second * 5 + var RebalanceTimeout = time.Second * 5 + var RetentionTime = time.Hour * 24 + var SessionTimeout = time.Second * 30 + var StartOffsets map[string]int64 + var TLSVersions map[string]uint16 + var TypesRegistry map[srclient.SchemaType]Serdes = map[srclient.SchemaType]Serdes + func GetSerdes(schemaType srclient.SchemaType) (Serdes, *Xk6KafkaError) + type AvroSerde struct + func (*AvroSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*AvroSerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type BasicAuth struct + Password string + Username string + type ByteArraySerde struct + func (*ByteArraySerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*ByteArraySerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type ConnectionConfig struct + Address string + SASL SASLConfig + TLS TLSConfig + type ConsumeConfig struct + Limit int64 + Name string + type Container struct + Data interface{} + Schema *Schema + SchemaType srclient.SchemaType + type Duration struct + func (d *Duration) UnmarshalJSON(b []byte) error + func (d Duration) MarshalJSON() ([]byte, error) + type Element string + type JKS struct + ClientCertsPem []string + ClientKeyPem string + ServerCaPem string + type JKSConfig struct + ClientCertAlias string + ClientKeyAlias string + ClientKeyPassword string + Password string + Path string + ServerCaAlias string + type JSONSerde struct + func (*JSONSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*JSONSerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type Kafka struct + type Message struct + Headers map[string]interface{} + HighWaterMark int64 + Key []byte + Name string + Offset int64 + Partition int + Time time.Time + Topic string + Value []byte + type Module struct + func (m *Module) Exports() modules.Exports + type ProduceConfig struct + Messages []Message + type ReaderConfig struct + Brokers []string + CommitInterval time.Duration + ConnectLogger bool + GroupBalancers []string + GroupID string + GroupTopics []string + HeartbeatInterval time.Duration + IsolationLevel string + JoinGroupBackoff time.Duration + MaxAttempts int + MaxBytes int + MaxWait Duration + MinBytes int + Offset int64 + OffsetOutOfRangeError bool + Partition int + PartitionWatchInterval time.Duration + QueueCapacity int + ReadBackoffMax time.Duration + ReadBackoffMin time.Duration + ReadBatchTimeout time.Duration + ReadLagInterval time.Duration + RebalanceTimeout time.Duration + RetentionTime time.Duration + SASL SASLConfig + SessionTimeout time.Duration + StartOffset string + TLS TLSConfig + Topic string + WatchPartitionChanges bool + type RootModule struct + func New() *RootModule + func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance + type SASLConfig struct + Algorithm string + Password string + Username string + type Schema struct + EnableCaching bool + ID int + References []srclient.Reference + Schema string + SchemaType *srclient.SchemaType + Subject string + Version int + func (s *Schema) Codec() *goavro.Codec + func (s *Schema) JsonSchema() *jsonschema.Schema + type SchemaRegistryConfig struct + BasicAuth BasicAuth + EnableCaching bool + TLS TLSConfig + URL string + type Serdes interface + Deserialize func(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + Serialize func(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type StringSerde struct + func (*StringSerde) Deserialize(data []byte, schema *Schema) (interface{}, *Xk6KafkaError) + func (*StringSerde) Serialize(data interface{}, schema *Schema) ([]byte, *Xk6KafkaError) + type SubjectNameConfig struct + Element Element + Schema string + SubjectNameStrategy string + Topic string + type TLSConfig struct + ClientCertPem string + ClientKeyPem string + EnableTLS bool + InsecureSkipTLSVerify bool + MinVersion string + ServerCaPem string + type WireFormat struct + Data []byte + SchemaID int + type WriterConfig struct + AutoCreateTopic bool + Balancer string + BatchBytes int + BatchSize int + BatchTimeout time.Duration + Brokers []string + Compression string + ConnectLogger bool + MaxAttempts int + ReadTimeout time.Duration + RequiredAcks int + SASL SASLConfig + TLS TLSConfig + Topic string + WriteTimeout time.Duration + type Xk6KafkaError struct + Code errCode + Message string + OriginalError error + func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError) + func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError) + func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError) + func NewXk6KafkaError(code errCode, msg string, originalErr error) *Xk6KafkaError + func (e Xk6KafkaError) Error() string + func (e Xk6KafkaError) Unwrap() error