kinesis

package
v3.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RecoverableErrorType = ErrorType("Error")
	PanicErrorType       = ErrorType("Panic") // Runtime errors.
)

These are the currently supported ErrorType values that can be emitted to an ErrorStore.

View Source
const (
	ShardStatusActive  = "ACTIVE"
	ShardStatusPending = "PENDING"
	ShardStatusClosed  = "CLOSED"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorStore

type ErrorStore interface {
	// Available checks if the backing resource can receive error
	// messages via Push.
	Available() bool

	// Push emits an error message of type ErrorType to the backing resource.
	//
	// The caller should NOT assume that Available is called implicitly
	// to check for ErrorStore availability.
	Push(ErrorType, string, logger.Logger) error
}

ErrorStore is an abstraction over a resource like external storage, a database, a queue, etc. that can receive and store error messages.

type ErrorStreamLogger

type ErrorStreamLogger struct {
	// contains filtered or unexported fields
}

ErrorStreamLogger is a logger.Logger implementation that decorates a base logger.Logger instance and emits error and panic messages to an ErrorStore.

All other log levels delegate to the base logger.Logger implementation.

func NewErrorStreamLogger

func NewErrorStreamLogger(base logger.Logger, store ErrorStore) *ErrorStreamLogger

NewErrorStreamLogger constructs an ErrorStreamLogger from a logger.Logger and an ErrorStore.

func (*ErrorStreamLogger) Debugf

func (esl *ErrorStreamLogger) Debugf(format string, v ...interface{})

Debugf just delegates to the wrapped/base logger.Logger's Debugf implementation.

func (*ErrorStreamLogger) Errorf

func (esl *ErrorStreamLogger) Errorf(format string, v ...interface{})

Errorf delegates to the wrapped/base logger.Logger's Errorf implementation and additionally/ pushes an error message with RecoverableErrorType ErrorType to the ErrorStore.

If an error occurs during a Push to the ErrorStore, the error is logged to the wrapped/base logger.Logger using Errorf.

func (*ErrorStreamLogger) Infof

func (esl *ErrorStreamLogger) Infof(format string, v ...interface{})

Infof just delegates to the wrapped/base logger.Logger's Infof implementation.

func (*ErrorStreamLogger) Panicf

func (esl *ErrorStreamLogger) Panicf(format string, v ...interface{})

Panicf delegates to the wrapped/base logger.Logger's Panicf implementation and additionally pushes an error message with PanicErrorType ErrorType to the ErrorStore.

If an error occurs during a Push to the ErrorStore, the error is logged to the wrapped/base logger.Logger using Errorf NOT Panicf.

func (*ErrorStreamLogger) Printf

func (esl *ErrorStreamLogger) Printf(format string, v ...interface{})

Printf just delegates to the wrapped/base logger.Logger's Printf implementation.

func (*ErrorStreamLogger) Warnf

func (esl *ErrorStreamLogger) Warnf(format string, v ...interface{})

Warnf just delegates to the wrapped/base logger.Logger's Warnf implementation.

func (*ErrorStreamLogger) WithPrefix added in v3.32.0

func (esl *ErrorStreamLogger) WithPrefix(prefix string) logger.Logger

type ErrorType

type ErrorType string

ErrorType signifies the type of error encountered.

type Main

type Main struct {
	idk.Main           `flag:"!embed"`
	Timeout            time.Duration `help:"Time to wait for more records from Kinesis before flushing a batch. 0 to disable."`
	Header             string        `help:"Path to the static schema, in JSON header format. May be a path on the local filesystem, or an S3 URI."`
	AWSRegion          string        `help:"AWS Region. Alternatively, use environment variable AWS_REGION."`
	AllowMissingFields bool          `help:"Will proceed with ingest even if a field is missing from a record but specified in the JSON config file. Default false"`
	StreamName         string        `help:"Name of AWS Kinesis stream to consume records from."`
	OffsetsPath        string        `help:"Path where the offsets file will be written. May be a path on the local filesystem, or an S3 URI."`
	AWSProfile         string        `help:"Name of AWS profile to use. Alternatively, use environment variable AWS_PROFILE."`
	ErrorQueueName     string        `help:"SQS queue name to send error and panic/runtime errors to."`
}

Main is the holder of all configurations for a Kinesis stream consumer.

Along with the additional configuration fields, kinesis.Main also gains all fields and methods from idk.Main via composition.

func NewMain

func NewMain() *Main

NewMain returns a new instance of a Kinesis stream consumer configuration object.

It specifies a callback NewSource that can be invoked to create a kinesis.Source object. This callback implicitly initializes an AWS session and uses that session to initialize clients to the following AWS resources: S3, Kinesis, and SQS. Client creation happens regardless of configuration. (ex: OffsetsPath and Header are local paths -> S3 client is created.)

The default BatchSize is 20000 and Concurrency is 1. Any Concurrency value > 1 is NOT supported. These values are set on the returned kinesis.Main instance.

The Logger instance on the kinesis.Source is always decorated when NewSource is invoked. Assuming no errors occur during AWS client initialization, the decorated Logger instance is propagated back to the kinesis.Main so that callers that configured it can also emit errors and panics to the SQS queue specified by ErrorQueueName. The behavior of the wrapped Logger depends on a non-empty ErrorQueueName, the existence of an SQS queue instance in AWSRegion with that name the StreamName field being of a particular format 'PREFIX'-VALID_UUID, and if a valid SQS queue URL can be resolved at the time of Logger initialization. If any of these are false, the error emission to an SQS queue functionality is not activated and the Logger instance behaves identically to its wrapped Logger and emits a warning to the caller that errors are not propagated to SQS.

type Record

type Record struct {
	// contains filtered or unexported fields
}

func (*Record) Commit

func (r *Record) Commit(ctx context.Context) error

func (*Record) Data

func (r *Record) Data() []interface{}

func (*Record) Schema added in v3.34.0

func (r *Record) Schema() interface{}

func (*Record) StreamOffset

func (r *Record) StreamOffset() (string, uint64)

type ShardOffset

type ShardOffset struct {
	ArrivalTime    string `json:"arrival_time"`
	CommittedTime  string `json:"committed_time"`
	Index          uint64 `json:"index"`
	SequenceNumber string `json:"sequence_number"`
}

type ShardRecord

type ShardRecord struct {
	ShardID string
	Index   uint64
	*kinesis.Record
}

ShardRecord wraps a kinesis record including metadata about its shard and an IDK-local index

type SinkErrorPayload

type SinkErrorPayload struct {
	SinkId       string    `json:"sink_id"`
	ErrorType    ErrorType `json:"error_type"`
	ErrorMessage string    `json:"error_msg"`
	Timestamp    string    `json:"time"`
}

SinkErrorPayload contains all data about a single IDK error message that will be emitted to an ErrorStore; includes a timestamp and a valid sink ID.

This payload is not meant to be used outside the context of error propogation to an ErrorStore.

type SinkErrorQueue

type SinkErrorQueue struct {
	// contains filtered or unexported fields
}

SinkErrorQueue is an ErrorStore implementation that uses an SQS queue as its backing resource to emit error and panic messages to.

It also maps 1-to-1 to a Kinesis stream via a unique sink ID.

func NewSinkErrorQueue

func NewSinkErrorQueue(queue sqsiface.SQSAPI, queueName, sinkId string) (*SinkErrorQueue, error)

NewSinkErrorQueue attempts to construct a SinkErrorQueue instance from an AWS SQS client, a queue name, and a sink ID.

On success, callers can assume that a backing SQS queue resource exists and is fully initialized.

Returns nil and an SQS error if an SQS queue URL cannot be resolved from the queue name and/or the AWS SQS client.

This method assumes the sink ID argument is valid.

func SinkErrorQueueFrom

func SinkErrorQueueFrom(queue sqsiface.SQSAPI, source *Source) *SinkErrorQueue

SinkErrorQueueFrom always constructs a SinkErrorQueue instance from an AWS SQS client and a kinesis.Source.

Unlike NewSinkErrorQueue, this does NOT return an error if a queue URL cannot be resolved from the queue name and/or the AWS SQS client. Instead it will collapse to a SinkErrorQueue instance with a backing SQS resource that is ALWAYS unavailable. Attempting to invoke Push on this instance will not result in an error; instead it will just emit a warning that no backing SQS resource could be written to.

This does check if the sink ID has a valid form: 'PREFIX'-VALID_UUID. If not, this collapses to a SinkErrorQueue instance that is ALWAYS unavailable.

func (*SinkErrorQueue) Available

func (seq *SinkErrorQueue) Available() bool

Available checks that a valid SQS queue resource exists.

func (*SinkErrorQueue) Push

func (seq *SinkErrorQueue) Push(errorType ErrorType, message string, log logger.Logger) error

Push attempts to emit a single error message of type ErrorType to a SQS queue resource.

If the backing SQS queue resource is not available, this function is a no-op and does NOT return an error. Instead it emits a warning to the logger.Logger instance specified by the log argument.

The warning can be ignored entirely by passing a nil log argument.

type Source

type Source struct {
	Log        logger.Logger
	Timeout    time.Duration
	Header     string
	AWSRegion  string
	AWSProfile string

	AllowMissingFields bool

	StreamName  string
	OffsetsPath string

	ErrorQueueName string
	// contains filtered or unexported fields
}

Source implements the idk.Source interface using kafka as a data source. It is not threadsafe! Due to the way Kafka clients work, to achieve concurrency, create multiple Sources.

func NewSource

func NewSource() *Source

NewSource gets a new Source

func (*Source) Close

func (s *Source) Close() error

Close closes the underlying Kinesis consumer.

func (*Source) Open

func (s *Source) Open() error

Open initializes the Kinesis source.

func (*Source) Record

func (s *Source) Record() (idk.Record, error)

Record returns the value of the next kinesis message. The same Record object may be used by successive calls to Record, so it should not be retained.

func (*Source) Schema

func (s *Source) Schema() []idk.Field

type StreamOffsets

type StreamOffsets struct {
	// synchronize updates/reads to the Shards map
	sync.RWMutex
	StreamName string                  `json:"stream_name"`
	Shards     map[string]*ShardOffset `json:"shards"`
}

func ReadOffsets

func ReadOffsets(cfg StreamReaderConfig) (*StreamOffsets, error)

func (*StreamOffsets) Load

func (o *StreamOffsets) Load(shardID string) (*ShardOffset, bool)

Load returns the ShardOffset for the shard in a thread safe manner

type StreamReader

type StreamReader struct {
	StreamReaderConfig
	// contains filtered or unexported fields
}

func NewStreamReader

func NewStreamReader(cfg StreamReaderConfig) (*StreamReader, error)

func (*StreamReader) Close

func (r *StreamReader) Close()

func (*StreamReader) CommitMessages

func (r *StreamReader) CommitMessages(ctx context.Context, msgs ...ShardRecord) error

func (*StreamReader) FetchMessage

func (r *StreamReader) FetchMessage(ctx context.Context) (ShardRecord, error)

func (*StreamReader) ProcessShards

func (r *StreamReader) ProcessShards() error

func (*StreamReader) Start

func (r *StreamReader) Start() error

type StreamReaderConfig

type StreamReaderConfig struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL