danube

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

Danube-go client

The Go Client library for interacting with Danube Messaging Broker platform.

Danube is an open-source distributed Messaging platform written in Rust. Consult the documentation for supported concepts and the platform architecture.

Example usage

Check out the example files.

Start the Danube server

Use the instructions from the documentation to run the Danube broker/cluster.

Create Producer
client, err := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()
if err != nil {
    log.Fatalf("Failed to create client: %v", err)
}

ctx := context.Background()
topic := "/default/test_topic"
producerName := "test_producer"

producer, err := client.NewProducer().
    WithName(producerName).
    WithTopic(topic).
    Build()
if err != nil {
    log.Fatalf("unable to initialize the producer: %v", err)
}

if err := producer.Create(ctx); err != nil {
    log.Fatalf("Failed to create producer: %v", err)
}
log.Printf("The Producer %s was created", producerName)

payload := fmt.Sprintln("Hello Danube")

// Convert string to bytes
bytes_payload := []byte(payload)

// You can send the payload along with the user defined attributes, in this case is nil
messageID, err := producer.Send(ctx, bytes_payload, nil)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}
log.Printf("The Message with id %v was sent", messageID)
Reliable Dispatch (optional)

Reliable dispatch can be enabled when creating the producer, the broker will stream the messages to the consumer from WAL and cloud storage.

reliableStrategy := danube.NewReliableDispatchStrategy()

producer, err := client.NewProducer().
    WithName(producerName).
    WithTopic(topic).
    WithDispatchStrategy(reliableStrategy).
    Build()
Create Consumer
client, err := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()
if err != nil {
    log.Fatalf("Failed to create client: %v", err)
}

ctx := context.Background()
topic := "/default/test_topic"
consumerName := "test_consumer"
subscriptionName := "test_subscription"
subType := danube.Exclusive

consumer, err := client.NewConsumer().
    WithConsumerName(consumerName).
    WithTopic(topic).
    WithSubscription(subscriptionName).
    WithSubscriptionType(subType).
    Build()
if err != nil {
    log.Fatalf("Failed to initialize the consumer: %v", err)
}

// Request to subscribe to the topic and create the resources on the Danube Broker
if err := consumer.Subscribe(ctx); err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}
log.Printf("The Consumer %s was created", consumerName)

// Request to receive messages
stream, err := consumer.Receive(ctx)
if err != nil {
    log.Fatalf("Failed to receive messages: %v", err)
}

// consume the messages from the go channel
for msg := range stream {
    fmt.Printf("Received message: %+v\n", string(msg.GetPayload()))

    // Acknowledge the message
    if _, err := consumer.Ack(ctx, msg); err != nil {
        log.Fatalf("Failed to acknowledge message: %v", err)
    }
}

Contribution

Working on improving and adding new features. Please feel free to contribute or report any issues you encounter.

Running Integration Tests

Before submitting a PR, start the test cluster and run the integration tests:

# 1. Start the cluster
cd docker/
docker compose up -d

# 2. Wait for the broker to be healthy
docker compose ps

# 3. Run the integration tests from the repository root
cd ..
go test ./integration_tests/ -v -count=1

# 4. Stop the cluster when done
cd docker/
docker compose down -v
Use latest DanubeApi.proto and SchemaRegistry.proto files

Make sure the proto/DanubeApi.proto is the latest from Danube project.

If not replace the file and add at the top of the file

option go_package = "github.com/danube-messaging/danube-go/proto";

right after the package danube;

In order to generate the Go grpc code you need the following packages installed:

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

And generate the Go code from the proto file:

protoc --proto_path=./proto --go_out=./proto --go-grpc_out=./proto --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative proto/DanubeApi.proto
protoc --proto_path=./proto --go_out=./proto --go-grpc_out=./proto --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative proto/SchemaRegistry.proto

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CompatibilityMode added in v0.4.0

type CompatibilityMode int

CompatibilityMode defines schema evolution rules.

const (
	CompatibilityNone CompatibilityMode = iota
	CompatibilityBackward
	CompatibilityForward
	CompatibilityFull
)

func ParseCompatibilityMode added in v0.4.0

func ParseCompatibilityMode(value string) (CompatibilityMode, error)

ParseCompatibilityMode parses a compatibility mode string.

func (CompatibilityMode) AsString added in v0.4.0

func (c CompatibilityMode) AsString() string

AsString returns the wire representation of the compatibility mode.

type ConfigDispatchStrategy

type ConfigDispatchStrategy struct {
	// contains filtered or unexported fields
}

ConfigDispatchStrategy represents the dispatch strategy for a topic. It now directly maps to the proto.DispatchStrategy enum.

func NewConfigDispatchStrategy

func NewConfigDispatchStrategy() *ConfigDispatchStrategy

NewConfigDispatchStrategy creates a new ConfigDispatchStrategy instance with NonReliable as default.

func NewReliableDispatchStrategy

func NewReliableDispatchStrategy() *ConfigDispatchStrategy

NewReliableDispatchStrategy creates a new reliable ConfigDispatchStrategy instance.

type ConnectionOptions added in v0.4.0

type ConnectionOptions struct {
	DialOptions    []DialOption  // Optional gRPC dial options.
	TLSConfig      *tls.Config   // TLS configuration (required when UseTLS is true).
	UseTLS         bool          // Enable TLS/mTLS for the connection.
	Token          string        // Static JWT token for authentication.
	TokenSupplier  TokenSupplier // Dynamic token supplier called per-request.
	InternalBroker string        // Broker-internal identity header (broker-to-broker only).
}

ConnectionOptions configures how the client connects to the broker.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a message consumer that subscribes to a topic and receives messages. It handles communication with the message broker and manages the consumer's state.

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, message *proto.StreamMessage) (*proto.AckResponse, error)

Ack acknowledges a received message.

func (*Consumer) Close added in v0.4.0

func (c *Consumer) Close()

Close stops receive loops and signals topic consumers to stop.

func (*Consumer) Nack added in v0.5.0

func (c *Consumer) Nack(ctx context.Context, message *proto.StreamMessage, delayMs *uint64, reason *string) (*proto.NackResponse, error)

Nack sends a negative acknowledgement for a received message.

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) (chan *proto.StreamMessage, error)

Receive starts receiving messages from the subscribed partitioned or non-partitioned topic. It continuously polls for new messages and handles them as long as the stopSignal has not been set to true.

Parameters: - ctx: The context for managing the receive operation.

Returns: - StreamMessage channel for receiving messages from the broker. - error: An error if the receive client cannot be created or if other issues occur.

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(ctx context.Context) error

Subscribe initializes the subscription to the non-partitioned or partitioned topic and starts the health check service. It establishes a gRPC connection with the brokers and requests to subscribe to the topic.

Parameters: - ctx: The context for managing the subscription lifecycle.

Returns: - error: An error if the subscription fails or if initialization encounters issues.

type ConsumerBuilder

type ConsumerBuilder struct {
	// contains filtered or unexported fields
}

ConsumerBuilder is a builder for creating a new Consumer instance. It allows setting various properties for the consumer such as topic, name, subscription, subscription type, and options.

func (*ConsumerBuilder) Build

func (b *ConsumerBuilder) Build() (*Consumer, error)

Build creates a new Consumer instance using the settings configured in the ConsumerBuilder. It performs validation to ensure that all required fields are set before creating the consumer.

Returns: - *Consumer: A pointer to the newly created Consumer instance if successful. - error: An error if required fields are missing or if consumer creation fails.

func (*ConsumerBuilder) WithConsumerName

func (b *ConsumerBuilder) WithConsumerName(name string) *ConsumerBuilder

WithConsumerName sets the name of the consumer. This is a required field.

Parameters: - name: The name assigned to the consumer instance.

func (*ConsumerBuilder) WithOptions added in v0.4.0

func (b *ConsumerBuilder) WithOptions(options ConsumerOptions) *ConsumerBuilder

WithOptions sets the configuration options for the consumer.

func (*ConsumerBuilder) WithSubscription

func (b *ConsumerBuilder) WithSubscription(subscription string) *ConsumerBuilder

WithSubscription sets the name of the subscription for the consumer. This is a required field.

Parameters: - subscription: The name of the subscription for the consumer.

func (*ConsumerBuilder) WithSubscriptionType

func (b *ConsumerBuilder) WithSubscriptionType(subType SubType) *ConsumerBuilder

WithSubscriptionType sets the type of subscription for the consumer. This field is optional.

Parameters: - subType: The type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER).

func (*ConsumerBuilder) WithTopic

func (b *ConsumerBuilder) WithTopic(topic string) *ConsumerBuilder

WithTopic sets the topic name for the consumer. This is a required field.

Parameters: - topic: The name of the topic for the consumer.

type ConsumerOptions

type ConsumerOptions struct {
	MaxRetries    int   // Maximum retry attempts for receive/subscribe operations.
	BaseBackoffMs int64 // Base backoff in milliseconds.
	MaxBackoffMs  int64 // Maximum backoff in milliseconds.
}

ConsumerOptions configures retry behavior for consumers.

type DanubeClient

type DanubeClient struct {
	URI string
	// contains filtered or unexported fields
}

DanubeClient is the main client for interacting with the Danube messaging system. It provides methods to create producers and consumers, and retrieve schema information.

func (*DanubeClient) NewConsumer

func (dc *DanubeClient) NewConsumer() *ConsumerBuilder

NewConsumer returns a new ConsumerBuilder, which is used to configure and create a Consumer instance.

func (*DanubeClient) NewProducer

func (dc *DanubeClient) NewProducer() *ProducerBuilder

NewProducer returns a new ProducerBuilder, which is used to configure and create a Producer instance.

func (*DanubeClient) Schema added in v0.4.0

func (dc *DanubeClient) Schema() *SchemaRegistryClient

Schema returns a SchemaRegistryClient for schema operations.

type DanubeClientBuilder

type DanubeClientBuilder struct {
	URI string
	// contains filtered or unexported fields
}

DanubeClientBuilder is used for configuring and creating a DanubeClient instance. It provides methods for setting various options, including the service URL, connection options, and logger.

Fields: - URI: The base URI for the Danube service. This is required for constructing the client. - ConnectionOptions: Optional connection settings for configuring how the client connects to the service.

func NewClient

func NewClient() *DanubeClientBuilder

NewClient initializes a new DanubeClientBuilder. The builder pattern allows for configuring and constructing a DanubeClient instance with optional settings and options.

Returns: - *DanubeClientBuilder: A new instance of DanubeClientBuilder for configuring and building a DanubeClient.

func (*DanubeClientBuilder) Build

func (b *DanubeClientBuilder) Build() (*DanubeClient, error)

Build constructs and returns a DanubeClient instance based on the configuration specified in the builder.

Returns: - *DanubeClient: A new instance of DanubeClient configured with the specified options. - error: An error if the configuration is invalid or incomplete.

func (*DanubeClientBuilder) ServiceURL

func (b *DanubeClientBuilder) ServiceURL(url string) *DanubeClientBuilder

ServiceURL sets the base URI for the Danube service in the builder.

Parameters: - url: The base URI to use for connecting to the Danube service.

Returns: - *DanubeClientBuilder: The updated builder instance with the new service URL.

func (*DanubeClientBuilder) WithConnectionOptions

func (b *DanubeClientBuilder) WithConnectionOptions(options ConnectionOptions) *DanubeClientBuilder

WithConnectionOptions sets connection settings for the client in the builder.

func (*DanubeClientBuilder) WithDialOptions added in v0.4.0

func (b *DanubeClientBuilder) WithDialOptions(options ...DialOption) *DanubeClientBuilder

WithDialOptions appends gRPC dial options to the connection options.

func (*DanubeClientBuilder) WithMTLS added in v0.4.0

func (b *DanubeClientBuilder) WithMTLS(caCertPath, clientCertPath, clientKeyPath string) (*DanubeClientBuilder, error)

WithMTLS enables mutual TLS using CA, client certificate, and client key.

func (*DanubeClientBuilder) WithTLS added in v0.4.0

func (b *DanubeClientBuilder) WithTLS(caCertPath string) (*DanubeClientBuilder, error)

WithTLS enables TLS using a custom CA certificate.

func (*DanubeClientBuilder) WithToken added in v0.6.0

func (b *DanubeClientBuilder) WithToken(token string) *DanubeClientBuilder

WithToken sets the authentication token (JWT) for the client.

Use `danube-admin security tokens create` to generate a token. Automatically enables TLS. If no TLS config has been set via WithTLS() or WithMTLS(), a default TLS config using system root certificates is applied.

For tokens that expire, consider WithTokenSupplier instead, which allows runtime token refresh.

func (*DanubeClientBuilder) WithTokenSupplier added in v0.6.0

func (b *DanubeClientBuilder) WithTokenSupplier(supplier func() string) *DanubeClientBuilder

WithTokenSupplier sets a dynamic token supplier for the client.

The supplier function is called on every gRPC request to obtain the current token, enabling runtime token refresh without restarting the client. This is useful for:

  • File-based tokens: Read from a file updated by infrastructure (e.g., K8s projected volumes, sidecar token refreshers)
  • Environment-based tokens: Read from an environment variable
  • Custom refresh logic: Implement your own token rotation

Automatically enables TLS (same as WithToken).

type DialOption

type DialOption func(*[]grpc.DialOption)

DialOption is a function that configures gRPC dial options.

func WithConnectionTimeout

func WithConnectionTimeout(timeout time.Duration) DialOption

WithConnectionTimeout configures the connection timeout for the connection.

func WithKeepAliveInterval

func WithKeepAliveInterval(interval time.Duration) DialOption

WithKeepAliveInterval configures the keepalive interval for the connection.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a message producer that is responsible for sending messages to a specific partitioned or non-partitioned topic on a message broker. It handles producer creation, message sending, and maintains the producer's state.

func (*Producer) Create

func (p *Producer) Create(ctx context.Context) error

Create initializes the producer and registers it with the message brokers.

Parameters: - ctx: The context for managing request lifecycle and cancellation.

Returns: - error: An error if producer creation fails.

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, data []byte, attributes map[string]string) (uint64, error)

Send sends a message to the topic associated with this producer.

It constructs a message request and sends it to the broker. The method handles payload and error reporting. It assumes that the producer has been successfully created and is ready to send messages.

Parameters: - ctx: The context for managing request lifecycle and cancellation. - data: The message payload to be sent. - attributes: user-defined properties or attributes associated with the message

Returns: - uint64: The sequence ID of the sent message if successful. - error: An error if message sending fail

type ProducerBuilder

type ProducerBuilder struct {
	// contains filtered or unexported fields
}

ProducerBuilder is a builder for creating a new Producer instance. It allows setting various properties for the producer such as topic, name, schema, and options.

func (*ProducerBuilder) Build

func (pb *ProducerBuilder) Build() (*Producer, error)

Build creates a new Producer instance using the settings configured in the ProducerBuilder. It performs validation to ensure that required fields are set before creating the producer.

Returns: - *Producer: A pointer to the newly created Producer instance if successful. - error: An error if required fields are missing or if producer creation fails.

func (*ProducerBuilder) WithDispatchStrategy

func (pb *ProducerBuilder) WithDispatchStrategy(dispatch_strategy *ConfigDispatchStrategy) *ProducerBuilder

WithDispatchStrategy sets the dispatch strategy for the producer. This method configures the retention strategy for the producer, which determines how messages are stored and managed.

Parameters: - dispatch_strategy: The dispatch strategy for the producer.

func (*ProducerBuilder) WithName

func (pb *ProducerBuilder) WithName(producerName string) *ProducerBuilder

WithName sets the name of the producer. This is a required field.

Parameters: - producerName: The name assigned to the producer instance.

func (*ProducerBuilder) WithOptions

func (pb *ProducerBuilder) WithOptions(options ProducerOptions) *ProducerBuilder

WithOptions sets the configuration options for the producer. This allows for customization of producer behavior.

Parameters: - options: Configuration options for the producer.

func (*ProducerBuilder) WithPartitions

func (pb *ProducerBuilder) WithPartitions(partitions int32) *ProducerBuilder

WithPartitions sets the number of topic partitions.

Parameters: - partitions: The number of partitions for a new topic.

func (*ProducerBuilder) WithSchemaMinVersion added in v0.4.0

func (pb *ProducerBuilder) WithSchemaMinVersion(subject string, minVersion uint32) *ProducerBuilder

WithSchemaMinVersion enforces a minimum schema version.

func (*ProducerBuilder) WithSchemaReference added in v0.4.0

func (pb *ProducerBuilder) WithSchemaReference(schemaRef *proto.SchemaReference) *ProducerBuilder

WithSchemaReference sets a custom SchemaReference (advanced use).

func (*ProducerBuilder) WithSchemaSubject added in v0.4.0

func (pb *ProducerBuilder) WithSchemaSubject(subject string) *ProducerBuilder

WithSchemaSubject sets the schema registry subject (uses latest version).

func (*ProducerBuilder) WithSchemaVersion added in v0.4.0

func (pb *ProducerBuilder) WithSchemaVersion(subject string, version uint32) *ProducerBuilder

WithSchemaVersion pins a schema subject to a specific version.

func (*ProducerBuilder) WithTopic

func (pb *ProducerBuilder) WithTopic(topic string) *ProducerBuilder

WithTopic sets the topic name for the producer. This is a required field.

Parameters: - topic: The name of the topic for the producer.

type ProducerOptions

type ProducerOptions struct {
	MaxRetries    int   // Maximum retry attempts for create/send operations.
	BaseBackoffMs int64 // Base backoff in milliseconds.
	MaxBackoffMs  int64 // Maximum backoff in milliseconds.
}

ProducerOptions configures retry behavior for producers.

type SchemaInfo added in v0.4.0

type SchemaInfo struct {
	SchemaID         uint64
	Subject          string
	Version          uint32
	SchemaType       string
	SchemaDefinition []byte
	Fingerprint      string
}

SchemaInfo is a user-friendly wrapper around GetSchemaResponse.

func (SchemaInfo) SchemaDefinitionAsString added in v0.4.0

func (s SchemaInfo) SchemaDefinitionAsString() (string, error)

SchemaDefinitionAsString returns the schema definition as a string.

type SchemaRegistrationBuilder added in v0.4.0

type SchemaRegistrationBuilder struct {
	// contains filtered or unexported fields
}

SchemaRegistrationBuilder provides a fluent API for schema registration.

func (*SchemaRegistrationBuilder) Execute added in v0.4.0

Execute registers the schema and returns the schema ID.

func (*SchemaRegistrationBuilder) WithCreatedBy added in v0.4.0

func (b *SchemaRegistrationBuilder) WithCreatedBy(createdBy string) *SchemaRegistrationBuilder

WithCreatedBy sets the creator metadata for the schema.

func (*SchemaRegistrationBuilder) WithDescription added in v0.4.0

func (b *SchemaRegistrationBuilder) WithDescription(description string) *SchemaRegistrationBuilder

WithDescription sets the optional schema description.

func (*SchemaRegistrationBuilder) WithSchemaData added in v0.4.0

func (b *SchemaRegistrationBuilder) WithSchemaData(data []byte) *SchemaRegistrationBuilder

WithSchemaData sets the schema definition payload.

func (*SchemaRegistrationBuilder) WithTags added in v0.4.0

WithTags sets optional tags for the schema.

func (*SchemaRegistrationBuilder) WithType added in v0.4.0

WithType sets the schema type for registration.

type SchemaRegistryClient added in v0.4.0

type SchemaRegistryClient struct {
	// contains filtered or unexported fields
}

SchemaRegistryClient provides access to the schema registry APIs.

func (*SchemaRegistryClient) CheckCompatibility added in v0.4.0

func (c *SchemaRegistryClient) CheckCompatibility(ctx context.Context, subject string, schemaData []byte, schemaType SchemaType, mode *CompatibilityMode) (*proto.CheckCompatibilityResponse, error)

CheckCompatibility validates new schema data against existing versions.

func (*SchemaRegistryClient) ConfigureTopicSchema added in v0.4.0

func (c *SchemaRegistryClient) ConfigureTopicSchema(ctx context.Context, topicName, schemaSubject, validationPolicy string, enablePayloadValidation bool) (*proto.ConfigureTopicSchemaResponse, error)

ConfigureTopicSchema assigns a schema subject and validation policy to a topic (admin-only).

func (*SchemaRegistryClient) DeleteSchemaVersion added in v0.4.0

func (c *SchemaRegistryClient) DeleteSchemaVersion(ctx context.Context, subject string, version uint32) (*proto.DeleteSchemaVersionResponse, error)

DeleteSchemaVersion removes a specific schema version.

func (*SchemaRegistryClient) GetLatestSchema added in v0.4.0

func (c *SchemaRegistryClient) GetLatestSchema(ctx context.Context, subject string) (SchemaInfo, error)

GetLatestSchema fetches the latest schema for a subject.

func (*SchemaRegistryClient) GetSchemaByID added in v0.4.0

func (c *SchemaRegistryClient) GetSchemaByID(ctx context.Context, schemaID uint64) (SchemaInfo, error)

GetSchemaByID fetches schema information for a schema ID (latest version).

func (*SchemaRegistryClient) GetSchemaVersion added in v0.4.0

func (c *SchemaRegistryClient) GetSchemaVersion(ctx context.Context, schemaID uint64, version *uint32) (SchemaInfo, error)

GetSchemaVersion fetches a specific version for a schema ID.

func (*SchemaRegistryClient) GetTopicSchemaConfig added in v0.4.0

func (c *SchemaRegistryClient) GetTopicSchemaConfig(ctx context.Context, topicName string) (*proto.GetTopicSchemaConfigResponse, error)

GetTopicSchemaConfig returns schema configuration for a topic.

func (*SchemaRegistryClient) ListVersions added in v0.4.0

func (c *SchemaRegistryClient) ListVersions(ctx context.Context, subject string) ([]uint32, error)

ListVersions returns all versions for a subject.

func (*SchemaRegistryClient) RegisterSchema added in v0.4.0

func (c *SchemaRegistryClient) RegisterSchema(subject string) *SchemaRegistrationBuilder

RegisterSchema returns a builder for schema registration.

func (*SchemaRegistryClient) SetCompatibilityMode added in v0.4.0

func (c *SchemaRegistryClient) SetCompatibilityMode(ctx context.Context, subject string, mode CompatibilityMode) (*proto.SetCompatibilityModeResponse, error)

SetCompatibilityMode sets the compatibility mode for a subject.

func (*SchemaRegistryClient) UpdateTopicValidationPolicy added in v0.4.0

func (c *SchemaRegistryClient) UpdateTopicValidationPolicy(ctx context.Context, topicName, validationPolicy string, enablePayloadValidation bool) (*proto.UpdateTopicValidationPolicyResponse, error)

UpdateTopicValidationPolicy updates validation policy for a topic (admin-only).

type SchemaType

type SchemaType int

SchemaType defines supported schema formats.

const (
	SchemaTypeBytes SchemaType = iota
	SchemaTypeString
	SchemaTypeNumber
	SchemaTypeAvro
	SchemaTypeJSONSchema
	SchemaTypeProtobuf
)

func ParseSchemaType added in v0.4.0

func ParseSchemaType(value string) (SchemaType, error)

ParseSchemaType parses a schema type string.

func (SchemaType) AsString added in v0.4.0

func (s SchemaType) AsString() string

AsString returns the wire representation of the schema type.

type SubType

type SubType int

SubType is the type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER).

const (
	// Exclusive - only one consumer can subscribe to a specific subscription
	Exclusive SubType = iota
	//  Shared - multiple consumers can subscribe, messages are delivered round-robin
	Shared
	// FailOver - similar to exclusive subscriptions, but multiple consumers can subscribe, and one actively receives messages
	FailOver
)

type TokenSupplier added in v0.6.0

type TokenSupplier func() string

TokenSupplier is a function that returns a token string, called on every request. This enables dynamic token refresh (e.g., reading from a file that is periodically updated by infrastructure like K8s projected volumes).

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL