app

package
Version: v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2015 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const CompressedMessages = "compressed_messages"

CompressedMessages is the name of the config setting name for controlling if the queue is using compression or not

View Source
const ConfigurationBucket = "config"

ConfigurationBucket is the name of the riak bucket holding the config

View Source
const MaxPartitionAge = "max_partition_age"

MaxPartitionAge is the name of the config setting name for controlling how long an un-used partition should exist

View Source
const MaxPartitions = "max_partitions"

MaxPartitions is the name of the config setting name for controlling the maximum number of partitions per node

View Source
const MinPartitions = "min_partitions"

MinPartitions is the name of the config setting name for controlling the minimum number of partitions per queue

View Source
const NoPartitions string = "no available partitions"

NoPartitions represents the message that there were no available partitions

View Source
const PartitionCount = "partition_count"

PartitionCount is

View Source
const QueueConfigName = "queue_config"

QueueConfigName is the key in the riak bucket holding the config

View Source
const QueueDeletedStatsSuffix = "deleted.count"

QueueDeletedStatsSuffix is

View Source
const QueueDepthAprStatsSuffix = "approximate_depth.count"

QueueDepthAprStatsSuffix is

View Source
const QueueDepthStatsSuffix = "depth.count"

QueueDepthStatsSuffix is

View Source
const QueueFillDeltaStatsSuffix = "fill.count"

QueueFillDeltaStatsSuffix

View Source
const QueueReceivedStatsSuffix = "received.count"

QueueReceivedStatsSuffix is

View Source
const QueueSentStatsSuffix = "sent.count"

QueueSentStatsSuffix is

View Source
const QueueSetName = "queues"

QueueSetName is the crdt key holding the set of all queues

View Source
const VisibilityTimeout = "visibility_timeout"

VisibilityTimeout is the name of the config setting name for controlling how long a message is "inflight"

Variables

View Source
var DefaultSettings = map[string]string{VisibilityTimeout: "30", PartitionCount: "5", MinPartitions: "1", MaxPartitions: "10", MaxPartitionAge: "432000", CompressedMessages: "false"}

DefaultSettings is

View Source
var (
	// ErrConfigurationOptionNotFound represents the condition that occurs if an invalid
	// location is specified for the config file
	ErrConfigurationOptionNotFound = errors.New("Configuration Value Not Found")
)
View Source
var MaxIDSize = *big.NewInt(math.MaxInt64)

MaxIDSize is

Settings Arrays and maps cannot be made immutable in golang

Functions

func GetNodePartitionRange

func GetNodePartitionRange(cfg *Config, list *memberlist.Memberlist) (int, int)

GetNodePartitionRange returns the range of partitions active for this node

func InitMemberList

func InitMemberList(name string, port int, seedServers []string, seedPort int) (*memberlist.Memberlist, int, error)

InitMemberList created a memberlist, and joins it to the network TODO clean this up, since we only really need the 1 port

Types

type Config

type Config struct {
	Core       Core
	Stats      Stats
	Compressor compressor.Compressor
	Queues     *Queues
	RiakPool   *riak.Client
	Topics     *Topics
}

Config is

func GetCoreConfig

func GetCoreConfig(configFile *string) (*Config, error)

GetCoreConfig is

func (*Config) GetCompressedMessages

func (cfg *Config) GetCompressedMessages(queueName string) (bool, error)

GetCompressedMessages is

func (*Config) GetMaxPartitionAge

func (cfg *Config) GetMaxPartitionAge(queueName string) (float64, error)

GetMaxPartitionAge is

func (*Config) GetMaxPartitions

func (cfg *Config) GetMaxPartitions(queueName string) (int, error)

GetMaxPartitions is

func (*Config) GetMinPartitions

func (cfg *Config) GetMinPartitions(queueName string) (int, error)

GetMinPartitions is

func (*Config) GetVisibilityTimeout

func (cfg *Config) GetVisibilityTimeout(queueName string) (float64, error)

GetVisibilityTimeout is

func (*Config) InitializeQueue

func (cfg *Config) InitializeQueue(queueName string) error

InitializeQueue is

func (*Config) RiakConnection

func (cfg *Config) RiakConnection() *riak.Client

RiakConnection returns a pointer to the current pool of riak connections, which is abstracted inside of the riak.Client object

func (*Config) SetCompressedMessages

func (cfg *Config) SetCompressedMessages(queueName string, compressedMessages bool) error

SetCompressedMessages is

func (*Config) SetMaxPartitionAge

func (cfg *Config) SetMaxPartitionAge(queueName string, age float64) error

SetMaxPartitionAge is

func (*Config) SetMaxPartitions

func (cfg *Config) SetMaxPartitions(queueName string, timeout int) error

SetMaxPartitions is

func (*Config) SetMinPartitions

func (cfg *Config) SetMinPartitions(queueName string, timeout int) error

SetMinPartitions is

func (*Config) SetVisibilityTimeout

func (cfg *Config) SetVisibilityTimeout(queueName string, timeout float64) error

SetVisibilityTimeout is

type ConfigRequest

type ConfigRequest struct {
	VisibilityTimeout  *float64 `json:"visibility_timeout,omitempty"`
	MinPartitions      *int     `json:"min_partitions,omitempty"`
	MaxPartitions      *int     `json:"max_partitions,omitempty"`
	MaxPartitionAge    *float64 `json:"max_partition_age,omitempty"`
	CompressedMessages *bool    `json:"compressed_messages,omitempty"`
}

ConfigRequest is

type Core

type Core struct {
	Name                  string
	Port                  int
	SeedServer            string
	SeedPort              int
	SeedServers           []string
	HTTPPort              int
	RiakNodes             string
	BackendConnectionPool int
	SyncConfigInterval    time.Duration
	LogLevel              logrus.Level
	LogLevelString        string
}

Core is

type HTTPApiV1

type HTTPApiV1 struct {
}

HTTPApiV1 is

func (HTTPApiV1) InitWebserver

func (h HTTPApiV1) InitWebserver(list *memberlist.Memberlist, cfg *Config)

InitWebserver is

type Partition

type Partition struct {
	ID       int
	LastUsed time.Time
}

Partition represents the logical boundary around subsets of the overall keyspace

type Partitions

type Partitions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Partitions represents a collecton of Partition objects

func InitPartitions

func InitPartitions(cfg *Config, queueName string) *Partitions

InitPartitions creates a series of partitions based on the provided config and queue

func (*Partitions) GetPartition

func (part *Partitions) GetPartition(cfg *Config, queueName string, list *memberlist.Memberlist) (int, int, *Partition, error)

GetPartition pops a partition off of the queue for the specified queue

func (*Partitions) PartitionCount

func (part *Partitions) PartitionCount() int

PartitionCount returns the count of known partitions

func (*Partitions) PushPartition

func (part *Partitions) PushPartition(cfg *Config, queueName string, partition *Partition, lock bool)

PushPartition pushes a partition back onto the queue for the given queue

type Queue

type Queue struct {
	// the definition of a queue
	// name of the queue
	Name string
	// the partitions of the queue
	Parts *Partitions
	// Individual settings for the queue
	Config *riak.RDtMap
	// Mutex for protecting rw access to the Config object
	sync.RWMutex
}

Queue represents

func (*Queue) BatchDelete

func (queue *Queue) BatchDelete(cfg *Config, ids []string) (int, error)

BatchDelete deletes multiple messages at once

func (*Queue) Delete

func (queue *Queue) Delete(cfg *Config, id string) bool

Delete deletes a Message from the queue

func (*Queue) Get

func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int64) ([]riak.RObject, error)

Get gets a message from the queue

func (*Queue) Put

func (queue *Queue) Put(cfg *Config, message string) string

Put puts a Message onto the queue

func (*Queue) RetrieveMessages

func (queue *Queue) RetrieveMessages(ids []string, cfg *Config) []riak.RObject

RetrieveMessages takes a list of message ids and pulls the actual data from Riak

type Queues

type Queues struct {
	// a container for all queues
	QueueMap map[string]*Queue
	// Settings for Queues in general, ie queue list
	Config *riak.RDtMap
	// Mutex for protecting rw access to the Config object
	sync.RWMutex
	// contains filtered or unexported fields
}

Queues represents

func (*Queues) DeleteQueue

func (queues *Queues) DeleteQueue(name string, cfg *Config) bool

DeleteQueue deletes the given queue

func (*Queues) Exists

func (queues *Queues) Exists(cfg *Config, queueName string) bool

Exists checks is the given queue name is already created or not

type Stats

type Stats struct {
	Type          string
	FlushInterval int
	Address       string
	Prefix        string
	Client        stats.Client
}

Stats is

type Topic

type Topic struct {
	// store a CRDT in riak for the topic configuration including subscribers
	Name   string
	Config *riak.RDtMap

	// Mutex for protecting rw access to the Config object
	sync.RWMutex
	// contains filtered or unexported fields
}

Topic represents a topic

func (*Topic) AddQueue

func (topic *Topic) AddQueue(cfg *Config, name string)

AddQueue adds a new queue as a subscriber to the topic

func (*Topic) Broadcast

func (topic *Topic) Broadcast(cfg *Config, message string) map[string]string

Broadcast will send the message to all listening queues and return the acked writes

func (*Topic) Delete

func (topic *Topic) Delete(cfg *Config)

Delete will delete the given topic, which removes any queues from its subscription list

func (*Topic) DeleteQueue

func (topic *Topic) DeleteQueue(cfg *Config, name string)

DeleteQueue will remove a queue from the list of topic subscribers

func (*Topic) ListQueues

func (topic *Topic) ListQueues() []string

ListQueues will return a list of all known queues for a topic

type Topics

type Topics struct {
	// global topic configuration, should contain list of all active topics
	Config *riak.RDtMap
	// topic map
	TopicMap map[string]*Topic

	// Mutex for protecting rw access to the Config object
	sync.RWMutex
	// contains filtered or unexported fields
}

Topics represents a collection of topics

func InitTopics

func InitTopics(cfg *Config, queues *Queues) *Topics

InitTopics initializes the set of known topics in the system

func (*Topics) DeleteTopic

func (topics *Topics) DeleteTopic(cfg *Config, name string) bool

DeleteTopic will delete the topic from the collection of all topics, which removes any queues it's subscription list

func (*Topics) InitTopic

func (topics *Topics) InitTopic(name string)

InitTopic initializes an individual topic given a known name

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL