klogstream

package
v0.0.0-...-bcbb9e7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoKubeConfig is returned when no kubernetes config is provided
	ErrNoKubeConfig = errors.New("no kubernetes configuration provided")
	// ErrNoFilter is returned when no log filter is provided
	ErrNoFilter = errors.New("no log filter provided")
	// ErrNoHandler is returned when no log handler is provided
	ErrNoHandler = errors.New("no log handler provided")
	// ErrNoKubeContext is returned when the specified kubernetes context is not found
	ErrNoKubeContext = errors.New("kubernetes context not found")
	// ErrStreamClosed is returned when attempting to use a closed stream
	ErrStreamClosed = errors.New("log stream has been closed")
	// ErrMultilineTimeout is returned when a multiline log times out
	ErrMultilineTimeout = errors.New("timed out waiting for multiline log")
	// ErrTooManyLines is returned when a multiline log exceeds the maximum lines
	ErrTooManyLines = errors.New("multiline log exceeds maximum number of lines")
)

Error definitions

View Source
var DefaultRetryPolicy = RetryPolicy{
	MaxRetries:      5,
	InitialInterval: 1 * time.Second,
	MaxInterval:     30 * time.Second,
	Multiplier:      2,
}

DefaultRetryPolicy provides reasonable default values for retries

View Source
var NewStreamer = func(options ...StreamOption) (Streamer, error) {

	config := NewStreamConfig()

	for _, option := range options {
		option(config)
	}

	internalFilter, err := convertFilter(config.Filter)
	if err != nil {
		return nil, err
	}

	clientProvider := kube.NewClientProviderWithOptions(config.KubeOptions...)

	internalConfig := &stream.StreamerConfig{
		KubeClientProvider: clientProvider,
		Filter:             internalFilter,
		RetryPolicy: stream.RetryPolicy{
			MaxRetries:      config.RetryPolicy.MaxRetries,
			InitialInterval: config.RetryPolicy.InitialInterval,
			MaxInterval:     config.RetryPolicy.MaxInterval,
			Multiplier:      config.RetryPolicy.Multiplier,
		},
	}

	if config.Handler != nil {
		internalConfig.Handler = stream.NewHandlerAdapter(adaptHandler(config.Handler))
	}

	if config.Formatter != nil {
		internalConfig.Formatter = stream.NewFormatterAdapter(adaptFormatter(config.Formatter))
	}

	if config.Matcher != nil {
		internalConfig.Matcher = stream.NewMatcherAdapter(adaptMatcher(config.Matcher))
	}

	internalStreamer, err := stream.NewStreamer(internalConfig)
	if err != nil {
		return nil, err
	}

	return &streamerImpl{
		internal: internalStreamer,
	}, nil
}

NewStreamer creates a new Streamer with the given options

Functions

func Run

func Run(ctx context.Context, options ...StreamOption) error

Run is a convenience function that creates a streamer with the given options, starts it, and waits for context completion

Types

type Config

type Config struct {
	// KubeConfig is the kubernetes configuration for connecting to the cluster
	KubeConfig *rest.Config
	// Filter defines the criteria for filtering logs
	Filter *LogFilter
	// Formatter defines how logs are formatted
	Formatter LogFormatter
	// Handler processes the log messages
	Handler LogHandler
	// Matcher determines if log lines should be treated as multiline
	Matcher MultilineMatcher
	// RetryPolicy configures retry behavior for transient errors
	RetryPolicy RetryPolicy
}

Config holds all configuration for the log streamer

func NewConfig

func NewConfig() *Config

NewConfig creates a new configuration with default values

type ConfigBuilder

type ConfigBuilder struct {
	// contains filtered or unexported fields
}

ConfigBuilder provides a fluent API for building Config

func NewConfigBuilder

func NewConfigBuilder() *ConfigBuilder

NewConfigBuilder creates a new ConfigBuilder

func (*ConfigBuilder) Build

func (b *ConfigBuilder) Build() (*Config, error)

Build creates and validates the Config

func (*ConfigBuilder) WithFilter

func (b *ConfigBuilder) WithFilter(filter *LogFilter) *ConfigBuilder

WithFilter sets the log filter

func (*ConfigBuilder) WithFormatter

func (b *ConfigBuilder) WithFormatter(formatter LogFormatter) *ConfigBuilder

WithFormatter sets the log formatter

func (*ConfigBuilder) WithHandler

func (b *ConfigBuilder) WithHandler(handler LogHandler) *ConfigBuilder

WithHandler sets the log handler

func (*ConfigBuilder) WithKubeConfig

func (b *ConfigBuilder) WithKubeConfig(config *rest.Config) *ConfigBuilder

WithKubeConfig sets the kubernetes configuration

func (*ConfigBuilder) WithMatcher

func (b *ConfigBuilder) WithMatcher(matcher MultilineMatcher) *ConfigBuilder

WithMatcher sets the multiline matcher

func (*ConfigBuilder) WithRetryPolicy

func (b *ConfigBuilder) WithRetryPolicy(policy RetryPolicy) *ConfigBuilder

WithRetryPolicy sets the retry policy

type ConsoleHandler

type ConsoleHandler struct {
	// contains filtered or unexported fields
}

ConsoleHandler outputs logs to the console

func NewConsoleHandler

func NewConsoleHandler() *ConsoleHandler

NewConsoleHandler creates a new ConsoleHandler with stdout and stderr as default outputs

func NewConsoleHandlerWithWriters

func NewConsoleHandlerWithWriters(out, errOut io.Writer) *ConsoleHandler

NewConsoleHandlerWithWriters creates a new ConsoleHandler with custom writers

func (*ConsoleHandler) OnEnd

func (h *ConsoleHandler) OnEnd()

OnEnd is called when the stream ends

func (*ConsoleHandler) OnError

func (h *ConsoleHandler) OnError(err error)

OnError writes error messages to the error output writer

func (*ConsoleHandler) OnLog

func (h *ConsoleHandler) OnLog(msg LogMessage)

OnLog writes formatted log messages to the configured output writer

type JSONFormatter

type JSONFormatter struct {
	// IncludeTimestamp controls whether to include the timestamp in the JSON
	IncludeTimestamp bool
	// IncludeNamespace controls whether to include the namespace in the JSON
	IncludeNamespace bool
	// IncludePodName controls whether to include the pod name in the JSON
	IncludePodName bool
	// IncludeContainerName controls whether to include the container name in the JSON
	IncludeContainerName bool
	// contains filtered or unexported fields
}

JSONFormatter formats log messages as JSON

func NewJSONFormatter

func NewJSONFormatter() *JSONFormatter

NewJSONFormatter creates a new JSONFormatter with default settings

func (*JSONFormatter) Format

func (f *JSONFormatter) Format(msg LogMessage) string

Format converts a LogMessage to a JSON string

type JSONMatcher

type JSONMatcher struct {
	// contains filtered or unexported fields
}

JSONMatcher detects JSON formatted logs for multiline log merging

func NewJSONMatcher

func NewJSONMatcher() *JSONMatcher

NewJSONMatcher creates a new JSONMatcher

func (*JSONMatcher) ShouldMerge

func (m *JSONMatcher) ShouldMerge(previous, next string) bool

ShouldMerge determines if the next line should be merged with the previous line

type JavaStackMatcher

type JavaStackMatcher struct {
	// contains filtered or unexported fields
}

JavaStackMatcher detects Java stack traces for multiline log merging

func NewJavaStackMatcher

func NewJavaStackMatcher() *JavaStackMatcher

NewJavaStackMatcher creates a new JavaStackMatcher

func (*JavaStackMatcher) ShouldMerge

func (m *JavaStackMatcher) ShouldMerge(previous, next string) bool

ShouldMerge determines if the next line should be merged with the previous line

type LogFilter

type LogFilter struct {
	// PodNameRegex filters pods by name regex
	PodNameRegex *regexp.Regexp
	// ContainerRegex filters containers by name regex
	ContainerRegex *regexp.Regexp
	// LabelSelector filters pods by their labels
	LabelSelector labels.Selector
	// IncludeRegex only includes log lines matching this regex
	IncludeRegex *regexp.Regexp
	// Since only includes logs newer than this time
	Since *time.Time
	// ContainerState filters by container state ("all", "running", "terminated", ...)
	ContainerState string
	// Namespaces is a list of namespaces to filter logs from
	Namespaces []string
}

LogFilter defines filtering criteria for kubernetes logs

type LogFilterBuilder

type LogFilterBuilder struct {
	// contains filtered or unexported fields
}

LogFilterBuilder provides a fluent API for building LogFilter

func NewLogFilterBuilder

func NewLogFilterBuilder() *LogFilterBuilder

NewLogFilterBuilder creates a new LogFilterBuilder

func (*LogFilterBuilder) Build

func (b *LogFilterBuilder) Build() (*LogFilter, error)

Build creates and validates the LogFilter

func (*LogFilterBuilder) ContainerRegex

func (b *LogFilterBuilder) ContainerRegex(pattern string) *LogFilterBuilder

ContainerRegex sets the container name regex pattern

func (*LogFilterBuilder) ContainerState

func (b *LogFilterBuilder) ContainerState(state string) *LogFilterBuilder

ContainerState sets the container state filter

func (*LogFilterBuilder) Include

func (b *LogFilterBuilder) Include(pattern string) *LogFilterBuilder

Include sets the regex for log lines to include

func (*LogFilterBuilder) Label

func (b *LogFilterBuilder) Label(key, value string) *LogFilterBuilder

Label adds a label selector

func (*LogFilterBuilder) Namespace

func (b *LogFilterBuilder) Namespace(namespace string) *LogFilterBuilder

Namespace adds a namespace to filter

func (*LogFilterBuilder) PodRegex

func (b *LogFilterBuilder) PodRegex(pattern string) *LogFilterBuilder

PodRegex sets the pod name regex pattern

func (*LogFilterBuilder) Since

func (b *LogFilterBuilder) Since(duration time.Duration) *LogFilterBuilder

Since sets the time to stream logs from

type LogFormatter

type LogFormatter interface {
	// Format converts a log message to a formatted string
	Format(LogMessage) string
}

LogFormatter formats log messages as strings

type LogHandler

type LogHandler interface {
	// OnLog is called for each log message
	OnLog(LogMessage)
	// OnError is called when an error occurs
	OnError(error)
	// OnEnd is called when log streaming ends
	OnEnd()
}

LogHandler handles log messages and errors

type LogMessage

type LogMessage struct {
	// Namespace is the kubernetes namespace of the pod
	Namespace string
	// PodName is the name of the pod
	PodName string
	// ContainerName is the name of the container within the pod
	ContainerName string
	// Timestamp is the time when the log message was created
	Timestamp time.Time
	// Message is the log content
	Message string
	// Raw contains the original bytes of the log message
	Raw []byte
}

LogMessage represents a single log entry from a kubernetes pod/container

type LogStreamError

type LogStreamError struct {
	// Err is the underlying error
	Err error
	// Permanent indicates if this error is permanent and cannot be recovered from
	Permanent bool
	// Reason is a human-readable description of why the error occurred
	Reason string
}

LogStreamError represents an error that occurred during log streaming

func (*LogStreamError) Error

func (e *LogStreamError) Error() string

Error implements the error interface

func (*LogStreamError) Unwrap

func (e *LogStreamError) Unwrap() error

Unwrap returns the underlying error

type MultilineMatcher

type MultilineMatcher interface {
	// ShouldMerge returns true if the next line should be merged with the previous
	ShouldMerge(previous, next string) bool
}

MultilineMatcher determines if log lines should be merged

type RetryPolicy

type RetryPolicy struct {
	// MaxRetries is the maximum number of retry attempts
	MaxRetries int
	// InitialInterval is the initial delay between retries
	InitialInterval time.Duration
	// MaxInterval is the maximum delay between retries
	MaxInterval time.Duration
	// Multiplier is the factor by which the delay increases between retries
	Multiplier float64
}

RetryPolicy configures the retry behavior for transient errors

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

StreamBuilder provides a fluent API for building and running a streamer

func NewBuilder

func NewBuilder() *StreamBuilder

NewBuilder creates a new StreamBuilder

func (*StreamBuilder) Build

func (b *StreamBuilder) Build() (Streamer, error)

Build creates a Streamer from the accumulated options

func (*StreamBuilder) Run

func (b *StreamBuilder) Run(ctx context.Context) error

Run creates a Streamer from the accumulated options, starts it, and waits for context completion

func (*StreamBuilder) WithClientset

func (b *StreamBuilder) WithClientset(clientset *kubernetes.Clientset) *StreamBuilder

WithClientset adds a direct kubernetes clientset option to the builder This is especially useful for testing with fake.Clientset

func (*StreamBuilder) WithContainerRegex

func (b *StreamBuilder) WithContainerRegex(pattern string) *StreamBuilder

WithContainerRegex adds a container name regex to the log filter

func (*StreamBuilder) WithFormatter

func (b *StreamBuilder) WithFormatter(formatter LogFormatter) *StreamBuilder

WithFormatter sets the log formatter

func (*StreamBuilder) WithHandler

func (b *StreamBuilder) WithHandler(handler LogHandler) *StreamBuilder

WithHandler sets the log handler

func (*StreamBuilder) WithIncludeRegex

func (b *StreamBuilder) WithIncludeRegex(pattern string) *StreamBuilder

WithIncludeRegex adds an include regex to the log filter

func (*StreamBuilder) WithKubeContext

func (b *StreamBuilder) WithKubeContext(name string) *StreamBuilder

WithKubeContext adds a kubernetes context option to the builder

func (*StreamBuilder) WithKubeconfigPath

func (b *StreamBuilder) WithKubeconfigPath(path string) *StreamBuilder

WithKubeconfigPath adds a kubeconfig path option to the builder

func (*StreamBuilder) WithLabel

func (b *StreamBuilder) WithLabel(key, value string) *StreamBuilder

WithLabel adds a label selector to the log filter

func (*StreamBuilder) WithMatcher

func (b *StreamBuilder) WithMatcher(matcher MultilineMatcher) *StreamBuilder

WithMatcher sets the multiline matcher

func (*StreamBuilder) WithNamespace

func (b *StreamBuilder) WithNamespace(namespace string) *StreamBuilder

WithNamespace adds a namespace to the log filter

func (*StreamBuilder) WithPodLabelSelector

func (b *StreamBuilder) WithPodLabelSelector(selector string) *StreamBuilder

WithPodLabelSelector adds a label selector string to the log filter The format is the same as kubectl's label selector (e.g., "app=myapp,env=prod")

func (*StreamBuilder) WithPodRegex

func (b *StreamBuilder) WithPodRegex(pattern string) *StreamBuilder

WithPodRegex adds a pod name regex to the log filter

func (*StreamBuilder) WithRestConfig

func (b *StreamBuilder) WithRestConfig(config *rest.Config) *StreamBuilder

WithRestConfig adds a rest.Config option to the builder

type StreamConfig

type StreamConfig struct {
	// KubeOptions are the options for the kubernetes client
	KubeOptions []kube.Option
	// Filter is the log filter
	Filter *LogFilter
	// Formatter is the log formatter
	Formatter LogFormatter
	// Handler is the log handler
	Handler LogHandler
	// Matcher is the multiline matcher
	Matcher MultilineMatcher
	// RetryPolicy configures retry behavior
	RetryPolicy RetryPolicy
}

StreamConfig holds all the configuration for a streamer

func NewStreamConfig

func NewStreamConfig() *StreamConfig

NewStreamConfig creates a new StreamConfig with default values

type StreamOption

type StreamOption func(*StreamConfig)

StreamOption is a function that configures a streamer

func WithClientset

func WithClientset(clientset *kubernetes.Clientset) StreamOption

WithClientset sets a direct kubernetes clientset to use This is especially useful for testing with fake.Clientset

func WithContainerRegex

func WithContainerRegex(pattern string) StreamOption

WithContainerRegex adds a container name regex to the log filter

func WithContainerState

func WithContainerState(state string) StreamOption

WithContainerState sets the container state filter

func WithFilter

func WithFilter(filter *LogFilter) StreamOption

WithFilter sets the log filter

func WithFormatter

func WithFormatter(formatter LogFormatter) StreamOption

WithFormatter sets the log formatter

func WithHandler

func WithHandler(handler LogHandler) StreamOption

WithHandler sets the log handler

func WithIncludeRegex

func WithIncludeRegex(pattern string) StreamOption

WithIncludeRegex adds an include regex to the log filter

func WithKubeContext

func WithKubeContext(name string) StreamOption

WithKubeContext sets the kubernetes context to use

func WithKubeconfigPath

func WithKubeconfigPath(path string) StreamOption

WithKubeconfigPath sets the path to the kubeconfig file

func WithLabel

func WithLabel(key, value string) StreamOption

WithLabel adds a label selector to the log filter

func WithLabelSelector

func WithLabelSelector(selector string) StreamOption

WithLabelSelector adds a label selector string to the log filter The format is the same as kubectl's label selector (e.g., "app=myapp,env=prod")

func WithMatcher

func WithMatcher(matcher MultilineMatcher) StreamOption

WithMatcher sets the multiline matcher

func WithNamespace

func WithNamespace(namespace string) StreamOption

WithNamespace adds a namespace to the log filter

func WithPodRegex

func WithPodRegex(pattern string) StreamOption

WithPodRegex adds a pod name regex to the log filter

func WithRestConfig

func WithRestConfig(config *rest.Config) StreamOption

WithRestConfig sets the kubernetes client configuration

func WithRetryPolicy

func WithRetryPolicy(policy RetryPolicy) StreamOption

WithRetryPolicy sets the retry policy

func WithSince

func WithSince(duration time.Duration) StreamOption

WithSince sets the time to stream logs from

type Streamer

type Streamer interface {
	// Start begins streaming logs for matching pods
	Start(ctx context.Context) error
	// Stop stops all log streaming activity
	Stop()
}

Streamer is the main interface for streaming logs

type TemplateFormatter

type TemplateFormatter struct {
	// TemplateString is the template string to use
	TemplateString string
	// contains filtered or unexported fields
}

TemplateFormatter formats log messages using Go templates

func NewTemplateFormatter

func NewTemplateFormatter() (*TemplateFormatter, error)

NewTemplateFormatter creates a new TemplateFormatter with the default template

func NewTemplateFormatterWithTemplate

func NewTemplateFormatterWithTemplate(templateStr string) (*TemplateFormatter, error)

NewTemplateFormatterWithTemplate creates a new TemplateFormatter with a custom template

func (*TemplateFormatter) Format

func (f *TemplateFormatter) Format(msg LogMessage) string

Format converts a LogMessage to a formatted string using the template

type TextFormatter

type TextFormatter struct {
	// ShowTimestamp controls whether to display the timestamp
	ShowTimestamp bool
	// ShowNamespace controls whether to display the namespace
	ShowNamespace bool
	// ShowPodName controls whether to display the pod name
	ShowPodName bool
	// ShowContainerName controls whether to display the container name
	ShowContainerName bool
	// TimestampFormat defines the format for timestamps
	TimestampFormat string
	// ColorOutput enables colorized output
	ColorOutput bool
	// contains filtered or unexported fields
}

TextFormatter formats log messages as text

func NewTextFormatter

func NewTextFormatter() *TextFormatter

NewTextFormatter creates a new TextFormatter with default settings

func (*TextFormatter) Format

func (f *TextFormatter) Format(msg LogMessage) string

Format converts a LogMessage to a formatted string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL