Documentation
¶
Index ¶
- Constants
- Variables
- func DoBatch(ctx context.Context, r ReadRing, keys []uint32, ...) error
- func GenerateTokens(numTokens int, takenTokens []uint32) []uint32
- func ProtoDescFactory() proto.Message
- type ByToken
- type CASCallback
- type Codec
- type Config
- type ConsulConfig
- type Desc
- func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState, normaliseTokens bool)
- func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32
- func (*Desc) Descriptor() ([]byte, []int)
- func (this *Desc) Equal(that interface{}) bool
- func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc
- func (m *Desc) GetIngesters() map[string]IngesterDesc
- func (m *Desc) GetTokens() []TokenDesc
- func (this *Desc) GoString() string
- func (m *Desc) Marshal() (dAtA []byte, err error)
- func (m *Desc) MarshalTo(dAtA []byte) (int, error)
- func (*Desc) ProtoMessage()
- func (d *Desc) Ready(heartbeatTimeout time.Duration) error
- func (d *Desc) RemoveIngester(id string)
- func (m *Desc) Reset()
- func (m *Desc) Size() (n int)
- func (this *Desc) String() string
- func (d *Desc) TokensFor(id string) (tokens, other []uint32)
- func (m *Desc) Unmarshal(dAtA []byte) error
- func (m *Desc) XXX_DiscardUnknown()
- func (m *Desc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Desc) XXX_Merge(src proto.Message)
- func (m *Desc) XXX_Size() int
- func (m *Desc) XXX_Unmarshal(b []byte) error
- type FlushTransferer
- type IngesterDesc
- func (*IngesterDesc) Descriptor() ([]byte, []int)
- func (this *IngesterDesc) Equal(that interface{}) bool
- func (m *IngesterDesc) GetAddr() string
- func (m *IngesterDesc) GetState() IngesterState
- func (m *IngesterDesc) GetTimestamp() int64
- func (m *IngesterDesc) GetTokens() []uint32
- func (this *IngesterDesc) GoString() string
- func (m *IngesterDesc) Marshal() (dAtA []byte, err error)
- func (m *IngesterDesc) MarshalTo(dAtA []byte) (int, error)
- func (*IngesterDesc) ProtoMessage()
- func (m *IngesterDesc) Reset()
- func (m *IngesterDesc) Size() (n int)
- func (this *IngesterDesc) String() string
- func (m *IngesterDesc) Unmarshal(dAtA []byte) error
- func (m *IngesterDesc) XXX_DiscardUnknown()
- func (m *IngesterDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *IngesterDesc) XXX_Merge(src proto.Message)
- func (m *IngesterDesc) XXX_Size() int
- func (m *IngesterDesc) XXX_Unmarshal(b []byte) error
- type IngesterState
- type KVClient
- type KVConfig
- type Lifecycler
- type LifecyclerConfig
- type Operation
- type ProtoCodec
- type ReadRing
- type ReplicationSet
- type Ring
- func (r *Ring) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error)
- func (r *Ring) Collect(ch chan<- prometheus.Metric)
- func (r *Ring) Describe(ch chan<- *prometheus.Desc)
- func (r *Ring) Get(key uint32, op Operation) (ReplicationSet, error)
- func (r *Ring) GetAll() (ReplicationSet, error)
- func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool
- func (r *Ring) ReplicationFactor() int
- func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (r *Ring) Stop()
- type TokenDesc
- func (*TokenDesc) Descriptor() ([]byte, []int)
- func (this *TokenDesc) Equal(that interface{}) bool
- func (m *TokenDesc) GetIngester() string
- func (m *TokenDesc) GetToken() uint32
- func (this *TokenDesc) GoString() string
- func (m *TokenDesc) Marshal() (dAtA []byte, err error)
- func (m *TokenDesc) MarshalTo(dAtA []byte) (int, error)
- func (*TokenDesc) ProtoMessage()
- func (m *TokenDesc) Reset()
- func (m *TokenDesc) Size() (n int)
- func (this *TokenDesc) String() string
- func (m *TokenDesc) Unmarshal(dAtA []byte) error
- func (m *TokenDesc) XXX_DiscardUnknown()
- func (m *TokenDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TokenDesc) XXX_Merge(src proto.Message)
- func (m *TokenDesc) XXX_Size() int
- func (m *TokenDesc) XXX_Unmarshal(b []byte) error
Constants ¶
const (
// ConsulKey is the key under which we store the ring in consul.
ConsulKey = "ring"
)
Variables ¶
var ( ErrInvalidLengthRing = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowRing = fmt.Errorf("proto: integer overflow") )
var ErrEmptyRing = errors.New("empty ring")
ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
var ( // ErrNotFound is returned by ConsulClient.Get. ErrNotFound = fmt.Errorf("Not found") )
var IngesterState_name = map[int32]string{
0: "ACTIVE",
1: "LEAVING",
2: "PENDING",
3: "JOINING",
}
var IngesterState_value = map[string]int32{
"ACTIVE": 0,
"LEAVING": 1,
"PENDING": 2,
"JOINING": 3,
}
Functions ¶
func DoBatch ¶
func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error) error
DoBatch request against a set of keys in the ring, handling replication and failures. For example if we want to write N items where they may all hit different ingesters, and we want them all replicated R ways with quorum writes, we track the relationship between batch RPCs and the items within them.
Callback is passed the ingester to target, and the indexes of the keys to send to that ingester.
Not implemented as a method on Ring so we can test separately.
func GenerateTokens ¶
GenerateTokens make numTokens random tokens, none of which clash with takenTokens. Assumes takenTokens is sorted.
Types ¶
type CASCallback ¶ added in v1.0.0
CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil.
type Config ¶
type Config struct { KVStore KVConfig `yaml:"kvstore,omitempty"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"` ReplicationFactor int `yaml:"replication_factor,omitempty"` }
Config for a Ring
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
type ConsulConfig ¶ added in v1.0.0
type ConsulConfig struct { Host string Prefix string ACLToken string HTTPClientTimeout time.Duration ConsistentReads bool }
ConsulConfig to create a ConsulClient
func (*ConsulConfig) RegisterFlags ¶ added in v1.0.0
func (cfg *ConsulConfig) RegisterFlags(f *flag.FlagSet, prefix string)
RegisterFlags adds the flags required to config this to the given FlagSet If prefix is not an empty string it should end with a period.
type Desc ¶
type Desc struct { Ingesters map[string]IngesterDesc `` /* 149-byte string literal not displayed */ Tokens []TokenDesc `protobuf:"bytes,2,rep,name=tokens,proto3" json:"tokens"` }
func (*Desc) AddIngester ¶
func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState, normaliseTokens bool)
AddIngester adds the given ingester to the ring.
func (*Desc) ClaimTokens ¶
ClaimTokens transfers all the tokens from one ingester to another, returning the claimed token.
func (*Desc) Descriptor ¶
func (*Desc) FindIngestersByState ¶
func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc
FindIngestersByState returns the list of ingesters in the given state
func (*Desc) GetIngesters ¶
func (m *Desc) GetIngesters() map[string]IngesterDesc
func (*Desc) ProtoMessage ¶
func (*Desc) ProtoMessage()
func (*Desc) RemoveIngester ¶
RemoveIngester removes the given ingester and all its tokens.
func (*Desc) TokensFor ¶
TokensFor partitions the tokens into those for the given ID, and those for others.
func (*Desc) XXX_DiscardUnknown ¶
func (m *Desc) XXX_DiscardUnknown()
func (*Desc) XXX_Unmarshal ¶
type FlushTransferer ¶
type FlushTransferer interface { StopIncomingRequests() Flush() TransferOut(ctx context.Context) error }
FlushTransferer controls the shutdown of an ingester.
type IngesterDesc ¶
type IngesterDesc struct { Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"` Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` State IngesterState `protobuf:"varint,3,opt,name=state,proto3,enum=ring.IngesterState" json:"state,omitempty"` Tokens []uint32 `protobuf:"varint,6,rep,packed,name=tokens,proto3" json:"tokens,omitempty"` }
func (*IngesterDesc) Descriptor ¶
func (*IngesterDesc) Descriptor() ([]byte, []int)
func (*IngesterDesc) Equal ¶
func (this *IngesterDesc) Equal(that interface{}) bool
func (*IngesterDesc) GetAddr ¶
func (m *IngesterDesc) GetAddr() string
func (*IngesterDesc) GetState ¶
func (m *IngesterDesc) GetState() IngesterState
func (*IngesterDesc) GetTimestamp ¶
func (m *IngesterDesc) GetTimestamp() int64
func (*IngesterDesc) GetTokens ¶
func (m *IngesterDesc) GetTokens() []uint32
func (*IngesterDesc) GoString ¶
func (this *IngesterDesc) GoString() string
func (*IngesterDesc) Marshal ¶
func (m *IngesterDesc) Marshal() (dAtA []byte, err error)
func (*IngesterDesc) ProtoMessage ¶
func (*IngesterDesc) ProtoMessage()
func (*IngesterDesc) Reset ¶
func (m *IngesterDesc) Reset()
func (*IngesterDesc) Size ¶
func (m *IngesterDesc) Size() (n int)
func (*IngesterDesc) String ¶
func (this *IngesterDesc) String() string
func (*IngesterDesc) Unmarshal ¶
func (m *IngesterDesc) Unmarshal(dAtA []byte) error
func (*IngesterDesc) XXX_DiscardUnknown ¶
func (m *IngesterDesc) XXX_DiscardUnknown()
func (*IngesterDesc) XXX_Marshal ¶
func (m *IngesterDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*IngesterDesc) XXX_Merge ¶
func (m *IngesterDesc) XXX_Merge(src proto.Message)
func (*IngesterDesc) XXX_Size ¶
func (m *IngesterDesc) XXX_Size() int
func (*IngesterDesc) XXX_Unmarshal ¶
func (m *IngesterDesc) XXX_Unmarshal(b []byte) error
type IngesterState ¶
type IngesterState int32
const ( ACTIVE IngesterState = 0 LEAVING IngesterState = 1 PENDING IngesterState = 2 JOINING IngesterState = 3 )
func (IngesterState) EnumDescriptor ¶
func (IngesterState) EnumDescriptor() ([]byte, []int)
func (IngesterState) String ¶
func (x IngesterState) String() string
type KVClient ¶ added in v1.0.0
type KVClient interface { CAS(ctx context.Context, key string, f CASCallback) error WatchKey(ctx context.Context, key string, f func(interface{}) bool) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) Get(ctx context.Context, key string) (interface{}, error) PutBytes(ctx context.Context, key string, buf []byte) error }
KVClient is a high-level client for Consul, that exposes operations such as CAS and Watch which take callbacks. It also deals with serialisation by having an instance factory passed in to methods and deserialising into that.
func NewConsulClient ¶ added in v1.0.0
func NewConsulClient(cfg ConsulConfig, codec Codec) (KVClient, error)
NewConsulClient returns a new ConsulClient.
func NewInMemoryKVClient ¶ added in v1.0.0
NewInMemoryKVClient makes a new mock consul client.
func NewKVStore ¶ added in v1.0.0
NewKVStore creates a new KVstore client (inmemory or consul) based on the config, encodes and decodes data for storage using the codec.
func PrefixClient ¶ added in v1.0.0
PrefixClient takes a ConsulClient and forces a prefix on all its operations.
type KVConfig ¶ added in v1.0.0
type KVConfig struct { Store string `yaml:"store,omitempty"` Consul ConsulConfig `yaml:"consul,omitempty"` Mock KVClient }
KVConfig is config for a KVStore currently used by ring and HA tracker, where store can be consul or inmemory.
func (*KVConfig) RegisterFlagsWithPrefix ¶ added in v1.0.0
RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. If prefix is an empty string we will register consul flags with no prefix and the store flag with the prefix ring, so ring.store. For everything else we pass the prefix to the Consul flags. If prefix is not an empty string it should end with a period.
type Lifecycler ¶
type Lifecycler struct { KVStore KVClient // These values are initialised at startup, and never change ID string Addr string RingName string // contains filtered or unexported fields }
Lifecycler is responsible for managing the lifecycle of entries in the ring.
func NewLifecycler ¶
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, name string) (*Lifecycler, error)
NewLifecycler makes and starts a new Lifecycler.
func (*Lifecycler) ChangeState ¶
func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
ChangeState of the ingester, for use off of the loop() goroutine.
func (*Lifecycler) CheckReady ¶
func (i *Lifecycler) CheckReady(ctx context.Context) error
CheckReady is used to rate limit the number of ingesters that can be coming or going at any one time, by only returning true if all ingesters are active. The state latches: once we have gone ready we don't go un-ready
func (*Lifecycler) ClaimTokensFor ¶
func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error
ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
func (*Lifecycler) GetState ¶
func (i *Lifecycler) GetState() IngesterState
GetState returns the state of this ingester.
func (*Lifecycler) Shutdown ¶
func (i *Lifecycler) Shutdown()
Shutdown the lifecycle. It will: - send chunks to another ingester, if it can. - otherwise, flush chunks to the chunk store. - remove config from Consul. - block until we've successfully shutdown.
type LifecyclerConfig ¶
type LifecyclerConfig struct { RingConfig Config `yaml:"ring,omitempty"` // Config for the ingester lifecycle control ListenPort *int NumTokens int `yaml:"num_tokens,omitempty"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period,omitempty"` JoinAfter time.Duration `yaml:"join_after,omitempty"` MinReadyDuration time.Duration `yaml:"min_ready_duration,omitempty"` ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"` NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address"` Port int ID string SkipUnregister bool }
LifecyclerConfig is the config to build a Lifecycler.
func (*LifecyclerConfig) RegisterFlags ¶
func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*LifecyclerConfig) RegisterFlagsWithPrefix ¶
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
type ProtoCodec ¶ added in v1.0.0
ProtoCodec is a Codec for proto/snappy
func (ProtoCodec) Decode ¶ added in v1.0.0
func (p ProtoCodec) Decode(bytes []byte) (interface{}, error)
Decode implements Codec
func (ProtoCodec) Encode ¶ added in v1.0.0
func (p ProtoCodec) Encode(msg interface{}) ([]byte, error)
Encode implements Codec
type ReadRing ¶
type ReadRing interface { prometheus.Collector Get(key uint32, op Operation) (ReplicationSet, error) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) GetAll() (ReplicationSet, error) ReplicationFactor() int }
ReadRing represents the read inferface to the ring.
type ReplicationSet ¶
type ReplicationSet struct { Ingesters []IngesterDesc MaxErrors int }
ReplicationSet describes the ingesters to talk to for a given key, and how many errors to tolerate.
func (ReplicationSet) Do ¶
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(*IngesterDesc) (interface{}, error)) ([]interface{}, error)
Do function f in parallel for all replicas in the set, erroring is we exceed MaxErrors and returning early otherwise.
type Ring ¶
type Ring struct { KVClient KVClient // contains filtered or unexported fields }
Ring holds the information about the members of the consistent hash ring.
func (*Ring) BatchGet ¶
func (r *Ring) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error)
BatchGet returns ReplicationFactor (or more) ingesters which form the replicas for the given keys. The order of the result matches the order of the input.
func (*Ring) Collect ¶
func (r *Ring) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector.
func (*Ring) Describe ¶
func (r *Ring) Describe(ch chan<- *prometheus.Desc)
Describe implements prometheus.Collector.
func (*Ring) Get ¶
func (r *Ring) Get(key uint32, op Operation) (ReplicationSet, error)
Get returns n (or more) ingesters which form the replicas for the given key.
func (*Ring) GetAll ¶
func (r *Ring) GetAll() (ReplicationSet, error)
GetAll returns all available ingesters in the ring.
func (*Ring) IsHealthy ¶
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool
IsHealthy checks whether an ingester appears to be alive and heartbeating
func (*Ring) ReplicationFactor ¶
ReplicationFactor of the ring.
type TokenDesc ¶
type TokenDesc struct { Token uint32 `protobuf:"varint,1,opt,name=token,proto3" json:"token,omitempty"` Ingester string `protobuf:"bytes,2,opt,name=ingester,proto3" json:"ingester,omitempty"` }
func (*TokenDesc) Descriptor ¶
func (*TokenDesc) GetIngester ¶
func (*TokenDesc) ProtoMessage ¶
func (*TokenDesc) ProtoMessage()
func (*TokenDesc) XXX_DiscardUnknown ¶
func (m *TokenDesc) XXX_DiscardUnknown()