Documentation
¶
Index ¶
- type AMQPFeeder
- func (f *AMQPFeeder) NewConsumer(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, ...) (*Consumer, error)
- func (f *AMQPFeeder) NewConsumerWithReconnector(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, ...) (*Consumer, error)
- func (f *AMQPFeeder) Run(out chan observation.InputObservation) error
- func (f *AMQPFeeder) SetInputDecoder(fn format.MakeObservationFunc)
- func (f *AMQPFeeder) Stop(stopChan chan bool)
- type Consumer
- type Feeder
- type HTTPFeeder
- type Setup
- type SocketFeeder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPFeeder ¶
type AMQPFeeder struct {
StopChan chan bool
StoppedChan chan bool
IsRunning bool
Consumer *Consumer
URL string
Exchanges []string
Queue string
MakeObservationFunc format.MakeObservationFunc
}
AMQPFeeder is a Feeder that accepts input via AMQP queues.
func MakeAMQPFeeder ¶
func MakeAMQPFeeder(url string, exchanges []string, queue string) *AMQPFeeder
MakeAMQPFeeder returns a new AMQPFeeder, connecting to the AMQP server at the given URL, creating a new queue with the given name bound to the provided exchanges.
func (*AMQPFeeder) NewConsumer ¶
func (f *AMQPFeeder) NewConsumer(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, out chan observation.InputObservation) (*Consumer, error)
NewConsumer returns a new Consumer.
func (*AMQPFeeder) NewConsumerWithReconnector ¶
func (f *AMQPFeeder) NewConsumerWithReconnector(amqpURI string, exchanges []string, exchangeType, queueName, key, ctag string, out chan observation.InputObservation, reconnector func(string) (wabbit.Conn, string, error)) (*Consumer, error)
NewConsumerWithReconnector creates a new consumer with the given properties. The callback function is called for each delivery accepted from a consumer channel.
func (*AMQPFeeder) Run ¶
func (f *AMQPFeeder) Run(out chan observation.InputObservation) error
Run starts the feeder.
func (*AMQPFeeder) SetInputDecoder ¶
func (f *AMQPFeeder) SetInputDecoder(fn format.MakeObservationFunc)
SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.
func (*AMQPFeeder) Stop ¶
func (f *AMQPFeeder) Stop(stopChan chan bool)
Stop causes the feeder to stop accepting deliveries and close all associated channels, including the passed notification channel.
type Consumer ¶
type Consumer struct {
URL string
Callback func(wabbit.Delivery)
StopReconnection chan bool
ChanMutex sync.Mutex
ConnMutex sync.Mutex
OutChan chan observation.InputObservation
MakeObservationFunc format.MakeObservationFunc
ErrorChan chan wabbit.Error
Reconnector func(string) (wabbit.Conn, string, error)
Connector func(*Consumer) error
// contains filtered or unexported fields
}
Consumer reads and processes messages from a fake RabbitMQ server.
type Feeder ¶
type Feeder interface {
Run(chan observation.InputObservation) error
SetInputDecoder(format.MakeObservationFunc)
Stop(chan bool)
}
Feeder is an interface of a component that accepts observations in a specific format and feeds them into a channel of InputObservations. An input decoder in the form of a MakeObservationFunc describes the operations necessary to transform the input format into an InputObservation.
type HTTPFeeder ¶
type HTTPFeeder struct {
StopChan chan bool
StoppedChan chan bool
IsRunning bool
Port int
Host string
MakeObservationFunc format.MakeObservationFunc
Server *http.Server
OutChan chan observation.InputObservation
}
HTTPFeeder is a Feeder implementation that accepts HTTP requests to obtain observations.
func MakeHTTPFeeder ¶
func MakeHTTPFeeder(host string, port int) *HTTPFeeder
MakeHTTPFeeder creates a new HTTPFeeder listening on a specific address and port.
func (*HTTPFeeder) Run ¶
func (f *HTTPFeeder) Run(out chan observation.InputObservation) error
Run starts the feeder.
func (*HTTPFeeder) ServeHTTP ¶
func (f *HTTPFeeder) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*HTTPFeeder) SetInputDecoder ¶
func (f *HTTPFeeder) SetInputDecoder(fn format.MakeObservationFunc)
SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.
func (*HTTPFeeder) Stop ¶
func (f *HTTPFeeder) Stop(stopChan chan bool)
Stop causes the feeder to stop accepting requests and close all associated channels, including the passed notification channel.
type Setup ¶
type Setup struct {
Feeder []struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
InputFormat string `yaml:"input_format"`
// for AMQP
URL string `yaml:"url"`
Exchange []string `yaml:"exchange"`
// for HTTP etc.
ListenHost string `yaml:"listen_host"`
ListenPort int `yaml:"listen_port"`
// for socket input
Path string `yaml:"path"`
} `yaml:"feeder"`
Feeders map[string]Feeder
}
Setup describes a collection of feeders that should be active, including their configuration settings.
func (*Setup) Run ¶
func (fs *Setup) Run(in chan observation.InputObservation) error
Run starts all feeders according to the description in the setup, in the background. Use Stop() to stop the feeders.
type SocketFeeder ¶
type SocketFeeder struct {
ObsChan chan observation.InputObservation
Verbose bool
Running bool
InputListener net.Listener
MakeObservationFunc format.MakeObservationFunc
StopChan chan bool
StoppedChan chan bool
}
SocketFeeder is a Feeder implementation that reds data from a UNIX socket.
func MakeSocketFeeder ¶
func MakeSocketFeeder(inputSocket string) (*SocketFeeder, error)
MakeSocketFeeder returns a new SocketFeeder reading from the Unix socket inputSocket and writing parsed events to outChan. If no such socket could be created for listening, the error returned is set accordingly.
func (*SocketFeeder) Run ¶
func (sf *SocketFeeder) Run(out chan observation.InputObservation) error
Run starts the feeder.
func (*SocketFeeder) SetInputDecoder ¶
func (sf *SocketFeeder) SetInputDecoder(fn format.MakeObservationFunc)
SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.
func (*SocketFeeder) Stop ¶
func (sf *SocketFeeder) Stop(stoppedChan chan bool)
Stop causes the SocketFeeder to stop reading from the socket and close all associated channels, including the passed notification channel.