Documentation
¶
Index ¶
- Variables
- func BaseHTTPMonitoringHandler(next http.Handler, serverName string) http.HandlerFunc
- func CobraBindEnvironmentVariables(prefix string) func(cmd *cobra.Command, _ []string)
- func CreateStdLogger(zapLogger *zap.Logger, logLevel string) (*log.Logger, error)
- func LoadLocation(name string) (*time.Location, error)
- func TraceOutbound(r *http.Request, span opentracing.Span)
- type HTTPMetricsRecorder
- type HTTPServerConfig
- func (c *HTTPServerConfig) RegisterFlags(flags *pflag.FlagSet, defaultPort int, defaultName string)
- func (c *HTTPServerConfig) RunHTTPServer(preStart func(ctx context.Context, mux *http.ServeMux, server *http.Server), ...)
- func (c *HTTPServerConfig) RunWebServer(ctx context.Context, wg *sync.WaitGroup, ...)
- type KafkaClient
- type KafkaConfig
- type KafkaConsumer
- func (kc KafkaConsumer) Close()
- func (kc KafkaConsumer) ConsumeTopic(ctx context.Context, handler KafkaMessageHandler, topic string, ...) error
- func (kc KafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler KafkaMessageHandler, topic string, ...) error
- func (kc KafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler KafkaMessageHandler, topic string, ...) error
- type KafkaConsumerIface
- type KafkaMessageHandler
- type KafkaMessageUnmarshaler
- type KafkaProducer
- type LoggingConfig
- type MockKafkaConsumer
- func (m *MockKafkaConsumer) Close()
- func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler KafkaMessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler KafkaMessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler KafkaMessageHandler, topic string, ...) error
- func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)
- type PartitionOffsets
- type SchemaRegistryConfig
- type SentryConfig
- type SentryCore
- type TracingConfig
Constants ¶
This section is empty.
Variables ¶
var Logger = zap.NewNop()
Logger is a zap logger. If performance is a concern, use this logger.
var SugaredLogger = Logger.Sugar()
SugaredLogger abstracts away types and lets the zap library figure them out so that the caller doesn't have to import zap into their package but is slightly slower and creates more garbage.
Functions ¶
func BaseHTTPMonitoringHandler ¶
func BaseHTTPMonitoringHandler(next http.Handler, serverName string) http.HandlerFunc
BaseHTTPMonitoringHandler is meant to be used as middleware for every request. It will:
- Starts an opentracing span, place it in http.Request context, and closes the span when the request completes
- Capture any unhandled errors and send them to Sentry
- Capture metrics to Prometheus for the duration of the HTTP request
func CobraBindEnvironmentVariables ¶
CobraBindEnvironmentVariables can be used at the root command level of a cobra CLI hierarchy to allow all command-line variables to be set by environment variables as well. Note that skewered-variable-names will automatically be translated to skewered_variable_names for compatibility with environment variables.
In addition, you can pass in an application name prefix such that all environment variables will need to start with PREFIX_ to be picked up as valid environment variables. For example, if you specified the prefix as "availability", then the program would only detect environment variables like "AVAILABILITY_KAFKA_BROKER" and not "KAFKA_BROKER". There is no need to capitalize the prefix name.
Note: CLI arguments (eg --address=localhost) will always take precedence over environment variables
func CreateStdLogger ¶
CreateStdLogger returns a standard-library compatible logger
func LoadLocation ¶
LoadLocation is a drop-in replacement for time.LoadLocation that caches all loaded locations so that subsequent loads do not require additional filesystem lookups.
func TraceOutbound ¶
func TraceOutbound(r *http.Request, span opentracing.Span)
TraceOutbound injects outbound HTTP requests with OpenTracing headers
Types ¶
type HTTPMetricsRecorder ¶
type HTTPMetricsRecorder interface {
RecordHttpMetrics(w http.ResponseWriter, r *http.Request) *prometheus.Timer
}
HTTPMetricsRecorder defines an interface for recording prometheus metrics on HTTP requests
type HTTPServerConfig ¶
HTTPServerConfig contains the basic configuration necessary for running an HTTP Server
func (*HTTPServerConfig) RegisterFlags ¶
func (c *HTTPServerConfig) RegisterFlags(flags *pflag.FlagSet, defaultPort int, defaultName string)
RegisterFlags registers HTTP flags with pflags
func (*HTTPServerConfig) RunHTTPServer ¶
func (c *HTTPServerConfig) RunHTTPServer( preStart func(ctx context.Context, mux *http.ServeMux, server *http.Server), postShutdown func(ctx context.Context), registerMuxes func(*http.ServeMux), )
RunHTTPServer starts and runs a web server, waiting for a cancellation signal to exit
func (*HTTPServerConfig) RunWebServer ¶
func (c *HTTPServerConfig) RunWebServer( ctx context.Context, wg *sync.WaitGroup, preStart func(ctx context.Context, mux *http.ServeMux, server *http.Server), postShutdown func(ctx context.Context), registerMuxes func(*http.ServeMux), )
RunWebServer starts and runs a new web server
type KafkaClient ¶
type KafkaClient struct {
KafkaConfig
// contains filtered or unexported fields
}
KafkaClient wraps a sarama client and Kafka configuration and can be used to create producers and consumers
func (KafkaClient) NewKafkaConsumer ¶
func (kc KafkaClient) NewKafkaConsumer() (KafkaConsumer, error)
NewKafkaConsumer sets up a Kafka consumer
func (KafkaClient) NewKafkaProducer ¶
func (kc KafkaClient) NewKafkaProducer() (KafkaProducer, error)
NewKafkaProducer creates a sarama producer from a client
type KafkaConfig ¶
type KafkaConfig struct {
Broker string
ClientID string
TLSCaCrtPath string
TLSCrtPath string
TLSKeyPath string
Handlers map[string]KafkaMessageHandler
JSONEnabled bool
Verbose bool
KafkaVersion string
ProducerCompressionCodec string
ProducerCompressionLevel int
SchemaRegistry *SchemaRegistryConfig
// contains filtered or unexported fields
}
KafkaConfig contains connection settings and configuration for communicating with a Kafka cluster
func (KafkaConfig) NewKafkaClient ¶
func (kc KafkaConfig) NewKafkaClient(ctx context.Context) (KafkaClient, error)
NewKafkaClient creates a Kafka client with metrics exporting and optional TLS that can be used to create consumers or producers
func (*KafkaConfig) RegisterFlags ¶
func (kc *KafkaConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags registers Kafka flags with pflags
type KafkaConsumer ¶
type KafkaConsumer struct {
KafkaClient
// contains filtered or unexported fields
}
KafkaConsumer contains a sarama client, consumer, and implementation of the KafkaMessageUnmarshaler interface
func (KafkaConsumer) ConsumeTopic ¶
func (kc KafkaConsumer) ConsumeTopic( ctx context.Context, handler KafkaMessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool, ) error
ConsumeTopic consumes a particular Kafka topic from startOffset to endOffset or from startOffset to forever
This function will create consumers for all partitions in a topic and read from the given offset on each partition to the latest offset when the consumer was started, then notify the caller via catchupWg. If exitAfterCaughtUp is true, the consumer will exit after it reads message at the latest offset when it started up. When all partition consumers are closed, it will send the last offset read on each partition through the readResult channel. If exitAfterCaughtUp is true, the consumer will exit after reading to the latest offset.
func (KafkaConsumer) ConsumeTopicFromBeginning ¶
func (kc KafkaConsumer) ConsumeTopicFromBeginning( ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool, ) error
ConsumeTopicFromBeginning starts Kafka consumers on all partitions in a given topic from the message with the oldest offset.
func (KafkaConsumer) ConsumeTopicFromLatest ¶
func (kc KafkaConsumer) ConsumeTopicFromLatest( ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets, ) error
ConsumeTopicFromLatest starts Kafka consumers on all partitions in a given topic from the message with the latest offset.
type KafkaConsumerIface ¶
type KafkaConsumerIface interface {
ConsumeTopic(ctx context.Context, handler KafkaMessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopicFromBeginning(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopicFromLatest(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets) error
Close()
}
KafkaConsumerIface is an interface for consuming messages from a Kafka topic
type KafkaMessageHandler ¶
type KafkaMessageHandler interface {
HandleMessage(ctx context.Context, msg *sarama.ConsumerMessage, unmarshaler KafkaMessageUnmarshaler) error
}
KafkaMessageHandler defines an interface for handling new messages received by the Kafka consumer
type KafkaMessageUnmarshaler ¶
type KafkaMessageUnmarshaler interface {
UnmarshalMessage(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) error
}
KafkaMessageUnmarshaler defines an interface for unmarshaling messages received from Kafka to Go types
type KafkaProducer ¶
type KafkaProducer struct {
KafkaClient
// contains filtered or unexported fields
}
KafkaProducer contains a sarama client and async producer
func (KafkaProducer) RunProducer ¶
func (kp KafkaProducer) RunProducer(messages <-chan *sarama.ProducerMessage, done chan bool)
RunProducer wraps the sarama AsyncProducer and adds metrics, logging, and a shutdown procedure to the producer. To stop the producer, close the messages channel; when the producer is shutdown a signal will be emitted on the done channel. If the messages channel is unbuffered, each message sent to the producer is guaranteed to at least have been attempted to be produced to Kafka.
type LoggingConfig ¶
type LoggingConfig struct {
SentryLoggingEnabled bool
UseDevelopmentLogger bool
OutputPaths []string
ErrorOutputPaths []string
Level string
SamplingInitial int
SamplingThereafter int
AppVersion string
GitSha string
}
LoggingConfig defines the necessary configuration for instantiating a Logger
func (*LoggingConfig) InitializeLogger ¶
func (lc *LoggingConfig) InitializeLogger() error
InitializeLogger sets up the logger. This function should be called as soon as possible. Any use of the logger provided by this package will be a nop until this function is called
func (*LoggingConfig) RegisterFlags ¶
func (lc *LoggingConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags register Logging flags with pflags
type MockKafkaConsumer ¶
MockKafkaConsumer implements KafkaConsumerIface for testing purposes
func (*MockKafkaConsumer) Close ¶
func (m *MockKafkaConsumer) Close()
Close mocks the Kafka consumer Close method
func (*MockKafkaConsumer) ConsumeTopic ¶
func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler KafkaMessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopic mocks the Kafka consumer ConsumeTopic method
func (*MockKafkaConsumer) ConsumeTopicFromBeginning ¶
func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
ConsumeTopicFromBeginning mocks the Kafka consumer ConsumeTopicFromBeginning method
func (*MockKafkaConsumer) ConsumeTopicFromLatest ¶
func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler KafkaMessageHandler, topic string, readResult chan PartitionOffsets) error
ConsumeTopicFromLatest mocks the Kafka consumer ConsumeTopicFromLatest method
func (*MockKafkaConsumer) EmitReadResult ¶
func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)
EmitReadResult allows tests to send values through the readResult channel passed into the mock consumer.
type PartitionOffsets ¶
PartitionOffsets is a mapping of partition ID to an offset to which a consumer read on that partition
type SchemaRegistryConfig ¶
type SchemaRegistryConfig struct {
SchemaRegistryURL string
// contains filtered or unexported fields
}
SchemaRegistryConfig defines the necessary configuration for interacting with Schema Registry
func (*SchemaRegistryConfig) RegisterFlags ¶
func (src *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags registers Kafka flags with pflags
func (*SchemaRegistryConfig) UnmarshalMessage ¶
func (src *SchemaRegistryConfig) UnmarshalMessage( ctx context.Context, msg *sarama.ConsumerMessage, target interface{}, ) error
UnmarshalMessage Implements the KafkaMessageUnmarshaler interface. Decodes an Avro message into a Go struct type, specifically an Avro message from Kafka. Avro schemas are fetched from Kafka schema registry. To use this function, tag each field of the target struct with a `kafka` tag whose value indicates which key on the Avro message to set as the value.
type SentryConfig ¶
SentryConfig defines the necessary configuration for instantiating a Sentry Reporter
func (*SentryConfig) InitializeRaven ¶
func (sc *SentryConfig) InitializeRaven()
InitializeRaven Initializes the Raven client. This function should be called as soon as possible after the application configuration is loaded so that raven is setup.
func (*SentryConfig) RegisterFlags ¶
func (sc *SentryConfig) RegisterFlags(flags *pflag.FlagSet)
RegisterFlags registers Sentry flags with pflags
type SentryCore ¶
type SentryCore struct {
zapcore.LevelEnabler
// contains filtered or unexported fields
}
SentryCore Implements a zapcore.Core that sends logged errors to Sentry
func (*SentryCore) Check ¶
func (c *SentryCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry
Check must be called before calling Write. This determines whether or not logs are sent to Sentry
type TracingConfig ¶
type TracingConfig struct {
Enabled bool
SamplerType string
SamplerParam float64
ReporterLogSpans bool
ReporterMaxQueueSize int
ReporterFlushInterval time.Duration
AgentHost string
AgentPort int
ServiceName string
}
TracingConfig defines the necessary configuration for instantiating a Tracer
func (*TracingConfig) ConfigureTracer ¶
func (tc *TracingConfig) ConfigureTracer() io.Closer
ConfigureTracer instantiates and configures the OpenTracer and returns the tracer closer
func (*TracingConfig) RegisterFlags ¶
func (tc *TracingConfig) RegisterFlags(flags *pflag.FlagSet, defaultTracerName string)
RegisterFlags registers Tracer flags with pflags