storage

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2019 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package storage - Data storage subsystem. The storage subsystem receives information from the cluster and consumer subsystems and serves that information out to other subsystems on request.

Modules

Currently, only one module is provided:

* inmemory - Store all information in a set of in-memory maps

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Coordinator

type Coordinator struct {
	// App is a pointer to the application context. This stores the channel to the storage subsystem
	App *protocol.ApplicationContext

	// Log is a logger that has been configured for this module to use. Normally, this means it has been set up with
	// fields that are appropriate to identify this coordinator
	Log *zap.Logger
	// contains filtered or unexported fields
}

Coordinator (storage) manages a single storage module (only one module is supported at this time), making sure it is configured, started, and stopped at the appropriate time. It is also responsible for listening to the StorageChannel that is provided in the application context and forwarding those requests to the storage module. If no storage module has been configured explicitly, the coordinator starts the inmemory module as a default.

func CoordinatorWithOffsets

func CoordinatorWithOffsets() *Coordinator

CoordinatorWithOffsets sets up a Coordinator with a single inmemory module defined. This module is loaded with offsets for a test cluster and group. This func should never be called in normal code. It is only provided to facilitate testing by other subsystems.

func (*Coordinator) Configure

func (sc *Coordinator) Configure()

Configure is called to create the configured storage module and call its Configure func to validate the configuration and set it up. The coordinator will panic is more than one module is configured, and if no modules have been configured, it will set up a default inmemory storage module. If there are any problems, it is expected that this func will panic with a descriptive error message, as configuration failures are not recoverable errors.

func (*Coordinator) Start

func (sc *Coordinator) Start() error

Start calls the storage module's underlying Start func. If the module Start returns an error, this func stops immediately and returns that error to the caller.

We also start a request forwarder goroutine. This listens to the StorageChannel that is provided in the application context that all modules receive, and forwards those requests to the storage modules. At the present time, the storage subsystem only supports one module, so this is a simple "accept and forward".

func (*Coordinator) Stop

func (sc *Coordinator) Stop() error

Stop calls the configured storage module's underlying Stop func. It is expected that the module Stop will not return until the module has been completely stopped. While an error can be returned, this func always returns no error, as a failure during stopping is not a critical failure

type InMemoryStorage

type InMemoryStorage struct {
	// App is a pointer to the application context. This stores the channel to the storage subsystem
	App *protocol.ApplicationContext

	// Log is a logger that has been configured for this module to use. Normally, this means it has been set up with
	// fields that are appropriate to identify this coordinator
	Log *zap.Logger
	// contains filtered or unexported fields
}

InMemoryStorage is a storage module that maintains the entire data set in memory in a series of maps. It has a configurable number of worker goroutines to service requests, and for requests that are group-specific, the group and cluster name are used to hash the request to a consistent worker. This assures that requests for a group are processed in order.

func (*InMemoryStorage) Configure

func (module *InMemoryStorage) Configure(name string, configRoot string)

Configure validates the configuration for the module, creates a channel to receive requests on, and sets up the storage map. If no expiration time for groups is set, a default value of 7 days is used. If no interval count is set, a default of 10 intervals is used. If no worker count is set, a default of 20 workers is used.

func (*InMemoryStorage) GetCommunicationChannel

func (module *InMemoryStorage) GetCommunicationChannel() chan *protocol.StorageRequest

GetCommunicationChannel returns the RequestChannel that has been setup for this module.

func (*InMemoryStorage) Start

func (module *InMemoryStorage) Start() error

Start sets up the rest of the storage map for each configured cluster. It then starts the configured number of worker routines to handle requests. Finally, it starts a main loop which will receive requests and hash them to the correct worker.

func (*InMemoryStorage) Stop

func (module *InMemoryStorage) Stop() error

Stop closes the incoming request channel, which will close the main loop. It then closes each of the worker channels, to close the workers, and waits for all goroutines to exit before returning.

type Module

type Module interface {
	protocol.Module
	GetCommunicationChannel() chan *protocol.StorageRequest
}

Module (storage) is responsible for maintaining all the broker and consumer offsets for all clusters that Burrow watches. It must accept and respond to all protocol.StorageRequest types. This interface conforms to the overall protocol.Module interface, but it adds a func to fetch the channel that the module is listening on for requests, so that requests can be forwarded to it by the coordinator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL