client

package
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: AGPL-3.0, MPL-2.0 Imports: 18 Imported by: 41

Documentation

Index

Constants

View Source
const (
	// This is the protocol version supported by this client.
	// The server might be running an older or newer version.
	ExpectedProtocolVersion = 2
)

Variables

View Source
var (
	ErrBatchAlreadyCommitted = fmt.Errorf("batch has already been committed, must reopen")
	ErrBatchNotOpen          = fmt.Errorf("batch must be opened before it can be used")
)
View Source
var (
	// Set this to a non-empty value in a consumer process
	// e.g. see how faktory_worker_go sets this.
	RandomProcessWid = ""
	Labels           = []string{"golang"}
)
View Source
var (
	Name    = "Faktory"
	Version = "1.8.0"
)
View Source
var (
	RetryPolicyDefault        = 25
	RetryPolicyEmphemeral     = 0
	RetryPolicyDirectToMorgue = -1
)
View Source
var (
	Everything = JobFilter{
		Regexp: "*",
	}
)

Functions

func RandomJid

func RandomJid() string

func RssKb

func RssKb() int64

Types

type Batch

type Batch struct {
	// Unique identifier for each batch.
	// NB: the caller should not set this, it is generated
	// by Faktory when the batch is persisted to Redis.
	Bid string `json:"bid"`

	ParentBid   string `json:"parent_bid,omitempty"`
	Description string `json:"description,omitempty"`
	Success     *Job   `json:"success,omitempty"`
	Complete    *Job   `json:"complete,omitempty"`
	// contains filtered or unexported fields
}

func NewBatch

func NewBatch(cl *Client) *Batch

Allocate a new Batch. Caller must set one or more callbacks and push one or more jobs in the batch.

b := faktory.NewBatch(cl)
b.Success = faktory.NewJob("MySuccessCallback", 12345)
b.Jobs(func() error {
  b.Push(...)
})

func (*Batch) Commit

func (b *Batch) Commit() error

Commit any pushed jobs in the batch to Redis so they can fire callbacks. A Batch object can only be committed once. You must use client.BatchOpen to get a new copy if you want to commit more jobs.

func (*Batch) Jobs

func (b *Batch) Jobs(fn func() error) error

Push one or more jobs within this function. Job processing will start **immediately** but callbacks will not fire until Commit() is called, allowing you to push jobs in slowly and avoid the obvious race condition.

func (*Batch) Push

func (b *Batch) Push(job *Job) error

type BatchStatus

type BatchStatus struct {
	Bid         string `json:"bid"`
	ParentBid   string `json:"parent_bid,omitempty"`
	Description string `json:"description,omitempty"`
	CreatedAt   string `json:"created_at"`
	Total       int64  `json:"total"`
	Pending     int64  `json:"pending"`
	Failed      int64  `json:"failed"`

	// "" if pending,
	// "1" if callback enqueued,
	// "2" if callback finished successfully
	CompleteState string `json:"complete_st"`
	SuccessState  string `json:"success_st"`
}

type Client

type Client struct {
	Location string
	Options  *ClientData
	// contains filtered or unexported fields
}

The Client structure represents a thread-unsafe connection to a Faktory server. It is recommended to use a connection pool of Clients in a multi-threaded process. See faktory_worker_go's internal connection pool for example.

func Dial

func Dial(srv *Server, password string) (*Client, error)

Dial connects to the remote faktory server with a Dialer reflecting the value of srv.Network; i.e., a *tls.Dialer if "tcp+tls" and a *net.Dialer if not.

client.Dial(client.Localhost, "topsecret")

func DialWithDialer

func DialWithDialer(srv *Server, password string, dialer Dialer) (*Client, error)

DialWithDialer connects to the faktory server

func Open

func Open() (*Client, error)

Open connects to a Faktory server based on environment variable conventions:

• Use FAKTORY_PROVIDER to point to a custom URL variable. • Use FAKTORY_URL as a catch-all default.

Use the URL to configure any necessary password:

tcp://:mypassword@localhost:7419

By default Open assumes localhost with no password which is appropriate for local development.

func OpenWithDialer

func OpenWithDialer(dialer Dialer) (*Client, error)

OpenWithDialer connects to a Faktory server following the same conventions as Open but instead uses dialer as the transport.

func (*Client) Ack

func (c *Client) Ack(jid string) error

func (*Client) BatchCommit

func (c *Client) BatchCommit(bid string) error

func (*Client) BatchNew

func (c *Client) BatchNew(def *Batch) (*Batch, error)

func (*Client) BatchOpen

func (c *Client) BatchOpen(bid string) (*Batch, error)

func (*Client) BatchStatus

func (c *Client) BatchStatus(bid string) (*BatchStatus, error)

func (*Client) Beat

func (c *Client) Beat(args ...string) (string, error)

* The first arg to Beat allows a worker process to report its current lifecycle state * to Faktory. All worker processes must follow the same basic lifecycle: * * (startup) -> "" -> "quiet" -> "terminate" * * Quiet allows the process to finish its current work without fetching any new work. * Terminate means the process should exit within X seconds, usually ~30 seconds.

func (*Client) Clear

func (c *Client) Clear(name Structure) error

func (*Client) Close

func (c *Client) Close() error

func (*Client) Discard

func (c *Client) Discard(name Structure, filter JobFilter) error

func (*Client) Fail

func (c *Client) Fail(jid string, err error, backtrace []byte) error

Fail notifies Faktory that a job failed with the given error. If backtrace is non-nil, it is assumed to be the output from runtime/debug.Stack().

func (*Client) Fetch

func (c *Client) Fetch(q ...string) (*Job, error)

func (*Client) Flush

func (c *Client) Flush() error

func (*Client) Generic

func (c *Client) Generic(cmdline string) (string, error)

func (*Client) Info

func (c *Client) Info() (map[string]interface{}, error)

func (*Client) Kill

func (c *Client) Kill(name Structure, filter JobFilter) error

func (*Client) PauseQueues

func (c *Client) PauseQueues(names ...string) error

List queues explicitly or use "*" to pause all known queues

func (*Client) Push

func (c *Client) Push(job *Job) error

func (*Client) PushBulk added in v1.6.0

func (c *Client) PushBulk(jobs []*Job) (map[string]string, error)

Result is map[JID]ErrorMessage

func (*Client) QueueSizes added in v1.5.2

func (c *Client) QueueSizes() (map[string]uint64, error)

func (*Client) RemoveQueues added in v1.6.2

func (c *Client) RemoveQueues(names ...string) error

List queues explicitly or use "*" to remove all known queues

func (*Client) Requeue

func (c *Client) Requeue(name Structure, filter JobFilter) error

func (*Client) ResumeQueues

func (c *Client) ResumeQueues(names ...string) error

List queues explicitly or use "*" to resume all known queues

func (*Client) TrackGet

func (c *Client) TrackGet(jid string) (*JobTrack, error)

func (*Client) TrackSet

func (c *Client) TrackSet(jid string, percent int, desc string, reserveUntil *time.Time) error

type ClientData

type ClientData struct {
	Hostname string   `json:"hostname"`
	Wid      string   `json:"wid"`
	Pid      int      `json:"pid"`
	Labels   []string `json:"labels"`

	// this can be used by proxies to route the connection.
	// it is ignored by Faktory.
	Username string `json:"username"`

	// Hash is hex(sha256(password + nonce))
	PasswordHash string `json:"pwdhash"`

	// The protocol version used by this client.
	// The server can reject this connection if the version will not work
	// The server advertises its protocol version in the HI.
	Version int `json:"v"`
}

ClientData is serialized to JSON and sent with the HELLO command. PasswordHash is required if the server is not listening on localhost. The WID (worker id) must be random and unique for each worker process. It can be a UUID, etc. Non-worker processes should leave WID empty.

The other elements can be useful for debugging and are displayed on the Busy tab.

type Dialer

type Dialer interface {
	Dial(network, addr string) (c net.Conn, err error)
}

Dialer is the interface for creating a specialized net.Conn.

type Failure

type Failure struct {
	RetryCount     int      `json:"retry_count"`
	RetryRemaining int      `json:"remaining"`
	FailedAt       string   `json:"failed_at"`
	NextAt         string   `json:"next_at,omitempty"`
	ErrorMessage   string   `json:"message,omitempty"`
	ErrorType      string   `json:"errtype,omitempty"`
	Backtrace      []string `json:"backtrace,omitempty"`
}

type Job

type Job struct {
	// required
	Jid   string        `json:"jid"`
	Queue string        `json:"queue"`
	Type  string        `json:"jobtype"`
	Args  []interface{} `json:"args"`

	// optional
	CreatedAt  string                 `json:"created_at,omitempty"`
	EnqueuedAt string                 `json:"enqueued_at,omitempty"`
	At         string                 `json:"at,omitempty"`
	ReserveFor int                    `json:"reserve_for,omitempty"`
	Retry      *int                   `json:"retry"`
	Backtrace  int                    `json:"backtrace,omitempty"`
	Failure    *Failure               `json:"failure,omitempty"`
	Custom     map[string]interface{} `json:"custom,omitempty"`
}

func NewJob

func NewJob(jobtype string, args ...interface{}) *Job

Clients should use this constructor to build a Job, not allocate a bare struct directly.

func (*Job) GetCustom

func (j *Job) GetCustom(name string) (interface{}, bool)

func (*Job) SetCustom

func (j *Job) SetCustom(name string, value interface{}) *Job

Set custom metadata for this job. Faktory reserves all element names starting with "_" for internal use, e.g. SetCustom("_txid", "12345")

func (*Job) SetExpiresAt

func (j *Job) SetExpiresAt(expiresAt time.Time) *Job

Configure the TTL for this job. After this point in time, the job will be discarded rather than executed.

func (*Job) SetExpiresIn added in v1.5.5

func (j *Job) SetExpiresIn(expiresIn time.Duration) *Job

func (*Job) SetUniqueFor

func (j *Job) SetUniqueFor(secs uint) *Job

Configure this job to be unique for +secs+ seconds or until the job has been successfully processed.

func (*Job) SetUniqueness

func (j *Job) SetUniqueness(until UniqueUntil) *Job

Configure the uniqueness deadline for this job, legal values are:

  • "success" - the job will be considered unique until it has successfully processed or the +unique_for+ TTL has passed, this is the default value.
  • "start" - the job will be considered unique until it starts processing. Retries may lead to multiple copies of the job running.

type JobFilter

type JobFilter struct {
	Jids    []string `json:"jids,omitempty"`
	Regexp  string   `json:"regexp,omitempty"`
	Jobtype string   `json:"jobtype,omitempty"`
}

func Matching

func Matching(pattern string) JobFilter

This is a generic pattern match across the entire job JSON payload. Be very careful that you don't accidentally match some unintended part of the payload.

NB: your pattern should have * on each side. The pattern is passed directly to Redis.

Example: discard any job retries whose payload contains the special word "uid:12345":

client.Discard(faktory.Retries, faktory.Matching("*uid:12345*"))

See the Redis SCAN documentation for pattern matching examples. https://redis.io/commands/scan

func OfType

func OfType(jobtype string) JobFilter

Matches jobs based on the exact Jobtype. This is pretty fast because it devolves to Matching(`"jobtype":"$ARG"`) and matches within Redis.

func WithJids

func WithJids(jids ...string) JobFilter

Match jobs with the given JIDs. Warning: O(m*n), very slow because it has to pull every job into Faktory and check the JID against the list.

If you pass in a single JID, it will devolve to matching within Redis and perform much faster. For that reason, it might be better to handle one JID at a time.

func (JobFilter) Matching

func (jf JobFilter) Matching(pattern string) JobFilter

func (JobFilter) OfType

func (jf JobFilter) OfType(jobtype string) JobFilter

func (JobFilter) WithJids

func (jf JobFilter) WithJids(jids ...string) JobFilter

type JobTrack

type JobTrack struct {
	Jid         string `json:"jid"`
	Percent     int    `json:"percent,omitempty"`
	Description string `json:"desc,omitempty"`
	State       string `json:"state"`
	UpdatedAt   string `json:"updated_at"`
}

type MutateClient

type MutateClient interface {

	// Move the given jobs from structure to the Dead set.
	// Faktory will not touch them anymore but you can still see them in the Web UI.
	//
	// Kill(Retries, OfType("DataSyncJob").WithJids("abc", "123"))
	Kill(name Structure, filter JobFilter) error

	// Move the given jobs to their associated queue so they can be immediately
	// picked up and processed.
	Requeue(name Structure, filter JobFilter) error

	// Throw away the given jobs, e.g. if you want to delete all jobs named "QuickbooksSyncJob"
	//
	//   Discard(Dead, OfType("QuickbooksSyncJob"))
	Discard(name Structure, filter JobFilter) error

	// Empty the entire given structure, e.g. if you want to clear all retries.
	// This is very fast as it is special cased by Faktory.
	Clear(name Structure) error
}

Commands which allow you to perform admin tasks on various Faktory structures. These are NOT designed to be used in business logic but rather for maintenance, data repair, migration, etc. They can have poor scalability or performance edge cases.

Generally these operations are O(n) or worse. They will get slower as your data gets bigger.

type Operation

type Operation struct {
	Cmd    string     `json:"cmd"`
	Target Structure  `json:"target"`
	Filter *JobFilter `json:"filter,omitempty"`
}

type Pool

type Pool struct {
	pool.Pool
}

func NewPool

func NewPool(capacity int) (*Pool, error)

NewPool creates a new Pool object through which multiple clients will be managed on your behalf.

Call Get() to retrieve a client instance and Put() to return it to the pool. If you do not call Put(), the connection will be leaked, and the pool will stop working once it hits capacity.

Do NOT call Close() on the client, as the lifecycle is managed internally.

The dialer clients in this pool use is determined by the URI scheme in FAKTORY_PROVIDER.

func NewPoolWithDialer

func NewPoolWithDialer(capacity int, dialer Dialer) (*Pool, error)

NewPoolWithDialer creates a new Pool object similar to NewPool but clients will use the provided dialer instead of default ones.

func (*Pool) Get

func (p *Pool) Get() (*Client, error)

Get retrieves a Client from the pool. This Client is created, internally, by calling the Open() function, and has all the same behaviors.

func (*Pool) Put

func (p *Pool) Put(client *Client)

Put returns a client to the pool.

func (*Pool) With

func (p *Pool) With(fn func(conn *Client) error) error

type ProtocolError

type ProtocolError struct {
	// contains filtered or unexported fields
}

func (*ProtocolError) Error

func (pe *ProtocolError) Error() string

type Server

type Server struct {
	Network  string
	Address  string
	Username string
	Password string
	Timeout  time.Duration
	TLS      *tls.Config
}

func DefaultServer

func DefaultServer() *Server

func (*Server) Open

func (s *Server) Open() (*Client, error)

func (*Server) OpenWithDialer

func (s *Server) OpenWithDialer(dialer Dialer) (*Client, error)

OpenWithDialer creates a *Client with the dialer.

func (*Server) ReadFromEnv

func (s *Server) ReadFromEnv() error

type Structure

type Structure string
const (
	Scheduled Structure = "scheduled"
	Retries   Structure = "retries"
	Dead      Structure = "dead"
)

type UniqueUntil

type UniqueUntil string
const (
	UntilSuccess UniqueUntil = "success" // default
	UntilStart   UniqueUntil = "start"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL