iggcon

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: Apache-2.0 Imports: 10 Imported by: 3

Documentation

Index

Constants

View Source
const (
	Microsecond Duration = 1
	Millisecond          = 1000 * Microsecond
	Second               = 1000 * Millisecond
	Minute               = 60 * Second
	Hour                 = 60 * Minute
)
View Source
const (
	// MaxPayloadSize is maximum allowed size in bytes for a message payload.
	//
	// This constant defines the upper limit for the size of an IggyMessage payload. Attempting to create a message
	// with a payload larger than this value will result
	// in an ierror.TooBigUserMessagePayload error.
	//
	//  Constraints
	//  - Minimum payload size: 1 byte (empty payloads are not allowed)
	//  - Maximum payload size: 10 MB
	MaxPayloadSize = 10 * 1000 * 1000

	// MaxUserHeadersSize is maximum allowed size in bytes for user-defined headers.
	//
	// This constant defines the upper limit for the combined size of all user headers in an IggyMessage. Attempting to
	// create a message with user headers larger than this value will result in an ierror.TooBigUserHeaders error.
	//
	//  Constraints
	//  - Maximum headers size: 100 KB
	//  - Each individual header key is limited to 255 bytes
	//  - Each individual header value is limited to 255 bytes
	MaxUserHeadersSize = 100 * 1000
)
View Source
const MaxLeaderRedirects uint8 = 3
View Source
const MaxNodesPerCluster = 64
View Source
const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4 + 8
View Source
const Version = "0.7.0"

TODO: Wire Version into binary_serialization/log_in_request_serializer.go to send the actual SDK version during login instead of an empty string.

Variables

This section is empty.

Functions

func GetHeadersBytes

func GetHeadersBytes(headers []HeaderEntry) []byte

Types

type ChangePasswordRequest

type ChangePasswordRequest struct {
	UserID          Identifier `json:"-"`
	CurrentPassword string     `json:"CurrentPassword"`
	NewPassword     string     `json:"NewPassword"`
}

type Client added in v0.7.0

type Client interface {
	// Close closes the client and releases all the resources.
	Close() error

	// GetConnectionInfo returns the current connection information including protocol and server address
	GetConnectionInfo() *ConnectionInfo

	// GetClusterMetadata get the metadata of the cluster including node information, roles, and status.
	// Authentication is required.
	GetClusterMetadata() (*ClusterMetadata, error)

	// GetStream get the info about a specific stream by unique ID or name.
	// Authentication is required, and the permission to read the streams.
	GetStream(streamId Identifier) (*StreamDetails, error)

	// GetStreams get the info about all the streams.
	// Authentication is required, and the permission to read the streams.
	GetStreams() ([]Stream, error)

	// CreateStream create a new stream.
	// Authentication is required, and the permission to manage the streams.
	CreateStream(name string) (*StreamDetails, error)

	// UpdateStream update a stream by unique ID or name.
	// Authentication is required, and the permission to manage the streams.
	UpdateStream(streamId Identifier, name string) error

	// DeleteStream delete a topic by unique ID or name.
	// Authentication is required, and the permission to manage the topics.
	DeleteStream(id Identifier) error

	// GetTopic Get the info about a specific topic by unique ID or name.
	// Authentication is required, and the permission to read the topics.
	GetTopic(streamId, topicId Identifier) (*TopicDetails, error)

	// GetTopics get the info about all the topics.
	// Authentication is required, and the permission to read the topics.
	GetTopics(streamId Identifier) ([]Topic, error)

	// CreateTopic create a new topic.
	// Authentication is required, and the permission to manage the topics.
	CreateTopic(
		streamId Identifier,
		name string,
		partitionsCount uint32,
		compressionAlgorithm CompressionAlgorithm,
		messageExpiry Duration,
		maxTopicSize uint64,
		replicationFactor *uint8,
	) (*TopicDetails, error)

	// UpdateTopic update a topic by unique ID or name.
	// Authentication is required, and the permission to manage the topics.
	UpdateTopic(
		streamId Identifier,
		topicId Identifier,
		name string,
		compressionAlgorithm CompressionAlgorithm,
		messageExpiry Duration,
		maxTopicSize uint64,
		replicationFactor *uint8,
	) error

	// DeleteTopic delete a topic by unique ID or name.
	// Authentication is required, and the permission to manage the topics.
	DeleteTopic(streamId, topicId Identifier) error

	// SendMessages sends messages using specified partitioning strategy to the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to send the messages.
	SendMessages(
		streamId Identifier,
		topicId Identifier,
		partitioning Partitioning,
		messages []IggyMessage,
	) error

	// PollMessages poll given amount of messages using the specified consumer and strategy from the specified stream and topic by unique IDs or names.
	// Authentication is required, and the permission to poll the messages.
	PollMessages(
		streamId Identifier,
		topicId Identifier,
		consumer Consumer,
		strategy PollingStrategy,
		count uint32,
		autoCommit bool,
		partitionId *uint32,
	) (*PolledMessage, error)

	// StoreConsumerOffset store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to poll the messages.
	StoreConsumerOffset(
		consumer Consumer,
		streamId Identifier,
		topicId Identifier,
		offset uint64,
		partitionId *uint32,
	) error

	// GetConsumerOffset get the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to poll the messages.
	GetConsumerOffset(
		consumer Consumer,
		streamId Identifier,
		topicId Identifier,
		partitionId *uint32,
	) (*ConsumerOffsetInfo, error)

	// GetConsumerGroups get the info about all the consumer groups for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to read the streams or topics.
	GetConsumerGroups(streamId Identifier, topicId Identifier) ([]ConsumerGroup, error)

	// DeleteConsumerOffset delete the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to poll the messages.
	DeleteConsumerOffset(
		consumer Consumer,
		streamId Identifier,
		topicId Identifier,
		partitionId *uint32,
	) error

	// GetConsumerGroup get the info about a specific consumer group by unique ID or name for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to read the streams or topics.
	GetConsumerGroup(
		streamId Identifier,
		topicId Identifier,
		groupId Identifier,
	) (*ConsumerGroupDetails, error)

	// CreateConsumerGroup create a new consumer group for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to manage the streams or topics.
	CreateConsumerGroup(
		streamId Identifier,
		topicId Identifier,
		name string,
	) (*ConsumerGroupDetails, error)

	// DeleteConsumerGroup delete a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to manage the streams or topics.
	DeleteConsumerGroup(
		streamId Identifier,
		topicId Identifier,
		groupId Identifier,
	) error

	// JoinConsumerGroup join a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to read the streams or topics.
	JoinConsumerGroup(
		streamId Identifier,
		topicId Identifier,
		groupId Identifier,
	) error

	// LeaveConsumerGroup leave a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
	// Authentication is required, and the permission to read the streams or topics.
	LeaveConsumerGroup(
		streamId Identifier,
		topicId Identifier,
		groupId Identifier,
	) error

	// CreatePartitions create new N partitions for a topic by unique ID or name.
	// For example, given a topic with 3 partitions, if you create 2 partitions, the topic will have 5 partitions (from 1 to 5).
	// Authentication is required, and the permission to manage the partitions.
	CreatePartitions(
		streamId Identifier,
		topicId Identifier,
		partitionsCount uint32,
	) error

	// DeletePartitions delete last N partitions for a topic by unique ID or name.
	// For example, given a topic with 5 partitions, if you delete 2 partitions, the topic will have 3 partitions left (from 1 to 3).
	// Authentication is required, and the permission to manage the partitions.
	DeletePartitions(
		streamId Identifier,
		topicId Identifier,
		partitionsCount uint32,
	) error

	// GetUser get the info about a specific user by unique ID or username.
	// Authentication is required, and the permission to read the users, unless the provided user ID is the same as the authenticated user.
	GetUser(identifier Identifier) (*UserInfoDetails, error)

	// GetUsers get the info about all the users.
	// Authentication is required, and the permission to read the users.
	GetUsers() ([]UserInfo, error)

	// CreateUser create a new user.
	// Authentication is required, and the permission to manage the users.
	CreateUser(
		username string,
		password string,
		status UserStatus,
		permissions *Permissions,
	) (*UserInfoDetails, error)

	// UpdateUser update a user by unique ID or username.
	// Authentication is required, and the permission to manage the users.
	UpdateUser(
		userID Identifier,
		username *string,
		status *UserStatus,
	) error

	// UpdatePermissions update the permissions of a user by unique ID or username.
	// Authentication is required, and the permission to manage the users.
	UpdatePermissions(userID Identifier, permissions *Permissions) error

	// ChangePassword change the password of a user by unique ID or username.
	// Authentication is required, and the permission to manage the users, unless the provided user ID is the same as the authenticated user.
	ChangePassword(
		userID Identifier,
		currentPassword string,
		newPassword string,
	) error

	// DeleteUser delete a user by unique ID or username.
	// Authentication is required, and the permission to manage the users.
	DeleteUser(identifier Identifier) error

	// CreatePersonalAccessToken create a new personal access token for the currently authenticated user.
	CreatePersonalAccessToken(name string, expiry uint32) (*RawPersonalAccessToken, error)

	// DeletePersonalAccessToken delete a personal access token of the currently authenticated user by unique token name.
	DeletePersonalAccessToken(name string) error

	// GetPersonalAccessTokens get the info about all the personal access tokens of the currently authenticated user.
	GetPersonalAccessTokens() ([]PersonalAccessTokenInfo, error)

	// LoginWithPersonalAccessToken login the user with the provided personal access token.
	LoginWithPersonalAccessToken(token string) (*IdentityInfo, error)

	// LoginUser login a user by username and password.
	LoginUser(username string, password string) (*IdentityInfo, error)

	// LogoutUser logout the currently authenticated user.
	LogoutUser() error

	// GetStats get the stats of the system such as PID, memory usage, streams count etc.
	// Authentication is required, and the permission to read the server info.
	GetStats() (*Stats, error)

	// Ping the server to check if it's alive.
	Ping() error

	// GetClients get the info about all the currently connected clients (not to be confused with the users).
	// Authentication is required, and the permission to read the server info.
	GetClients() ([]ClientInfo, error)

	// GetClient get the info about a specific client by unique ID (not to be confused with the user).
	// Authentication is required, and the permission to read the server info.
	GetClient(clientId uint32) (*ClientInfoDetails, error)
}

type ClientInfo

type ClientInfo struct {
	ID                  uint32 `json:"id"`
	Address             string `json:"address"`
	UserID              uint32 `json:"userId"`
	Transport           string `json:"transport"`
	ConsumerGroupsCount uint32 `json:"consumerGroupsCount"`
}

type ClientInfoDetails

type ClientInfoDetails struct {
	ClientInfo
	ConsumerGroups []ConsumerGroupInfo `json:"consumerGroups,omitempty"`
}

type ClusterMetadata added in v0.7.0

type ClusterMetadata struct {
	Name  string
	Nodes []ClusterNode
}

func (*ClusterMetadata) BufferSize added in v0.7.0

func (m *ClusterMetadata) BufferSize() int

BufferSize returns total serialized size.

func (*ClusterMetadata) MarshalBinary added in v0.7.0

func (m *ClusterMetadata) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (*ClusterMetadata) UnmarshalBinary added in v0.7.0

func (m *ClusterMetadata) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler.

type ClusterNode added in v0.7.0

type ClusterNode struct {
	Name      string
	IP        string
	Endpoints TransportEndpoints
	Role      ClusterNodeRole
	Status    ClusterNodeStatus
}

func (*ClusterNode) BufferSize added in v0.7.0

func (n *ClusterNode) BufferSize() int

func (*ClusterNode) MarshalBinary added in v0.7.0

func (n *ClusterNode) MarshalBinary() ([]byte, error)

func (*ClusterNode) String added in v0.7.0

func (n *ClusterNode) String() string

func (*ClusterNode) UnmarshalBinary added in v0.7.0

func (n *ClusterNode) UnmarshalBinary(b []byte) error

type ClusterNodeRole added in v0.7.0

type ClusterNodeRole byte
const (
	RoleLeader   ClusterNodeRole = 0
	RoleFollower ClusterNodeRole = 1
)

func ClusterNodeRoleTryFrom added in v0.7.0

func ClusterNodeRoleTryFrom(b byte) (ClusterNodeRole, error)

ClusterNodeRoleTryFrom validates a raw byte and returns the corresponding role.

func (*ClusterNodeRole) MarshalBinary added in v0.7.0

func (r *ClusterNodeRole) MarshalBinary() ([]byte, error)

func (*ClusterNodeRole) String added in v0.7.0

func (r *ClusterNodeRole) String() string

func (*ClusterNodeRole) UnmarshalBinary added in v0.7.0

func (r *ClusterNodeRole) UnmarshalBinary(b []byte) error

type ClusterNodeStatus added in v0.7.0

type ClusterNodeStatus uint8
const (
	// Healthy indicates node is healthy and responsive
	Healthy ClusterNodeStatus = 0
	// Starting indicates node is starting up
	Starting ClusterNodeStatus = 1
	// Stopping indicates node is shutting down
	Stopping ClusterNodeStatus = 2
	// Unreachable indicates node is unreachable
	Unreachable ClusterNodeStatus = 3
	// Maintenance indicates node is in maintenance mode
	Maintenance ClusterNodeStatus = 4
	// Unknown indicates node is unknown
	Unknown ClusterNodeStatus = 5
)

func TryFrom added in v0.7.0

func TryFrom(b byte) (ClusterNodeStatus, error)

func (*ClusterNodeStatus) MarshalBinary added in v0.7.0

func (s *ClusterNodeStatus) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (*ClusterNodeStatus) String added in v0.7.0

func (s *ClusterNodeStatus) String() string

String implements fmt.Stringer with lowercase names (matches serde/strum settings).

func (*ClusterNodeStatus) UnmarshalBinary added in v0.7.0

func (s *ClusterNodeStatus) UnmarshalBinary(b []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler.

type Command added in v0.7.0

type Command interface {
	// Code returns the command code associated with this command.
	Code() CommandCode

	encoding.BinaryMarshaler
}

type CommandCode

type CommandCode int
const (
	PingCode                 CommandCode = 1
	GetStatsCode             CommandCode = 10
	GetSnapshotFileCode      CommandCode = 11
	GetClusterMetadataCode   CommandCode = 12
	GetMeCode                CommandCode = 20
	GetClientCode            CommandCode = 21
	GetClientsCode           CommandCode = 22
	GetUserCode              CommandCode = 31
	GetUsersCode             CommandCode = 32
	CreateUserCode           CommandCode = 33
	DeleteUserCode           CommandCode = 34
	UpdateUserCode           CommandCode = 35
	UpdatePermissionsCode    CommandCode = 36
	ChangePasswordCode       CommandCode = 37
	LoginUserCode            CommandCode = 38
	LogoutUserCode           CommandCode = 39
	GetAccessTokensCode      CommandCode = 41
	CreateAccessTokenCode    CommandCode = 42
	DeleteAccessTokenCode    CommandCode = 43
	LoginWithAccessTokenCode CommandCode = 44
	PollMessagesCode         CommandCode = 100
	SendMessagesCode         CommandCode = 101
	GetOffsetCode            CommandCode = 120
	StoreOffsetCode          CommandCode = 121
	DeleteConsumerOffsetCode CommandCode = 122
	GetStreamCode            CommandCode = 200
	GetStreamsCode           CommandCode = 201
	CreateStreamCode         CommandCode = 202
	DeleteStreamCode         CommandCode = 203
	UpdateStreamCode         CommandCode = 204
	GetTopicCode             CommandCode = 300
	GetTopicsCode            CommandCode = 301
	CreateTopicCode          CommandCode = 302
	DeleteTopicCode          CommandCode = 303
	UpdateTopicCode          CommandCode = 304
	CreatePartitionsCode     CommandCode = 402
	DeletePartitionsCode     CommandCode = 403
	GetGroupCode             CommandCode = 600
	GetGroupsCode            CommandCode = 601
	CreateGroupCode          CommandCode = 602
	DeleteGroupCode          CommandCode = 603
	JoinGroupCode            CommandCode = 604
	LeaveGroupCode           CommandCode = 605
)

type CompressionAlgorithm

type CompressionAlgorithm uint8
const (
	CompressionAlgorithmNone CompressionAlgorithm = 1
	CompressionAlgorithmGzip CompressionAlgorithm = 2
)

type ConnectionInfo added in v0.7.0

type ConnectionInfo struct {
	Protocol      Protocol
	ServerAddress string
}

type Consumer

type Consumer struct {
	Kind ConsumerKind
	Id   Identifier
}

func DefaultConsumer

func DefaultConsumer() Consumer

func NewGroupConsumer

func NewGroupConsumer(id Identifier) Consumer

NewGroupConsumer create a new Consumer whose kind is ConsumerKindGroup from the Identifier

func NewSingleConsumer

func NewSingleConsumer(id Identifier) Consumer

NewSingleConsumer create a new Consumer whose kind is ConsumerKindSingle from the Identifier

type ConsumerGroup

type ConsumerGroup struct {
	Id              uint32 `json:"id"`
	Name            string `json:"name"`
	PartitionsCount uint32 `json:"partitionsCount"`
	MembersCount    uint32 `json:"membersCount"`
}

type ConsumerGroupDetails

type ConsumerGroupDetails struct {
	ConsumerGroup
	Members []ConsumerGroupMember
}

type ConsumerGroupInfo

type ConsumerGroupInfo struct {
	StreamId        uint32 `json:"streamId"`
	TopicId         uint32 `json:"topicId"`
	ConsumerGroupId uint32 `json:"consumerGroupId"`
}

type ConsumerGroupMember

type ConsumerGroupMember struct {
	ID              uint32
	PartitionsCount uint32
	Partitions      []uint32
}

type ConsumerKind

type ConsumerKind uint8
const (
	ConsumerKindSingle ConsumerKind = 1
	ConsumerKindGroup  ConsumerKind = 2
)

type ConsumerOffsetInfo

type ConsumerOffsetInfo struct {
	PartitionId   uint32 `json:"partitionId"`
	CurrentOffset uint64 `json:"currentOffset"`
	StoredOffset  uint64 `json:"storedOffset"`
}

type CreateConsumerGroupRequest

type CreateConsumerGroupRequest struct {
	StreamId Identifier `json:"streamId"`
	TopicId  Identifier `json:"topicId"`
	Name     string     `json:"name"`
}

type CreatePartitionsRequest

type CreatePartitionsRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	PartitionsCount uint32     `json:"partitionsCount"`
}

type CreatePersonalAccessTokenRequest

type CreatePersonalAccessTokenRequest struct {
	Name   string `json:"Name"`
	Expiry uint32 `json:"Expiry"`
}

type CreateStreamRequest

type CreateStreamRequest struct {
	StreamId int    `json:"streamId"`
	Name     string `json:"name"`
}

type CreateTopicRequest

type CreateTopicRequest struct {
	StreamId             Identifier    `json:"streamId"`
	TopicId              uint32        `json:"topicId"`
	PartitionsCount      int           `json:"partitionsCount"`
	CompressionAlgorithm uint8         `json:"compressionAlgorithm"`
	MessageExpiry        time.Duration `json:"messageExpiry"`
	MaxTopicSize         uint64        `json:"maxTopicSize"`
	ReplicationFactor    uint8         `json:"replicationFactor"`
	Name                 string        `json:"name"`
}

type CreateUserRequest

type CreateUserRequest struct {
	Username    string       `json:"username"`
	Password    string       `json:"Password"`
	Status      UserStatus   `json:"Status"`
	Permissions *Permissions `json:"Permissions,omitempty"`
}

type DeleteConsumerGroupRequest

type DeleteConsumerGroupRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	ConsumerGroupId Identifier `json:"consumerGroupId"`
}

type DeleteConsumerOffsetRequest added in v0.6.0

type DeleteConsumerOffsetRequest struct {
	Consumer    Consumer
	StreamId    Identifier
	TopicId     Identifier
	PartitionId *uint32
}

type DeletePartitionsRequest

type DeletePartitionsRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	PartitionsCount uint32     `json:"partitionsCount"`
}

type DeletePersonalAccessTokenRequest

type DeletePersonalAccessTokenRequest struct {
	Name string `json:"Name"`
}

type Duration

type Duration uint64

Duration represents the expiration duration in microsecond (µs).

const (
	// IggyExpiryServerDefault use the default expiry time from the server
	IggyExpiryServerDefault Duration = 0
	// IggyExpiryNeverExpire never expire
	IggyExpiryNeverExpire Duration = math.MaxUint64
)

type GetClusterMetadata added in v0.7.0

type GetClusterMetadata struct {
}

func (*GetClusterMetadata) Code added in v0.7.0

func (m *GetClusterMetadata) Code() CommandCode

func (*GetClusterMetadata) MarshalBinary added in v0.7.0

func (m *GetClusterMetadata) MarshalBinary() ([]byte, error)

type GetConsumerOffsetRequest

type GetConsumerOffsetRequest struct {
	StreamId    Identifier `json:"streamId"`
	TopicId     Identifier `json:"topicId"`
	Consumer    Consumer   `json:"consumer"`
	PartitionId *uint32    `json:"partitionId"`
}

type GetStreamRequest

type GetStreamRequest struct {
	StreamID Identifier
}

type GlobalPermissions

type GlobalPermissions struct {
	ManageServers bool `json:"ManageServers"`
	ReadServers   bool `json:"ReadServers"`
	ManageUsers   bool `json:"ManageUsers"`
	ReadUsers     bool `json:"ReadUsers"`
	ManageStreams bool `json:"ManageStreams"`
	ReadStreams   bool `json:"ReadStreams"`
	ManageTopics  bool `json:"ManageTopics"`
	ReadTopics    bool `json:"ReadTopics"`
	PollMessages  bool `json:"PollMessages"`
	SendMessages  bool `json:"SendMessages"`
}

type HeaderEntry added in v0.7.0

type HeaderEntry struct {
	Key   HeaderKey
	Value HeaderValue
}

func DeserializeHeaders

func DeserializeHeaders(userHeadersBytes []byte) ([]HeaderEntry, error)

type HeaderKey

type HeaderKey struct {
	Kind  HeaderKind
	Value []byte
}

func NewHeaderKeyInt32 added in v0.7.0

func NewHeaderKeyInt32(val int32) HeaderKey

func NewHeaderKeyRaw added in v0.7.0

func NewHeaderKeyRaw(val []byte) (HeaderKey, error)

func NewHeaderKeyString added in v0.7.0

func NewHeaderKeyString(val string) (HeaderKey, error)

type HeaderKind

type HeaderKind int
const (
	Raw     HeaderKind = 1
	String  HeaderKind = 2
	Bool    HeaderKind = 3
	Int8    HeaderKind = 4
	Int16   HeaderKind = 5
	Int32   HeaderKind = 6
	Int64   HeaderKind = 7
	Int128  HeaderKind = 8
	Uint8   HeaderKind = 9
	Uint16  HeaderKind = 10
	Uint32  HeaderKind = 11
	Uint64  HeaderKind = 12
	Uint128 HeaderKind = 13
	Float   HeaderKind = 14
	Double  HeaderKind = 15
)

func (HeaderKind) ExpectedSize added in v0.7.0

func (k HeaderKind) ExpectedSize() int

type HeaderValue

type HeaderValue struct {
	Kind  HeaderKind
	Value []byte
}

type IdKind

type IdKind uint8
const (
	NumericId IdKind = 1
	StringId  IdKind = 2
)

type Identifier

type Identifier struct {
	Kind   IdKind
	Length int
	Value  []byte
}

func NewIdentifier

func NewIdentifier[T uint32 | string](value T) (Identifier, error)

NewIdentifier create a new identifier

func (Identifier) String

func (id Identifier) String() (string, error)

String returns the string value of the identifier.

func (Identifier) Uint32

func (id Identifier) Uint32() (uint32, error)

Uint32 returns the numeric value of the identifier.

type IdentityInfo

type IdentityInfo struct {
	// Unique identifier (numeric) of the user.
	UserId uint32 `json:"userId"`
	// The optional tokens, used only by HTTP transport.
	AccessToken *string `json:"accessToken"`
}

type IggyMessage

type IggyMessage struct {
	Header      MessageHeader
	Payload     []byte
	UserHeaders []byte
}

func NewIggyMessage

func NewIggyMessage(payload []byte, opts ...IggyMessageOpt) (IggyMessage, error)

NewIggyMessage Creates a new message with customizable parameters.

type IggyMessageCompression

type IggyMessageCompression string
const (
	MESSAGE_COMPRESSION_NONE      IggyMessageCompression = "none"
	MESSAGE_COMPRESSION_S2        IggyMessageCompression = "s2"
	MESSAGE_COMPRESSION_S2_BETTER IggyMessageCompression = "s2-better"
	MESSAGE_COMPRESSION_S2_BEST   IggyMessageCompression = "s2-best"
)

type IggyMessageOpt

type IggyMessageOpt func(message *IggyMessage)

func WithID

func WithID(id [16]byte) IggyMessageOpt

func WithUserHeaders

func WithUserHeaders(userHeaders []HeaderEntry) IggyMessageOpt

type JoinConsumerGroupRequest

type JoinConsumerGroupRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	ConsumerGroupId Identifier `json:"consumerGroupId"`
}

type LeaderRedirectionState added in v0.7.0

type LeaderRedirectionState struct {
	RedirectCount     uint8
	LastLeaderAddress string
}

LeaderRedirectionState tracks redirection attempts to avoid loops.

func NewLeaderRedirectionState added in v0.7.0

func NewLeaderRedirectionState() *LeaderRedirectionState

func (*LeaderRedirectionState) CanRedirect added in v0.7.0

func (s *LeaderRedirectionState) CanRedirect() bool

func (*LeaderRedirectionState) IncrementRedirect added in v0.7.0

func (s *LeaderRedirectionState) IncrementRedirect(leaderAddress string)

func (*LeaderRedirectionState) Reset added in v0.7.0

func (s *LeaderRedirectionState) Reset()

type LeaveConsumerGroupRequest

type LeaveConsumerGroupRequest struct {
	StreamId        Identifier `json:"streamId"`
	TopicId         Identifier `json:"topicId"`
	ConsumerGroupId Identifier `json:"consumerGroupId"`
}

type LoginUserRequest

type LoginUserRequest struct {
	Username string `json:"username"`
	Password string `json:"password"`
	Version  string `json:"version,omitempty"`
	Context  string `json:"context,omitempty"`
}

type LoginWithPersonalAccessTokenRequest

type LoginWithPersonalAccessTokenRequest struct {
	Token string `json:"token"`
}

type MessageHeader

type MessageHeader struct {
	Checksum         uint64    `json:"checksum"`
	Id               MessageID `json:"id"`
	Offset           uint64    `json:"offset"`
	Timestamp        uint64    `json:"timestamp"`
	OriginTimestamp  uint64    `json:"origin_timestamp"`
	UserHeaderLength uint32    `json:"user_header_length"`
	PayloadLength    uint32    `json:"payload_length"`
	Reserved         uint64    `json:"reserved"`
}

func MessageHeaderFromBytes

func MessageHeaderFromBytes(data []byte) (*MessageHeader, error)

func NewMessageHeader

func NewMessageHeader(id MessageID, payloadLength uint32, userHeaderLength uint32) MessageHeader

func (*MessageHeader) ToBytes

func (mh *MessageHeader) ToBytes() []byte

type MessageID

type MessageID [16]byte

type MessagePolling

type MessagePolling byte
const (
	POLLING_OFFSET    MessagePolling = 1
	POLLING_TIMESTAMP MessagePolling = 2
	POLLING_FIRST     MessagePolling = 3
	POLLING_LAST      MessagePolling = 4
	POLLING_NEXT      MessagePolling = 5
)

type PartitionContract

type PartitionContract struct {
	Id            uint32 `json:"id"`
	MessagesCount uint64 `json:"messagesCount"`
	CreatedAt     uint64 `json:"createdAt"`
	SegmentsCount uint32 `json:"segmentsCount"`
	CurrentOffset uint64 `json:"currentOffset"`
	SizeBytes     uint64 `json:"sizeBytes"`
}

type Partitioning

type Partitioning struct {
	Kind   PartitioningKind
	Length int
	Value  []byte
}

func EntityIdBytes

func EntityIdBytes(value []byte) (Partitioning, error)

func EntityIdGuid

func EntityIdGuid(value uuid.UUID) Partitioning

func EntityIdInt

func EntityIdInt(value int) Partitioning

func EntityIdString

func EntityIdString(value string) (Partitioning, error)

func EntityIdUlong

func EntityIdUlong(value uint64) Partitioning

func None

func None() Partitioning

func PartitionId

func PartitionId(value uint32) Partitioning

type PartitioningKind

type PartitioningKind int
const (
	Balanced        PartitioningKind = 1
	PartitionIdKind PartitioningKind = 2
	MessageKey      PartitioningKind = 3
)

type Permissions

type Permissions struct {
	Global  GlobalPermissions          `json:"Global"`
	Streams map[int]*StreamPermissions `json:"Streams,omitempty"`
}

type PersonalAccessTokenInfo

type PersonalAccessTokenInfo struct {
	Name   string     `json:"Name"`
	Expiry *time.Time `json:"Expiry"`
}

type PollMessageRequest

type PollMessageRequest struct {
	StreamId        Identifier      `json:"streamId"`
	TopicId         Identifier      `json:"topicId"`
	Consumer        Consumer        `json:"consumer"`
	PartitionId     uint32          `json:"partitionId"`
	PollingStrategy PollingStrategy `json:"pollingStrategy"`
	Count           int             `json:"count"`
	AutoCommit      bool            `json:"autoCommit"`
}

type PolledMessage

type PolledMessage struct {
	PartitionId   uint32
	CurrentOffset uint64
	MessageCount  uint32
	Messages      []IggyMessage
}

type PollingStrategy

type PollingStrategy struct {
	Kind  MessagePolling
	Value uint64
}

func FirstPollingStrategy

func FirstPollingStrategy() PollingStrategy

func LastPollingStrategy

func LastPollingStrategy() PollingStrategy

func NewPollingStrategy

func NewPollingStrategy(kind MessagePolling, value uint64) PollingStrategy

func NextPollingStrategy

func NextPollingStrategy() PollingStrategy

func OffsetPollingStrategy

func OffsetPollingStrategy(value uint64) PollingStrategy

func TimestampPollingStrategy

func TimestampPollingStrategy(value uint64) PollingStrategy

type Protocol

type Protocol string
const (
	Http      Protocol = "http"
	Tcp       Protocol = "tcp"
	Quic      Protocol = "quic"
	WebSocket Protocol = "ws"
)

type RawPersonalAccessToken

type RawPersonalAccessToken struct {
	Token string `json:"token"`
}

type ReceivedMessage

type ReceivedMessage struct {
	Message       IggyMessage
	CurrentOffset uint64
	PartitionId   uint32
}

type SendMessagesRequest

type SendMessagesRequest struct {
	StreamId     Identifier    `json:"streamId"`
	TopicId      Identifier    `json:"topicId"`
	Partitioning Partitioning  `json:"partitioning"`
	Messages     []IggyMessage `json:"messages"`
}

type State added in v0.7.0

type State uint8
const (
	StateShutdown State = iota
	StateDisconnected
	StateConnecting
	StateConnected
	StateAuthenticating
	StateAuthenticated
)

func (State) String added in v0.7.0

func (s State) String() string

type Stats

type Stats struct {
	ProcessId           uint32  `json:"process_id"`
	CpuUsage            float32 `json:"cpu_usage"`
	TotalCpuUsage       float32 `json:"total_cpu_usage"`
	MemoryUsage         uint64  `json:"memory_usage"`
	TotalMemory         uint64  `json:"total_memory"`
	AvailableMemory     uint64  `json:"available_memory"`
	RunTime             uint64  `json:"run_time"`
	StartTime           uint64  `json:"start_time"`
	ReadBytes           uint64  `json:"read_bytes"`
	WrittenBytes        uint64  `json:"written_bytes"`
	MessagesSizeBytes   uint64  `json:"messages_size_bytes"`
	StreamsCount        uint32  `json:"streams_count"`
	TopicsCount         uint32  `json:"topics_count"`
	PartitionsCount     uint32  `json:"partitions_count"`
	SegmentsCount       uint32  `json:"segments_count"`
	MessagesCount       uint64  `json:"messages_count"`
	ClientsCount        uint32  `json:"clients_count"`
	ConsumerGroupsCount uint32  `json:"consumer_groups_count"`
	Hostname            string  `json:"hostname"`
	OsName              string  `json:"os_name"`
	OsVersion           string  `json:"os_version"`
	KernelVersion       string  `json:"kernel_version"`
}

type StoreConsumerOffsetRequest

type StoreConsumerOffsetRequest struct {
	StreamId    Identifier `json:"streamId"`
	TopicId     Identifier `json:"topicId"`
	Consumer    Consumer   `json:"consumer"`
	PartitionId *uint32    `json:"partitionId"`
	Offset      uint64     `json:"offset"`
}

type Stream

type Stream struct {
	Id            uint32 `json:"id"`
	Name          string `json:"name"`
	SizeBytes     uint64 `json:"sizeBytes"`
	CreatedAt     uint64 `json:"createdAt"`
	MessagesCount uint64 `json:"messagesCount"`
	TopicsCount   uint32 `json:"topicsCount"`
}

type StreamDetails

type StreamDetails struct {
	Stream
	Topics []Topic `json:"topics,omitempty"`
}

type StreamPermissions

type StreamPermissions struct {
	ManageStream bool                      `json:"ManageStream"`
	ReadStream   bool                      `json:"ReadStream"`
	ManageTopics bool                      `json:"ManageTopics"`
	ReadTopics   bool                      `json:"ReadTopics"`
	PollMessages bool                      `json:"PollMessages"`
	SendMessages bool                      `json:"SendMessages"`
	Topics       map[int]*TopicPermissions `json:"Topics,omitempty"`
}

type Topic

type Topic struct {
	Id                   uint32   `json:"id"`
	CreatedAt            uint64   `json:"createdAt"`
	Name                 string   `json:"name"`
	Size                 uint64   `json:"size"`
	MessageExpiry        Duration `json:"messageExpiry"`
	CompressionAlgorithm uint8    `json:"compressionAlgorithm"`
	MaxTopicSize         uint64   `json:"maxTopicSize"`
	ReplicationFactor    uint8    `json:"replicationFactor"`
	MessagesCount        uint64   `json:"messagesCount"`
	PartitionsCount      uint32   `json:"partitionsCount"`
}

type TopicDetails

type TopicDetails struct {
	Topic
	Partitions []PartitionContract `json:"partitions,omitempty"`
}

type TopicPermissions

type TopicPermissions struct {
	ManageTopic  bool `json:"ManageTopic"`
	ReadTopic    bool `json:"ReadTopic"`
	PollMessages bool `json:"PollMessages"`
	SendMessages bool `json:"SendMessages"`
}

type TransportEndpoints added in v0.7.0

type TransportEndpoints struct {
	Tcp       uint16
	Quic      uint16
	Http      uint16
	WebSocket uint16
}

TransportEndpoints represents four 16-bit ports.

func NewTransportEndpoints added in v0.7.0

func NewTransportEndpoints(tcp, quic, http, websocket uint16) TransportEndpoints

NewTransportEndpoints constructs a TransportEndpoints value.

func (*TransportEndpoints) GetBufferSize added in v0.7.0

func (t *TransportEndpoints) GetBufferSize() int

func (*TransportEndpoints) MarshalBinary added in v0.7.0

func (t *TransportEndpoints) MarshalBinary() ([]byte, error)

func (*TransportEndpoints) String added in v0.7.0

func (t *TransportEndpoints) String() string

func (*TransportEndpoints) UnmarshalBinary added in v0.7.0

func (t *TransportEndpoints) UnmarshalBinary(b []byte) error

type UpdatePermissionsRequest

type UpdatePermissionsRequest struct {
	UserID      Identifier   `json:"-"`
	Permissions *Permissions `json:"Permissions,omitempty"`
}

type UpdateTopicRequest

type UpdateTopicRequest struct {
	StreamId             Identifier    `json:"streamId"`
	TopicId              Identifier    `json:"topicId"`
	CompressionAlgorithm uint8         `json:"compressionAlgorithm"`
	MessageExpiry        time.Duration `json:"messageExpiry"`
	MaxTopicSize         uint64        `json:"maxTopicSize"`
	ReplicationFactor    uint8         `json:"replicationFactor"`
	Name                 string        `json:"name"`
}

type UpdateUserRequest

type UpdateUserRequest struct {
	UserID   Identifier  `json:"-"`
	Username *string     `json:"username"`
	Status   *UserStatus `json:"userStatus"`
}

type UserInfo

type UserInfo struct {
	Id        uint32     `json:"Id"`
	CreatedAt uint64     `json:"CreatedAt"`
	Status    UserStatus `json:"Status"`
	Username  string     `json:"Username"`
}

type UserInfoDetails

type UserInfoDetails struct {
	UserInfo
	Permissions *Permissions `json:"Permissions"`
}

type UserStatus

type UserStatus int
const (
	Active UserStatus = iota
	Inactive
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL