dkron

package
v3.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2023 License: LGPL-3.0 Imports: 70 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultBindPort      int           = 8946
	DefaultRPCPort       int           = 6868
	DefaultRetryInterval time.Duration = time.Second * 30
)

DefaultBindPort is the default port that dkron will use for Serf communication

View Source
const (
	// StatusNotSet is the initial job status.
	StatusNotSet = ""
	// StatusSuccess is status of a job whose last run was a success.
	StatusSuccess = "success"
	// StatusRunning is status of a job whose last run has not finished.
	StatusRunning = "running"
	// StatusFailed is status of a job whose last run was not successful on any nodes.
	StatusFailed = "failed"
	// StatusPartiallyFailed is status of a job whose last run was successful on only some nodes.
	StatusPartiallyFailed = "partially_failed"

	// ConcurrencyAllow allows a job to execute concurrency.
	ConcurrencyAllow = "allow"
	// ConcurrencyForbid forbids a job from executing concurrency.
	ConcurrencyForbid = "forbid"
)
View Source
const (
	// MaxExecutions to maintain in the storage
	MaxExecutions = 100
)
View Source
const (
	// StatusReap is used to update the status of a node if we
	// are handling a EventMemberReap
	StatusReap = serf.MemberStatus(-1)
)

Variables

View Source
var (

	// ErrLeaderNotFound is returned when obtained leader is not found in member list
	ErrLeaderNotFound = errors.New("no member leader found in member list")

	// ErrNoSuitableServer returns an error in case no suitable server to send the request is found.
	ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting")
)
View Source
var (
	// ErrExecutionDoneForDeletedJob is returned when an execution done
	// is received for a non existent job.
	ErrExecutionDoneForDeletedJob = errors.New("grpc: Received execution done for a deleted job")
	// ErrRPCDialing is returned on dialing fail.
	ErrRPCDialing = errors.New("grpc: Error dialing, verify the network connection to the server")
	// ErrNotLeader is the error returned when the operation need the node to be the leader,
	// but the current node is not the leader.
	ErrNotLeader = errors.New("grpc: Error, server is not leader, this operation should be run on the leader")
	// ErrBrokenStream is the error that indicates a sudden disconnection of the agent streaming an execution
	ErrBrokenStream = errors.New("grpc: Error on execution streaming, agent connection was abruptly terminated")
)
View Source
var (
	// ErrParentJobNotFound is returned when the parent job is not found.
	ErrParentJobNotFound = errors.New("specified parent job not found")
	// ErrNoAgent is returned when the job's agent is nil.
	ErrNoAgent = errors.New("no agent defined")
	// ErrSameParent is returned when the job's parent is itself.
	ErrSameParent = errors.New("the job can not have itself as parent")
	// ErrNoParent is returned when the job has no parent.
	ErrNoParent = errors.New("the job doesn't have a parent job set")
	// ErrNoCommand is returned when attempting to store a job that has no command.
	ErrNoCommand = errors.New("unspecified command for job")
	// ErrWrongConcurrency is returned when Concurrency is set to a non existing setting.
	ErrWrongConcurrency = errors.New("invalid concurrency policy value, use \"allow\" or \"forbid\"")
)
View Source
var Codename = "Devel"

Codename codename of this series

View Source
var (
	// ErrDependentJobs is returned when deleting a job that has dependent jobs
	ErrDependentJobs = errors.New("store: could not delete job with dependent jobs, delete childs first")
)
View Source
var ErrResolvingHost = errors.New("error resolving hostname")
View Source
var (

	// ErrScheduleParse is the error returned when the schedule parsing fails.
	ErrScheduleParse = errors.New("can't parse job schedule")
)
View Source
var Name = "Dkron"

Name store the name of this software

View Source
var Version = "devel"

Version is the current version that will get replaced on build.

Functions

func ConfigFlagSet

func ConfigFlagSet() *flag.FlagSet

ConfigFlagSet creates all of our configuration flags.

func CreateMyRender

func CreateMyRender(l *logrus.Entry) multitemplate.Render

CreateMyRender returns a new custom multitemplate renderer to use with Gin.

func Encode

func Encode(t MessageType, msg interface{}) ([]byte, error)

Encode is used to encode a Protoc object with type prefix

func InitLogger

func InitLogger(logLevel string, node string) *logrus.Entry

InitLogger creates the logger instance

func NewAgentServer

func NewAgentServer(agent *Agent, logger *logrus.Entry) types.AgentServer

NewServer creates and returns an instance of a DkronGRPCServer implementation

func ParseSingleIPTemplate

func ParseSingleIPTemplate(ipTmpl string) (string, error)

ParseSingleIPTemplate is used as a helper function to parse out a single IP address from a config parameter.

func SendPostNotifications added in v3.2.0

func SendPostNotifications(config *Config, execution *Execution, exGroup []*Execution, job *Job, logger *logrus.Entry) error

Send sends the notifications using any configured method

func SendPreNotifications added in v3.2.0

func SendPreNotifications(config *Config, execution *Execution, exGroup []*Execution, job *Job, logger *logrus.Entry) error

NewNotifier returns a new notifier

func UserAgent

func UserAgent() string

UserAgent returns the consistent user-agent string

Types

type Agent

type Agent struct {
	// ProcessorPlugins maps processor plugins
	ProcessorPlugins map[string]plugin.Processor

	//ExecutorPlugins maps executor plugins
	ExecutorPlugins map[string]plugin.Executor

	// HTTPTransport is a swappable interface for the HTTP server interface
	HTTPTransport Transport

	// Store interface to set the storage engine
	Store Storage

	// GRPCServer interface for setting the GRPC server
	GRPCServer DkronGRPCServer

	// GRPCClient interface for setting the GRPC client
	GRPCClient DkronGRPCClient

	// TLSConfig allows setting a TLS config for transport
	TLSConfig *tls.Config

	// Pro features
	GlobalLock         bool
	MemberEventHandler func(serf.Event)
	ProAppliers        LogAppliers
	// contains filtered or unexported fields
}

Agent is the main struct that represents a dkron agent

func NewAgent

func NewAgent(config *Config, options ...AgentOption) *Agent

NewAgent returns a new Agent instance capable of starting and running a Dkron instance.

func (*Agent) Config

func (a *Agent) Config() *Config

Config returns the agent's config.

func (*Agent) DashboardRoutes

func (a *Agent) DashboardRoutes(r *gin.RouterGroup)

DashboardRoutes registers dashboard specific routes on the gin RouterGroup.

func (*Agent) GetActiveExecutions

func (a *Agent) GetActiveExecutions() ([]*proto.Execution, error)

GetActiveExecutions returns running executions globally

func (*Agent) GetRunningJobs

func (a *Agent) GetRunningJobs() int

GetRunningJobs returns amount of active jobs of the local agent

func (*Agent) IsLeader

func (a *Agent) IsLeader() bool

IsLeader checks if this server is the cluster leader

func (*Agent) JoinLAN

func (a *Agent) JoinLAN(addrs []string) (int, error)

JoinLAN is used to have Dkron join the inner-DC pool The target address should be another node inside the DC listening on the Serf LAN address

func (*Agent) Leader

func (a *Agent) Leader() raft.ServerAddress

Leader is used to return the Raft leader

func (*Agent) LocalMember

func (a *Agent) LocalMember() serf.Member

LocalMember is used to return the local node

func (*Agent) LocalServers

func (a *Agent) LocalServers() (members []*ServerParts)

LocalServers returns a list of the local known server

func (*Agent) Members

func (a *Agent) Members() []serf.Member

Members is used to return the members of the serf cluster

func (*Agent) RaftApply

func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture

RaftApply applies a command to the Raft log

func (*Agent) RetryJoinCh

func (a *Agent) RetryJoinCh() <-chan error

RetryJoinCh is a channel that transports errors from the retry join process.

func (*Agent) Run

func (a *Agent) Run(jobName string, ex *Execution) (*Job, error)

Run call the agents to run a job. Returns a job with its new status and next schedule.

func (*Agent) Servers

func (a *Agent) Servers() (members []*ServerParts)

Servers returns a list of known server

func (*Agent) SetConfig

func (a *Agent) SetConfig(c *Config)

SetConfig sets the agent's config.

func (*Agent) Start

func (a *Agent) Start() error

Start the current agent by running all the necessary checks and server or client routines.

func (*Agent) StartServer

func (a *Agent) StartServer()

StartServer launch a new dkron server process

func (*Agent) Stop

func (a *Agent) Stop() error

Stop stops an agent, if the agent is a server and is running for election stop running for election, if this server was the leader this will force the cluster to elect a new leader and start a new scheduler. If this is a server and has the scheduler started stop it, ignoring if this server was participating in leader election or not (local storage). Then actually leave the cluster.

func (*Agent) UpdateTags added in v3.1.8

func (a *Agent) UpdateTags(tags map[string]string)

UpdateTags updates the tag configuration for this agent

type AgentOption

type AgentOption func(agent *Agent)

AgentOption type that defines agent options

func WithPlugins

func WithPlugins(plugins Plugins) AgentOption

WithPlugins option to set plugins to the agent

func WithRaftStore added in v3.2.1

func WithRaftStore(raftStore RaftStore) AgentOption

func WithStore

func WithStore(store Storage) AgentOption

WithStore set store in the agent

func WithTransportCredentials

func WithTransportCredentials(tls *tls.Config) AgentOption

WithTransportCredentials set tls config in the agent

type AgentServer

type AgentServer struct {
	types.AgentServer
	// contains filtered or unexported fields
}

GRPCAgentServer is the local implementation of the gRPC server interface.

func (*AgentServer) AgentRun

func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_AgentRunServer) error

AgentRun is called when an agent starts running a job and lasts all execution, the agent will stream execution progress to the server.

type Config

type Config struct {
	// NodeName is the name we register as. Defaults to hostname.
	NodeName string `mapstructure:"node-name"`

	// BindAddr is the address on which all of dkron's services will
	// be bound. If not specified, this defaults to the first private ip address.
	BindAddr string `mapstructure:"bind-addr"`

	// HTTPAddr is the address on the UI web server will
	// be bound. If not specified, this defaults to all interfaces.
	HTTPAddr string `mapstructure:"http-addr"`

	// Profile is used to select a timing profile for Serf. The supported choices
	// are "wan", "lan", and "local". The default is "lan"
	Profile string

	// AdvertiseAddr is the address that the Serf and gRPC layer will advertise to
	// other members of the cluster. Can be used for basic NAT traversal
	// where both the internal ip:port and external ip:port are known.
	AdvertiseAddr string `mapstructure:"advertise-addr"`

	// Tags are used to attach key/value metadata to a node.
	Tags map[string]string `mapstructure:"tags"`

	// Server enables this node to work as a dkron server.
	Server bool

	// EncryptKey is the secret key to use for encrypting communication
	// traffic for Serf. The secret key must be exactly 32-bytes, base64
	// encoded. The easiest way to do this on Unix machines is this command:
	// "head -c32 /dev/urandom | base64" or use "dkron keygen". If this is
	// not specified, the traffic will not be encrypted.
	EncryptKey string `mapstructure:"encrypt"`

	// StartJoin is a list of addresses to attempt to join when the
	// agent starts. If Serf is unable to communicate with any of these
	// addresses, then the agent will error and exit.
	StartJoin []string `mapstructure:"join"`

	// RetryJoinLAN is a list of addresses to attempt to join when the
	// agent starts. Serf will continue to retry the join until it
	// succeeds or RetryMaxAttempts is reached.
	RetryJoinLAN []string `mapstructure:"retry-join"`

	// RetryMaxAttemptsLAN is used to limit the maximum attempts made
	// by RetryJoin to reach other nodes. If this is 0, then no limit
	// is imposed, and Serf will continue to try forever. Defaults to 0.
	RetryJoinMaxAttemptsLAN int `mapstructure:"retry-max"`

	// RetryIntervalLAN is the string retry interval. This interval
	// controls how often we retry the join for RetryJoin. This defaults
	// to 30 seconds.
	RetryJoinIntervalLAN time.Duration `mapstructure:"retry-interval"`

	// RPCPort is the gRPC port used by Dkron. This should be reachable
	// by the other servers and clients.
	RPCPort int `mapstructure:"rpc-port"`

	// AdvertiseRPCPort is the gRPC port advertised to clients. This should be reachable
	// by the other servers and clients.
	AdvertiseRPCPort int `mapstructure:"advertise-rpc-port"`

	// LogLevel is the log verbosity level used.
	// It can be (debug|info|warn|error|fatal|panic).
	LogLevel string `mapstructure:"log-level"`

	// Datacenter is the datacenter this Dkron server belongs to.
	Datacenter string

	// Region is the region this Dkron server belongs to.
	Region string

	// Bootstrap mode is used to bring up the first Dkron server.  It is
	// required so that it can elect a leader without any other nodes
	// being present
	Bootstrap bool

	// BootstrapExpect tries to automatically bootstrap the Dkron cluster,
	// by withholding peers until enough servers join.
	BootstrapExpect int `mapstructure:"bootstrap-expect"`

	// DataDir is the directory to store our state in
	DataDir string `mapstructure:"data-dir"`

	// DevMode is used for development purposes only and limits the
	// use of persistence or state.
	DevMode bool

	// ReconcileInterval controls how often we reconcile the strongly
	// consistent store with the Serf info. This is used to handle nodes
	// that are force removed, as well as intermittent unavailability during
	// leader election.
	ReconcileInterval time.Duration

	// RaftMultiplier An integer multiplier used by Dkron servers to scale key
	// Raft timing parameters.
	RaftMultiplier int `mapstructure:"raft-multiplier"`

	// MailHost is the SMTP server host to use for email notifications.
	MailHost string `mapstructure:"mail-host"`

	// MailPort is the SMTP server port to use for email notifications.
	MailPort uint16 `mapstructure:"mail-port"`

	// MailUsername is the SMTP server username to use for email notifications.
	MailUsername string `mapstructure:"mail-username"`

	// MailPassword is the SMTP server password to use for email notifications.
	MailPassword string `mapstructure:"mail-password"`

	// MailFrom is the email sender to use for email notifications.
	MailFrom string `mapstructure:"mail-from"`

	// MailPayload is the email template body to use for email notifications.
	MailPayload string `mapstructure:"mail-payload"`

	// MailSubjectPrefix is the email subject prefix string to use for email notifications.
	MailSubjectPrefix string `mapstructure:"mail-subject-prefix"`

	// PreWebhookURL is the endpoint to call for notifications.
	PreWebhookEndpoint string `mapstructure:"pre-webhook-endpoint"`

	// PreWebhookPayload is the body template of the request for notifications.
	PreWebhookPayload string `mapstructure:"pre-webhook-payload"`

	// PreWebhookHeaders are the headers to use when calling the webhook for notifications.
	PreWebhookHeaders []string `mapstructure:"pre-webhook-headers"`

	// WebhookEndpoint is the URL to call for notifications.
	WebhookEndpoint string `mapstructure:"webhook-endpoint"`

	// WebhookPayload is the body template of the request for notifications.
	WebhookPayload string `mapstructure:"webhook-payload"`

	// WebhookHeaders are the headers to use when calling the webhook for notifications.
	WebhookHeaders []string `mapstructure:"webhook-headers"`

	// DogStatsdAddr is the address of a dogstatsd instance. If provided,
	// metrics will be sent to that instance.
	DogStatsdAddr string `mapstructure:"dog-statsd-addr"`

	// DogStatsdTags are the global tags that should be sent with each packet to dogstatsd
	// It is a list of strings, where each string looks like "my_tag_name:my_tag_value".
	DogStatsdTags []string `mapstructure:"dog-statsd-tags"`

	// StatsdAddr is the statsd standard server to be used for sending metrics.
	StatsdAddr string `mapstructure:"statsd-addr"`

	// SerfReconnectTimeout is the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone
	SerfReconnectTimeout string `mapstructure:"serf-reconnect-timeout"`

	// EnablePrometheus enables serving of prometheus metrics at /metrics
	EnablePrometheus bool `mapstructure:"enable-prometheus"`

	// UI enable the web UI on this node. The node must be server.
	UI bool

	// DisableUsageStats disable sending anonymous usage stats
	DisableUsageStats bool `mapstructure:"disable-usage-stats"`

	// CronitorEndpoint is the endpoint to call for cronitor notifications.
	CronitorEndpoint string `mapstructure:"cronitor-endpoint"`
}

Config stores all configuration options for the dkron package.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config struct pointer with sensible default settings.

func (*Config) AddrParts

func (c *Config) AddrParts(address string) (string, int, error)

AddrParts returns the parts of the BindAddr that should be used to configure Serf.

func (*Config) EncryptBytes

func (c *Config) EncryptBytes() ([]byte, error)

EncryptBytes returns the encryption key configured.

func (*Config) Hash added in v3.1.5

func (c *Config) Hash() (string, error)

Hash returns the sha 256 hash of the configuration in a standard base64 encoded string

type DkronGRPCClient

type DkronGRPCClient interface {
	Connect(string) (*grpc.ClientConn, error)
	ExecutionDone(string, *Execution) error
	GetJob(string, string) (*Job, error)
	SetJob(*Job) error
	DeleteJob(string) (*Job, error)
	Leave(string) error
	RunJob(string) (*Job, error)
	RaftGetConfiguration(string) (*proto.RaftGetConfigurationResponse, error)
	RaftRemovePeerByID(string, string) error
	GetActiveExecutions(string) ([]*proto.Execution, error)
	SetExecution(execution *proto.Execution) error
	AgentRun(addr string, job *proto.Job, execution *proto.Execution) error
}

DkronGRPCClient defines the interface that any gRPC client for dkron should implement.

func NewGRPCClient

func NewGRPCClient(dialOpt grpc.DialOption, agent *Agent, logger *logrus.Entry) DkronGRPCClient

NewGRPCClient returns a new instance of the gRPC client.

type DkronGRPCServer

type DkronGRPCServer interface {
	proto.DkronServer
	Serve(net.Listener) error
}

DkronGRPCServer defines the basics that a gRPC server should implement.

func NewGRPCServer

func NewGRPCServer(agent *Agent, logger *logrus.Entry) DkronGRPCServer

NewGRPCServer creates and returns an instance of a DkronGRPCServer implementation

type EntryJob added in v3.2.0

type EntryJob struct {
	// contains filtered or unexported fields
}

type Execution

type Execution struct {
	// Id is the Key for this execution
	Id string `json:"id,omitempty"`

	// Name of the job this executions refers to.
	JobName string `json:"job_name,omitempty"`

	// Start time of the execution.
	StartedAt time.Time `json:"started_at,omitempty"`

	// When the execution finished running.
	FinishedAt time.Time `json:"finished_at,omitempty"`

	// If this execution executed successfully.
	Success bool `json:"success"`

	// Partial output of the execution.
	Output string `json:"output,omitempty"`

	// Node name of the node that run this execution.
	NodeName string `json:"node_name,omitempty"`

	// Execution group to what this execution belongs to.
	Group int64 `json:"group,omitempty"`

	// Retry attempt of this execution.
	Attempt uint `json:"attempt,omitempty"`
}

Execution type holds all of the details of a specific Execution.

func NewExecution

func NewExecution(jobName string) *Execution

NewExecution creates a new execution.

func NewExecutionFromProto

func NewExecutionFromProto(e *proto.Execution) *Execution

NewExecutionFromProto maps a proto.ExecutionDoneRequest to an Execution object

func (*Execution) GetGroup

func (e *Execution) GetGroup() string

GetGroup is the getter for the execution group.

func (*Execution) Key

func (e *Execution) Key() string

Key wil generate the execution Id for an execution.

func (*Execution) ToProto

func (e *Execution) ToProto() *proto.Execution

ToProto returns the protobuf struct corresponding to the representation of the current execution.

type ExecutionOptions added in v3.1.2

type ExecutionOptions struct {
	Sort     string
	Order    string
	Timezone *time.Location
}

ExecutionOptions additional options like "Sort" will be ready for JSON marshall

type GRPCClient

type GRPCClient struct {
	// contains filtered or unexported fields
}

GRPCClient is the local implementation of the DkronGRPCClient interface.

func (*GRPCClient) AgentRun

func (grpcc *GRPCClient) AgentRun(addr string, job *proto.Job, execution *proto.Execution) error

AgentRun runs a job in the given agent

func (*GRPCClient) Connect

func (grpcc *GRPCClient) Connect(addr string) (*grpc.ClientConn, error)

Connect dialing to a gRPC server

func (*GRPCClient) DeleteJob

func (grpcc *GRPCClient) DeleteJob(jobName string) (*Job, error)

DeleteJob calls the leader passing the job name

func (*GRPCClient) ExecutionDone

func (grpcc *GRPCClient) ExecutionDone(addr string, execution *Execution) error

ExecutionDone calls the ExecutionDone gRPC method

func (*GRPCClient) GetActiveExecutions

func (grpcc *GRPCClient) GetActiveExecutions(addr string) ([]*proto.Execution, error)

GetActiveExecutions returns the active executions of a server node

func (*GRPCClient) GetJob

func (grpcc *GRPCClient) GetJob(addr, jobName string) (*Job, error)

GetJob calls GetJob gRPC method in the server

func (*GRPCClient) Leave

func (grpcc *GRPCClient) Leave(addr string) error

Leave calls Leave method on the gRPC server

func (*GRPCClient) RaftGetConfiguration

func (grpcc *GRPCClient) RaftGetConfiguration(addr string) (*proto.RaftGetConfigurationResponse, error)

RaftGetConfiguration get the current raft configuration of peers

func (*GRPCClient) RaftRemovePeerByID

func (grpcc *GRPCClient) RaftRemovePeerByID(addr, peerID string) error

RaftRemovePeerByID remove a raft peer

func (*GRPCClient) RunJob

func (grpcc *GRPCClient) RunJob(jobName string) (*Job, error)

RunJob calls the leader passing the job name

func (*GRPCClient) SetExecution

func (grpcc *GRPCClient) SetExecution(execution *proto.Execution) error

SetExecution calls the leader passing the execution

func (*GRPCClient) SetJob

func (grpcc *GRPCClient) SetJob(job *Job) error

SetJob calls the leader passing the job

type GRPCServer

type GRPCServer struct {
	proto.DkronServer
	// contains filtered or unexported fields
}

GRPCServer is the local implementation of the gRPC server interface.

func (*GRPCServer) DeleteJob

func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJobRequest) (*proto.DeleteJobResponse, error)

DeleteJob broadcast a state change to the cluster members that will delete the job. This only works on the leader

func (*GRPCServer) ExecutionDone

func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.ExecutionDoneRequest) (*proto.ExecutionDoneResponse, error)

ExecutionDone saves the execution to the store

func (*GRPCServer) GetActiveExecutions

func (grpcs *GRPCServer) GetActiveExecutions(ctx context.Context, in *empty.Empty) (*proto.GetActiveExecutionsResponse, error)

GetActiveExecutions returns the active executions on the server node

func (*GRPCServer) GetJob

func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *proto.GetJobRequest) (*proto.GetJobResponse, error)

GetJob loads the job from the datastore

func (*GRPCServer) Leave

func (grpcs *GRPCServer) Leave(ctx context.Context, in *empty.Empty) (*empty.Empty, error)

Leave calls the Stop method, stopping everything in the server

func (*GRPCServer) RaftGetConfiguration

func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Empty) (*proto.RaftGetConfigurationResponse, error)

RaftGetConfiguration get raft config

func (*GRPCServer) RaftRemovePeerByID

func (grpcs *GRPCServer) RaftRemovePeerByID(ctx context.Context, in *proto.RaftRemovePeerByIDRequest) (*empty.Empty, error)

RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft quorum but no longer known to Serf or the catalog) by address in the form of "IP:port". The reply argument is not used, but is required to fulfill the RPC interface.

func (*GRPCServer) RunJob

func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (*proto.RunJobResponse, error)

RunJob runs a job in the cluster

func (*GRPCServer) Serve

func (grpcs *GRPCServer) Serve(lis net.Listener) error

Serve creates and start a new gRPC dkron server

func (*GRPCServer) SetExecution

func (grpcs *GRPCServer) SetExecution(ctx context.Context, execution *proto.Execution) (*empty.Empty, error)

SetExecution broadcast a state change to the cluster members that will store the execution. This only works on the leader

func (*GRPCServer) SetJob

func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequest) (*proto.SetJobResponse, error)

SetJob broadcast a state change to the cluster members that will store the job. Then restart the scheduler This only works on the leader

func (*GRPCServer) ToggleJob

func (grpcs *GRPCServer) ToggleJob(ctx context.Context, getJobReq *proto.ToggleJobRequest) (*proto.ToggleJobResponse, error)

ToggleJob toggle the enablement of a job

type HCLogAdapter

type HCLogAdapter struct {
	Logger     logrus.FieldLogger
	LoggerName string
}

HCLogAdapter implements the hclog interface, and wraps it around a Logrus entry

func (*HCLogAdapter) CreateEntry

func (a *HCLogAdapter) CreateEntry(args []interface{}) *logrus.Entry

CreateEntry creates a new logrus entry

func (*HCLogAdapter) Debug

func (a *HCLogAdapter) Debug(msg string, args ...interface{})

Debug logging level message

func (*HCLogAdapter) Error

func (a *HCLogAdapter) Error(msg string, args ...interface{})

Error logging level message

func (*HCLogAdapter) GetLevel added in v3.2.2

func (a *HCLogAdapter) GetLevel() hclog.Level

GetLevel noop

func (*HCLogAdapter) ImpliedArgs

func (a *HCLogAdapter) ImpliedArgs() []interface{}

ImpliedArgs returns With key/value pairs

func (*HCLogAdapter) Info

func (a *HCLogAdapter) Info(msg string, args ...interface{})

Info logging level message

func (*HCLogAdapter) IsDebug

func (a *HCLogAdapter) IsDebug() bool

IsDebug check

func (*HCLogAdapter) IsError

func (a *HCLogAdapter) IsError() bool

IsError check

func (*HCLogAdapter) IsInfo

func (a *HCLogAdapter) IsInfo() bool

IsInfo check

func (*HCLogAdapter) IsTrace

func (a *HCLogAdapter) IsTrace() bool

IsTrace check

func (*HCLogAdapter) IsWarn

func (a *HCLogAdapter) IsWarn() bool

IsWarn check

func (*HCLogAdapter) Log

func (*HCLogAdapter) Log(level hclog.Level, msg string, args ...interface{})

Log Emit a message and key/value pairs at a provided log level

func (*HCLogAdapter) Name

func (a *HCLogAdapter) Name() string

Name returns the Name of the logger

func (*HCLogAdapter) Named

func (a *HCLogAdapter) Named(name string) hclog.Logger

Named returns a named logger

func (*HCLogAdapter) ResetNamed

func (a *HCLogAdapter) ResetNamed(name string) hclog.Logger

ResetNamed returns a new logger with the default name

func (*HCLogAdapter) SetLevel

func (a *HCLogAdapter) SetLevel(hclog.Level)

SetLevel noop

func (*HCLogAdapter) StandardLogger

func (a *HCLogAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *golog.Logger

StandardLogger is meant to return a stldib Logger type which wraps around hclog. It does this by providing an io.Writer and instantiating a new Logger. It then tries to interpret the log level by parsing the message.

Since we are not using `hclog` in a generic way, and I cannot find any calls to this method from go-plugin, we will poorly support this method. Rather than pull in all of hclog writer parsing logic, pass it a Logrus writer, and hardcode the level to INFO.

Apologies to those who find themselves here.

func (*HCLogAdapter) StandardWriter

func (a *HCLogAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer

StandardWriter return a value that conforms to io.Writer, which can be passed into log.SetOutput()

func (*HCLogAdapter) Trace

func (*HCLogAdapter) Trace(_ string, _ ...interface{})

Trace HCLog has one more level than we do. As such, we will never set trace level.

func (*HCLogAdapter) Warn

func (a *HCLogAdapter) Warn(msg string, args ...interface{})

Warn logging level message

func (*HCLogAdapter) With

func (a *HCLogAdapter) With(args ...interface{}) hclog.Logger

With returns a new instance with the specified options

type HTTPTransport

type HTTPTransport struct {
	Engine *gin.Engine
	// contains filtered or unexported fields
}

HTTPTransport stores pointers to an agent and a gin Engine.

func NewTransport

func NewTransport(a *Agent, log *logrus.Entry) *HTTPTransport

NewTransport creates an HTTPTransport with a bound agent.

func (*HTTPTransport) APIRoutes

func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup, middleware ...gin.HandlerFunc)

APIRoutes registers the api routes on the gin RouterGroup.

func (*HTTPTransport) MetaMiddleware

func (h *HTTPTransport) MetaMiddleware() gin.HandlerFunc

MetaMiddleware adds middleware to the gin Context.

func (*HTTPTransport) Options added in v3.1.0

func (h *HTTPTransport) Options(c *gin.Context)

func (*HTTPTransport) ServeHTTP

func (h *HTTPTransport) ServeHTTP()

func (*HTTPTransport) UI added in v3.1.0

func (h *HTTPTransport) UI(r *gin.RouterGroup)

UI registers UI specific routes on the gin RouterGroup.

type Job

type Job struct {
	// Job id. Must be unique, it's a copy of name.
	ID string `json:"id"`

	// Job name. Must be unique, acts as the id.
	Name string `json:"name"`

	// Display name of the job. If present, displayed instead of the name
	DisplayName string `json:"displayname"`

	// The timezone where the cron expression will be evaluated in.
	// Empty means local time.
	Timezone string `json:"timezone"`

	// Cron expression for the job. When to run the job.
	Schedule string `json:"schedule"`

	// Arbitrary string indicating the owner of the job.
	Owner string `json:"owner"`

	// Email address to use for notifications.
	OwnerEmail string `json:"owner_email"`

	// Number of successful executions of this job.
	SuccessCount int `json:"success_count"`

	// Number of errors running this job.
	ErrorCount int `json:"error_count"`

	// Last time this job executed successfully.
	LastSuccess ntime.NullableTime `json:"last_success"`

	// Last time this job failed.
	LastError ntime.NullableTime `json:"last_error"`

	// Is this job disabled?
	Disabled bool `json:"disabled"`

	// Tags of the target servers to run this job against.
	Tags map[string]string `json:"tags"`

	// Job metadata describes the job and allows filtering from the API.
	Metadata map[string]string `json:"metadata"`

	// Pointer to the calling agent.
	Agent *Agent `json:"-"`

	// Number of times to retry a job that failed an execution.
	Retries uint `json:"retries"`

	// Jobs that are dependent upon this one will be run after this job runs.
	DependentJobs []string `json:"dependent_jobs"`

	// Job pointer that are dependent upon this one
	ChildJobs []*Job `json:"-"`

	// Job id of job that this job is dependent upon.
	ParentJob string `json:"parent_job"`

	// Processors to use for this job.
	Processors map[string]plugin.Config `json:"processors"`

	// Concurrency policy for this job (allow, forbid).
	Concurrency string `json:"concurrency"`

	// Executor plugin to be used in this job.
	Executor string `json:"executor"`

	// Configuration arguments for the specific executor.
	ExecutorConfig plugin.ExecutorPluginConfig `json:"executor_config"`

	// Computed job status.
	Status string `json:"status"`

	// Computed next execution.
	Next time.Time `json:"next"`

	// Delete the job after the first successful execution.
	Ephemeral bool `json:"ephemeral"`

	// The job will not be executed after this time.
	ExpiresAt ntime.NullableTime `json:"expires_at"`
	// contains filtered or unexported fields
}

Job describes a scheduled Job.

func NewJobFromProto

func NewJobFromProto(in *proto.Job, logger *logrus.Entry) *Job

NewJobFromProto create a new Job from a PB Job struct

func (*Job) GetNext

func (j *Job) GetNext() (time.Time, error)

GetNext returns the job's next schedule from now

func (*Job) GetParent

func (j *Job) GetParent(store *Store) (*Job, error)

GetParent returns the parent job of a job

func (*Job) GetTimeLocation added in v3.0.8

func (j *Job) GetTimeLocation() *time.Location

GetTimeLocation returns the time.Location based on the job's Timezone, or the default (UTC) if none is configured, or nil if an error occurred while creating the timezone from the property

func (*Job) Run

func (j *Job) Run()

Run the job

func (*Job) String

func (j *Job) String() string

Friendly format a job

func (*Job) ToProto

func (j *Job) ToProto() *proto.Job

ToProto return the corresponding representation of this Job in proto struct

func (*Job) Validate

func (j *Job) Validate() error

Validate validates whether all values in the job are acceptable.

type JobOptions

type JobOptions struct {
	Metadata map[string]string `json:"tags"`
	Sort     string
	Order    string
	Query    string
	Status   string
	Disabled string
}

JobOptions additional options to apply when loading a Job.

type LogApplier

type LogApplier func(buf []byte, index uint64) interface{}

LogApplier is the definition of a function that can apply a Raft log

type LogAppliers

type LogAppliers map[MessageType]LogApplier

LogAppliers is a mapping of the Raft MessageType to the appropriate log applier

type MId added in v3.1.0

type MId struct {
	serf.Member

	Id         string `json:"id"`
	StatusText string `json:"statusText"`
}

type MessageType

type MessageType uint8

MessageType is the type to encode FSM commands.

const (
	// SetJobType is the command used to store a job in the store.
	SetJobType MessageType = iota
	// DeleteJobType is the command used to delete a Job from the store.
	DeleteJobType
	// SetExecutionType is the command used to store an Execution to the store.
	SetExecutionType
	// DeleteExecutionsType is the command used to delete executions from the store.
	DeleteExecutionsType
	// ExecutionDoneType is the command to perform the logic needed once an execution
	// is done.
	ExecutionDoneType
)

type Node added in v3.1.8

type Node = serf.Member

Node is a shorter, more descriptive name for serf.Member

type Plugins

type Plugins struct {
	Processors map[string]plugin.Processor
	Executors  map[string]plugin.Executor
}

Plugins struct to store loaded plugins of each type

type ProcessorFactory

type ProcessorFactory func() (plugin.Processor, error)

ProcessorFactory is a function type that creates a new instance of a processor.

type RaftLayer

type RaftLayer struct {
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

RaftLayer is the network layer for internode communications.

func NewRaftLayer

func NewRaftLayer(logger *logrus.Entry) *RaftLayer

NewRaftLayer returns an initialized unencrypted RaftLayer.

func NewTLSRaftLayer

func NewTLSRaftLayer(tlsConfig *tls.Config, logger *logrus.Entry) *RaftLayer

NewTLSRaftLayer returns an initialized TLS-encrypted RaftLayer.

func (*RaftLayer) Accept

func (t *RaftLayer) Accept() (net.Conn, error)

Accept waits for the next connection.

func (*RaftLayer) Addr

func (t *RaftLayer) Addr() net.Addr

Addr returns the binding address of the RaftLayer.

func (*RaftLayer) Close

func (t *RaftLayer) Close() error

Close closes the RaftLayer

func (*RaftLayer) Dial

func (t *RaftLayer) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error)

Dial opens a network connection.

func (*RaftLayer) Open

func (t *RaftLayer) Open(l net.Listener) error

Open opens the RaftLayer, binding to the supplied address.

type RaftStore added in v3.2.1

type RaftStore interface {
	raft.StableStore
	raft.LogStore
	Close() error
}

type Scheduler

type Scheduler struct {
	Cron *cron.Cron
	// contains filtered or unexported fields
}

Scheduler represents a dkron scheduler instance, it stores the cron engine and the related parameters.

func NewScheduler

func NewScheduler(logger *logrus.Entry) *Scheduler

NewScheduler creates a new Scheduler instance

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job *Job) error

AddJob Adds a job to the cron scheduler

func (*Scheduler) ClearCron added in v3.0.5

func (s *Scheduler) ClearCron()

ClearCron clears the cron scheduler

func (*Scheduler) GetEntryJob added in v3.2.0

func (s *Scheduler) GetEntryJob(jobName string) (EntryJob, bool)

GetEntryJob returns a EntryJob object from a snapshot in the current time, and whether or not the entry was found.

func (*Scheduler) RemoveJob

func (s *Scheduler) RemoveJob(jobName string)

RemoveJob removes a job from the cron scheduler if it exists.

func (*Scheduler) Restart

func (s *Scheduler) Restart(jobs []*Job, agent *Agent)

Restart the scheduler

func (*Scheduler) Start

func (s *Scheduler) Start(jobs []*Job, agent *Agent) error

Start the cron scheduler, adding its corresponding jobs and executing them on time.

func (*Scheduler) Started

func (s *Scheduler) Started() bool

Started will safely return if the scheduler is started or not

func (*Scheduler) Stop

func (s *Scheduler) Stop() context.Context

Stop stops the cron scheduler if it is running; otherwise it does nothing. A context is returned so the caller can wait for running jobs to complete.

type ServerParts

type ServerParts struct {
	Name         string
	ID           string
	Region       string
	Datacenter   string
	Port         int
	Bootstrap    bool
	Expect       int
	RaftVersion  int
	BuildVersion *version.Version
	Addr         net.Addr
	RPCAddr      net.Addr
	Status       serf.MemberStatus
}

ServerParts is used to return the parts of a server role

func (*ServerParts) Copy

func (s *ServerParts) Copy() *ServerParts

Copy returns a copy of this struct

func (*ServerParts) String

func (s *ServerParts) String() string

String returns a representation of this instance

type Storage

type Storage interface {
	SetJob(job *Job, copyDependentJobs bool) error
	DeleteJob(name string) (*Job, error)
	SetExecution(execution *Execution) (string, error)
	SetExecutionDone(execution *Execution) (bool, error)
	GetJobs(options *JobOptions) ([]*Job, error)
	GetJob(name string, options *JobOptions) (*Job, error)
	GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error)
	GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error)
	GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error)
	Shutdown() error
	Snapshot(w io.WriteCloser) error
	Restore(r io.ReadCloser) error
}

Storage is the interface that should be used by any storage engine implemented for dkron. It contains the minimum set of operations that are needed to have a working dkron store.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is the local implementation of the Storage interface. It gives dkron the ability to manipulate its embedded storage BuntDB.

func NewStore

func NewStore(logger *logrus.Entry) (*Store, error)

NewStore creates a new Storage instance.

func (*Store) DB

func (s *Store) DB() *buntdb.DB

DB is the getter for the BuntDB instance

func (*Store) DeleteJob

func (s *Store) DeleteJob(name string) (*Job, error)

DeleteJob deletes the given job from the store, along with all its executions and references to it.

func (*Store) GetExecutionGroup

func (s *Store) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error)

GetExecutionGroup returns all executions in the same group of a given execution

func (*Store) GetExecutions

func (s *Store) GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error)

GetExecutions returns the executions given a Job name.

func (*Store) GetGroupedExecutions

func (s *Store) GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error)

GetGroupedExecutions returns executions for a job grouped and with an ordered index to facilitate access.

func (*Store) GetJob

func (s *Store) GetJob(name string, options *JobOptions) (*Job, error)

GetJob finds and return a Job from the store

func (*Store) GetJobs

func (s *Store) GetJobs(options *JobOptions) ([]*Job, error)

GetJobs returns all jobs

func (*Store) Restore

func (s *Store) Restore(r io.ReadCloser) error

Restore load data created with backup in to Bunt

func (*Store) SetExecution

func (s *Store) SetExecution(execution *Execution) (string, error)

SetExecution Save a new execution and returns the key of the new saved item or an error.

func (*Store) SetExecutionDone

func (s *Store) SetExecutionDone(execution *Execution) (bool, error)

SetExecutionDone saves the execution and updates the job with the corresponding results

func (*Store) SetJob

func (s *Store) SetJob(job *Job, copyDependentJobs bool) error

SetJob stores a job in the storage

func (*Store) Shutdown

func (s *Store) Shutdown() error

Shutdown close the KV store

func (*Store) Snapshot

func (s *Store) Snapshot(w io.WriteCloser) error

Snapshot creates a backup of the data stored in BuntDB

type Transport

type Transport interface {
	ServeHTTP()
}

Transport is the interface that wraps the ServeHTTP method.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL