target

package
v0.0.0-...-d8f8204 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2020 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AmqpQueueDir   = "queue_dir"
	AmqpQueueLimit = "queue_limit"

	AmqpURL               = "url"
	AmqpExchange          = "exchange"
	AmqpRoutingKey        = "routing_key"
	AmqpExchangeType      = "exchange_type"
	AmqpDeliveryMode      = "delivery_mode"
	AmqpMandatory         = "mandatory"
	AmqpImmediate         = "immediate"
	AmqpDurable           = "durable"
	AmqpInternal          = "internal"
	AmqpNoWait            = "no_wait"
	AmqpAutoDeleted       = "auto_deleted"
	AmqpArguments         = "arguments"
	AmqpPublishingHeaders = "publishing_headers"

	EnvAMQPEnable            = "MINIO_NOTIFY_AMQP_ENABLE"
	EnvAMQPURL               = "MINIO_NOTIFY_AMQP_URL"
	EnvAMQPExchange          = "MINIO_NOTIFY_AMQP_EXCHANGE"
	EnvAMQPRoutingKey        = "MINIO_NOTIFY_AMQP_ROUTING_KEY"
	EnvAMQPExchangeType      = "MINIO_NOTIFY_AMQP_EXCHANGE_TYPE"
	EnvAMQPDeliveryMode      = "MINIO_NOTIFY_AMQP_DELIVERY_MODE"
	EnvAMQPMandatory         = "MINIO_NOTIFY_AMQP_MANDATORY"
	EnvAMQPImmediate         = "MINIO_NOTIFY_AMQP_IMMEDIATE"
	EnvAMQPDurable           = "MINIO_NOTIFY_AMQP_DURABLE"
	EnvAMQPInternal          = "MINIO_NOTIFY_AMQP_INTERNAL"
	EnvAMQPNoWait            = "MINIO_NOTIFY_AMQP_NO_WAIT"
	EnvAMQPAutoDeleted       = "MINIO_NOTIFY_AMQP_AUTO_DELETED"
	EnvAMQPArguments         = "MINIO_NOTIFY_AMQP_ARGUMENTS"
	EnvAMQPPublishingHeaders = "MINIO_NOTIFY_AMQP_PUBLISHING_HEADERS"
	EnvAMQPQueueDir          = "MINIO_NOTIFY_AMQP_QUEUE_DIR"
	EnvAMQPQueueLimit        = "MINIO_NOTIFY_AMQP_QUEUE_LIMIT"
)

AMQP input constants.

View Source
const (
	ElasticFormat     = "format"
	ElasticURL        = "url"
	ElasticIndex      = "index"
	ElasticQueueDir   = "queue_dir"
	ElasticQueueLimit = "queue_limit"

	EnvElasticEnable     = "MINIO_NOTIFY_ELASTICSEARCH_ENABLE"
	EnvElasticFormat     = "MINIO_NOTIFY_ELASTICSEARCH_FORMAT"
	EnvElasticURL        = "MINIO_NOTIFY_ELASTICSEARCH_URL"
	EnvElasticIndex      = "MINIO_NOTIFY_ELASTICSEARCH_INDEX"
	EnvElasticQueueDir   = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_DIR"
	EnvElasticQueueLimit = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_LIMIT"
)

Elastic constants

View Source
const (
	KafkaBrokers       = "brokers"
	KafkaTopic         = "topic"
	KafkaQueueDir      = "queue_dir"
	KafkaQueueLimit    = "queue_limit"
	KafkaTLS           = "tls"
	KafkaTLSSkipVerify = "tls_skip_verify"
	KafkaTLSClientAuth = "tls_client_auth"
	KafkaSASL          = "sasl"
	KafkaSASLUsername  = "sasl_username"
	KafkaSASLPassword  = "sasl_password"
	KafkaClientTLSCert = "client_tls_cert"
	KafkaClientTLSKey  = "client_tls_key"

	EnvKafkaEnable        = "MINIO_NOTIFY_KAFKA_ENABLE"
	EnvKafkaBrokers       = "MINIO_NOTIFY_KAFKA_BROKERS"
	EnvKafkaTopic         = "MINIO_NOTIFY_KAFKA_TOPIC"
	EnvKafkaQueueDir      = "MINIO_NOTIFY_KAFKA_QUEUE_DIR"
	EnvKafkaQueueLimit    = "MINIO_NOTIFY_KAFKA_QUEUE_LIMIT"
	EnvKafkaTLS           = "MINIO_NOTIFY_KAFKA_TLS"
	EnvKafkaTLSSkipVerify = "MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY"
	EnvKafkaTLSClientAuth = "MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH"
	EnvKafkaSASLEnable    = "MINIO_NOTIFY_KAFKA_SASL"
	EnvKafkaSASLUsername  = "MINIO_NOTIFY_KAFKA_SASL_USERNAME"
	EnvKafkaSASLPassword  = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD"
	EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT"
	EnvKafkaClientTLSKey  = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY"
)

MQTT input constants

View Source
const (
	MqttBroker            = "broker"
	MqttTopic             = "topic"
	MqttQoS               = "qos"
	MqttUsername          = "username"
	MqttPassword          = "password"
	MqttReconnectInterval = "reconnect_interval"
	MqttKeepAliveInterval = "keep_alive_interval"
	MqttQueueDir          = "queue_dir"
	MqttQueueLimit        = "queue_limit"

	EnvMQTTEnable            = "MINIO_NOTIFY_MQTT_ENABLE"
	EnvMQTTBroker            = "MINIO_NOTIFY_MQTT_BROKER"
	EnvMQTTTopic             = "MINIO_NOTIFY_MQTT_TOPIC"
	EnvMQTTQoS               = "MINIO_NOTIFY_MQTT_QOS"
	EnvMQTTUsername          = "MINIO_NOTIFY_MQTT_USERNAME"
	EnvMQTTPassword          = "MINIO_NOTIFY_MQTT_PASSWORD"
	EnvMQTTReconnectInterval = "MINIO_NOTIFY_MQTT_RECONNECT_INTERVAL"
	EnvMQTTKeepAliveInterval = "MINIO_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL"
	EnvMQTTQueueDir          = "MINIO_NOTIFY_MQTT_QUEUE_DIR"
	EnvMQTTQueueLimit        = "MINIO_NOTIFY_MQTT_QUEUE_LIMIT"
)

MQTT input constants

View Source
const (
	MySQLFormat     = "format"
	MySQLDSNString  = "dsn_string"
	MySQLTable      = "table"
	MySQLHost       = "host"
	MySQLPort       = "port"
	MySQLUsername   = "username"
	MySQLPassword   = "password"
	MySQLDatabase   = "database"
	MySQLQueueLimit = "queue_limit"
	MySQLQueueDir   = "queue_dir"

	EnvMySQLEnable     = "MINIO_NOTIFY_MYSQL_ENABLE"
	EnvMySQLFormat     = "MINIO_NOTIFY_MYSQL_FORMAT"
	EnvMySQLDSNString  = "MINIO_NOTIFY_MYSQL_DSN_STRING"
	EnvMySQLTable      = "MINIO_NOTIFY_MYSQL_TABLE"
	EnvMySQLHost       = "MINIO_NOTIFY_MYSQL_HOST"
	EnvMySQLPort       = "MINIO_NOTIFY_MYSQL_PORT"
	EnvMySQLUsername   = "MINIO_NOTIFY_MYSQL_USERNAME"
	EnvMySQLPassword   = "MINIO_NOTIFY_MYSQL_PASSWORD"
	EnvMySQLDatabase   = "MINIO_NOTIFY_MYSQL_DATABASE"
	EnvMySQLQueueLimit = "MINIO_NOTIFY_MYSQL_QUEUE_LIMIT"
	EnvMySQLQueueDir   = "MINIO_NOTIFY_MYSQL_QUEUE_DIR"
)

MySQL related constants

View Source
const (
	NATSAddress       = "address"
	NATSSubject       = "subject"
	NATSUsername      = "username"
	NATSPassword      = "password"
	NATSToken         = "token"
	NATSTLS           = "tls"
	NATSTLSSkipVerify = "tls_skip_verify"
	NATSPingInterval  = "ping_interval"
	NATSQueueDir      = "queue_dir"
	NATSQueueLimit    = "queue_limit"
	NATSCertAuthority = "cert_authority"
	NATSClientCert    = "client_cert"
	NATSClientKey     = "client_key"

	// Streaming constants
	NATSStreaming                   = "streaming"
	NATSStreamingClusterID          = "streaming_cluster_id"
	NATSStreamingAsync              = "streaming_async"
	NATSStreamingMaxPubAcksInFlight = "streaming_max_pub_acks_in_flight"

	EnvNATSEnable        = "MINIO_NOTIFY_NATS_ENABLE"
	EnvNATSAddress       = "MINIO_NOTIFY_NATS_ADDRESS"
	EnvNATSSubject       = "MINIO_NOTIFY_NATS_SUBJECT"
	EnvNATSUsername      = "MINIO_NOTIFY_NATS_USERNAME"
	EnvNATSPassword      = "MINIO_NOTIFY_NATS_PASSWORD"
	EnvNATSToken         = "MINIO_NOTIFY_NATS_TOKEN"
	EnvNATSTLS           = "MINIO_NOTIFY_NATS_TLS"
	EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY"
	EnvNATSPingInterval  = "MINIO_NOTIFY_NATS_PING_INTERVAL"
	EnvNATSQueueDir      = "MINIO_NOTIFY_NATS_QUEUE_DIR"
	EnvNATSQueueLimit    = "MINIO_NOTIFY_NATS_QUEUE_LIMIT"
	EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY"
	EnvNATSClientCert    = "MINIO_NOTIFY_NATS_CLIENT_CERT"
	EnvNATSClientKey     = "MINIO_NOTIFY_NATS_CLIENT_KEY"

	// Streaming constants
	EnvNATSStreaming                   = "MINIO_NOTIFY_NATS_STREAMING"
	EnvNATSStreamingClusterID          = "MINIO_NOTIFY_NATS_STREAMING_CLUSTER_ID"
	EnvNATSStreamingAsync              = "MINIO_NOTIFY_NATS_STREAMING_ASYNC"
	EnvNATSStreamingMaxPubAcksInFlight = "MINIO_NOTIFY_NATS_STREAMING_MAX_PUB_ACKS_IN_FLIGHT"
)

NATS related constants

View Source
const (
	NSQAddress       = "nsqd_address"
	NSQTopic         = "topic"
	NSQTLS           = "tls"
	NSQTLSSkipVerify = "tls_skip_verify"
	NSQQueueDir      = "queue_dir"
	NSQQueueLimit    = "queue_limit"

	EnvNSQEnable        = "MINIO_NOTIFY_NSQ"
	EnvNSQAddress       = "MINIO_NOTIFY_NSQ_NSQD_ADDRESS"
	EnvNSQTopic         = "MINIO_NOTIFY_NSQ_TOPIC"
	EnvNSQTLS           = "MINIO_NOTIFY_NSQ_TLS"
	EnvNSQTLSSkipVerify = "MINIO_NOTIFY_NSQ_TLS_SKIP_VERIFY"
	EnvNSQQueueDir      = "MINIO_NOTIFY_NSQ_QUEUE_DIR"
	EnvNSQQueueLimit    = "MINIO_NOTIFY_NSQ_QUEUE_LIMIT"
)

NSQ constants

View Source
const (
	PostgresFormat           = "format"
	PostgresConnectionString = "connection_string"
	PostgresTable            = "table"
	PostgresHost             = "host"
	PostgresPort             = "port"
	PostgresUsername         = "username"
	PostgresPassword         = "password"
	PostgresDatabase         = "database"
	PostgresQueueDir         = "queue_dir"
	PostgresQueueLimit       = "queue_limit"

	EnvPostgresEnable           = "MINIO_NOTIFY_POSTGRES_ENABLE"
	EnvPostgresFormat           = "MINIO_NOTIFY_POSTGRES_FORMAT"
	EnvPostgresConnectionString = "MINIO_NOTIFY_POSTGRES_CONNECTION_STRING"
	EnvPostgresTable            = "MINIO_NOTIFY_POSTGRES_TABLE"
	EnvPostgresHost             = "MINIO_NOTIFY_POSTGRES_HOST"
	EnvPostgresPort             = "MINIO_NOTIFY_POSTGRES_PORT"
	EnvPostgresUsername         = "MINIO_NOTIFY_POSTGRES_USERNAME"
	EnvPostgresPassword         = "MINIO_NOTIFY_POSTGRES_PASSWORD"
	EnvPostgresDatabase         = "MINIO_NOTIFY_POSTGRES_DATABASE"
	EnvPostgresQueueDir         = "MINIO_NOTIFY_POSTGRES_QUEUE_DIR"
	EnvPostgresQueueLimit       = "MINIO_NOTIFY_POSTGRES_QUEUE_LIMIT"
)

Postgres constants

View Source
const (
	RedisFormat     = "format"
	RedisAddress    = "address"
	RedisPassword   = "password"
	RedisKey        = "key"
	RedisQueueDir   = "queue_dir"
	RedisQueueLimit = "queue_limit"

	EnvRedisEnable     = "MINIO_NOTIFY_REDIS_ENABLE"
	EnvRedisFormat     = "MINIO_NOTIFY_REDIS_FORMAT"
	EnvRedisAddress    = "MINIO_NOTIFY_REDIS_ADDRESS"
	EnvRedisPassword   = "MINIO_NOTIFY_REDIS_PASSWORD"
	EnvRedisKey        = "MINIO_NOTIFY_REDIS_KEY"
	EnvRedisQueueDir   = "MINIO_NOTIFY_REDIS_QUEUE_DIR"
	EnvRedisQueueLimit = "MINIO_NOTIFY_REDIS_QUEUE_LIMIT"
)

Redis constants

View Source
const (
	WebhookEndpoint   = "endpoint"
	WebhookAuthToken  = "auth_token"
	WebhookQueueDir   = "queue_dir"
	WebhookQueueLimit = "queue_limit"

	EnvWebhookEnable     = "MINIO_NOTIFY_WEBHOOK_ENABLE"
	EnvWebhookEndpoint   = "MINIO_NOTIFY_WEBHOOK_ENDPOINT"
	EnvWebhookAuthToken  = "MINIO_NOTIFY_WEBHOOK_AUTH_TOKEN"
	EnvWebhookQueueDir   = "MINIO_NOTIFY_WEBHOOK_QUEUE_DIR"
	EnvWebhookQueueLimit = "MINIO_NOTIFY_WEBHOOK_QUEUE_LIMIT"
)

Webhook constants

Variables

This section is empty.

Functions

func IsConnErr

func IsConnErr(err error) bool

IsConnErr - To detect a connection error.

func IsConnRefusedErr

func IsConnRefusedErr(err error) bool

IsConnRefusedErr - To check fot "connection refused" error.

func IsConnResetErr

func IsConnResetErr(err error) bool

IsConnResetErr - Checks for connection reset errors.

Types

type AMQPArgs

type AMQPArgs struct {
	Enable       bool     `json:"enable"`
	URL          xnet.URL `json:"url"`
	Exchange     string   `json:"exchange"`
	RoutingKey   string   `json:"routingKey"`
	ExchangeType string   `json:"exchangeType"`
	DeliveryMode uint8    `json:"deliveryMode"`
	Mandatory    bool     `json:"mandatory"`
	Immediate    bool     `json:"immediate"`
	Durable      bool     `json:"durable"`
	Internal     bool     `json:"internal"`
	NoWait       bool     `json:"noWait"`
	AutoDeleted  bool     `json:"autoDeleted"`
	QueueDir     string   `json:"queueDir"`
	QueueLimit   uint64   `json:"queueLimit"`
}

AMQPArgs - AMQP target arguments.

func (*AMQPArgs) Validate

func (a *AMQPArgs) Validate() error

Validate AMQP arguments

type AMQPTarget

type AMQPTarget struct {
	// contains filtered or unexported fields
}

AMQPTarget - AMQP target

func NewAMQPTarget

func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error)

NewAMQPTarget - creates new AMQP target.

func (*AMQPTarget) Close

func (target *AMQPTarget) Close() error

Close - does nothing and available for interface compatibility.

func (*AMQPTarget) ID

func (target *AMQPTarget) ID() event.TargetID

ID - returns TargetID.

func (*AMQPTarget) IsActive

func (target *AMQPTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*AMQPTarget) Save

func (target *AMQPTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the amqp connection is active.

func (*AMQPTarget) Send

func (target *AMQPTarget) Send(eventKey string) error

Send - sends event to AMQP.

type ElasticsearchArgs

type ElasticsearchArgs struct {
	Enable     bool     `json:"enable"`
	Format     string   `json:"format"`
	URL        xnet.URL `json:"url"`
	Index      string   `json:"index"`
	QueueDir   string   `json:"queueDir"`
	QueueLimit uint64   `json:"queueLimit"`
}

ElasticsearchArgs - Elasticsearch target arguments.

func (ElasticsearchArgs) Validate

func (a ElasticsearchArgs) Validate() error

Validate ElasticsearchArgs fields

type ElasticsearchTarget

type ElasticsearchTarget struct {
	// contains filtered or unexported fields
}

ElasticsearchTarget - Elasticsearch target.

func NewElasticsearchTarget

func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error)

NewElasticsearchTarget - creates new Elasticsearch target.

func (*ElasticsearchTarget) Close

func (target *ElasticsearchTarget) Close() error

Close - does nothing and available for interface compatibility.

func (*ElasticsearchTarget) ID

func (target *ElasticsearchTarget) ID() event.TargetID

ID - returns target ID.

func (*ElasticsearchTarget) IsActive

func (target *ElasticsearchTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*ElasticsearchTarget) Save

func (target *ElasticsearchTarget) Save(eventData event.Event) error

Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.

func (*ElasticsearchTarget) Send

func (target *ElasticsearchTarget) Send(eventKey string) error

Send - reads an event from store and sends it to Elasticsearch.

type HTTPClientTarget

type HTTPClientTarget struct {
	DoneCh chan struct{}
	// contains filtered or unexported fields
}

HTTPClientTarget - HTTP client target.

func NewHTTPClientTarget

func NewHTTPClientTarget(host xnet.Host, w http.ResponseWriter) (*HTTPClientTarget, error)

NewHTTPClientTarget - creates new HTTP client target.

func (*HTTPClientTarget) Close

func (target *HTTPClientTarget) Close() error

Close - closes underneath goroutine.

func (HTTPClientTarget) ID

func (target HTTPClientTarget) ID() event.TargetID

ID - returns target ID.

func (*HTTPClientTarget) IsActive

func (target *HTTPClientTarget) IsActive() (bool, error)

IsActive - does nothing and available for interface compatibility.

func (*HTTPClientTarget) Save

func (target *HTTPClientTarget) Save(eventData event.Event) error

Save - sends event to HTTP client.

func (*HTTPClientTarget) Send

func (target *HTTPClientTarget) Send(eventKey string) error

Send - interface compatible method does no-op.

type KafkaArgs

type KafkaArgs struct {
	Enable     bool        `json:"enable"`
	Brokers    []xnet.Host `json:"brokers"`
	Topic      string      `json:"topic"`
	QueueDir   string      `json:"queueDir"`
	QueueLimit uint64      `json:"queueLimit"`
	TLS        struct {
		Enable        bool               `json:"enable"`
		RootCAs       *x509.CertPool     `json:"-"`
		SkipVerify    bool               `json:"skipVerify"`
		ClientAuth    tls.ClientAuthType `json:"clientAuth"`
		ClientTLSCert string             `json:"clientTLSCert"`
		ClientTLSKey  string             `json:"clientTLSKey"`
	} `json:"tls"`
	SASL struct {
		Enable   bool   `json:"enable"`
		User     string `json:"username"`
		Password string `json:"password"`
	} `json:"sasl"`
}

KafkaArgs - Kafka target arguments.

func (KafkaArgs) Validate

func (k KafkaArgs) Validate() error

Validate KafkaArgs fields

type KafkaTarget

type KafkaTarget struct {
	// contains filtered or unexported fields
}

KafkaTarget - Kafka target.

func NewKafkaTarget

func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error)

NewKafkaTarget - creates new Kafka target with auth credentials.

func (*KafkaTarget) Close

func (target *KafkaTarget) Close() error

Close - closes underneath kafka connection.

func (*KafkaTarget) ID

func (target *KafkaTarget) ID() event.TargetID

ID - returns target ID.

func (*KafkaTarget) IsActive

func (target *KafkaTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*KafkaTarget) Save

func (target *KafkaTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the Kafka connection is active.

func (*KafkaTarget) Send

func (target *KafkaTarget) Send(eventKey string) error

Send - reads an event from store and sends it to Kafka.

type MQTTArgs

type MQTTArgs struct {
	Enable               bool           `json:"enable"`
	Broker               xnet.URL       `json:"broker"`
	Topic                string         `json:"topic"`
	QoS                  byte           `json:"qos"`
	User                 string         `json:"username"`
	Password             string         `json:"password"`
	MaxReconnectInterval time.Duration  `json:"reconnectInterval"`
	KeepAlive            time.Duration  `json:"keepAliveInterval"`
	RootCAs              *x509.CertPool `json:"-"`
	QueueDir             string         `json:"queueDir"`
	QueueLimit           uint64         `json:"queueLimit"`
}

MQTTArgs - MQTT target arguments.

func (MQTTArgs) Validate

func (m MQTTArgs) Validate() error

Validate MQTTArgs fields

type MQTTTarget

type MQTTTarget struct {
	// contains filtered or unexported fields
}

MQTTTarget - MQTT target.

func NewMQTTTarget

func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error)

NewMQTTTarget - creates new MQTT target.

func (*MQTTTarget) Close

func (target *MQTTTarget) Close() error

Close - does nothing and available for interface compatibility.

func (*MQTTTarget) ID

func (target *MQTTTarget) ID() event.TargetID

ID - returns target ID.

func (*MQTTTarget) IsActive

func (target *MQTTTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*MQTTTarget) Save

func (target *MQTTTarget) Save(eventData event.Event) error

Save - saves the events to the store if queuestore is configured, which will be replayed when the mqtt connection is active.

func (*MQTTTarget) Send

func (target *MQTTTarget) Send(eventKey string) error

Send - reads an event from store and sends it to MQTT.

type MySQLArgs

type MySQLArgs struct {
	Enable     bool     `json:"enable"`
	Format     string   `json:"format"`
	DSN        string   `json:"dsnString"`
	Table      string   `json:"table"`
	Host       xnet.URL `json:"host"`
	Port       string   `json:"port"`
	User       string   `json:"user"`
	Password   string   `json:"password"`
	Database   string   `json:"database"`
	QueueDir   string   `json:"queueDir"`
	QueueLimit uint64   `json:"queueLimit"`
}

MySQLArgs - MySQL target arguments.

func (MySQLArgs) Validate

func (m MySQLArgs) Validate() error

Validate MySQLArgs fields

type MySQLTarget

type MySQLTarget struct {
	// contains filtered or unexported fields
}

MySQLTarget - MySQL target.

func NewMySQLTarget

func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error)

NewMySQLTarget - creates new MySQL target.

func (*MySQLTarget) Close

func (target *MySQLTarget) Close() error

Close - closes underneath connections to MySQL database.

func (*MySQLTarget) ID

func (target *MySQLTarget) ID() event.TargetID

ID - returns target ID.

func (*MySQLTarget) IsActive

func (target *MySQLTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*MySQLTarget) Save

func (target *MySQLTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the SQL connection is active.

func (*MySQLTarget) Send

func (target *MySQLTarget) Send(eventKey string) error

Send - reads an event from store and sends it to MySQL.

type NATSArgs

type NATSArgs struct {
	Enable        bool      `json:"enable"`
	Address       xnet.Host `json:"address"`
	Subject       string    `json:"subject"`
	Username      string    `json:"username"`
	Password      string    `json:"password"`
	Token         string    `json:"token"`
	TLS           bool      `json:"tls"`
	TLSSkipVerify bool      `json:"tlsSkipVerify"`
	Secure        bool      `json:"secure"`
	CertAuthority string    `json:"certAuthority"`
	ClientCert    string    `json:"clientCert"`
	ClientKey     string    `json:"clientKey"`
	PingInterval  int64     `json:"pingInterval"`
	QueueDir      string    `json:"queueDir"`
	QueueLimit    uint64    `json:"queueLimit"`
	Streaming     struct {
		Enable             bool   `json:"enable"`
		ClusterID          string `json:"clusterID"`
		Async              bool   `json:"async"`
		MaxPubAcksInflight int    `json:"maxPubAcksInflight"`
	} `json:"streaming"`

	RootCAs *x509.CertPool `json:"-"`
}

NATSArgs - NATS target arguments.

func (NATSArgs) Validate

func (n NATSArgs) Validate() error

Validate NATSArgs fields

type NATSTarget

type NATSTarget struct {
	// contains filtered or unexported fields
}

NATSTarget - NATS target.

func NewNATSTarget

func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error)

NewNATSTarget - creates new NATS target.

func (*NATSTarget) Close

func (target *NATSTarget) Close() (err error)

Close - closes underneath connections to NATS server.

func (*NATSTarget) ID

func (target *NATSTarget) ID() event.TargetID

ID - returns target ID.

func (*NATSTarget) IsActive

func (target *NATSTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*NATSTarget) Save

func (target *NATSTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the Nats connection is active.

func (*NATSTarget) Send

func (target *NATSTarget) Send(eventKey string) error

Send - sends event to Nats.

type NSQArgs

type NSQArgs struct {
	Enable      bool      `json:"enable"`
	NSQDAddress xnet.Host `json:"nsqdAddress"`
	Topic       string    `json:"topic"`
	TLS         struct {
		Enable     bool `json:"enable"`
		SkipVerify bool `json:"skipVerify"`
	} `json:"tls"`
	QueueDir   string `json:"queueDir"`
	QueueLimit uint64 `json:"queueLimit"`
}

NSQArgs - NSQ target arguments.

func (NSQArgs) Validate

func (n NSQArgs) Validate() error

Validate NSQArgs fields

type NSQTarget

type NSQTarget struct {
	// contains filtered or unexported fields
}

NSQTarget - NSQ target.

func NewNSQTarget

func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error)

NewNSQTarget - creates new NSQ target.

func (*NSQTarget) Close

func (target *NSQTarget) Close() (err error)

Close - closes underneath connections to NSQD server.

func (*NSQTarget) ID

func (target *NSQTarget) ID() event.TargetID

ID - returns target ID.

func (*NSQTarget) IsActive

func (target *NSQTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*NSQTarget) Save

func (target *NSQTarget) Save(eventData event.Event) error

Save - saves the events to the store which will be replayed when the nsq connection is active.

func (*NSQTarget) Send

func (target *NSQTarget) Send(eventKey string) error

Send - reads an event from store and sends it to NSQ.

type PostgreSQLArgs

type PostgreSQLArgs struct {
	Enable           bool      `json:"enable"`
	Format           string    `json:"format"`
	ConnectionString string    `json:"connectionString"`
	Table            string    `json:"table"`
	Host             xnet.Host `json:"host"`     // default: localhost
	Port             string    `json:"port"`     // default: 5432
	User             string    `json:"user"`     // default: user running minio
	Password         string    `json:"password"` // default: no password
	Database         string    `json:"database"` // default: same as user
	QueueDir         string    `json:"queueDir"`
	QueueLimit       uint64    `json:"queueLimit"`
}

PostgreSQLArgs - PostgreSQL target arguments.

func (PostgreSQLArgs) Validate

func (p PostgreSQLArgs) Validate() error

Validate PostgreSQLArgs fields

type PostgreSQLTarget

type PostgreSQLTarget struct {
	// contains filtered or unexported fields
}

PostgreSQLTarget - PostgreSQL target.

func NewPostgreSQLTarget

func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error)

NewPostgreSQLTarget - creates new PostgreSQL target.

func (*PostgreSQLTarget) Close

func (target *PostgreSQLTarget) Close() error

Close - closes underneath connections to PostgreSQL database.

func (*PostgreSQLTarget) ID

func (target *PostgreSQLTarget) ID() event.TargetID

ID - returns target ID.

func (*PostgreSQLTarget) IsActive

func (target *PostgreSQLTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*PostgreSQLTarget) Save

func (target *PostgreSQLTarget) Save(eventData event.Event) error

Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.

func (*PostgreSQLTarget) Send

func (target *PostgreSQLTarget) Send(eventKey string) error

Send - reads an event from store and sends it to PostgreSQL.

type QueueStore

type QueueStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

QueueStore - Filestore for persisting events.

func (*QueueStore) Del

func (store *QueueStore) Del(key string) error

Del - Deletes an entry from the store.

func (*QueueStore) Get

func (store *QueueStore) Get(key string) (event event.Event, err error)

Get - gets a event from the store.

func (*QueueStore) List

func (store *QueueStore) List() ([]string, error)

List - lists all files from the directory.

func (*QueueStore) Open

func (store *QueueStore) Open() error

Open - Creates the directory if not present.

func (*QueueStore) Put

func (store *QueueStore) Put(e event.Event) error

Put - puts a event to the store.

type RedisArgs

type RedisArgs struct {
	Enable     bool      `json:"enable"`
	Format     string    `json:"format"`
	Addr       xnet.Host `json:"address"`
	Password   string    `json:"password"`
	Key        string    `json:"key"`
	QueueDir   string    `json:"queueDir"`
	QueueLimit uint64    `json:"queueLimit"`
}

RedisArgs - Redis target arguments.

func (RedisArgs) Validate

func (r RedisArgs) Validate() error

Validate RedisArgs fields

type RedisTarget

type RedisTarget struct {
	// contains filtered or unexported fields
}

RedisTarget - Redis target.

func NewRedisTarget

func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error)

NewRedisTarget - creates new Redis target.

func (*RedisTarget) Close

func (target *RedisTarget) Close() error

Close - releases the resources used by the pool.

func (*RedisTarget) ID

func (target *RedisTarget) ID() event.TargetID

ID - returns target ID.

func (*RedisTarget) IsActive

func (target *RedisTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*RedisTarget) Save

func (target *RedisTarget) Save(eventData event.Event) error

Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.

func (*RedisTarget) Send

func (target *RedisTarget) Send(eventKey string) error

Send - reads an event from store and sends it to redis.

type Store

type Store interface {
	Put(event event.Event) error
	Get(key string) (event.Event, error)
	List() ([]string, error)
	Del(key string) error
	Open() error
}

Store - To persist the events.

func NewQueueStore

func NewQueueStore(directory string, limit uint64) Store

NewQueueStore - Creates an instance for QueueStore.

type WebhookArgs

type WebhookArgs struct {
	Enable     bool            `json:"enable"`
	Endpoint   xnet.URL        `json:"endpoint"`
	AuthToken  string          `json:"authToken"`
	Transport  *http.Transport `json:"-"`
	QueueDir   string          `json:"queueDir"`
	QueueLimit uint64          `json:"queueLimit"`
}

WebhookArgs - Webhook target arguments.

func (WebhookArgs) Validate

func (w WebhookArgs) Validate() error

Validate WebhookArgs fields

type WebhookTarget

type WebhookTarget struct {
	// contains filtered or unexported fields
}

WebhookTarget - Webhook target.

func NewWebhookTarget

func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error)

NewWebhookTarget - creates new Webhook target.

func (*WebhookTarget) Close

func (target *WebhookTarget) Close() error

Close - does nothing and available for interface compatibility.

func (WebhookTarget) ID

func (target WebhookTarget) ID() event.TargetID

ID - returns target ID.

func (*WebhookTarget) IsActive

func (target *WebhookTarget) IsActive() (bool, error)

IsActive - Return true if target is up and active

func (*WebhookTarget) Save

func (target *WebhookTarget) Save(eventData event.Event) error

Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active.

func (*WebhookTarget) Send

func (target *WebhookTarget) Send(eventKey string) error

Send - reads an event from store and sends it to webhook.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL