Documentation
¶
Index ¶
- Constants
- func GetHeadersBytes(headers []HeaderEntry) []byte
- type ChangePasswordRequest
- type Client
- type ClientInfo
- type ClientInfoDetails
- type ClusterMetadata
- type ClusterNode
- type ClusterNodeRole
- type ClusterNodeStatus
- type Command
- type CommandCode
- type CompressionAlgorithm
- type ConnectionInfo
- type Consumer
- type ConsumerGroup
- type ConsumerGroupDetails
- type ConsumerGroupInfo
- type ConsumerGroupMember
- type ConsumerKind
- type ConsumerOffsetInfo
- type CreateConsumerGroupRequest
- type CreatePartitionsRequest
- type CreatePersonalAccessTokenRequest
- type CreateStreamRequest
- type CreateTopicRequest
- type CreateUserRequest
- type DeleteConsumerGroupRequest
- type DeleteConsumerOffsetRequest
- type DeletePartitionsRequest
- type DeletePersonalAccessTokenRequest
- type Duration
- type GetClusterMetadata
- type GetConsumerOffsetRequest
- type GetStreamRequest
- type GlobalPermissions
- type HeaderEntry
- type HeaderKey
- type HeaderKind
- type HeaderValue
- type IdKind
- type Identifier
- type IdentityInfo
- type IggyMessage
- type IggyMessageCompression
- type IggyMessageOpt
- type JoinConsumerGroupRequest
- type LeaderRedirectionState
- type LeaveConsumerGroupRequest
- type LoginUserRequest
- type LoginWithPersonalAccessTokenRequest
- type MessageHeader
- type MessageID
- type MessagePolling
- type PartitionContract
- type Partitioning
- func EntityIdBytes(value []byte) (Partitioning, error)
- func EntityIdGuid(value uuid.UUID) Partitioning
- func EntityIdInt(value int) Partitioning
- func EntityIdString(value string) (Partitioning, error)
- func EntityIdUlong(value uint64) Partitioning
- func None() Partitioning
- func PartitionId(value uint32) Partitioning
- type PartitioningKind
- type Permissions
- type PersonalAccessTokenInfo
- type PollMessageRequest
- type PolledMessage
- type PollingStrategy
- func FirstPollingStrategy() PollingStrategy
- func LastPollingStrategy() PollingStrategy
- func NewPollingStrategy(kind MessagePolling, value uint64) PollingStrategy
- func NextPollingStrategy() PollingStrategy
- func OffsetPollingStrategy(value uint64) PollingStrategy
- func TimestampPollingStrategy(value uint64) PollingStrategy
- type Protocol
- type RawPersonalAccessToken
- type ReceivedMessage
- type SendMessagesRequest
- type State
- type Stats
- type StoreConsumerOffsetRequest
- type Stream
- type StreamDetails
- type StreamPermissions
- type Topic
- type TopicDetails
- type TopicPermissions
- type TransportEndpoints
- type UpdatePermissionsRequest
- type UpdateTopicRequest
- type UpdateUserRequest
- type UserInfo
- type UserInfoDetails
- type UserStatus
Constants ¶
const ( Microsecond Duration = 1 Millisecond = 1000 * Microsecond Second = 1000 * Millisecond Minute = 60 * Second Hour = 60 * Minute )
const ( // MaxPayloadSize is maximum allowed size in bytes for a message payload. // // This constant defines the upper limit for the size of an IggyMessage payload. Attempting to create a message // with a payload larger than this value will result // in an ierror.TooBigUserMessagePayload error. // // Constraints // - Minimum payload size: 1 byte (empty payloads are not allowed) // - Maximum payload size: 10 MB MaxPayloadSize = 10 * 1000 * 1000 // MaxUserHeadersSize is maximum allowed size in bytes for user-defined headers. // // This constant defines the upper limit for the combined size of all user headers in an IggyMessage. Attempting to // create a message with user headers larger than this value will result in an ierror.TooBigUserHeaders error. // // Constraints // - Maximum headers size: 100 KB // - Each individual header key is limited to 255 bytes // - Each individual header value is limited to 255 bytes MaxUserHeadersSize = 100 * 1000 )
const MaxLeaderRedirects uint8 = 3
const MaxNodesPerCluster = 64
const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4 + 8
const Version = "0.7.0"
TODO: Wire Version into binary_serialization/log_in_request_serializer.go to send the actual SDK version during login instead of an empty string.
Variables ¶
This section is empty.
Functions ¶
func GetHeadersBytes ¶
func GetHeadersBytes(headers []HeaderEntry) []byte
Types ¶
type ChangePasswordRequest ¶
type ChangePasswordRequest struct {
UserID Identifier `json:"-"`
CurrentPassword string `json:"CurrentPassword"`
NewPassword string `json:"NewPassword"`
}
type Client ¶ added in v0.7.0
type Client interface {
// Close closes the client and releases all the resources.
Close() error
// GetConnectionInfo returns the current connection information including protocol and server address
GetConnectionInfo() *ConnectionInfo
// GetClusterMetadata get the metadata of the cluster including node information, roles, and status.
// Authentication is required.
GetClusterMetadata() (*ClusterMetadata, error)
// GetStream get the info about a specific stream by unique ID or name.
// Authentication is required, and the permission to read the streams.
GetStream(streamId Identifier) (*StreamDetails, error)
// GetStreams get the info about all the streams.
// Authentication is required, and the permission to read the streams.
GetStreams() ([]Stream, error)
// CreateStream create a new stream.
// Authentication is required, and the permission to manage the streams.
CreateStream(name string) (*StreamDetails, error)
// UpdateStream update a stream by unique ID or name.
// Authentication is required, and the permission to manage the streams.
UpdateStream(streamId Identifier, name string) error
// DeleteStream delete a topic by unique ID or name.
// Authentication is required, and the permission to manage the topics.
DeleteStream(id Identifier) error
// GetTopic Get the info about a specific topic by unique ID or name.
// Authentication is required, and the permission to read the topics.
GetTopic(streamId, topicId Identifier) (*TopicDetails, error)
// GetTopics get the info about all the topics.
// Authentication is required, and the permission to read the topics.
GetTopics(streamId Identifier) ([]Topic, error)
// CreateTopic create a new topic.
// Authentication is required, and the permission to manage the topics.
CreateTopic(
streamId Identifier,
name string,
partitionsCount uint32,
compressionAlgorithm CompressionAlgorithm,
messageExpiry Duration,
maxTopicSize uint64,
replicationFactor *uint8,
) (*TopicDetails, error)
// UpdateTopic update a topic by unique ID or name.
// Authentication is required, and the permission to manage the topics.
UpdateTopic(
streamId Identifier,
topicId Identifier,
name string,
compressionAlgorithm CompressionAlgorithm,
messageExpiry Duration,
maxTopicSize uint64,
replicationFactor *uint8,
) error
// DeleteTopic delete a topic by unique ID or name.
// Authentication is required, and the permission to manage the topics.
DeleteTopic(streamId, topicId Identifier) error
// SendMessages sends messages using specified partitioning strategy to the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to send the messages.
SendMessages(
streamId Identifier,
topicId Identifier,
partitioning Partitioning,
messages []IggyMessage,
) error
// PollMessages poll given amount of messages using the specified consumer and strategy from the specified stream and topic by unique IDs or names.
// Authentication is required, and the permission to poll the messages.
PollMessages(
streamId Identifier,
topicId Identifier,
consumer Consumer,
strategy PollingStrategy,
count uint32,
autoCommit bool,
partitionId *uint32,
) (*PolledMessage, error)
// StoreConsumerOffset store the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to poll the messages.
StoreConsumerOffset(
consumer Consumer,
streamId Identifier,
topicId Identifier,
offset uint64,
partitionId *uint32,
) error
// GetConsumerOffset get the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to poll the messages.
GetConsumerOffset(
consumer Consumer,
streamId Identifier,
topicId Identifier,
partitionId *uint32,
) (*ConsumerOffsetInfo, error)
// GetConsumerGroups get the info about all the consumer groups for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to read the streams or topics.
GetConsumerGroups(streamId Identifier, topicId Identifier) ([]ConsumerGroup, error)
// DeleteConsumerOffset delete the consumer offset for a specific consumer or consumer group for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to poll the messages.
DeleteConsumerOffset(
consumer Consumer,
streamId Identifier,
topicId Identifier,
partitionId *uint32,
) error
// GetConsumerGroup get the info about a specific consumer group by unique ID or name for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to read the streams or topics.
GetConsumerGroup(
streamId Identifier,
topicId Identifier,
groupId Identifier,
) (*ConsumerGroupDetails, error)
// CreateConsumerGroup create a new consumer group for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to manage the streams or topics.
CreateConsumerGroup(
streamId Identifier,
topicId Identifier,
name string,
) (*ConsumerGroupDetails, error)
// DeleteConsumerGroup delete a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to manage the streams or topics.
DeleteConsumerGroup(
streamId Identifier,
topicId Identifier,
groupId Identifier,
) error
// JoinConsumerGroup join a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to read the streams or topics.
JoinConsumerGroup(
streamId Identifier,
topicId Identifier,
groupId Identifier,
) error
// LeaveConsumerGroup leave a consumer group by unique ID or name for the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to read the streams or topics.
LeaveConsumerGroup(
streamId Identifier,
topicId Identifier,
groupId Identifier,
) error
// CreatePartitions create new N partitions for a topic by unique ID or name.
// For example, given a topic with 3 partitions, if you create 2 partitions, the topic will have 5 partitions (from 1 to 5).
// Authentication is required, and the permission to manage the partitions.
CreatePartitions(
streamId Identifier,
topicId Identifier,
partitionsCount uint32,
) error
// DeletePartitions delete last N partitions for a topic by unique ID or name.
// For example, given a topic with 5 partitions, if you delete 2 partitions, the topic will have 3 partitions left (from 1 to 3).
// Authentication is required, and the permission to manage the partitions.
DeletePartitions(
streamId Identifier,
topicId Identifier,
partitionsCount uint32,
) error
// GetUser get the info about a specific user by unique ID or username.
// Authentication is required, and the permission to read the users, unless the provided user ID is the same as the authenticated user.
GetUser(identifier Identifier) (*UserInfoDetails, error)
// GetUsers get the info about all the users.
// Authentication is required, and the permission to read the users.
GetUsers() ([]UserInfo, error)
// CreateUser create a new user.
// Authentication is required, and the permission to manage the users.
CreateUser(
username string,
password string,
status UserStatus,
permissions *Permissions,
) (*UserInfoDetails, error)
// UpdateUser update a user by unique ID or username.
// Authentication is required, and the permission to manage the users.
UpdateUser(
userID Identifier,
username *string,
status *UserStatus,
) error
// UpdatePermissions update the permissions of a user by unique ID or username.
// Authentication is required, and the permission to manage the users.
UpdatePermissions(userID Identifier, permissions *Permissions) error
// ChangePassword change the password of a user by unique ID or username.
// Authentication is required, and the permission to manage the users, unless the provided user ID is the same as the authenticated user.
ChangePassword(
userID Identifier,
currentPassword string,
newPassword string,
) error
// DeleteUser delete a user by unique ID or username.
// Authentication is required, and the permission to manage the users.
DeleteUser(identifier Identifier) error
// CreatePersonalAccessToken create a new personal access token for the currently authenticated user.
CreatePersonalAccessToken(name string, expiry uint32) (*RawPersonalAccessToken, error)
// DeletePersonalAccessToken delete a personal access token of the currently authenticated user by unique token name.
DeletePersonalAccessToken(name string) error
// GetPersonalAccessTokens get the info about all the personal access tokens of the currently authenticated user.
GetPersonalAccessTokens() ([]PersonalAccessTokenInfo, error)
// LoginWithPersonalAccessToken login the user with the provided personal access token.
LoginWithPersonalAccessToken(token string) (*IdentityInfo, error)
// LoginUser login a user by username and password.
LoginUser(username string, password string) (*IdentityInfo, error)
// LogoutUser logout the currently authenticated user.
LogoutUser() error
// GetStats get the stats of the system such as PID, memory usage, streams count etc.
// Authentication is required, and the permission to read the server info.
GetStats() (*Stats, error)
// Ping the server to check if it's alive.
Ping() error
// GetClients get the info about all the currently connected clients (not to be confused with the users).
// Authentication is required, and the permission to read the server info.
GetClients() ([]ClientInfo, error)
// GetClient get the info about a specific client by unique ID (not to be confused with the user).
// Authentication is required, and the permission to read the server info.
GetClient(clientId uint32) (*ClientInfoDetails, error)
}
type ClientInfo ¶
type ClientInfoDetails ¶
type ClientInfoDetails struct {
ClientInfo
ConsumerGroups []ConsumerGroupInfo `json:"consumerGroups,omitempty"`
}
type ClusterMetadata ¶ added in v0.7.0
type ClusterMetadata struct {
Name string
Nodes []ClusterNode
}
func (*ClusterMetadata) BufferSize ¶ added in v0.7.0
func (m *ClusterMetadata) BufferSize() int
BufferSize returns total serialized size.
func (*ClusterMetadata) MarshalBinary ¶ added in v0.7.0
func (m *ClusterMetadata) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler.
func (*ClusterMetadata) UnmarshalBinary ¶ added in v0.7.0
func (m *ClusterMetadata) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler.
type ClusterNode ¶ added in v0.7.0
type ClusterNode struct {
Name string
IP string
Endpoints TransportEndpoints
Role ClusterNodeRole
Status ClusterNodeStatus
}
func (*ClusterNode) BufferSize ¶ added in v0.7.0
func (n *ClusterNode) BufferSize() int
func (*ClusterNode) MarshalBinary ¶ added in v0.7.0
func (n *ClusterNode) MarshalBinary() ([]byte, error)
func (*ClusterNode) String ¶ added in v0.7.0
func (n *ClusterNode) String() string
func (*ClusterNode) UnmarshalBinary ¶ added in v0.7.0
func (n *ClusterNode) UnmarshalBinary(b []byte) error
type ClusterNodeRole ¶ added in v0.7.0
type ClusterNodeRole byte
const ( RoleLeader ClusterNodeRole = 0 RoleFollower ClusterNodeRole = 1 )
func ClusterNodeRoleTryFrom ¶ added in v0.7.0
func ClusterNodeRoleTryFrom(b byte) (ClusterNodeRole, error)
ClusterNodeRoleTryFrom validates a raw byte and returns the corresponding role.
func (*ClusterNodeRole) MarshalBinary ¶ added in v0.7.0
func (r *ClusterNodeRole) MarshalBinary() ([]byte, error)
func (*ClusterNodeRole) String ¶ added in v0.7.0
func (r *ClusterNodeRole) String() string
func (*ClusterNodeRole) UnmarshalBinary ¶ added in v0.7.0
func (r *ClusterNodeRole) UnmarshalBinary(b []byte) error
type ClusterNodeStatus ¶ added in v0.7.0
type ClusterNodeStatus uint8
const ( // Healthy indicates node is healthy and responsive Healthy ClusterNodeStatus = 0 // Starting indicates node is starting up Starting ClusterNodeStatus = 1 // Stopping indicates node is shutting down Stopping ClusterNodeStatus = 2 // Unreachable indicates node is unreachable Unreachable ClusterNodeStatus = 3 // Maintenance indicates node is in maintenance mode Maintenance ClusterNodeStatus = 4 // Unknown indicates node is unknown Unknown ClusterNodeStatus = 5 )
func TryFrom ¶ added in v0.7.0
func TryFrom(b byte) (ClusterNodeStatus, error)
func (*ClusterNodeStatus) MarshalBinary ¶ added in v0.7.0
func (s *ClusterNodeStatus) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler.
func (*ClusterNodeStatus) String ¶ added in v0.7.0
func (s *ClusterNodeStatus) String() string
String implements fmt.Stringer with lowercase names (matches serde/strum settings).
func (*ClusterNodeStatus) UnmarshalBinary ¶ added in v0.7.0
func (s *ClusterNodeStatus) UnmarshalBinary(b []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler.
type Command ¶ added in v0.7.0
type Command interface {
// Code returns the command code associated with this command.
Code() CommandCode
encoding.BinaryMarshaler
}
type CommandCode ¶
type CommandCode int
const ( PingCode CommandCode = 1 GetStatsCode CommandCode = 10 GetSnapshotFileCode CommandCode = 11 GetClusterMetadataCode CommandCode = 12 GetMeCode CommandCode = 20 GetClientCode CommandCode = 21 GetClientsCode CommandCode = 22 GetUserCode CommandCode = 31 GetUsersCode CommandCode = 32 CreateUserCode CommandCode = 33 DeleteUserCode CommandCode = 34 UpdateUserCode CommandCode = 35 UpdatePermissionsCode CommandCode = 36 ChangePasswordCode CommandCode = 37 LoginUserCode CommandCode = 38 LogoutUserCode CommandCode = 39 GetAccessTokensCode CommandCode = 41 CreateAccessTokenCode CommandCode = 42 DeleteAccessTokenCode CommandCode = 43 LoginWithAccessTokenCode CommandCode = 44 PollMessagesCode CommandCode = 100 SendMessagesCode CommandCode = 101 GetOffsetCode CommandCode = 120 StoreOffsetCode CommandCode = 121 DeleteConsumerOffsetCode CommandCode = 122 GetStreamCode CommandCode = 200 GetStreamsCode CommandCode = 201 CreateStreamCode CommandCode = 202 DeleteStreamCode CommandCode = 203 UpdateStreamCode CommandCode = 204 GetTopicCode CommandCode = 300 GetTopicsCode CommandCode = 301 CreateTopicCode CommandCode = 302 DeleteTopicCode CommandCode = 303 UpdateTopicCode CommandCode = 304 CreatePartitionsCode CommandCode = 402 DeletePartitionsCode CommandCode = 403 GetGroupCode CommandCode = 600 GetGroupsCode CommandCode = 601 CreateGroupCode CommandCode = 602 DeleteGroupCode CommandCode = 603 JoinGroupCode CommandCode = 604 LeaveGroupCode CommandCode = 605 )
type CompressionAlgorithm ¶
type CompressionAlgorithm uint8
const ( CompressionAlgorithmNone CompressionAlgorithm = 1 CompressionAlgorithmGzip CompressionAlgorithm = 2 )
type ConnectionInfo ¶ added in v0.7.0
type Consumer ¶
type Consumer struct {
Kind ConsumerKind
Id Identifier
}
func DefaultConsumer ¶
func DefaultConsumer() Consumer
func NewGroupConsumer ¶
func NewGroupConsumer(id Identifier) Consumer
NewGroupConsumer create a new Consumer whose kind is ConsumerKindGroup from the Identifier
func NewSingleConsumer ¶
func NewSingleConsumer(id Identifier) Consumer
NewSingleConsumer create a new Consumer whose kind is ConsumerKindSingle from the Identifier
type ConsumerGroup ¶
type ConsumerGroupDetails ¶
type ConsumerGroupDetails struct {
ConsumerGroup
Members []ConsumerGroupMember
}
type ConsumerGroupInfo ¶
type ConsumerGroupMember ¶
type ConsumerKind ¶
type ConsumerKind uint8
const ( ConsumerKindSingle ConsumerKind = 1 ConsumerKindGroup ConsumerKind = 2 )
type ConsumerOffsetInfo ¶
type CreateConsumerGroupRequest ¶
type CreateConsumerGroupRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
Name string `json:"name"`
}
type CreatePartitionsRequest ¶
type CreatePartitionsRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
PartitionsCount uint32 `json:"partitionsCount"`
}
type CreateStreamRequest ¶
type CreateTopicRequest ¶
type CreateTopicRequest struct {
StreamId Identifier `json:"streamId"`
TopicId uint32 `json:"topicId"`
PartitionsCount int `json:"partitionsCount"`
CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
MessageExpiry time.Duration `json:"messageExpiry"`
MaxTopicSize uint64 `json:"maxTopicSize"`
ReplicationFactor uint8 `json:"replicationFactor"`
Name string `json:"name"`
}
type CreateUserRequest ¶
type CreateUserRequest struct {
Username string `json:"username"`
Password string `json:"Password"`
Status UserStatus `json:"Status"`
Permissions *Permissions `json:"Permissions,omitempty"`
}
type DeleteConsumerGroupRequest ¶
type DeleteConsumerGroupRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
ConsumerGroupId Identifier `json:"consumerGroupId"`
}
type DeleteConsumerOffsetRequest ¶ added in v0.6.0
type DeleteConsumerOffsetRequest struct {
Consumer Consumer
StreamId Identifier
TopicId Identifier
PartitionId *uint32
}
type DeletePartitionsRequest ¶
type DeletePartitionsRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
PartitionsCount uint32 `json:"partitionsCount"`
}
type DeletePersonalAccessTokenRequest ¶
type DeletePersonalAccessTokenRequest struct {
Name string `json:"Name"`
}
type Duration ¶
type Duration uint64
Duration represents the expiration duration in microsecond (µs).
type GetClusterMetadata ¶ added in v0.7.0
type GetClusterMetadata struct {
}
func (*GetClusterMetadata) Code ¶ added in v0.7.0
func (m *GetClusterMetadata) Code() CommandCode
func (*GetClusterMetadata) MarshalBinary ¶ added in v0.7.0
func (m *GetClusterMetadata) MarshalBinary() ([]byte, error)
type GetConsumerOffsetRequest ¶
type GetConsumerOffsetRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
Consumer Consumer `json:"consumer"`
PartitionId *uint32 `json:"partitionId"`
}
type GetStreamRequest ¶
type GetStreamRequest struct {
StreamID Identifier
}
type GlobalPermissions ¶
type GlobalPermissions struct {
ManageServers bool `json:"ManageServers"`
ReadServers bool `json:"ReadServers"`
ManageUsers bool `json:"ManageUsers"`
ReadUsers bool `json:"ReadUsers"`
ManageStreams bool `json:"ManageStreams"`
ReadStreams bool `json:"ReadStreams"`
ManageTopics bool `json:"ManageTopics"`
ReadTopics bool `json:"ReadTopics"`
PollMessages bool `json:"PollMessages"`
SendMessages bool `json:"SendMessages"`
}
type HeaderEntry ¶ added in v0.7.0
type HeaderEntry struct {
Key HeaderKey
Value HeaderValue
}
func DeserializeHeaders ¶
func DeserializeHeaders(userHeadersBytes []byte) ([]HeaderEntry, error)
type HeaderKey ¶
type HeaderKey struct {
Kind HeaderKind
Value []byte
}
func NewHeaderKeyInt32 ¶ added in v0.7.0
func NewHeaderKeyRaw ¶ added in v0.7.0
func NewHeaderKeyString ¶ added in v0.7.0
type HeaderKind ¶
type HeaderKind int
const ( Raw HeaderKind = 1 String HeaderKind = 2 Bool HeaderKind = 3 Int8 HeaderKind = 4 Int16 HeaderKind = 5 Int32 HeaderKind = 6 Int64 HeaderKind = 7 Int128 HeaderKind = 8 Uint8 HeaderKind = 9 Uint16 HeaderKind = 10 Uint32 HeaderKind = 11 Uint64 HeaderKind = 12 Uint128 HeaderKind = 13 Float HeaderKind = 14 Double HeaderKind = 15 )
func (HeaderKind) ExpectedSize ¶ added in v0.7.0
func (k HeaderKind) ExpectedSize() int
type HeaderValue ¶
type HeaderValue struct {
Kind HeaderKind
Value []byte
}
type Identifier ¶
func NewIdentifier ¶
func NewIdentifier[T uint32 | string](value T) (Identifier, error)
NewIdentifier create a new identifier
func (Identifier) String ¶
func (id Identifier) String() (string, error)
String returns the string value of the identifier.
func (Identifier) Uint32 ¶
func (id Identifier) Uint32() (uint32, error)
Uint32 returns the numeric value of the identifier.
type IdentityInfo ¶
type IggyMessage ¶
type IggyMessage struct {
Header MessageHeader
Payload []byte
UserHeaders []byte
}
func NewIggyMessage ¶
func NewIggyMessage(payload []byte, opts ...IggyMessageOpt) (IggyMessage, error)
NewIggyMessage Creates a new message with customizable parameters.
type IggyMessageCompression ¶
type IggyMessageCompression string
const ( MESSAGE_COMPRESSION_NONE IggyMessageCompression = "none" MESSAGE_COMPRESSION_S2 IggyMessageCompression = "s2" MESSAGE_COMPRESSION_S2_BETTER IggyMessageCompression = "s2-better" MESSAGE_COMPRESSION_S2_BEST IggyMessageCompression = "s2-best" )
type IggyMessageOpt ¶
type IggyMessageOpt func(message *IggyMessage)
func WithID ¶
func WithID(id [16]byte) IggyMessageOpt
func WithUserHeaders ¶
func WithUserHeaders(userHeaders []HeaderEntry) IggyMessageOpt
type JoinConsumerGroupRequest ¶
type JoinConsumerGroupRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
ConsumerGroupId Identifier `json:"consumerGroupId"`
}
type LeaderRedirectionState ¶ added in v0.7.0
LeaderRedirectionState tracks redirection attempts to avoid loops.
func NewLeaderRedirectionState ¶ added in v0.7.0
func NewLeaderRedirectionState() *LeaderRedirectionState
func (*LeaderRedirectionState) CanRedirect ¶ added in v0.7.0
func (s *LeaderRedirectionState) CanRedirect() bool
func (*LeaderRedirectionState) IncrementRedirect ¶ added in v0.7.0
func (s *LeaderRedirectionState) IncrementRedirect(leaderAddress string)
func (*LeaderRedirectionState) Reset ¶ added in v0.7.0
func (s *LeaderRedirectionState) Reset()
type LeaveConsumerGroupRequest ¶
type LeaveConsumerGroupRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
ConsumerGroupId Identifier `json:"consumerGroupId"`
}
type LoginUserRequest ¶
type LoginWithPersonalAccessTokenRequest ¶
type LoginWithPersonalAccessTokenRequest struct {
Token string `json:"token"`
}
type MessageHeader ¶
type MessageHeader struct {
Checksum uint64 `json:"checksum"`
Id MessageID `json:"id"`
Offset uint64 `json:"offset"`
Timestamp uint64 `json:"timestamp"`
OriginTimestamp uint64 `json:"origin_timestamp"`
UserHeaderLength uint32 `json:"user_header_length"`
PayloadLength uint32 `json:"payload_length"`
Reserved uint64 `json:"reserved"`
}
func MessageHeaderFromBytes ¶
func MessageHeaderFromBytes(data []byte) (*MessageHeader, error)
func NewMessageHeader ¶
func NewMessageHeader(id MessageID, payloadLength uint32, userHeaderLength uint32) MessageHeader
func (*MessageHeader) ToBytes ¶
func (mh *MessageHeader) ToBytes() []byte
type MessagePolling ¶
type MessagePolling byte
const ( POLLING_OFFSET MessagePolling = 1 POLLING_TIMESTAMP MessagePolling = 2 POLLING_FIRST MessagePolling = 3 POLLING_LAST MessagePolling = 4 POLLING_NEXT MessagePolling = 5 )
type PartitionContract ¶
type Partitioning ¶
type Partitioning struct {
Kind PartitioningKind
Length int
Value []byte
}
func EntityIdBytes ¶
func EntityIdBytes(value []byte) (Partitioning, error)
func EntityIdGuid ¶
func EntityIdGuid(value uuid.UUID) Partitioning
func EntityIdInt ¶
func EntityIdInt(value int) Partitioning
func EntityIdString ¶
func EntityIdString(value string) (Partitioning, error)
func EntityIdUlong ¶
func EntityIdUlong(value uint64) Partitioning
func None ¶
func None() Partitioning
func PartitionId ¶
func PartitionId(value uint32) Partitioning
type PartitioningKind ¶
type PartitioningKind int
const ( Balanced PartitioningKind = 1 PartitionIdKind PartitioningKind = 2 MessageKey PartitioningKind = 3 )
type Permissions ¶
type Permissions struct {
Global GlobalPermissions `json:"Global"`
Streams map[int]*StreamPermissions `json:"Streams,omitempty"`
}
type PersonalAccessTokenInfo ¶
type PollMessageRequest ¶
type PollMessageRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
Consumer Consumer `json:"consumer"`
PartitionId uint32 `json:"partitionId"`
PollingStrategy PollingStrategy `json:"pollingStrategy"`
Count int `json:"count"`
AutoCommit bool `json:"autoCommit"`
}
type PolledMessage ¶
type PolledMessage struct {
PartitionId uint32
CurrentOffset uint64
MessageCount uint32
Messages []IggyMessage
}
type PollingStrategy ¶
type PollingStrategy struct {
Kind MessagePolling
Value uint64
}
func FirstPollingStrategy ¶
func FirstPollingStrategy() PollingStrategy
func LastPollingStrategy ¶
func LastPollingStrategy() PollingStrategy
func NewPollingStrategy ¶
func NewPollingStrategy(kind MessagePolling, value uint64) PollingStrategy
func NextPollingStrategy ¶
func NextPollingStrategy() PollingStrategy
func OffsetPollingStrategy ¶
func OffsetPollingStrategy(value uint64) PollingStrategy
func TimestampPollingStrategy ¶
func TimestampPollingStrategy(value uint64) PollingStrategy
type RawPersonalAccessToken ¶
type RawPersonalAccessToken struct {
Token string `json:"token"`
}
type ReceivedMessage ¶
type ReceivedMessage struct {
Message IggyMessage
CurrentOffset uint64
PartitionId uint32
}
type SendMessagesRequest ¶
type SendMessagesRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
Partitioning Partitioning `json:"partitioning"`
Messages []IggyMessage `json:"messages"`
}
type Stats ¶
type Stats struct {
ProcessId uint32 `json:"process_id"`
CpuUsage float32 `json:"cpu_usage"`
TotalCpuUsage float32 `json:"total_cpu_usage"`
MemoryUsage uint64 `json:"memory_usage"`
TotalMemory uint64 `json:"total_memory"`
AvailableMemory uint64 `json:"available_memory"`
RunTime uint64 `json:"run_time"`
StartTime uint64 `json:"start_time"`
ReadBytes uint64 `json:"read_bytes"`
WrittenBytes uint64 `json:"written_bytes"`
MessagesSizeBytes uint64 `json:"messages_size_bytes"`
StreamsCount uint32 `json:"streams_count"`
TopicsCount uint32 `json:"topics_count"`
PartitionsCount uint32 `json:"partitions_count"`
SegmentsCount uint32 `json:"segments_count"`
MessagesCount uint64 `json:"messages_count"`
ClientsCount uint32 `json:"clients_count"`
ConsumerGroupsCount uint32 `json:"consumer_groups_count"`
Hostname string `json:"hostname"`
OsName string `json:"os_name"`
OsVersion string `json:"os_version"`
KernelVersion string `json:"kernel_version"`
}
type StoreConsumerOffsetRequest ¶
type StoreConsumerOffsetRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
Consumer Consumer `json:"consumer"`
PartitionId *uint32 `json:"partitionId"`
Offset uint64 `json:"offset"`
}
type StreamDetails ¶
type StreamPermissions ¶
type StreamPermissions struct {
ManageStream bool `json:"ManageStream"`
ReadStream bool `json:"ReadStream"`
ManageTopics bool `json:"ManageTopics"`
ReadTopics bool `json:"ReadTopics"`
PollMessages bool `json:"PollMessages"`
SendMessages bool `json:"SendMessages"`
Topics map[int]*TopicPermissions `json:"Topics,omitempty"`
}
type Topic ¶
type Topic struct {
Id uint32 `json:"id"`
CreatedAt uint64 `json:"createdAt"`
Name string `json:"name"`
Size uint64 `json:"size"`
MessageExpiry Duration `json:"messageExpiry"`
CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
MaxTopicSize uint64 `json:"maxTopicSize"`
ReplicationFactor uint8 `json:"replicationFactor"`
MessagesCount uint64 `json:"messagesCount"`
PartitionsCount uint32 `json:"partitionsCount"`
}
type TopicDetails ¶
type TopicDetails struct {
Topic
Partitions []PartitionContract `json:"partitions,omitempty"`
}
type TopicPermissions ¶
type TransportEndpoints ¶ added in v0.7.0
TransportEndpoints represents four 16-bit ports.
func NewTransportEndpoints ¶ added in v0.7.0
func NewTransportEndpoints(tcp, quic, http, websocket uint16) TransportEndpoints
NewTransportEndpoints constructs a TransportEndpoints value.
func (*TransportEndpoints) GetBufferSize ¶ added in v0.7.0
func (t *TransportEndpoints) GetBufferSize() int
func (*TransportEndpoints) MarshalBinary ¶ added in v0.7.0
func (t *TransportEndpoints) MarshalBinary() ([]byte, error)
func (*TransportEndpoints) String ¶ added in v0.7.0
func (t *TransportEndpoints) String() string
func (*TransportEndpoints) UnmarshalBinary ¶ added in v0.7.0
func (t *TransportEndpoints) UnmarshalBinary(b []byte) error
type UpdatePermissionsRequest ¶
type UpdatePermissionsRequest struct {
UserID Identifier `json:"-"`
Permissions *Permissions `json:"Permissions,omitempty"`
}
type UpdateTopicRequest ¶
type UpdateTopicRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
CompressionAlgorithm uint8 `json:"compressionAlgorithm"`
MessageExpiry time.Duration `json:"messageExpiry"`
MaxTopicSize uint64 `json:"maxTopicSize"`
ReplicationFactor uint8 `json:"replicationFactor"`
Name string `json:"name"`
}
type UpdateUserRequest ¶
type UpdateUserRequest struct {
UserID Identifier `json:"-"`
Username *string `json:"username"`
Status *UserStatus `json:"userStatus"`
}
type UserInfo ¶
type UserInfo struct {
Id uint32 `json:"Id"`
CreatedAt uint64 `json:"CreatedAt"`
Status UserStatus `json:"Status"`
Username string `json:"Username"`
}
type UserInfoDetails ¶
type UserInfoDetails struct {
UserInfo
Permissions *Permissions `json:"Permissions"`
}
Source Files
¶
- access_tokens.go
- client.go
- client_info.go
- cluster.go
- cluster_node.go
- command.go
- command_codes.go
- compression_algorithm.go
- config.go
- connection_info.go
- consumer.go
- consumer_groups.go
- duration.go
- identifier.go
- leader_redirection_state.go
- login.go
- message_header.go
- message_polling.go
- messages.go
- meta_data.go
- node_status.go
- offets.go
- partitions.go
- role.go
- state.go
- stats.go
- stream.go
- topics.go
- transport_endpoints.go
- user_headers.go
- users.go
- version.go