consumers

package
v0.0.0-...-de41a1e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ApplicationJson      = "application/json"
	ProjectNotFound      = "project doesn't exist"
	SubscriptionNotFound = "Subscription doesn't exist"
)
View Source
const (
	AmsHttpConsumerType consumerType = "ams-http-consumer"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AmsHttpConsumer

type AmsHttpConsumer struct {
	// contains filtered or unexported fields
}

AmsHttpConsumer is a consumer that helps us interface with AMS through its rest api

func NewAmsHttpConsumer

func NewAmsHttpConsumer(fullSub string, amsClient *ams.Client) *AmsHttpConsumer

NewAmsHttpConsumer initialises and returns a new ams http consumer

func (*AmsHttpConsumer) Ack

func (ahc *AmsHttpConsumer) Ack(ctx context.Context, ackId string) error

Ack acknowledges that an ams message has been consumed and processed

func (*AmsHttpConsumer) Consume

func (ahc *AmsHttpConsumer) Consume(ctx context.Context, numberOfMessages int64) (ams.ReceivedMessagesList, error)

Consume consumes messages from an subscription

func (*AmsHttpConsumer) ResourceInfo

func (ahc *AmsHttpConsumer) ResourceInfo() string

ResourceInfo returns the ams subscription and the ams host it is on

func (*AmsHttpConsumer) ToCancelableError

func (ahc *AmsHttpConsumer) ToCancelableError(error error) (CancelableError, bool)

type AmsHttpError

type AmsHttpError struct {
	Error amsErr `json:"error"`
}

AmsHttpError represents the layout of an ams http api error

type CancelableError

type CancelableError struct {
	// string representation of the occurred error
	ErrMsg string
	// the resource that the error relates to
	Resource string
}

There are specific errors that if they are faced they indicate that the consumption should stop CancelableError is used as a special error form that indicates that the push worker should stop its functionality in case it occurs

func NewCancelableError

func NewCancelableError(errMSg string, resource string) CancelableError

type Consumer

type Consumer interface {
	// Consume pulls data from the source
	Consume(ctx context.Context, numberOfMessages int64) (ams.ReceivedMessagesList, error)
	// Ack acknowledges that a data have been successfully pulled and send
	Ack(ctx context.Context, ackId string) error
	// ResourceInfo returns returns a string representation of the data source
	ResourceInfo() string
	// ToCancelableError checks whether or not an error represents a cancelable error
	// for the respective consumer, if it does, it formats it to a cancelable error
	ToCancelableError(error error) (CancelableError, bool)
}

Consumer is used to consume data from a source.

func New

func New(cType consumerType, fullSub string, amsClient *ams.Client) (Consumer, error)

New acts as consumer factory, creates and returns a new consumer based on the provided type

type MockConsumer

type MockConsumer struct {
	GeneratedMessages     []ams.ReceivedMessage
	AckMessages           []string
	SubStatus             string
	AckStatus             string
	UpdStatus             string
	UpdatedStatusMessages []string
}

func (*MockConsumer) Ack

func (m *MockConsumer) Ack(ctx context.Context, ackId string) error

func (*MockConsumer) Consume

func (m *MockConsumer) Consume(ctx context.Context, numberOfMessages int64) (ams.ReceivedMessagesList, error)

func (*MockConsumer) ResourceInfo

func (m *MockConsumer) ResourceInfo() string

func (*MockConsumer) ToCancelableError

func (m *MockConsumer) ToCancelableError(error error) (CancelableError, bool)

func (*MockConsumer) UpdateResourceStatus

func (m *MockConsumer) UpdateResourceStatus(ctx context.Context, status string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL