Version: v0.22.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2021 License: Apache-2.0 Imports: 25 Imported by: 2




This section is empty.


This section is empty.


func InGroups

func InGroups(set []string, size int, f func([]string) error) error

InGroups calls f for sub slices of a slice where every slice is at most `size` big

func InterruptableSleep

func InterruptableSleep(ctx context.Context, d time.Duration) error

InterruptableSleep sleep for the duration of the n'th wait cycle in a way that can be interrupted by the context. An error is returned if the context cancels the sleep


type ChoriaClient

type ChoriaClient interface {
	Request(ctx context.Context, msg *choria.Message, handler cclient.Handler) (err error)

ChoriaClient implements the connection to the Choria network

type ChoriaFramework

type ChoriaFramework interface {
	Logger(string) *logrus.Entry
	Configuration() *config.Config
	NewMessage(payload string, agent string, collective string, msgType string, request *choria.Message) (msg *choria.Message, err error)
	NewReplyFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Reply, err error)
	NewTransportFromJSON(data string) (message protocol.TransportMessage, err error)
	MiddlewareServers() (servers srvcache.Servers, err error)
	NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, logger *logrus.Entry) (conn choria.Connector, err error)
	NewRequestID() (string, error)
	Certname() string
	PQLQueryCertNames(query string) ([]string, error)

type Connector

type Connector interface {
	QueueSubscribe(ctx context.Context, name string, subject string, group string, output chan *choria.ConnectorMessage) error
	Publish(msg *choria.Message) error

Connector is a connection to the choria network

type DiscoveryEndFunc

type DiscoveryEndFunc func(discovered int, limited int) error

DiscoveryEndFunc gets called after discovery ends and include the discovered node count and what count of nodes will be targeted after limits were applied should this return error the RPC call will terminate

type DiscoveryStartFunc

type DiscoveryStartFunc func()

DiscoveryStartFunc gets called before discovery starts

type Handler

type Handler func(protocol.Reply, *RPCReply)

Handler is a function that should handle each reply synchronously

type NodeList

type NodeList struct {
	// contains filtered or unexported fields

NodeList is a list of nodes the client is interacting with and used to keep track of things like which have responded, still to respond etc

func NewNodeList

func NewNodeList() *NodeList

NewNodeList creates a new initialized NodeList

func (*NodeList) AddHosts

func (n *NodeList) AddHosts(hosts ...string)

AddHosts appends the given nodes to the list of known nodes

func (*NodeList) Clear

func (n *NodeList) Clear()

Clear removes all nodes from the NodeList

func (*NodeList) Count

func (n *NodeList) Count() int

Count returns the number of nodes on the list

func (*NodeList) DeleteIfKnown

func (n *NodeList) DeleteIfKnown(host string) bool

DeleteIfKnown removes a node from the list if it's known, boolean result indicates if it was known

func (*NodeList) Have

func (n *NodeList) Have(host string) bool

Have determines if a node is known

func (*NodeList) HaveAny

func (n *NodeList) HaveAny(hosts ...string) bool

HaveAny determines if any of the given nodes are known in a boolean OR fashion

func (*NodeList) Hosts

func (n *NodeList) Hosts() []string

Hosts returns the individual nodes on the list

type Option

type Option func(r *RPC)

Option configures the RPC client

func DDL

func DDL(d *addl.DDL) Option

DDL supplies a DDL when creating the client thus avoiding a disk search

func DiscoveryMethod

func DiscoveryMethod(dm string) Option

DiscoveryMethod sets a specific discovery method

type RPC

type RPC struct {
	// contains filtered or unexported fields

RPC is a MCollective compatible RPC client

func New

func New(fw ChoriaFramework, agent string, opts ...Option) (rpc *RPC, err error)

New creates a new RPC request

A DDL is required when one is not given using the DDL() option as argument attempts will be made to find it on the file system should this fail an error will be returned

func (*RPC) Do

func (r *RPC) Do(ctx context.Context, action string, payload interface{}, opts ...RequestOption) (RequestResult, error)

Do performs a RPC request and optionally processes replies

If a filter is supplied using the Filter() option and Targets() are not then discovery will be done for you using the broadcast method, should no nodes be discovered an error will be returned

func (*RPC) Reset

func (r *RPC) Reset()

Reset removes the cached options, any further Do() calls need to specify full options

type RPCReply

type RPCReply struct {
	Statuscode mcorpc.StatusCode `json:"statuscode"`
	Statusmsg  string            `json:"statusmsg"`
	Data       json.RawMessage   `json:"data"`
	Sender     string            `json:"-"`
	Time       time.Time         `json:"-"`

RPCReply is a basic RPC reply

func ParseReplyData

func ParseReplyData(source []byte) (*RPCReply, error)

ParseReplyData parses reply data and populates a Reply and custom Data

func (*RPCReply) MatchExpr

func (r *RPCReply) MatchExpr(q string, prog *vm.Program) (bool, *vm.Program, error)

MatchExpr determines if the Reply matches expression q using the expr format. The query q is expected to return a boolean type else an error will be raised

type RPCRequest

type RPCRequest struct {
	Agent  string          `json:"agent"`
	Action string          `json:"action"`
	Data   json.RawMessage `json:"data"`

RPCRequest is a basic RPC request

type RequestOption

type RequestOption func(*RequestOptions)

RequestOption is a function capable of setting an option

func BroadcastRequest

func BroadcastRequest() RequestOption

BroadcastRequest for the request to be a broadcast mode

**NOTE:** You need to ensure you have filters etc done

func Collective

func Collective(c string) RequestOption

Collective sets the collective to target a message at

func ConnectionName

func ConnectionName(n string) RequestOption

ConnectionName sets the prefix used for various connection names

Setting this when making many clients will minimize prometheus metrics being created - 2 or 3 per client which with random generated names will snowball over time

func DirectRequest

func DirectRequest() RequestOption

DirectRequest force the request to be a direct request

func DiscoveryEndCB

func DiscoveryEndCB(h DiscoveryEndFunc) RequestOption

DiscoveryEndCB sets the function to be called after discovery and node limiting

func DiscoveryStartCB

func DiscoveryStartCB(h DiscoveryStartFunc) RequestOption

DiscoveryStartCB sets the function to be called before discovery starts

func DiscoveryTimeout

func DiscoveryTimeout(t time.Duration) RequestOption

DiscoveryTimeout configures the request discovery timeout, defaults to configured discovery timeout

func Filter

func Filter(f *protocol.Filter) RequestOption

Filter sets the filter, if its set discovery will be done prior to performing requests

func InBatches

func InBatches(size int, sleep int) RequestOption

InBatches performs requests in batches

func LimitMethod

func LimitMethod(m string) RequestOption

LimitMethod configures the method to use when limiting targets - "random" or "first"

func LimitSeed

func LimitSeed(s int64) RequestOption

LimitSeed sets the random seed used to select targets when limiting and limit method is "random"

func LimitSize

func LimitSize(s string) RequestOption

LimitSize sets limits on the targets, either a number of a percentage like "10%"

func Protocol

func Protocol(v string) RequestOption

Protocol sets the protocol version to use

func Replies

func Replies(r chan *choria.ConnectorMessage) RequestOption

Replies creates a custom channel for replies and will avoid processing them

func ReplyExprFilter

func ReplyExprFilter(f string) RequestOption

ReplyExprFilter filters reply by filter f, replies that match f will not be recorded and will not be passed to any handlers - they will count to received replies though as usual.

When this filter matches a reply and a handler is set the handler will be called using a nil 'rpcreply' allowing the handler to process progress bars and more

func ReplyHandler

func ReplyHandler(f Handler) RequestOption

ReplyHandler configures a callback to be called for each message received

func ReplyTo

func ReplyTo(r string) RequestOption

ReplyTo sets a custom reply to, else the connector will determine it

func ServiceRequest

func ServiceRequest() RequestOption

ServiceRequest for the request to be directed at a specific service agent

**Note**: does not support filters or targets

func Targets

func Targets(t []string) RequestOption

Targets configures targets for a RPC request

func Timeout

func Timeout(t time.Duration) RequestOption

Timeout configures the request timeout

func Workers

func Workers(w int) RequestOption

Workers configures the amount of workers used to process responses this is ignored during batched mode as that is always done with a single worker

type RequestOptions

type RequestOptions struct {
	BatchSize        int
	BatchSleep       time.Duration
	Collective       string
	ConnectionName   string
	DiscoveryTimeout time.Duration
	Filter           *protocol.Filter
	Handler          Handler
	ProcessReplies   bool
	ProtocolVersion  string
	Replies          chan *choria.ConnectorMessage
	ReplyTo          string
	RequestID        string
	RequestType      string
	Targets          []string
	Timeout          time.Duration
	Workers          int
	LimitSeed        int64
	LimitMethod      string
	LimitSize        string
	ReplyExprFilter  string
	DiscoveryStartCB DiscoveryStartFunc
	DiscoveryEndCB   DiscoveryEndFunc
	// contains filtered or unexported fields

RequestOptions are options for a RPC request

func NewRequestOptions

func NewRequestOptions(fw ChoriaFramework, ddl *agent.DDL) (*RequestOptions, error)

NewRequestOptions creates a initialized request options

func (*RequestOptions) ConfigureMessage

func (o *RequestOptions) ConfigureMessage(msg *choria.Message) (err error)

ConfigureMessage configures a pre-made message object based on the settings contained

func (*RequestOptions) Stats

func (o *RequestOptions) Stats() *Stats

Stats retrieves the stats for the completed request

type RequestResult

type RequestResult interface {
	Stats() *Stats

RequestResult is the result of a request

type Stats

type Stats struct {
	RequestID string
	// contains filtered or unexported fields

Stats represent stats for a request

func NewStats

func NewStats() *Stats

NewStats initializes a new stats instance

func (*Stats) Action

func (s *Stats) Action() string

Action returns the action the stat is for if it was set

func (*Stats) Agent

func (s *Stats) Agent() string

Agent returns the agent the stat is for if it was set

func (*Stats) All

func (s *Stats) All() bool

All determines if all expected nodes replied already

func (*Stats) DiscoveredCount

func (s *Stats) DiscoveredCount() int

DiscoveredCount is how many nodes were discovered

func (*Stats) DiscoveredNodes

func (s *Stats) DiscoveredNodes() *[]string

DiscoveredNodes are the nodes that was discovered for this request

func (*Stats) DiscoveryDuration

func (s *Stats) DiscoveryDuration() (time.Duration, error)

DiscoveryDuration determines how long discovery took, 0 and error when discovery was not done

func (*Stats) End

func (s *Stats) End()

End records the end time of a request

func (*Stats) EndDiscover

func (s *Stats) EndDiscover()

EndDiscover records the end time of the discovery process

func (*Stats) EndPublish

func (s *Stats) EndPublish()

EndPublish records the publish process ended

func (*Stats) FailCount

func (s *Stats) FailCount() int

FailCount is the number of responses that were failures

func (*Stats) FailedRequestInc

func (s *Stats) FailedRequestInc()

FailedRequestInc increments the failed request counter by one

func (*Stats) Merge

func (s *Stats) Merge(other *Stats) error

Merge merges the stats from a specific batch into this

func (*Stats) NoResponseFrom

func (s *Stats) NoResponseFrom() []string

NoResponseFrom calculates discovered which hosts did not respond

func (*Stats) OKCount

func (s *Stats) OKCount() int

OKCount is the number of responses that were ok

func (*Stats) OverrideDiscoveryTime

func (s *Stats) OverrideDiscoveryTime(start time.Time, end time.Time)

OverrideDiscoveryTime sets specific discovery time

func (*Stats) PassedRequestInc

func (s *Stats) PassedRequestInc()

PassedRequestInc increments the passed request counter by one

func (*Stats) PublishDuration

func (s *Stats) PublishDuration() (time.Duration, error)

PublishDuration calculates how long publishing took

func (*Stats) RecordReceived

func (s *Stats) RecordReceived(sender string)

RecordReceived reords the fact that one message was received

func (*Stats) RequestDuration

func (s *Stats) RequestDuration() (time.Duration, error)

RequestDuration calculates the total duration

func (*Stats) ResponsesCount

func (s *Stats) ResponsesCount() int

ResponsesCount if the total amount of nodes that responded so far

func (*Stats) SetAction

func (s *Stats) SetAction(a string)

SetAction stores the action the stats is for

func (*Stats) SetAgent

func (s *Stats) SetAgent(a string)

SetAgent stores the agent the stats is for

func (*Stats) SetDiscoveredNodes

func (s *Stats) SetDiscoveredNodes(nodes []string)

SetDiscoveredNodes records the node names we expect to communicate with

func (*Stats) Start

func (s *Stats) Start()

Start records the start time of a request

func (*Stats) StartDiscover

func (s *Stats) StartDiscover()

StartDiscover records the start time of the discovery process

func (*Stats) StartPublish

func (s *Stats) StartPublish()

StartPublish records the publish process started

func (*Stats) Started

func (s *Stats) Started() time.Time

Started is the time the request was started, zero time when not started

func (*Stats) UnexpectedResponseFrom

func (s *Stats) UnexpectedResponseFrom() []string

UnexpectedResponseFrom calculates which hosts responses that we did not expect responses from

func (*Stats) WaitingFor

func (s *Stats) WaitingFor(nodes []string) bool

WaitingFor checks if any of the given nodes are still outstanding

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to