hh

package
v0.0.0-...-592d533 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2017 License: MIT Imports: 18 Imported by: 0

README

Hinted handoff

Hinted Handoff is an optional part of writes in cluster, enabled by default, with two purposes:

  1. Hinted handoff allows influxdb to offer full write availability when consistency is not required.
  2. Hinted handoff dramatically improves response consistency after temporary outages such as network failures.

How it works

When a write is performed and a replica node for the row is either known to be down ahead of time, or does not respond to the write request, the coordinator will store a hint locally, in the hh directory. This hint is basically a wrapper around the mutation indicating that it needs to be replayed to the unavailable node(s).

Once a node discovers via gossip that a node for which it holds hints has recovered, it will send the data(known as Point in inlfluxdb) corresponding to each hint to the target data node.

How data stroed inside hinted handoff

each segment has the following structue in disk.

 ┌──────────────────────────┐ ┌──────────────────────────┐ ┌────────────┐
 │         Block 1          │ │         Block 2          │ │   Footer   │
 └──────────────────────────┘ └──────────────────────────┘ └────────────┘
 ┌────────────┐┌────────────┐ ┌────────────┐┌────────────┐ ┌────────────┐
 │Block 1 Len ││Block 1 Body│ │Block 2 Len ││Block 2 Body│ │Head Offset │
 │  8 bytes   ││  N bytes   │ │  8 bytes   ││  N bytes   │ │  8 bytes   │
 └────────────┘└────────────┘ └────────────┘└────────────┘ └────────────┘

queue is a bounded, disk-backed, append-only type that combines queue and log semantics. byte slices can be appended and read back in-order. The queue maintains a pointer to the current head byte slice and can re-read from the head until it has been advanced.

 ┌─────┐
 │Head │
 ├─────┘
 │
 ▼
 ┌─────────────────┐ ┌─────────────────┐┌─────────────────┐
 │Segment 1 - 10MB │ │Segment 2 - 10MB ││Segment 3 - 10MB │
 └─────────────────┘ └─────────────────┘└─────────────────┘
                                                          ▲
                                                          │
                                                          │
                                                     ┌─────┐
                                                     │Tail │
                                                     └─────┘

How hinted handoff perfrom write requests

type shardWriter interface {
	WriteShard(shardID, ownerID uint64, points []models.Point) error
}

type metaClient interface {
	DataNode(id uint64) (ni *meta.NodeInfo, err error)
}

shardWriter will write points into a data node whose node is is owner id. This must be done via rpc call. This requires tcp host infor which can be achieved from metaClient.

How data stored in dish in hinted handoff system

For any incoming write request, if coordinator detect some request failed to write to some node(this is realted with consistency level), corrdinator node will forward such request into hinted handoff service. When hinted handoff receieved that request, it will write shardID and all Points into disk file(known as segment). ownerID is the name of the directory in hh.

Elements in hinted handoff system

Segment
Queue
Node Processor

Each node processor is recognized by nodeID. In addition, nodeID can be used for sedning pending write into these offline node storage. Inside node processor, there are two interface:

  1. metaClient
  2. shardWriter

metaClient has only one method DataNode(id uint64). This can is the way to determine current node is online or offline. If such node is online, then this method will return normally. If not, then such node is offline.

shardWriter has only one method:

	WriteShard(shardID, ownerID uint64, points []models.Point) error

This method is used when node processor trying to empty pending write.

Documentation

Overview

Package hh implements a hinted handoff for writes

Index

Constants

View Source
const (
	// DefaultMaxSize is the default maximum size of all hinted handoff queues in bytes.
	DefaultMaxSize = 1024 * 1024 * 1024

	// DefaultMaxAge is the default maximum amount of time that a hinted handoff write
	// can stay in the queue.  After this time, the write will be purged.
	DefaultMaxAge = 7 * 24 * time.Hour

	// DefaultConcurrency is the default number of queued writes to process at a time .
	DefaultRetryConcurrency = 20

	// DefaultRetryRateLimit is the default rate that hinted handoffs will be retried.
	// The rate is in bytes per second and applies across all nodes when retried.   A
	// value of 0 disables the rate limit.
	DefaultRetryRateLimit = 0

	// DefaultRetryInterval is the default amount of time the system waits before
	// attempting to flush hinted handoff queues. With each failure of a hinted
	// handoff write, this retry interval increases exponentially until it reaches
	// the maximum
	DefaultRetryInterval = time.Second

	// DefaultRetryMaxInterval is the maximum the hinted handoff retry interval
	// will ever be.
	DefaultRetryMaxInterval = time.Minute

	// DefaultPurgeInterval is the amount of time the system waits before attempting
	// to purge hinted handoff data due to age or inactive nodes.
	DefaultPurgeInterval = time.Hour
)

Variables

View Source
var (
	ErrNotOpen     = fmt.Errorf("queue not open")
	ErrQueueFull   = fmt.Errorf("queue is full")
	ErrSegmentFull = fmt.Errorf("segment is full")
)

Possible errors returned by a hinted handoff queue.

View Source
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")

ErrHintedHandoffDisabled is returned when attempting to use a disabled hinted handoff service.

Functions

func NewRateLimiter

func NewRateLimiter(limit int64) *limiter

NewRateLimiter returns a new limiter configured to restrict a process to the limit per second. limit is the maximum amount that can be used per second. The limit should be > 0. A limit <= 0, will not limit the processes.

Types

type Config

type Config struct {
	Enabled          bool          `toml:"enabled"`
	Dir              string        `toml:"dir"`
	MaxSize          int64         `toml:"max-size"`
	MaxAge           toml.Duration `toml:"max-age"`
	RetryConcurrency int64         `toml:"retry-concurrency"`
	RetryRateLimit   int64         `toml:"retry-rate-limit"`
	RetryInterval    toml.Duration `toml:"retry-interval"`
	RetryMaxInterval toml.Duration `toml:"retry-max-interval"`
	PurgeInterval    toml.Duration `toml:"purge-interval"`
}

Config is a hinted handoff configuration.

func NewConfig

func NewConfig() Config

NewConfig returns a new Config.

func (*Config) Validate

func (c *Config) Validate() error

type HHStatistics

type HHStatistics struct {
	NodeProcessorCreated int64
	NodeProcessorOpened  int64
}

HHStatistics keeps all statistcs realted with hinted handoff service

type NodeProcessor

type NodeProcessor struct {
	PurgeInterval    time.Duration // Interval between periodic purge checks
	RetryInterval    time.Duration // Interval between periodic write-to-node attempts.
	RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts.
	MaxSize          int64         // Maximum size an underlying queue can get.
	MaxAge           time.Duration // Maximum age queue data can get before purging.
	RetryRateLimit   int64         // Limits the rate data is sent to node.

	Logger zap.Logger
	// contains filtered or unexported fields
}

NodeProcessor encapsulates a queue of hinted-handoff data for a node, and the transmission of the data to the node.

func NewNodeProcessor

func NewNodeProcessor(nodeID uint64, dir string, w shardWriter, m metaClient) *NodeProcessor

NewNodeProcessor returns a new NodeProcessor for the given node, using dir for the hinted-handoff data.

func (*NodeProcessor) Active

func (n *NodeProcessor) Active() (bool, error)

Active returns whether this node processor is for a currently active node.

func (*NodeProcessor) Close

func (n *NodeProcessor) Close() error

Close closes the NodeProcessor, terminating all data tranmission to the node. When closed it will not accept hinted-handoff data.

func (*NodeProcessor) Closed

func (n *NodeProcessor) Closed() bool

Closed will return true if node processor is currently closed

func (*NodeProcessor) Empty

func (n *NodeProcessor) Empty() bool

func (*NodeProcessor) Head

func (n *NodeProcessor) Head() string

Head returns the head of the processor's queue.

func (*NodeProcessor) LastModified

func (n *NodeProcessor) LastModified() (time.Time, error)

LastModified returns the time the NodeProcessor last receieved hinted-handoff data.

func (*NodeProcessor) Open

func (n *NodeProcessor) Open() error

Open opens the NodeProcessor. It will read and write data present in dir, and start transmitting data to the node. A NodeProcessor must be opened before it can accept hinted data.

func (*NodeProcessor) Purge

func (n *NodeProcessor) Purge() error

Purge deletes all hinted-handoff data under management by a NodeProcessor. The NodeProcessor should be in the closed state before calling this function.

func (*NodeProcessor) SendWrite

func (n *NodeProcessor) SendWrite() (int, error)

SendWrite attempts to sent the current block of hinted data to the target node. If successful, it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF when there is no more data or the node is inactive.

func (*NodeProcessor) Statistics

func (n *NodeProcessor) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*NodeProcessor) Tail

func (n *NodeProcessor) Tail() string

Tail returns the tail of the processor's queue.

func (*NodeProcessor) WriteShard

func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error

WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate hinted-handoff queues, and be called concurrently, it takes a lock during queue access.

type Service

type Service struct {
	Logger zap.Logger

	MetaClient metaClient

	Monitor interface {
		RegisterDiagnosticsClient(name string, client diagnostics.Client)
		DeregisterDiagnosticsClient(name string)
	}
	// contains filtered or unexported fields
}

Service represents a hinted handoff service.

func NewService

func NewService(c Config, w shardWriter, m metaClient) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

Close closes the hinted handoff service.

func (*Service) Diagnostics

func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns diagnostic information.

func (*Service) Empty

func (s *Service) Empty(id uint64) bool

func (*Service) Open

func (s *Service) Open() error

Open opens the hinted handoff service.

func (*Service) Statistics

func (s *Service) Statistics(tags map[string]string) []models.Statistic

func (*Service) WithLogger

func (s *Service) WithLogger(log zap.Logger)

WithLogger sets the internal logger to the logger passed in

func (*Service) WriteShard

func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error

WriteShard queues the points write for shardID to node ownerID to handoff queue

type Statistics

type Statistics struct {
	WriteShardReq                int64
	WriteShardReqPoints          int64
	WriteNodeReqFail             int64
	WriteNodeReqPoints           int64
	WriteNodeReq                 int64
	WriteShardConcurrentlyReq    int64
	WriteShardConcurrentlyFail   int64
	WriteShardConcurrentlyPoints int64
	WriteDiskBytes               int64
	WriteDiskSegments            int64
}

Statistics keeps statistics related to the hinted handoff

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL