server

package
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: AGPL-3.0 Imports: 20 Imported by: 40

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	AlwaysMatch = func(value string) bool {
		return true
	}
)
View Source
var CommandSet = map[string]command{
	"END":    end,
	"PUSH":   push,
	"PUSHB":  pushBulk,
	"FETCH":  fetch,
	"ACK":    ack,
	"FAIL":   fail,
	"BEAT":   heartbeat,
	"INFO":   info,
	"FLUSH":  flush,
	"MUTATE": mutate,
	"BATCH":  batch,
	"TRACK":  track,
	"QUEUE":  queue,
}
View Source
var DefaultMaxPoolSize = 1000

Functions

This section is empty.

Types

type ClientBeat added in v1.5.0

type ClientBeat struct {
	CurrentState string `json:"current_state"`
	Wid          string `json:"wid"`
	RssKb        int64  `json:"rss_kb"`
}

type ClientData added in v1.5.0

type ClientData struct {
	Hostname     string   `json:"hostname"`
	Wid          string   `json:"wid"`
	Pid          int      `json:"pid"`
	RssKb        int64    `json:"rss_kb"`
	Labels       []string `json:"labels"`
	PasswordHash string   `json:"pwdhash"`
	Username     string   `json:"username"`
	Version      uint8    `json:"v"`
	StartedAt    time.Time
	// contains filtered or unexported fields
}

This represents a single client process. It may have many network connections open to Faktory.

A client can be a producer AND/OR consumer of jobs. Typically a process will either only produce jobs (like a webapp pushing jobs) or produce/consume jobs (like a faktory worker process where a job can create other jobs while executing).

Each Faktory worker process should send a BEAT command every 15 seconds. Only consumers should send a BEAT. If Faktory does not receive a BEAT from a worker process within 60 seconds, it expires and is removed from the Busy page.

From Faktory's POV, the worker can BEAT again and resume normal operations, e.g. due to a network partition. If a process dies, it will be removed after 1 minute and its jobs recovered after the job reservation timeout has passed (typically 30 minutes).

A worker process has a simple three-state lifecycle:

running -> quiet -> terminate

- Running means the worker is alive and processing jobs. - Quiet means the worker should stop FETCHing new jobs but continue working on existing jobs. It should not exit, even if no jobs are processing. - Terminate means the worker should exit within N seconds, where N is recommended to be 30 seconds. In practice, faktory_worker_ruby waits up to 25 seconds and any threads that are still busy are forcefully killed and their associated jobs reported as FAILed so they will be retried shortly.

A worker process should never stop sending BEAT. Even after "quiet" or "terminate", the BEAT should continue, only stopping due to process exit(). Workers should never move backward in state - you cannot "unquiet" a worker, it must be restarted.

Workers will typically also respond to standard Unix signals. faktory_worker_ruby uses TSTP ("Threads SToP") as the quiet signal and TERM as the terminate signal.

func (*ClientData) ConnectionCount added in v1.6.1

func (worker *ClientData) ConnectionCount() int

func (*ClientData) IsConsumer added in v1.5.0

func (worker *ClientData) IsConsumer() bool

func (*ClientData) IsQuiet added in v1.5.0

func (worker *ClientData) IsQuiet() bool

func (*ClientData) Signal added in v1.5.0

func (worker *ClientData) Signal(newstate WorkerState)

* Send "quiet" or "terminate" to the given client * worker process. Other signals are undefined.

type Connection

type Connection struct {
	context.Context
	// contains filtered or unexported fields
}

Represents a connection to a faktory client.

faktory reuses the same wire protocol as Redis: RESP. It's a nice trade-off between human-readable and efficient. Shout out to antirez for his nice design document on it. https://redis.io/topics/protocol

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) Error

func (c *Connection) Error(cmd string, err error) error

func (*Connection) Number

func (c *Connection) Number(val int) error

func (*Connection) Ok

func (c *Connection) Ok() error

func (*Connection) Result

func (c *Connection) Result(msg []byte) error

type RuntimeStats

type RuntimeStats struct {
	Connections uint64
	Commands    uint64
	StartedAt   time.Time
}

type Server

type Server struct {
	Options    *ServerOptions
	Stats      *RuntimeStats
	Subsystems []Subsystem
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts *ServerOptions) (*Server, error)

func (*Server) AddTask added in v1.5.0

func (s *Server) AddTask(everySec int64, task Taskable)

func (*Server) Boot added in v1.5.0

func (s *Server) Boot() error

func (*Server) CurrentState added in v1.5.0

func (s *Server) CurrentState() (map[string]interface{}, error)

func (*Server) Heartbeats

func (s *Server) Heartbeats() map[string]*ClientData

func (*Server) Manager added in v1.5.0

func (s *Server) Manager() manager.Manager

func (*Server) Register added in v1.5.0

func (s *Server) Register(x Subsystem)

register a global handler to be called when the Server instance has finished booting but before it starts listening.

func (*Server) Reload added in v1.5.0

func (s *Server) Reload()

func (*Server) Run added in v1.5.0

func (s *Server) Run() error

func (*Server) Stop

func (s *Server) Stop(f func())

func (*Server) Stopper added in v1.5.0

func (s *Server) Stopper() chan bool

func (*Server) Store

func (s *Server) Store() storage.Store

type ServerOptions

type ServerOptions struct {
	Binding          string
	StorageDirectory string
	RedisSock        string
	ConfigDirectory  string
	Environment      string
	Password         string
	PoolSize         int
	GlobalConfig     map[string]interface{}
}

func (*ServerOptions) Config added in v1.5.0

func (so *ServerOptions) Config(subsys string, key string, defval interface{}) interface{}

func (*ServerOptions) String added in v1.5.0

func (so *ServerOptions) String(subsys string, key string, defval string) string

type Subsystem added in v1.5.0

type Subsystem interface {
	Name() string

	// Called when the server is configured but before it starts accepting client connections.
	Start(*Server) error

	// Called every time Faktory reloads the global config for the Server.
	// Each subsystem is responsible for diffing its own config and making
	// necessary changes.
	Reload(*Server) error
}

type Taskable added in v1.5.0

type Taskable interface {
	Name() string
	Execute(context.Context) error
	Stats(context.Context) map[string]interface{}
}

type WorkerState added in v1.5.0

type WorkerState int
const (
	Running WorkerState = iota
	Quiet
	Terminate
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL