hh

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2015 License: MIT, MIT Imports: 17 Imported by: 0

Documentation

Overview

Package hh implements a hinted handoff for writes

Index

Constants

View Source
const (
	// DefaultMaxSize is the default maximum size of all hinted handoff queues in bytes.
	DefaultMaxSize = 1024 * 1024 * 1024

	// DefaultMaxAge is the default maximum amount of time that a hinted handoff write
	// can stay in the queue.  After this time, the write will be purged.
	DefaultMaxAge = 7 * 24 * time.Hour

	// DefaultRetryRateLimit is the default rate that hinted handoffs will be retried.
	// The rate is in bytes per second and applies across all nodes when retried.   A
	// value of 0 disables the rate limit.
	DefaultRetryRateLimit = 0

	// DefaultRetryInterval is the default amount of time the system waits before
	// attempting to flush hinted handoff queues. With each failure of a hinted
	// handoff write, this retry interval increases exponentially until it reaches
	// the maximum
	DefaultRetryInterval = time.Second

	// DefaultRetryMaxInterval is the maximum the hinted handoff retry interval
	// will ever be.
	DefaultRetryMaxInterval = time.Minute

	// DefaultPurgeInterval is the amount of time the system waits before attempting
	// to purge hinted handoff data due to age or inactive nodes.
	DefaultPurgeInterval = time.Hour
)

Variables

View Source
var (
	ErrNotOpen     = fmt.Errorf("queue not open")
	ErrQueueFull   = fmt.Errorf("queue is full")
	ErrSegmentFull = fmt.Errorf("segment is full")
)
View Source
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")

Functions

func NewRateLimiter

func NewRateLimiter(limit int64) *limiter

NewRateLimiter returns a new limiter configured to restrict a process to the limit per second. limit is the maximum amount that can be used per second. The limit should be > 0. A limit <= 0, will not limit the processes.

Types

type Config

type Config struct {
	Enabled          bool          `toml:"enabled"`
	Dir              string        `toml:"dir"`
	MaxSize          int64         `toml:"max-size"`
	MaxAge           toml.Duration `toml:"max-age"`
	RetryRateLimit   int64         `toml:"retry-rate-limit"`
	RetryInterval    toml.Duration `toml:"retry-interval"`
	RetryMaxInterval toml.Duration `toml:"retry-max-interval"`
	PurgeInterval    toml.Duration `toml:"purge-interval"`
}

func NewConfig

func NewConfig() Config

type NodeProcessor added in v0.2.1

type NodeProcessor struct {
	PurgeInterval    time.Duration // Interval between periodic purge checks
	RetryInterval    time.Duration // Interval between periodic write-to-node attempts.
	RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts.
	MaxSize          int64         // Maximum size an underlying queue can get.
	MaxAge           time.Duration // Maximum age queue data can get before purging.
	RetryRateLimit   int64         // Limits the rate data is sent to node.

	Logger *log.Logger
	// contains filtered or unexported fields
}

NodeProcessor encapsulates a queue of hinted-handoff data for a node, and the transmission of the data to the node.

func NewNodeProcessor added in v0.2.1

func NewNodeProcessor(nodeID uint64, dir string, w shardWriter, m metaStore) *NodeProcessor

NewNodeProcessor returns a new NodeProcessor for the given node, using dir for the hinted-handoff data.

func (*NodeProcessor) Active added in v0.2.1

func (n *NodeProcessor) Active() (bool, error)

Active returns whether this node processor is for a currently active node.

func (*NodeProcessor) Close added in v0.2.1

func (n *NodeProcessor) Close() error

Close closes the NodeProcessor, terminating all data tranmission to the node. When closed it will not accept hinted-handoff data.

func (*NodeProcessor) Head added in v0.2.1

func (n *NodeProcessor) Head() string

func (*NodeProcessor) LastModified added in v0.2.1

func (n *NodeProcessor) LastModified() (time.Time, error)

LastModified returns the time the NodeProcessor last receieved hinted-handoff data.

func (*NodeProcessor) Open added in v0.2.1

func (n *NodeProcessor) Open() error

Open opens the NodeProcessor. It will read and write data present in dir, and start transmitting data to the node. A NodeProcessor must be opened before it can accept hinted data.

func (*NodeProcessor) Purge added in v0.2.1

func (n *NodeProcessor) Purge() error

Purge deletes all hinted-handoff data under management by a NodeProcessor. The NodeProcessor should be in the closed state before calling this function.

func (*NodeProcessor) SendWrite added in v0.2.1

func (n *NodeProcessor) SendWrite() (int, error)

SendWrite attempts to sent the current block of hinted data to the target node. If successful, it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF when there is no more data or the node is inactive.

func (*NodeProcessor) Tail added in v0.2.1

func (n *NodeProcessor) Tail() string

func (*NodeProcessor) WriteShard added in v0.2.1

func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error

WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate hinted-handoff queues, and be called concurrently, it takes a lock during queue access.

type Service

type Service struct {
	Logger *log.Logger

	Monitor interface {
		RegisterDiagnosticsClient(name string, client monitor.DiagsClient)
		DeregisterDiagnosticsClient(name string)
	}
	// contains filtered or unexported fields
}

func NewService

func NewService(c Config, w shardWriter, m metaStore) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

func (*Service) Diagnostics added in v0.2.1

func (s *Service) Diagnostics() (*monitor.Diagnostic, error)

Diagnostics returns diagnostic information.

func (*Service) Open

func (s *Service) Open() error

func (*Service) SetLogger

func (s *Service) SetLogger(l *log.Logger)

SetLogger sets the internal logger to the logger passed in.

func (*Service) WriteShard

func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error

WriteShard queues the points write for shardID to node ownerID to handoff queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL