v0.11.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2016 License: MIT Imports: 18 Imported by: 0



Package hh implements a hinted handoff for writes



View Source
const (
	// DefaultMaxSize is the default maximum size of all hinted handoff queues in bytes.
	DefaultMaxSize = 1024 * 1024 * 1024

	// DefaultMaxAge is the default maximum amount of time that a hinted handoff write
	// can stay in the queue.  After this time, the write will be purged.
	DefaultMaxAge = 7 * 24 * time.Hour

	// DefaultRetryRateLimit is the default rate that hinted handoffs will be retried.
	// The rate is in bytes per second and applies across all nodes when retried.   A
	// value of 0 disables the rate limit.
	DefaultRetryRateLimit = 0

	// DefaultRetryInterval is the default amount of time the system waits before
	// attempting to flush hinted handoff queues. With each failure of a hinted
	// handoff write, this retry interval increases exponentially until it reaches
	// the maximum
	DefaultRetryInterval = time.Second

	// DefaultRetryMaxInterval is the maximum the hinted handoff retry interval
	// will ever be.
	DefaultRetryMaxInterval = time.Minute

	// DefaultPurgeInterval is the amount of time the system waits before attempting
	// to purge hinted handoff data due to age or inactive nodes.
	DefaultPurgeInterval = time.Hour


View Source
var (
	ErrNotOpen     = fmt.Errorf("queue not open")
	ErrQueueFull   = fmt.Errorf("queue is full")
	ErrSegmentFull = fmt.Errorf("segment is full")

Possible errors returned by a hinted handoff queue.

View Source
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")

ErrHintedHandoffDisabled is returned when attempting to use a disabled hinted handoff service.


func NewRateLimiter

func NewRateLimiter(limit int64) *limiter

NewRateLimiter returns a new limiter configured to restrict a process to the limit per second. limit is the maximum amount that can be used per second. The limit should be > 0. A limit <= 0, will not limit the processes.


type Config

type Config struct {
	Enabled          bool          `toml:"enabled"`
	Dir              string        `toml:"dir"`
	MaxSize          int64         `toml:"max-size"`
	MaxAge           toml.Duration `toml:"max-age"`
	RetryRateLimit   int64         `toml:"retry-rate-limit"`
	RetryInterval    toml.Duration `toml:"retry-interval"`
	RetryMaxInterval toml.Duration `toml:"retry-max-interval"`
	PurgeInterval    toml.Duration `toml:"purge-interval"`

Config is a hinted handoff configuration.

func NewConfig

func NewConfig() Config

NewConfig returns a new Config.

func (*Config) Validate added in v0.10.0

func (c *Config) Validate() error

type NodeProcessor added in v0.9.5

type NodeProcessor struct {
	PurgeInterval    time.Duration // Interval between periodic purge checks
	RetryInterval    time.Duration // Interval between periodic write-to-node attempts.
	RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts.
	MaxSize          int64         // Maximum size an underlying queue can get.
	MaxAge           time.Duration // Maximum age queue data can get before purging.
	RetryRateLimit   int64         // Limits the rate data is sent to node.

	Logger *log.Logger
	// contains filtered or unexported fields

NodeProcessor encapsulates a queue of hinted-handoff data for a node, and the transmission of the data to the node.

func NewNodeProcessor added in v0.9.5

func NewNodeProcessor(nodeID uint64, dir string, w shardWriter, m metaClient) *NodeProcessor

NewNodeProcessor returns a new NodeProcessor for the given node, using dir for the hinted-handoff data.

func (*NodeProcessor) Active added in v0.9.5

func (n *NodeProcessor) Active() (bool, error)

Active returns whether this node processor is for a currently active node.

func (*NodeProcessor) Close added in v0.9.5

func (n *NodeProcessor) Close() error

Close closes the NodeProcessor, terminating all data tranmission to the node. When closed it will not accept hinted-handoff data.

func (*NodeProcessor) Head added in v0.9.5

func (n *NodeProcessor) Head() string

Head returns the head of the processor's queue.

func (*NodeProcessor) LastModified added in v0.9.5

func (n *NodeProcessor) LastModified() (time.Time, error)

LastModified returns the time the NodeProcessor last receieved hinted-handoff data.

func (*NodeProcessor) Open added in v0.9.5

func (n *NodeProcessor) Open() error

Open opens the NodeProcessor. It will read and write data present in dir, and start transmitting data to the node. A NodeProcessor must be opened before it can accept hinted data.

func (*NodeProcessor) Purge added in v0.9.5

func (n *NodeProcessor) Purge() error

Purge deletes all hinted-handoff data under management by a NodeProcessor. The NodeProcessor should be in the closed state before calling this function.

func (*NodeProcessor) SendWrite added in v0.9.5

func (n *NodeProcessor) SendWrite() (int, error)

SendWrite attempts to sent the current block of hinted data to the target node. If successful, it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF when there is no more data or the node is inactive.

func (*NodeProcessor) Tail added in v0.9.5

func (n *NodeProcessor) Tail() string

Tail returns the tail of the processor's queue.

func (*NodeProcessor) WriteShard added in v0.9.5

func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error

WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate hinted-handoff queues, and be called concurrently, it takes a lock during queue access.

type Service

type Service struct {
	Logger *log.Logger

	MetaClient metaClient

	Monitor interface {
		RegisterDiagnosticsClient(name string, client diagnostics.Client)
		DeregisterDiagnosticsClient(name string)
	// contains filtered or unexported fields

Service represents a hinted handoff service.

func NewService

func NewService(c Config, w shardWriter, m metaClient) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

Close closes the hinted handoff service.

func (*Service) Diagnostics added in v0.9.5

func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns diagnostic information.

func (*Service) Open

func (s *Service) Open() error

Open opens the hinted handoff service.

func (*Service) SetLogger

func (s *Service) SetLogger(l *log.Logger)

SetLogger sets the internal logger to the logger passed in.

func (*Service) WriteShard

func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error

WriteShard queues the points write for shardID to node ownerID to handoff queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL