Documentation
¶
Index ¶
- type CompatibilityMode
- type ConfigDispatchStrategy
- type ConnectionOptions
- type Consumer
- func (c *Consumer) Ack(ctx context.Context, message *proto.StreamMessage) (*proto.AckResponse, error)
- func (c *Consumer) Close()
- func (c *Consumer) Nack(ctx context.Context, message *proto.StreamMessage, delayMs *uint64, ...) (*proto.NackResponse, error)
- func (c *Consumer) Receive(ctx context.Context) (chan *proto.StreamMessage, error)
- func (c *Consumer) Subscribe(ctx context.Context) error
- type ConsumerBuilder
- func (b *ConsumerBuilder) Build() (*Consumer, error)
- func (b *ConsumerBuilder) WithConsumerName(name string) *ConsumerBuilder
- func (b *ConsumerBuilder) WithOptions(options ConsumerOptions) *ConsumerBuilder
- func (b *ConsumerBuilder) WithSubscription(subscription string) *ConsumerBuilder
- func (b *ConsumerBuilder) WithSubscriptionType(subType SubType) *ConsumerBuilder
- func (b *ConsumerBuilder) WithTopic(topic string) *ConsumerBuilder
- type ConsumerOptions
- type DanubeClient
- type DanubeClientBuilder
- func (b *DanubeClientBuilder) Build() (*DanubeClient, error)
- func (b *DanubeClientBuilder) ServiceURL(url string) *DanubeClientBuilder
- func (b *DanubeClientBuilder) WithConnectionOptions(options ConnectionOptions) *DanubeClientBuilder
- func (b *DanubeClientBuilder) WithDialOptions(options ...DialOption) *DanubeClientBuilder
- func (b *DanubeClientBuilder) WithMTLS(caCertPath, clientCertPath, clientKeyPath string) (*DanubeClientBuilder, error)
- func (b *DanubeClientBuilder) WithTLS(caCertPath string) (*DanubeClientBuilder, error)
- func (b *DanubeClientBuilder) WithToken(token string) *DanubeClientBuilder
- func (b *DanubeClientBuilder) WithTokenSupplier(supplier func() string) *DanubeClientBuilder
- type DialOption
- type Producer
- type ProducerBuilder
- func (pb *ProducerBuilder) Build() (*Producer, error)
- func (pb *ProducerBuilder) WithDispatchStrategy(dispatch_strategy *ConfigDispatchStrategy) *ProducerBuilder
- func (pb *ProducerBuilder) WithName(producerName string) *ProducerBuilder
- func (pb *ProducerBuilder) WithOptions(options ProducerOptions) *ProducerBuilder
- func (pb *ProducerBuilder) WithPartitions(partitions int32) *ProducerBuilder
- func (pb *ProducerBuilder) WithSchemaMinVersion(subject string, minVersion uint32) *ProducerBuilder
- func (pb *ProducerBuilder) WithSchemaReference(schemaRef *proto.SchemaReference) *ProducerBuilder
- func (pb *ProducerBuilder) WithSchemaSubject(subject string) *ProducerBuilder
- func (pb *ProducerBuilder) WithSchemaVersion(subject string, version uint32) *ProducerBuilder
- func (pb *ProducerBuilder) WithTopic(topic string) *ProducerBuilder
- type ProducerOptions
- type SchemaInfo
- type SchemaRegistrationBuilder
- func (b *SchemaRegistrationBuilder) Execute(ctx context.Context) (uint64, error)
- func (b *SchemaRegistrationBuilder) WithCreatedBy(createdBy string) *SchemaRegistrationBuilder
- func (b *SchemaRegistrationBuilder) WithDescription(description string) *SchemaRegistrationBuilder
- func (b *SchemaRegistrationBuilder) WithSchemaData(data []byte) *SchemaRegistrationBuilder
- func (b *SchemaRegistrationBuilder) WithTags(tags []string) *SchemaRegistrationBuilder
- func (b *SchemaRegistrationBuilder) WithType(schemaType SchemaType) *SchemaRegistrationBuilder
- type SchemaRegistryClient
- func (c *SchemaRegistryClient) CheckCompatibility(ctx context.Context, subject string, schemaData []byte, schemaType SchemaType, ...) (*proto.CheckCompatibilityResponse, error)
- func (c *SchemaRegistryClient) ConfigureTopicSchema(ctx context.Context, topicName, schemaSubject, validationPolicy string, ...) (*proto.ConfigureTopicSchemaResponse, error)
- func (c *SchemaRegistryClient) DeleteSchemaVersion(ctx context.Context, subject string, version uint32) (*proto.DeleteSchemaVersionResponse, error)
- func (c *SchemaRegistryClient) GetLatestSchema(ctx context.Context, subject string) (SchemaInfo, error)
- func (c *SchemaRegistryClient) GetSchemaByID(ctx context.Context, schemaID uint64) (SchemaInfo, error)
- func (c *SchemaRegistryClient) GetSchemaVersion(ctx context.Context, schemaID uint64, version *uint32) (SchemaInfo, error)
- func (c *SchemaRegistryClient) GetTopicSchemaConfig(ctx context.Context, topicName string) (*proto.GetTopicSchemaConfigResponse, error)
- func (c *SchemaRegistryClient) ListVersions(ctx context.Context, subject string) ([]uint32, error)
- func (c *SchemaRegistryClient) RegisterSchema(subject string) *SchemaRegistrationBuilder
- func (c *SchemaRegistryClient) SetCompatibilityMode(ctx context.Context, subject string, mode CompatibilityMode) (*proto.SetCompatibilityModeResponse, error)
- func (c *SchemaRegistryClient) UpdateTopicValidationPolicy(ctx context.Context, topicName, validationPolicy string, ...) (*proto.UpdateTopicValidationPolicyResponse, error)
- type SchemaType
- type SubType
- type TokenSupplier
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CompatibilityMode ¶ added in v0.4.0
type CompatibilityMode int
CompatibilityMode defines schema evolution rules.
const ( CompatibilityNone CompatibilityMode = iota CompatibilityBackward CompatibilityForward CompatibilityFull )
func ParseCompatibilityMode ¶ added in v0.4.0
func ParseCompatibilityMode(value string) (CompatibilityMode, error)
ParseCompatibilityMode parses a compatibility mode string.
func (CompatibilityMode) AsString ¶ added in v0.4.0
func (c CompatibilityMode) AsString() string
AsString returns the wire representation of the compatibility mode.
type ConfigDispatchStrategy ¶
type ConfigDispatchStrategy struct {
// contains filtered or unexported fields
}
ConfigDispatchStrategy represents the dispatch strategy for a topic. It now directly maps to the proto.DispatchStrategy enum.
func NewConfigDispatchStrategy ¶
func NewConfigDispatchStrategy() *ConfigDispatchStrategy
NewConfigDispatchStrategy creates a new ConfigDispatchStrategy instance with NonReliable as default.
func NewReliableDispatchStrategy ¶
func NewReliableDispatchStrategy() *ConfigDispatchStrategy
NewReliableDispatchStrategy creates a new reliable ConfigDispatchStrategy instance.
type ConnectionOptions ¶ added in v0.4.0
type ConnectionOptions struct {
DialOptions []DialOption // Optional gRPC dial options.
TLSConfig *tls.Config // TLS configuration (required when UseTLS is true).
UseTLS bool // Enable TLS/mTLS for the connection.
Token string // Static JWT token for authentication.
TokenSupplier TokenSupplier // Dynamic token supplier called per-request.
InternalBroker string // Broker-internal identity header (broker-to-broker only).
}
ConnectionOptions configures how the client connects to the broker.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a message consumer that subscribes to a topic and receives messages. It handles communication with the message broker and manages the consumer's state.
func (*Consumer) Ack ¶
func (c *Consumer) Ack(ctx context.Context, message *proto.StreamMessage) (*proto.AckResponse, error)
Ack acknowledges a received message.
func (*Consumer) Close ¶ added in v0.4.0
func (c *Consumer) Close()
Close stops receive loops and signals topic consumers to stop.
func (*Consumer) Nack ¶ added in v0.5.0
func (c *Consumer) Nack(ctx context.Context, message *proto.StreamMessage, delayMs *uint64, reason *string) (*proto.NackResponse, error)
Nack sends a negative acknowledgement for a received message.
func (*Consumer) Receive ¶
Receive starts receiving messages from the subscribed partitioned or non-partitioned topic. It continuously polls for new messages and handles them as long as the stopSignal has not been set to true.
Parameters: - ctx: The context for managing the receive operation.
Returns: - StreamMessage channel for receiving messages from the broker. - error: An error if the receive client cannot be created or if other issues occur.
func (*Consumer) Subscribe ¶
Subscribe initializes the subscription to the non-partitioned or partitioned topic and starts the health check service. It establishes a gRPC connection with the brokers and requests to subscribe to the topic.
Parameters: - ctx: The context for managing the subscription lifecycle.
Returns: - error: An error if the subscription fails or if initialization encounters issues.
type ConsumerBuilder ¶
type ConsumerBuilder struct {
// contains filtered or unexported fields
}
ConsumerBuilder is a builder for creating a new Consumer instance. It allows setting various properties for the consumer such as topic, name, subscription, subscription type, and options.
func (*ConsumerBuilder) Build ¶
func (b *ConsumerBuilder) Build() (*Consumer, error)
Build creates a new Consumer instance using the settings configured in the ConsumerBuilder. It performs validation to ensure that all required fields are set before creating the consumer.
Returns: - *Consumer: A pointer to the newly created Consumer instance if successful. - error: An error if required fields are missing or if consumer creation fails.
func (*ConsumerBuilder) WithConsumerName ¶
func (b *ConsumerBuilder) WithConsumerName(name string) *ConsumerBuilder
WithConsumerName sets the name of the consumer. This is a required field.
Parameters: - name: The name assigned to the consumer instance.
func (*ConsumerBuilder) WithOptions ¶ added in v0.4.0
func (b *ConsumerBuilder) WithOptions(options ConsumerOptions) *ConsumerBuilder
WithOptions sets the configuration options for the consumer.
func (*ConsumerBuilder) WithSubscription ¶
func (b *ConsumerBuilder) WithSubscription(subscription string) *ConsumerBuilder
WithSubscription sets the name of the subscription for the consumer. This is a required field.
Parameters: - subscription: The name of the subscription for the consumer.
func (*ConsumerBuilder) WithSubscriptionType ¶
func (b *ConsumerBuilder) WithSubscriptionType(subType SubType) *ConsumerBuilder
WithSubscriptionType sets the type of subscription for the consumer. This field is optional.
Parameters: - subType: The type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER).
func (*ConsumerBuilder) WithTopic ¶
func (b *ConsumerBuilder) WithTopic(topic string) *ConsumerBuilder
WithTopic sets the topic name for the consumer. This is a required field.
Parameters: - topic: The name of the topic for the consumer.
type ConsumerOptions ¶
type ConsumerOptions struct {
MaxRetries int // Maximum retry attempts for receive/subscribe operations.
BaseBackoffMs int64 // Base backoff in milliseconds.
MaxBackoffMs int64 // Maximum backoff in milliseconds.
}
ConsumerOptions configures retry behavior for consumers.
type DanubeClient ¶
type DanubeClient struct {
URI string
// contains filtered or unexported fields
}
DanubeClient is the main client for interacting with the Danube messaging system. It provides methods to create producers and consumers, and retrieve schema information.
func (*DanubeClient) NewConsumer ¶
func (dc *DanubeClient) NewConsumer() *ConsumerBuilder
NewConsumer returns a new ConsumerBuilder, which is used to configure and create a Consumer instance.
func (*DanubeClient) NewProducer ¶
func (dc *DanubeClient) NewProducer() *ProducerBuilder
NewProducer returns a new ProducerBuilder, which is used to configure and create a Producer instance.
func (*DanubeClient) Schema ¶ added in v0.4.0
func (dc *DanubeClient) Schema() *SchemaRegistryClient
Schema returns a SchemaRegistryClient for schema operations.
type DanubeClientBuilder ¶
type DanubeClientBuilder struct {
URI string
// contains filtered or unexported fields
}
DanubeClientBuilder is used for configuring and creating a DanubeClient instance. It provides methods for setting various options, including the service URL, connection options, and logger.
Fields: - URI: The base URI for the Danube service. This is required for constructing the client. - ConnectionOptions: Optional connection settings for configuring how the client connects to the service.
func NewClient ¶
func NewClient() *DanubeClientBuilder
NewClient initializes a new DanubeClientBuilder. The builder pattern allows for configuring and constructing a DanubeClient instance with optional settings and options.
Returns: - *DanubeClientBuilder: A new instance of DanubeClientBuilder for configuring and building a DanubeClient.
func (*DanubeClientBuilder) Build ¶
func (b *DanubeClientBuilder) Build() (*DanubeClient, error)
Build constructs and returns a DanubeClient instance based on the configuration specified in the builder.
Returns: - *DanubeClient: A new instance of DanubeClient configured with the specified options. - error: An error if the configuration is invalid or incomplete.
func (*DanubeClientBuilder) ServiceURL ¶
func (b *DanubeClientBuilder) ServiceURL(url string) *DanubeClientBuilder
ServiceURL sets the base URI for the Danube service in the builder.
Parameters: - url: The base URI to use for connecting to the Danube service.
Returns: - *DanubeClientBuilder: The updated builder instance with the new service URL.
func (*DanubeClientBuilder) WithConnectionOptions ¶
func (b *DanubeClientBuilder) WithConnectionOptions(options ConnectionOptions) *DanubeClientBuilder
WithConnectionOptions sets connection settings for the client in the builder.
func (*DanubeClientBuilder) WithDialOptions ¶ added in v0.4.0
func (b *DanubeClientBuilder) WithDialOptions(options ...DialOption) *DanubeClientBuilder
WithDialOptions appends gRPC dial options to the connection options.
func (*DanubeClientBuilder) WithMTLS ¶ added in v0.4.0
func (b *DanubeClientBuilder) WithMTLS(caCertPath, clientCertPath, clientKeyPath string) (*DanubeClientBuilder, error)
WithMTLS enables mutual TLS using CA, client certificate, and client key.
func (*DanubeClientBuilder) WithTLS ¶ added in v0.4.0
func (b *DanubeClientBuilder) WithTLS(caCertPath string) (*DanubeClientBuilder, error)
WithTLS enables TLS using a custom CA certificate.
func (*DanubeClientBuilder) WithToken ¶ added in v0.6.0
func (b *DanubeClientBuilder) WithToken(token string) *DanubeClientBuilder
WithToken sets the authentication token (JWT) for the client.
Use `danube-admin security tokens create` to generate a token. Automatically enables TLS. If no TLS config has been set via WithTLS() or WithMTLS(), a default TLS config using system root certificates is applied.
For tokens that expire, consider WithTokenSupplier instead, which allows runtime token refresh.
func (*DanubeClientBuilder) WithTokenSupplier ¶ added in v0.6.0
func (b *DanubeClientBuilder) WithTokenSupplier(supplier func() string) *DanubeClientBuilder
WithTokenSupplier sets a dynamic token supplier for the client.
The supplier function is called on every gRPC request to obtain the current token, enabling runtime token refresh without restarting the client. This is useful for:
- File-based tokens: Read from a file updated by infrastructure (e.g., K8s projected volumes, sidecar token refreshers)
- Environment-based tokens: Read from an environment variable
- Custom refresh logic: Implement your own token rotation
Automatically enables TLS (same as WithToken).
type DialOption ¶
type DialOption func(*[]grpc.DialOption)
DialOption is a function that configures gRPC dial options.
func WithConnectionTimeout ¶
func WithConnectionTimeout(timeout time.Duration) DialOption
WithConnectionTimeout configures the connection timeout for the connection.
func WithKeepAliveInterval ¶
func WithKeepAliveInterval(interval time.Duration) DialOption
WithKeepAliveInterval configures the keepalive interval for the connection.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer represents a message producer that is responsible for sending messages to a specific partitioned or non-partitioned topic on a message broker. It handles producer creation, message sending, and maintains the producer's state.
func (*Producer) Create ¶
Create initializes the producer and registers it with the message brokers.
Parameters: - ctx: The context for managing request lifecycle and cancellation.
Returns: - error: An error if producer creation fails.
func (*Producer) Send ¶
func (p *Producer) Send(ctx context.Context, data []byte, attributes map[string]string) (uint64, error)
Send sends a message to the topic associated with this producer.
It constructs a message request and sends it to the broker. The method handles payload and error reporting. It assumes that the producer has been successfully created and is ready to send messages.
Parameters: - ctx: The context for managing request lifecycle and cancellation. - data: The message payload to be sent. - attributes: user-defined properties or attributes associated with the message
Returns: - uint64: The sequence ID of the sent message if successful. - error: An error if message sending fail
type ProducerBuilder ¶
type ProducerBuilder struct {
// contains filtered or unexported fields
}
ProducerBuilder is a builder for creating a new Producer instance. It allows setting various properties for the producer such as topic, name, schema, and options.
func (*ProducerBuilder) Build ¶
func (pb *ProducerBuilder) Build() (*Producer, error)
Build creates a new Producer instance using the settings configured in the ProducerBuilder. It performs validation to ensure that required fields are set before creating the producer.
Returns: - *Producer: A pointer to the newly created Producer instance if successful. - error: An error if required fields are missing or if producer creation fails.
func (*ProducerBuilder) WithDispatchStrategy ¶
func (pb *ProducerBuilder) WithDispatchStrategy(dispatch_strategy *ConfigDispatchStrategy) *ProducerBuilder
WithDispatchStrategy sets the dispatch strategy for the producer. This method configures the retention strategy for the producer, which determines how messages are stored and managed.
Parameters: - dispatch_strategy: The dispatch strategy for the producer.
func (*ProducerBuilder) WithName ¶
func (pb *ProducerBuilder) WithName(producerName string) *ProducerBuilder
WithName sets the name of the producer. This is a required field.
Parameters: - producerName: The name assigned to the producer instance.
func (*ProducerBuilder) WithOptions ¶
func (pb *ProducerBuilder) WithOptions(options ProducerOptions) *ProducerBuilder
WithOptions sets the configuration options for the producer. This allows for customization of producer behavior.
Parameters: - options: Configuration options for the producer.
func (*ProducerBuilder) WithPartitions ¶
func (pb *ProducerBuilder) WithPartitions(partitions int32) *ProducerBuilder
WithPartitions sets the number of topic partitions.
Parameters: - partitions: The number of partitions for a new topic.
func (*ProducerBuilder) WithSchemaMinVersion ¶ added in v0.4.0
func (pb *ProducerBuilder) WithSchemaMinVersion(subject string, minVersion uint32) *ProducerBuilder
WithSchemaMinVersion enforces a minimum schema version.
func (*ProducerBuilder) WithSchemaReference ¶ added in v0.4.0
func (pb *ProducerBuilder) WithSchemaReference(schemaRef *proto.SchemaReference) *ProducerBuilder
WithSchemaReference sets a custom SchemaReference (advanced use).
func (*ProducerBuilder) WithSchemaSubject ¶ added in v0.4.0
func (pb *ProducerBuilder) WithSchemaSubject(subject string) *ProducerBuilder
WithSchemaSubject sets the schema registry subject (uses latest version).
func (*ProducerBuilder) WithSchemaVersion ¶ added in v0.4.0
func (pb *ProducerBuilder) WithSchemaVersion(subject string, version uint32) *ProducerBuilder
WithSchemaVersion pins a schema subject to a specific version.
func (*ProducerBuilder) WithTopic ¶
func (pb *ProducerBuilder) WithTopic(topic string) *ProducerBuilder
WithTopic sets the topic name for the producer. This is a required field.
Parameters: - topic: The name of the topic for the producer.
type ProducerOptions ¶
type ProducerOptions struct {
MaxRetries int // Maximum retry attempts for create/send operations.
BaseBackoffMs int64 // Base backoff in milliseconds.
MaxBackoffMs int64 // Maximum backoff in milliseconds.
}
ProducerOptions configures retry behavior for producers.
type SchemaInfo ¶ added in v0.4.0
type SchemaInfo struct {
SchemaID uint64
Subject string
Version uint32
SchemaType string
SchemaDefinition []byte
Fingerprint string
}
SchemaInfo is a user-friendly wrapper around GetSchemaResponse.
func (SchemaInfo) SchemaDefinitionAsString ¶ added in v0.4.0
func (s SchemaInfo) SchemaDefinitionAsString() (string, error)
SchemaDefinitionAsString returns the schema definition as a string.
type SchemaRegistrationBuilder ¶ added in v0.4.0
type SchemaRegistrationBuilder struct {
// contains filtered or unexported fields
}
SchemaRegistrationBuilder provides a fluent API for schema registration.
func (*SchemaRegistrationBuilder) Execute ¶ added in v0.4.0
func (b *SchemaRegistrationBuilder) Execute(ctx context.Context) (uint64, error)
Execute registers the schema and returns the schema ID.
func (*SchemaRegistrationBuilder) WithCreatedBy ¶ added in v0.4.0
func (b *SchemaRegistrationBuilder) WithCreatedBy(createdBy string) *SchemaRegistrationBuilder
WithCreatedBy sets the creator metadata for the schema.
func (*SchemaRegistrationBuilder) WithDescription ¶ added in v0.4.0
func (b *SchemaRegistrationBuilder) WithDescription(description string) *SchemaRegistrationBuilder
WithDescription sets the optional schema description.
func (*SchemaRegistrationBuilder) WithSchemaData ¶ added in v0.4.0
func (b *SchemaRegistrationBuilder) WithSchemaData(data []byte) *SchemaRegistrationBuilder
WithSchemaData sets the schema definition payload.
func (*SchemaRegistrationBuilder) WithTags ¶ added in v0.4.0
func (b *SchemaRegistrationBuilder) WithTags(tags []string) *SchemaRegistrationBuilder
WithTags sets optional tags for the schema.
func (*SchemaRegistrationBuilder) WithType ¶ added in v0.4.0
func (b *SchemaRegistrationBuilder) WithType(schemaType SchemaType) *SchemaRegistrationBuilder
WithType sets the schema type for registration.
type SchemaRegistryClient ¶ added in v0.4.0
type SchemaRegistryClient struct {
// contains filtered or unexported fields
}
SchemaRegistryClient provides access to the schema registry APIs.
func (*SchemaRegistryClient) CheckCompatibility ¶ added in v0.4.0
func (c *SchemaRegistryClient) CheckCompatibility(ctx context.Context, subject string, schemaData []byte, schemaType SchemaType, mode *CompatibilityMode) (*proto.CheckCompatibilityResponse, error)
CheckCompatibility validates new schema data against existing versions.
func (*SchemaRegistryClient) ConfigureTopicSchema ¶ added in v0.4.0
func (c *SchemaRegistryClient) ConfigureTopicSchema(ctx context.Context, topicName, schemaSubject, validationPolicy string, enablePayloadValidation bool) (*proto.ConfigureTopicSchemaResponse, error)
ConfigureTopicSchema assigns a schema subject and validation policy to a topic (admin-only).
func (*SchemaRegistryClient) DeleteSchemaVersion ¶ added in v0.4.0
func (c *SchemaRegistryClient) DeleteSchemaVersion(ctx context.Context, subject string, version uint32) (*proto.DeleteSchemaVersionResponse, error)
DeleteSchemaVersion removes a specific schema version.
func (*SchemaRegistryClient) GetLatestSchema ¶ added in v0.4.0
func (c *SchemaRegistryClient) GetLatestSchema(ctx context.Context, subject string) (SchemaInfo, error)
GetLatestSchema fetches the latest schema for a subject.
func (*SchemaRegistryClient) GetSchemaByID ¶ added in v0.4.0
func (c *SchemaRegistryClient) GetSchemaByID(ctx context.Context, schemaID uint64) (SchemaInfo, error)
GetSchemaByID fetches schema information for a schema ID (latest version).
func (*SchemaRegistryClient) GetSchemaVersion ¶ added in v0.4.0
func (c *SchemaRegistryClient) GetSchemaVersion(ctx context.Context, schemaID uint64, version *uint32) (SchemaInfo, error)
GetSchemaVersion fetches a specific version for a schema ID.
func (*SchemaRegistryClient) GetTopicSchemaConfig ¶ added in v0.4.0
func (c *SchemaRegistryClient) GetTopicSchemaConfig(ctx context.Context, topicName string) (*proto.GetTopicSchemaConfigResponse, error)
GetTopicSchemaConfig returns schema configuration for a topic.
func (*SchemaRegistryClient) ListVersions ¶ added in v0.4.0
ListVersions returns all versions for a subject.
func (*SchemaRegistryClient) RegisterSchema ¶ added in v0.4.0
func (c *SchemaRegistryClient) RegisterSchema(subject string) *SchemaRegistrationBuilder
RegisterSchema returns a builder for schema registration.
func (*SchemaRegistryClient) SetCompatibilityMode ¶ added in v0.4.0
func (c *SchemaRegistryClient) SetCompatibilityMode(ctx context.Context, subject string, mode CompatibilityMode) (*proto.SetCompatibilityModeResponse, error)
SetCompatibilityMode sets the compatibility mode for a subject.
func (*SchemaRegistryClient) UpdateTopicValidationPolicy ¶ added in v0.4.0
func (c *SchemaRegistryClient) UpdateTopicValidationPolicy(ctx context.Context, topicName, validationPolicy string, enablePayloadValidation bool) (*proto.UpdateTopicValidationPolicyResponse, error)
UpdateTopicValidationPolicy updates validation policy for a topic (admin-only).
type SchemaType ¶
type SchemaType int
SchemaType defines supported schema formats.
const ( SchemaTypeBytes SchemaType = iota SchemaTypeString SchemaTypeNumber SchemaTypeAvro SchemaTypeJSONSchema SchemaTypeProtobuf )
func ParseSchemaType ¶ added in v0.4.0
func ParseSchemaType(value string) (SchemaType, error)
ParseSchemaType parses a schema type string.
func (SchemaType) AsString ¶ added in v0.4.0
func (s SchemaType) AsString() string
AsString returns the wire representation of the schema type.
type SubType ¶
type SubType int
SubType is the type of subscription (e.g., EXCLUSIVE, SHARED, FAILOVER).
type TokenSupplier ¶ added in v0.6.0
type TokenSupplier func() string
TokenSupplier is a function that returns a token string, called on every request. This enables dynamic token refresh (e.g., reading from a file that is periodically updated by infrastructure like K8s projected volumes).
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
multi_partitions/consumer
command
|
|
|
multi_partitions/producer
command
|
|
|
reliable_dispatch/consumer
command
|
|
|
reliable_dispatch/producer
command
|
|
|
schema_json/consumer
command
|
|
|
schema_json/producer
command
|
|
|
schema_string/consumer
command
|
|
|
schema_string/producer
command
|
|