server

package
v0.0.0-...-e97d6fb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2017 License: GPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultTimeout = 1800
)
View Source
var (
	EventHandlers = make([]func(*Server) error, 0)
)

Functions

func CurrentState

func CurrentState(s *Server) (map[string]interface{}, error)

func Fail

func Fail(store storage.Store, jid, msg, errtype string, backtrace []string) error

func OnStart

func OnStart(x func(*Server) error)

register a global handler to be called when the Server instance has finished booting but before it starts listening.

Types

type ClientData

type ClientData struct {
	Hostname     string   `json:"hostname"`
	Wid          string   `json:"wid"`
	Pid          int      `json:"pid"`
	Labels       []string `json:"labels"`
	PasswordHash string   `json:"pwdhash"`
	Version      uint8    `json:"v"`
	StartedAt    time.Time
	// contains filtered or unexported fields
}

This represents a single client process. It may have many network connections open to Faktory.

A client can be a producer AND/OR consumer of jobs. Typically a process will either only produce jobs (like a webapp pushing jobs) or produce/consume jobs (like a faktory worker process where a job can create other jobs while executing another job).

Each Faktory worker process should send a BEAT command every 15 seconds. Only consumers should send a BEAT. If Faktory does not receive a BEAT from a worker process within 60 seconds, it expires and is removed from the Busy page.

From Faktory's POV, the worker can BEAT again and resume normal operations, e.g. due to a network partition. If a process dies, it will be removed after 1 minute and its jobs recovered after the job reservation timeout has passed (typically 30 minutes).

A worker process has a simple three-state lifecycle:

running -> quiet -> terminate

- Running means the worker is alive and processing jobs. - Quiet means the worker should stop FETCHing new jobs but continue working on existing jobs. It should not exit, even if no jobs are processing. - Terminate means the worker should exit within N seconds, where N is recommended to be 30 seconds. In practice, faktory_worker_ruby waits up to 25 seconds and any threads that are still busy are forcefully killed and their associated jobs reported as FAILed so they will be retried shortly.

A worker process should never stop sending BEAT. Even after "quiet" or "terminate", the BEAT should continue, only stopping due to process exit(). Workers should never move backward in state - you cannot "unquiet" a worker, it must be restarted.

Workers will typically also respond to standard Unix signals. faktory_worker_ruby uses TSTP ("Threads SToP") as the quiet signal and TERM as the terminate signal.

func (*ClientData) BusyCount

func (worker *ClientData) BusyCount() int

func (*ClientData) IsConsumer

func (worker *ClientData) IsConsumer() bool

func (*ClientData) IsQuiet

func (worker *ClientData) IsQuiet() bool

func (*ClientData) Signal

func (worker *ClientData) Signal(newstate WorkerState)

* Send "quiet" or "terminate" to the given client * worker process. Other signals are undefined.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Represents a connection to a faktory client.

faktory reuses the same wire protocol as Redis: RESP. It's a nice trade-off between human-readable and efficient. Shout out to antirez for his nice design document on it. https://redis.io/topics/protocol

func (*Connection) Close

func (c *Connection) Close()

func (*Connection) Error

func (c *Connection) Error(cmd string, err error) error

func (*Connection) Number

func (c *Connection) Number(val int) error

func (*Connection) Ok

func (c *Connection) Ok() error

func (*Connection) Result

func (c *Connection) Result(msg []byte) error

type FailPayload

type FailPayload struct {
	Jid          string   `json:"jid"`
	ErrorMessage string   `json:"message"`
	ErrorType    string   `json:"errtype"`
	Backtrace    []string `json:"backtrace"`
}

type Reservation

type Reservation struct {
	Job    *faktory.Job `json:"job"`
	Since  string       `json:"reserved_at"`
	Expiry string       `json:"expires_at"`
	Wid    string       `json:"wid"`
	// contains filtered or unexported fields
}

type RuntimeStats

type RuntimeStats struct {
	Connections int64
	Commands    int64
	StartedAt   time.Time
}

type Server

type Server struct {
	Options  *ServerOptions
	Stats    *RuntimeStats
	Password string
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts *ServerOptions) (*Server, error)

func (*Server) Fetch

func (s *Server) Fetch(fn func(*faktory.Job) error, ctx context.Context, queues ...string) (*faktory.Job, error)

func (*Server) Heartbeats

func (s *Server) Heartbeats() map[string]*ClientData

func (*Server) Start

func (s *Server) Start() error

func (*Server) Stop

func (s *Server) Stop(f func())

func (*Server) Store

func (s *Server) Store() storage.Store

func (*Server) WaitUntilInitialized

func (s *Server) WaitUntilInitialized()

type ServerOptions

type ServerOptions struct {
	Binding          string
	StorageDirectory string
	ConfigDirectory  string
	Environment      string
}

type TimedSet

type TimedSet interface {
	AddElement(string, string, []byte) error
	RemoveElement(string, string) error
}

type WorkerState

type WorkerState int
const (
	Running WorkerState = iota
	Quiet
	Terminate
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL