Documentation ¶
Index ¶
- Constants
- Variables
- func ConfigFlagSet() *flag.FlagSet
- func CreateMyRender(l *logrus.Entry) multitemplate.Render
- func Encode(t MessageType, msg interface{}) ([]byte, error)
- func InitLogger(logLevel string, node string) *logrus.Entry
- func NewAgentServer(agent *Agent, logger *logrus.Entry) types.AgentServer
- func ParseSingleIPTemplate(ipTmpl string) (string, error)
- func SendPostNotifications(config *Config, execution *Execution, exGroup []*Execution, job *Job, ...) error
- func SendPreNotifications(config *Config, execution *Execution, exGroup []*Execution, job *Job, ...) error
- func UserAgent() string
- type Agent
- func (a *Agent) Config() *Config
- func (a *Agent) DashboardRoutes(r *gin.RouterGroup)
- func (a *Agent) GetActiveExecutions() ([]*proto.Execution, error)
- func (a *Agent) GetRunningJobs() int
- func (a *Agent) IsLeader() bool
- func (a *Agent) JoinLAN(addrs []string) (int, error)
- func (a *Agent) Leader() raft.ServerAddress
- func (a *Agent) LocalMember() serf.Member
- func (a *Agent) LocalServers() (members []*ServerParts)
- func (a *Agent) Members() []serf.Member
- func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture
- func (a *Agent) RetryJoinCh() <-chan error
- func (a *Agent) Run(jobName string, ex *Execution) (*Job, error)
- func (a *Agent) Servers() (members []*ServerParts)
- func (a *Agent) SetConfig(c *Config)
- func (a *Agent) Start() error
- func (a *Agent) StartServer()
- func (a *Agent) Stop() error
- func (a *Agent) UpdateTags(tags map[string]string)
- type AgentOption
- type AgentServer
- type Config
- type DkronGRPCClient
- type DkronGRPCServer
- type EntryJob
- type Execution
- type ExecutionOptions
- type GRPCClient
- func (grpcc *GRPCClient) AgentRun(addr string, job *proto.Job, execution *proto.Execution) error
- func (grpcc *GRPCClient) Connect(addr string) (*grpc.ClientConn, error)
- func (grpcc *GRPCClient) DeleteJob(jobName string) (*Job, error)
- func (grpcc *GRPCClient) ExecutionDone(addr string, execution *Execution) error
- func (grpcc *GRPCClient) GetActiveExecutions(addr string) ([]*proto.Execution, error)
- func (grpcc *GRPCClient) GetJob(addr, jobName string) (*Job, error)
- func (grpcc *GRPCClient) Leave(addr string) error
- func (grpcc *GRPCClient) RaftGetConfiguration(addr string) (*proto.RaftGetConfigurationResponse, error)
- func (grpcc *GRPCClient) RaftRemovePeerByID(addr, peerID string) error
- func (grpcc *GRPCClient) RunJob(jobName string) (*Job, error)
- func (grpcc *GRPCClient) SetExecution(execution *proto.Execution) error
- func (grpcc *GRPCClient) SetJob(job *Job) error
- type GRPCServer
- func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJobRequest) (*proto.DeleteJobResponse, error)
- func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.ExecutionDoneRequest) (*proto.ExecutionDoneResponse, error)
- func (grpcs *GRPCServer) GetActiveExecutions(ctx context.Context, in *empty.Empty) (*proto.GetActiveExecutionsResponse, error)
- func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *proto.GetJobRequest) (*proto.GetJobResponse, error)
- func (grpcs *GRPCServer) Leave(ctx context.Context, in *empty.Empty) (*empty.Empty, error)
- func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Empty) (*proto.RaftGetConfigurationResponse, error)
- func (grpcs *GRPCServer) RaftRemovePeerByID(ctx context.Context, in *proto.RaftRemovePeerByIDRequest) (*empty.Empty, error)
- func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (*proto.RunJobResponse, error)
- func (grpcs *GRPCServer) Serve(lis net.Listener) error
- func (grpcs *GRPCServer) SetExecution(ctx context.Context, execution *proto.Execution) (*empty.Empty, error)
- func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequest) (*proto.SetJobResponse, error)
- func (grpcs *GRPCServer) ToggleJob(ctx context.Context, getJobReq *proto.ToggleJobRequest) (*proto.ToggleJobResponse, error)
- type HCLogAdapter
- func (a *HCLogAdapter) CreateEntry(args []interface{}) *logrus.Entry
- func (a *HCLogAdapter) Debug(msg string, args ...interface{})
- func (a *HCLogAdapter) Error(msg string, args ...interface{})
- func (a *HCLogAdapter) GetLevel() hclog.Level
- func (a *HCLogAdapter) ImpliedArgs() []interface{}
- func (a *HCLogAdapter) Info(msg string, args ...interface{})
- func (a *HCLogAdapter) IsDebug() bool
- func (a *HCLogAdapter) IsError() bool
- func (a *HCLogAdapter) IsInfo() bool
- func (a *HCLogAdapter) IsTrace() bool
- func (a *HCLogAdapter) IsWarn() bool
- func (*HCLogAdapter) Log(level hclog.Level, msg string, args ...interface{})
- func (a *HCLogAdapter) Name() string
- func (a *HCLogAdapter) Named(name string) hclog.Logger
- func (a *HCLogAdapter) ResetNamed(name string) hclog.Logger
- func (a *HCLogAdapter) SetLevel(hclog.Level)
- func (a *HCLogAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *golog.Logger
- func (a *HCLogAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer
- func (*HCLogAdapter) Trace(_ string, _ ...interface{})
- func (a *HCLogAdapter) Warn(msg string, args ...interface{})
- func (a *HCLogAdapter) With(args ...interface{}) hclog.Logger
- type HTTPTransport
- type Job
- type JobOptions
- type LogApplier
- type LogAppliers
- type MId
- type MessageType
- type Node
- type Plugins
- type ProcessorFactory
- type RaftLayer
- type RaftStore
- type Scheduler
- func (s *Scheduler) AddJob(job *Job) error
- func (s *Scheduler) ClearCron()
- func (s *Scheduler) GetEntryJob(jobName string) (EntryJob, bool)
- func (s *Scheduler) RemoveJob(jobName string)
- func (s *Scheduler) Restart(jobs []*Job, agent *Agent)
- func (s *Scheduler) Start(jobs []*Job, agent *Agent) error
- func (s *Scheduler) Started() bool
- func (s *Scheduler) Stop() context.Context
- type ServerParts
- type Storage
- type Store
- func (s *Store) DB() *buntdb.DB
- func (s *Store) DeleteJob(name string) (*Job, error)
- func (s *Store) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error)
- func (s *Store) GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error)
- func (s *Store) GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error)
- func (s *Store) GetJob(name string, options *JobOptions) (*Job, error)
- func (s *Store) GetJobs(options *JobOptions) ([]*Job, error)
- func (s *Store) Restore(r io.ReadCloser) error
- func (s *Store) SetExecution(execution *Execution) (string, error)
- func (s *Store) SetExecutionDone(execution *Execution) (bool, error)
- func (s *Store) SetJob(job *Job, copyDependentJobs bool) error
- func (s *Store) Shutdown() error
- func (s *Store) Snapshot(w io.WriteCloser) error
- type Transport
Constants ¶
const ( DefaultBindPort int = 8946 DefaultRPCPort int = 6868 DefaultRetryInterval time.Duration = time.Second * 30 )
DefaultBindPort is the default port that dkron will use for Serf communication
const ( // StatusNotSet is the initial job status. StatusNotSet = "" // StatusSuccess is status of a job whose last run was a success. StatusSuccess = "success" // StatusRunning is status of a job whose last run has not finished. StatusRunning = "running" // StatusFailed is status of a job whose last run was not successful on any nodes. StatusFailed = "failed" // StatusPartiallyFailed is status of a job whose last run was successful on only some nodes. StatusPartiallyFailed = "partially_failed" // ConcurrencyAllow allows a job to execute concurrency. ConcurrencyAllow = "allow" // ConcurrencyForbid forbids a job from executing concurrency. ConcurrencyForbid = "forbid" )
const (
// MaxExecutions to maintain in the storage
MaxExecutions = 100
)
const ( // StatusReap is used to update the status of a node if we // are handling a EventMemberReap StatusReap = serf.MemberStatus(-1) )
Variables ¶
var ( // ErrLeaderNotFound is returned when obtained leader is not found in member list ErrLeaderNotFound = errors.New("no member leader found in member list") // ErrNoSuitableServer returns an error in case no suitable server to send the request is found. ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting") )
var ( // ErrExecutionDoneForDeletedJob is returned when an execution done // is received for a non existent job. ErrExecutionDoneForDeletedJob = errors.New("grpc: Received execution done for a deleted job") // ErrRPCDialing is returned on dialing fail. ErrRPCDialing = errors.New("grpc: Error dialing, verify the network connection to the server") // ErrNotLeader is the error returned when the operation need the node to be the leader, // but the current node is not the leader. ErrNotLeader = errors.New("grpc: Error, server is not leader, this operation should be run on the leader") // ErrBrokenStream is the error that indicates a sudden disconnection of the agent streaming an execution ErrBrokenStream = errors.New("grpc: Error on execution streaming, agent connection was abruptly terminated") )
var ( // ErrParentJobNotFound is returned when the parent job is not found. ErrParentJobNotFound = errors.New("specified parent job not found") // ErrNoAgent is returned when the job's agent is nil. ErrNoAgent = errors.New("no agent defined") // ErrSameParent is returned when the job's parent is itself. ErrSameParent = errors.New("the job can not have itself as parent") // ErrNoParent is returned when the job has no parent. ErrNoParent = errors.New("the job doesn't have a parent job set") // ErrNoCommand is returned when attempting to store a job that has no command. ErrNoCommand = errors.New("unspecified command for job") // ErrWrongConcurrency is returned when Concurrency is set to a non existing setting. ErrWrongConcurrency = errors.New("invalid concurrency policy value, use \"allow\" or \"forbid\"") )
var Codename = "Devel"
Codename codename of this series
var ( // ErrDependentJobs is returned when deleting a job that has dependent jobs ErrDependentJobs = errors.New("store: could not delete job with dependent jobs, delete childs first") )
var ErrResolvingHost = errors.New("error resolving hostname")
var ( // ErrScheduleParse is the error returned when the schedule parsing fails. ErrScheduleParse = errors.New("can't parse job schedule") )
var Name = "Dkron"
Name store the name of this software
var Version = "devel"
Version is the current version that will get replaced on build.
Functions ¶
func ConfigFlagSet ¶
ConfigFlagSet creates all of our configuration flags.
func CreateMyRender ¶
func CreateMyRender(l *logrus.Entry) multitemplate.Render
CreateMyRender returns a new custom multitemplate renderer to use with Gin.
func Encode ¶
func Encode(t MessageType, msg interface{}) ([]byte, error)
Encode is used to encode a Protoc object with type prefix
func InitLogger ¶
InitLogger creates the logger instance
func NewAgentServer ¶
func NewAgentServer(agent *Agent, logger *logrus.Entry) types.AgentServer
NewServer creates and returns an instance of a DkronGRPCServer implementation
func ParseSingleIPTemplate ¶
ParseSingleIPTemplate is used as a helper function to parse out a single IP address from a config parameter.
func SendPostNotifications ¶ added in v3.2.0
func SendPostNotifications(config *Config, execution *Execution, exGroup []*Execution, job *Job, logger *logrus.Entry) error
Send sends the notifications using any configured method
Types ¶
type Agent ¶
type Agent struct { // ProcessorPlugins maps processor plugins ProcessorPlugins map[string]plugin.Processor //ExecutorPlugins maps executor plugins ExecutorPlugins map[string]plugin.Executor // HTTPTransport is a swappable interface for the HTTP server interface HTTPTransport Transport // Store interface to set the storage engine Store Storage // GRPCServer interface for setting the GRPC server GRPCServer DkronGRPCServer // GRPCClient interface for setting the GRPC client GRPCClient DkronGRPCClient // TLSConfig allows setting a TLS config for transport TLSConfig *tls.Config // Pro features GlobalLock bool MemberEventHandler func(serf.Event) ProAppliers LogAppliers // contains filtered or unexported fields }
Agent is the main struct that represents a dkron agent
func NewAgent ¶
func NewAgent(config *Config, options ...AgentOption) *Agent
NewAgent returns a new Agent instance capable of starting and running a Dkron instance.
func (*Agent) DashboardRoutes ¶
func (a *Agent) DashboardRoutes(r *gin.RouterGroup)
DashboardRoutes registers dashboard specific routes on the gin RouterGroup.
func (*Agent) GetActiveExecutions ¶
GetActiveExecutions returns running executions globally
func (*Agent) GetRunningJobs ¶
GetRunningJobs returns amount of active jobs of the local agent
func (*Agent) JoinLAN ¶
JoinLAN is used to have Dkron join the inner-DC pool The target address should be another node inside the DC listening on the Serf LAN address
func (*Agent) Leader ¶
func (a *Agent) Leader() raft.ServerAddress
Leader is used to return the Raft leader
func (*Agent) LocalMember ¶
LocalMember is used to return the local node
func (*Agent) LocalServers ¶
func (a *Agent) LocalServers() (members []*ServerParts)
LocalServers returns a list of the local known server
func (*Agent) RaftApply ¶
func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture
RaftApply applies a command to the Raft log
func (*Agent) RetryJoinCh ¶
RetryJoinCh is a channel that transports errors from the retry join process.
func (*Agent) Run ¶
Run call the agents to run a job. Returns a job with its new status and next schedule.
func (*Agent) Servers ¶
func (a *Agent) Servers() (members []*ServerParts)
Servers returns a list of known server
func (*Agent) Start ¶
Start the current agent by running all the necessary checks and server or client routines.
func (*Agent) StartServer ¶
func (a *Agent) StartServer()
StartServer launch a new dkron server process
func (*Agent) Stop ¶
Stop stops an agent, if the agent is a server and is running for election stop running for election, if this server was the leader this will force the cluster to elect a new leader and start a new scheduler. If this is a server and has the scheduler started stop it, ignoring if this server was participating in leader election or not (local storage). Then actually leave the cluster.
func (*Agent) UpdateTags ¶ added in v3.1.8
UpdateTags updates the tag configuration for this agent
type AgentOption ¶
type AgentOption func(agent *Agent)
AgentOption type that defines agent options
func WithPlugins ¶
func WithPlugins(plugins Plugins) AgentOption
WithPlugins option to set plugins to the agent
func WithRaftStore ¶ added in v3.2.1
func WithRaftStore(raftStore RaftStore) AgentOption
func WithTransportCredentials ¶
func WithTransportCredentials(tls *tls.Config) AgentOption
WithTransportCredentials set tls config in the agent
type AgentServer ¶
type AgentServer struct { types.AgentServer // contains filtered or unexported fields }
GRPCAgentServer is the local implementation of the gRPC server interface.
func (*AgentServer) AgentRun ¶
func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_AgentRunServer) error
AgentRun is called when an agent starts running a job and lasts all execution, the agent will stream execution progress to the server.
type Config ¶
type Config struct { // NodeName is the name we register as. Defaults to hostname. NodeName string `mapstructure:"node-name"` // BindAddr is the address on which all of dkron's services will // be bound. If not specified, this defaults to the first private ip address. BindAddr string `mapstructure:"bind-addr"` // HTTPAddr is the address on the UI web server will // be bound. If not specified, this defaults to all interfaces. HTTPAddr string `mapstructure:"http-addr"` // Profile is used to select a timing profile for Serf. The supported choices // are "wan", "lan", and "local". The default is "lan" Profile string // AdvertiseAddr is the address that the Serf and gRPC layer will advertise to // other members of the cluster. Can be used for basic NAT traversal // where both the internal ip:port and external ip:port are known. AdvertiseAddr string `mapstructure:"advertise-addr"` // Tags are used to attach key/value metadata to a node. Tags map[string]string `mapstructure:"tags"` // Server enables this node to work as a dkron server. Server bool // EncryptKey is the secret key to use for encrypting communication // traffic for Serf. The secret key must be exactly 32-bytes, base64 // encoded. The easiest way to do this on Unix machines is this command: // "head -c32 /dev/urandom | base64" or use "dkron keygen". If this is // not specified, the traffic will not be encrypted. EncryptKey string `mapstructure:"encrypt"` // StartJoin is a list of addresses to attempt to join when the // agent starts. If Serf is unable to communicate with any of these // addresses, then the agent will error and exit. StartJoin []string `mapstructure:"join"` // RetryJoinLAN is a list of addresses to attempt to join when the // agent starts. Serf will continue to retry the join until it // succeeds or RetryMaxAttempts is reached. RetryJoinLAN []string `mapstructure:"retry-join"` // RetryMaxAttemptsLAN is used to limit the maximum attempts made // by RetryJoin to reach other nodes. If this is 0, then no limit // is imposed, and Serf will continue to try forever. Defaults to 0. RetryJoinMaxAttemptsLAN int `mapstructure:"retry-max"` // RetryIntervalLAN is the string retry interval. This interval // controls how often we retry the join for RetryJoin. This defaults // to 30 seconds. RetryJoinIntervalLAN time.Duration `mapstructure:"retry-interval"` // RPCPort is the gRPC port used by Dkron. This should be reachable // by the other servers and clients. RPCPort int `mapstructure:"rpc-port"` // AdvertiseRPCPort is the gRPC port advertised to clients. This should be reachable // by the other servers and clients. AdvertiseRPCPort int `mapstructure:"advertise-rpc-port"` // LogLevel is the log verbosity level used. // It can be (debug|info|warn|error|fatal|panic). LogLevel string `mapstructure:"log-level"` // Datacenter is the datacenter this Dkron server belongs to. Datacenter string // Region is the region this Dkron server belongs to. Region string // Bootstrap mode is used to bring up the first Dkron server. It is // required so that it can elect a leader without any other nodes // being present Bootstrap bool // BootstrapExpect tries to automatically bootstrap the Dkron cluster, // by withholding peers until enough servers join. BootstrapExpect int `mapstructure:"bootstrap-expect"` // DataDir is the directory to store our state in DataDir string `mapstructure:"data-dir"` // DevMode is used for development purposes only and limits the // use of persistence or state. DevMode bool // ReconcileInterval controls how often we reconcile the strongly // consistent store with the Serf info. This is used to handle nodes // that are force removed, as well as intermittent unavailability during // leader election. ReconcileInterval time.Duration // RaftMultiplier An integer multiplier used by Dkron servers to scale key // Raft timing parameters. RaftMultiplier int `mapstructure:"raft-multiplier"` // MailHost is the SMTP server host to use for email notifications. MailHost string `mapstructure:"mail-host"` // MailPort is the SMTP server port to use for email notifications. MailPort uint16 `mapstructure:"mail-port"` // MailUsername is the SMTP server username to use for email notifications. MailUsername string `mapstructure:"mail-username"` // MailPassword is the SMTP server password to use for email notifications. MailPassword string `mapstructure:"mail-password"` // MailFrom is the email sender to use for email notifications. MailFrom string `mapstructure:"mail-from"` // MailPayload is the email template body to use for email notifications. MailPayload string `mapstructure:"mail-payload"` // MailSubjectPrefix is the email subject prefix string to use for email notifications. MailSubjectPrefix string `mapstructure:"mail-subject-prefix"` // PreWebhookURL is the endpoint to call for notifications. PreWebhookEndpoint string `mapstructure:"pre-webhook-endpoint"` // PreWebhookPayload is the body template of the request for notifications. PreWebhookPayload string `mapstructure:"pre-webhook-payload"` // PreWebhookHeaders are the headers to use when calling the webhook for notifications. PreWebhookHeaders []string `mapstructure:"pre-webhook-headers"` // WebhookEndpoint is the URL to call for notifications. WebhookEndpoint string `mapstructure:"webhook-endpoint"` // WebhookPayload is the body template of the request for notifications. WebhookPayload string `mapstructure:"webhook-payload"` // WebhookHeaders are the headers to use when calling the webhook for notifications. WebhookHeaders []string `mapstructure:"webhook-headers"` // DogStatsdAddr is the address of a dogstatsd instance. If provided, // metrics will be sent to that instance. DogStatsdAddr string `mapstructure:"dog-statsd-addr"` // DogStatsdTags are the global tags that should be sent with each packet to dogstatsd // It is a list of strings, where each string looks like "my_tag_name:my_tag_value". DogStatsdTags []string `mapstructure:"dog-statsd-tags"` // StatsdAddr is the statsd standard server to be used for sending metrics. StatsdAddr string `mapstructure:"statsd-addr"` // SerfReconnectTimeout is the amount of time to attempt to reconnect to a failed node before giving up and considering it completely gone SerfReconnectTimeout string `mapstructure:"serf-reconnect-timeout"` // EnablePrometheus enables serving of prometheus metrics at /metrics EnablePrometheus bool `mapstructure:"enable-prometheus"` // UI enable the web UI on this node. The node must be server. UI bool // DisableUsageStats disable sending anonymous usage stats DisableUsageStats bool `mapstructure:"disable-usage-stats"` // CronitorEndpoint is the endpoint to call for cronitor notifications. CronitorEndpoint string `mapstructure:"cronitor-endpoint"` }
Config stores all configuration options for the dkron package.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config struct pointer with sensible default settings.
func (*Config) AddrParts ¶
AddrParts returns the parts of the BindAddr that should be used to configure Serf.
func (*Config) EncryptBytes ¶
EncryptBytes returns the encryption key configured.
type DkronGRPCClient ¶
type DkronGRPCClient interface { Connect(string) (*grpc.ClientConn, error) ExecutionDone(string, *Execution) error GetJob(string, string) (*Job, error) SetJob(*Job) error DeleteJob(string) (*Job, error) Leave(string) error RunJob(string) (*Job, error) RaftGetConfiguration(string) (*proto.RaftGetConfigurationResponse, error) RaftRemovePeerByID(string, string) error GetActiveExecutions(string) ([]*proto.Execution, error) SetExecution(execution *proto.Execution) error AgentRun(addr string, job *proto.Job, execution *proto.Execution) error }
DkronGRPCClient defines the interface that any gRPC client for dkron should implement.
func NewGRPCClient ¶
func NewGRPCClient(dialOpt grpc.DialOption, agent *Agent, logger *logrus.Entry) DkronGRPCClient
NewGRPCClient returns a new instance of the gRPC client.
type DkronGRPCServer ¶
type DkronGRPCServer interface { proto.DkronServer Serve(net.Listener) error }
DkronGRPCServer defines the basics that a gRPC server should implement.
func NewGRPCServer ¶
func NewGRPCServer(agent *Agent, logger *logrus.Entry) DkronGRPCServer
NewGRPCServer creates and returns an instance of a DkronGRPCServer implementation
type Execution ¶
type Execution struct { // Id is the Key for this execution Id string `json:"id,omitempty"` // Name of the job this executions refers to. JobName string `json:"job_name,omitempty"` // Start time of the execution. StartedAt time.Time `json:"started_at,omitempty"` // When the execution finished running. FinishedAt time.Time `json:"finished_at,omitempty"` // If this execution executed successfully. Success bool `json:"success"` // Partial output of the execution. Output string `json:"output,omitempty"` // Node name of the node that run this execution. NodeName string `json:"node_name,omitempty"` // Execution group to what this execution belongs to. Group int64 `json:"group,omitempty"` // Retry attempt of this execution. Attempt uint `json:"attempt,omitempty"` }
Execution type holds all of the details of a specific Execution.
func NewExecution ¶
NewExecution creates a new execution.
func NewExecutionFromProto ¶
NewExecutionFromProto maps a proto.ExecutionDoneRequest to an Execution object
type ExecutionOptions ¶ added in v3.1.2
ExecutionOptions additional options like "Sort" will be ready for JSON marshall
type GRPCClient ¶
type GRPCClient struct {
// contains filtered or unexported fields
}
GRPCClient is the local implementation of the DkronGRPCClient interface.
func (*GRPCClient) Connect ¶
func (grpcc *GRPCClient) Connect(addr string) (*grpc.ClientConn, error)
Connect dialing to a gRPC server
func (*GRPCClient) DeleteJob ¶
func (grpcc *GRPCClient) DeleteJob(jobName string) (*Job, error)
DeleteJob calls the leader passing the job name
func (*GRPCClient) ExecutionDone ¶
func (grpcc *GRPCClient) ExecutionDone(addr string, execution *Execution) error
ExecutionDone calls the ExecutionDone gRPC method
func (*GRPCClient) GetActiveExecutions ¶
func (grpcc *GRPCClient) GetActiveExecutions(addr string) ([]*proto.Execution, error)
GetActiveExecutions returns the active executions of a server node
func (*GRPCClient) GetJob ¶
func (grpcc *GRPCClient) GetJob(addr, jobName string) (*Job, error)
GetJob calls GetJob gRPC method in the server
func (*GRPCClient) Leave ¶
func (grpcc *GRPCClient) Leave(addr string) error
Leave calls Leave method on the gRPC server
func (*GRPCClient) RaftGetConfiguration ¶
func (grpcc *GRPCClient) RaftGetConfiguration(addr string) (*proto.RaftGetConfigurationResponse, error)
RaftGetConfiguration get the current raft configuration of peers
func (*GRPCClient) RaftRemovePeerByID ¶
func (grpcc *GRPCClient) RaftRemovePeerByID(addr, peerID string) error
RaftRemovePeerByID remove a raft peer
func (*GRPCClient) RunJob ¶
func (grpcc *GRPCClient) RunJob(jobName string) (*Job, error)
RunJob calls the leader passing the job name
func (*GRPCClient) SetExecution ¶
func (grpcc *GRPCClient) SetExecution(execution *proto.Execution) error
SetExecution calls the leader passing the execution
func (*GRPCClient) SetJob ¶
func (grpcc *GRPCClient) SetJob(job *Job) error
SetJob calls the leader passing the job
type GRPCServer ¶
type GRPCServer struct { proto.DkronServer // contains filtered or unexported fields }
GRPCServer is the local implementation of the gRPC server interface.
func (*GRPCServer) DeleteJob ¶
func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJobRequest) (*proto.DeleteJobResponse, error)
DeleteJob broadcast a state change to the cluster members that will delete the job. This only works on the leader
func (*GRPCServer) ExecutionDone ¶
func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.ExecutionDoneRequest) (*proto.ExecutionDoneResponse, error)
ExecutionDone saves the execution to the store
func (*GRPCServer) GetActiveExecutions ¶
func (grpcs *GRPCServer) GetActiveExecutions(ctx context.Context, in *empty.Empty) (*proto.GetActiveExecutionsResponse, error)
GetActiveExecutions returns the active executions on the server node
func (*GRPCServer) GetJob ¶
func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *proto.GetJobRequest) (*proto.GetJobResponse, error)
GetJob loads the job from the datastore
func (*GRPCServer) RaftGetConfiguration ¶
func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Empty) (*proto.RaftGetConfigurationResponse, error)
RaftGetConfiguration get raft config
func (*GRPCServer) RaftRemovePeerByID ¶
func (grpcs *GRPCServer) RaftRemovePeerByID(ctx context.Context, in *proto.RaftRemovePeerByIDRequest) (*empty.Empty, error)
RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft quorum but no longer known to Serf or the catalog) by address in the form of "IP:port". The reply argument is not used, but is required to fulfill the RPC interface.
func (*GRPCServer) RunJob ¶
func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (*proto.RunJobResponse, error)
RunJob runs a job in the cluster
func (*GRPCServer) Serve ¶
func (grpcs *GRPCServer) Serve(lis net.Listener) error
Serve creates and start a new gRPC dkron server
func (*GRPCServer) SetExecution ¶
func (grpcs *GRPCServer) SetExecution(ctx context.Context, execution *proto.Execution) (*empty.Empty, error)
SetExecution broadcast a state change to the cluster members that will store the execution. This only works on the leader
func (*GRPCServer) SetJob ¶
func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequest) (*proto.SetJobResponse, error)
SetJob broadcast a state change to the cluster members that will store the job. Then restart the scheduler This only works on the leader
func (*GRPCServer) ToggleJob ¶
func (grpcs *GRPCServer) ToggleJob(ctx context.Context, getJobReq *proto.ToggleJobRequest) (*proto.ToggleJobResponse, error)
ToggleJob toggle the enablement of a job
type HCLogAdapter ¶
type HCLogAdapter struct { Logger logrus.FieldLogger LoggerName string }
HCLogAdapter implements the hclog interface, and wraps it around a Logrus entry
func (*HCLogAdapter) CreateEntry ¶
func (a *HCLogAdapter) CreateEntry(args []interface{}) *logrus.Entry
CreateEntry creates a new logrus entry
func (*HCLogAdapter) Debug ¶
func (a *HCLogAdapter) Debug(msg string, args ...interface{})
Debug logging level message
func (*HCLogAdapter) Error ¶
func (a *HCLogAdapter) Error(msg string, args ...interface{})
Error logging level message
func (*HCLogAdapter) GetLevel ¶ added in v3.2.2
func (a *HCLogAdapter) GetLevel() hclog.Level
GetLevel noop
func (*HCLogAdapter) ImpliedArgs ¶
func (a *HCLogAdapter) ImpliedArgs() []interface{}
ImpliedArgs returns With key/value pairs
func (*HCLogAdapter) Info ¶
func (a *HCLogAdapter) Info(msg string, args ...interface{})
Info logging level message
func (*HCLogAdapter) Log ¶
func (*HCLogAdapter) Log(level hclog.Level, msg string, args ...interface{})
Log Emit a message and key/value pairs at a provided log level
func (*HCLogAdapter) Name ¶
func (a *HCLogAdapter) Name() string
Name returns the Name of the logger
func (*HCLogAdapter) Named ¶
func (a *HCLogAdapter) Named(name string) hclog.Logger
Named returns a named logger
func (*HCLogAdapter) ResetNamed ¶
func (a *HCLogAdapter) ResetNamed(name string) hclog.Logger
ResetNamed returns a new logger with the default name
func (*HCLogAdapter) StandardLogger ¶
func (a *HCLogAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *golog.Logger
StandardLogger is meant to return a stldib Logger type which wraps around hclog. It does this by providing an io.Writer and instantiating a new Logger. It then tries to interpret the log level by parsing the message.
Since we are not using `hclog` in a generic way, and I cannot find any calls to this method from go-plugin, we will poorly support this method. Rather than pull in all of hclog writer parsing logic, pass it a Logrus writer, and hardcode the level to INFO.
Apologies to those who find themselves here.
func (*HCLogAdapter) StandardWriter ¶
func (a *HCLogAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer
StandardWriter return a value that conforms to io.Writer, which can be passed into log.SetOutput()
func (*HCLogAdapter) Trace ¶
func (*HCLogAdapter) Trace(_ string, _ ...interface{})
Trace HCLog has one more level than we do. As such, we will never set trace level.
func (*HCLogAdapter) Warn ¶
func (a *HCLogAdapter) Warn(msg string, args ...interface{})
Warn logging level message
func (*HCLogAdapter) With ¶
func (a *HCLogAdapter) With(args ...interface{}) hclog.Logger
With returns a new instance with the specified options
type HTTPTransport ¶
HTTPTransport stores pointers to an agent and a gin Engine.
func NewTransport ¶
func NewTransport(a *Agent, log *logrus.Entry) *HTTPTransport
NewTransport creates an HTTPTransport with a bound agent.
func (*HTTPTransport) APIRoutes ¶
func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup, middleware ...gin.HandlerFunc)
APIRoutes registers the api routes on the gin RouterGroup.
func (*HTTPTransport) MetaMiddleware ¶
func (h *HTTPTransport) MetaMiddleware() gin.HandlerFunc
MetaMiddleware adds middleware to the gin Context.
func (*HTTPTransport) Options ¶ added in v3.1.0
func (h *HTTPTransport) Options(c *gin.Context)
func (*HTTPTransport) ServeHTTP ¶
func (h *HTTPTransport) ServeHTTP()
func (*HTTPTransport) UI ¶ added in v3.1.0
func (h *HTTPTransport) UI(r *gin.RouterGroup)
UI registers UI specific routes on the gin RouterGroup.
type Job ¶
type Job struct { // Job id. Must be unique, it's a copy of name. ID string `json:"id"` // Job name. Must be unique, acts as the id. Name string `json:"name"` // Display name of the job. If present, displayed instead of the name DisplayName string `json:"displayname"` // The timezone where the cron expression will be evaluated in. // Empty means local time. Timezone string `json:"timezone"` // Cron expression for the job. When to run the job. Schedule string `json:"schedule"` // Arbitrary string indicating the owner of the job. Owner string `json:"owner"` // Email address to use for notifications. OwnerEmail string `json:"owner_email"` // Number of successful executions of this job. SuccessCount int `json:"success_count"` // Number of errors running this job. ErrorCount int `json:"error_count"` // Last time this job executed successfully. LastSuccess ntime.NullableTime `json:"last_success"` // Last time this job failed. LastError ntime.NullableTime `json:"last_error"` // Is this job disabled? Disabled bool `json:"disabled"` // Tags of the target servers to run this job against. Tags map[string]string `json:"tags"` // Job metadata describes the job and allows filtering from the API. Metadata map[string]string `json:"metadata"` // Pointer to the calling agent. Agent *Agent `json:"-"` // Number of times to retry a job that failed an execution. Retries uint `json:"retries"` // Jobs that are dependent upon this one will be run after this job runs. DependentJobs []string `json:"dependent_jobs"` // Job pointer that are dependent upon this one ChildJobs []*Job `json:"-"` // Job id of job that this job is dependent upon. ParentJob string `json:"parent_job"` // Processors to use for this job. Processors map[string]plugin.Config `json:"processors"` // Concurrency policy for this job (allow, forbid). Concurrency string `json:"concurrency"` // Executor plugin to be used in this job. Executor string `json:"executor"` // Configuration arguments for the specific executor. ExecutorConfig plugin.ExecutorPluginConfig `json:"executor_config"` // Computed job status. Status string `json:"status"` // Computed next execution. Next time.Time `json:"next"` // Delete the job after the first successful execution. Ephemeral bool `json:"ephemeral"` // The job will not be executed after this time. ExpiresAt ntime.NullableTime `json:"expires_at"` // contains filtered or unexported fields }
Job describes a scheduled Job.
func NewJobFromProto ¶
NewJobFromProto create a new Job from a PB Job struct
func (*Job) GetTimeLocation ¶ added in v3.0.8
GetTimeLocation returns the time.Location based on the job's Timezone, or the default (UTC) if none is configured, or nil if an error occurred while creating the timezone from the property
type JobOptions ¶
type JobOptions struct { Metadata map[string]string `json:"tags"` Sort string Order string Query string Status string Disabled string }
JobOptions additional options to apply when loading a Job.
type LogApplier ¶
LogApplier is the definition of a function that can apply a Raft log
type LogAppliers ¶
type LogAppliers map[MessageType]LogApplier
LogAppliers is a mapping of the Raft MessageType to the appropriate log applier
type MessageType ¶
type MessageType uint8
MessageType is the type to encode FSM commands.
const ( // SetJobType is the command used to store a job in the store. SetJobType MessageType = iota // DeleteJobType is the command used to delete a Job from the store. DeleteJobType // SetExecutionType is the command used to store an Execution to the store. SetExecutionType // DeleteExecutionsType is the command used to delete executions from the store. DeleteExecutionsType // ExecutionDoneType is the command to perform the logic needed once an execution // is done. ExecutionDoneType )
type ProcessorFactory ¶
ProcessorFactory is a function type that creates a new instance of a processor.
type RaftLayer ¶
RaftLayer is the network layer for internode communications.
func NewRaftLayer ¶
NewRaftLayer returns an initialized unencrypted RaftLayer.
func NewTLSRaftLayer ¶
NewTLSRaftLayer returns an initialized TLS-encrypted RaftLayer.
type RaftStore ¶ added in v3.2.1
type RaftStore interface { raft.StableStore raft.LogStore Close() error }
type Scheduler ¶
type Scheduler struct { Cron *cron.Cron // contains filtered or unexported fields }
Scheduler represents a dkron scheduler instance, it stores the cron engine and the related parameters.
func NewScheduler ¶
NewScheduler creates a new Scheduler instance
func (*Scheduler) ClearCron ¶ added in v3.0.5
func (s *Scheduler) ClearCron()
ClearCron clears the cron scheduler
func (*Scheduler) GetEntryJob ¶ added in v3.2.0
GetEntryJob returns a EntryJob object from a snapshot in the current time, and whether or not the entry was found.
func (*Scheduler) Start ¶
Start the cron scheduler, adding its corresponding jobs and executing them on time.
type ServerParts ¶
type ServerParts struct { Name string ID string Region string Datacenter string Port int Bootstrap bool Expect int RaftVersion int BuildVersion *version.Version Addr net.Addr RPCAddr net.Addr Status serf.MemberStatus }
ServerParts is used to return the parts of a server role
func (*ServerParts) Copy ¶
func (s *ServerParts) Copy() *ServerParts
Copy returns a copy of this struct
func (*ServerParts) String ¶
func (s *ServerParts) String() string
String returns a representation of this instance
type Storage ¶
type Storage interface { SetJob(job *Job, copyDependentJobs bool) error DeleteJob(name string) (*Job, error) SetExecution(execution *Execution) (string, error) SetExecutionDone(execution *Execution) (bool, error) GetJobs(options *JobOptions) ([]*Job, error) GetJob(name string, options *JobOptions) (*Job, error) GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error) GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error) Shutdown() error Snapshot(w io.WriteCloser) error Restore(r io.ReadCloser) error }
Storage is the interface that should be used by any storage engine implemented for dkron. It contains the minimum set of operations that are needed to have a working dkron store.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is the local implementation of the Storage interface. It gives dkron the ability to manipulate its embedded storage BuntDB.
func (*Store) DeleteJob ¶
DeleteJob deletes the given job from the store, along with all its executions and references to it.
func (*Store) GetExecutionGroup ¶
func (s *Store) GetExecutionGroup(execution *Execution, opts *ExecutionOptions) ([]*Execution, error)
GetExecutionGroup returns all executions in the same group of a given execution
func (*Store) GetExecutions ¶
func (s *Store) GetExecutions(jobName string, opts *ExecutionOptions) ([]*Execution, error)
GetExecutions returns the executions given a Job name.
func (*Store) GetGroupedExecutions ¶
func (s *Store) GetGroupedExecutions(jobName string, opts *ExecutionOptions) (map[int64][]*Execution, []int64, error)
GetGroupedExecutions returns executions for a job grouped and with an ordered index to facilitate access.
func (*Store) GetJob ¶
func (s *Store) GetJob(name string, options *JobOptions) (*Job, error)
GetJob finds and return a Job from the store
func (*Store) GetJobs ¶
func (s *Store) GetJobs(options *JobOptions) ([]*Job, error)
GetJobs returns all jobs
func (*Store) Restore ¶
func (s *Store) Restore(r io.ReadCloser) error
Restore load data created with backup in to Bunt
func (*Store) SetExecution ¶
SetExecution Save a new execution and returns the key of the new saved item or an error.
func (*Store) SetExecutionDone ¶
SetExecutionDone saves the execution and updates the job with the corresponding results