kafka

package
v0.0.0-...-4263410 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2024 License: GPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NotificationSuccessEventType = "launch-success"
	NotificationFailureEventType = "launch-failed"
)

Variables

View Source
var (
	ErrDifferentTopic       = errors.New("messages in batch have different topics")
	ErrUnknownSASLMechanism = errors.New("unknown SASL mechanism")
)
View Source
var (
	AvailabilityStatusRequestTopic string
	SourcesStatusTopic             string
	NotificationTopic              string
)

topics after clowder mapping

Functions

func Consume

func Consume(ctx context.Context, topic string, since time.Time, handler func(ctx context.Context, message *GenericMessage))

func InitializeKafkaBroker

func InitializeKafkaBroker(ctx context.Context) error

func InitializeStubBroker

func InitializeStubBroker(bufferSize int) error

func InitializeTopicRequests

func InitializeTopicRequests(ctx context.Context)

InitializeTopicRequests performs clowder mapping of topics.

func Send

func Send(ctx context.Context, messages ...*GenericMessage) error

Types

type AvailabilityStatusMessage

type AvailabilityStatusMessage struct {
	SourceID string `json:"source_id"`
}

func NewAvailabilityStatusMessage

func NewAvailabilityStatusMessage(msg *GenericMessage) (*AvailabilityStatusMessage, error)

func (AvailabilityStatusMessage) GenericMessage

type Broker

type Broker interface {
	// Send one or more messages to the kafka
	Send(ctx context.Context, messages ...*GenericMessage) error

	// Consume messages of a single topic in a loop. Blocking call, use context cancellation to stop.
	Consume(ctx context.Context, topic string, since time.Time, handler func(ctx context.Context, message *GenericMessage))
}

func NewKafkaBroker

func NewKafkaBroker(ctx context.Context) (Broker, error)

func NewStubBroker

func NewStubBroker(bufferSize int) Broker

type GenericHeader

type GenericHeader struct {
	Key   string
	Value string
}

func GenericHeaders

func GenericHeaders(args ...string) []GenericHeader

GenericHeaders returns slice of headers

type GenericMessage

type GenericMessage struct {
	// Topic of the message. Some producers already have associated topic, in that case Topic from the message will be ignored.
	Topic string

	// Key is used for topic partitioning. Can be nil.
	Key []byte

	// Value is the payload. Typically, a JSON marshaled data.
	Value []byte

	// List of key-value pairs for each message.
	Headers []GenericHeader
}

GenericMessage is a platform independent message.

func NewMessageFromKafka

func NewMessageFromKafka(km *kafka.Message) *GenericMessage

NewMessageFromKafka converts generic message to native message

func (GenericMessage) Header

func (m GenericMessage) Header(name string) string

func (GenericMessage) KafkaMessage

func (m GenericMessage) KafkaMessage() kafka.Message

KafkaMessage converts from generic to native message.

type NativeMessage

type NativeMessage interface {
	// GenericMessage returns a generic message that is platform independent.
	GenericMessage(ctx context.Context) (GenericMessage, error)
}

NativeMessage represents a native (kafka) message. It can be converted to GenericMessage.

type NotificationContext

type NotificationContext struct {
	LaunchID int64  `json:"launch_id"`
	Provider string `json:"provider"`
}

type NotificationError

type NotificationError struct {
	Error string `json:"error"`
}

type NotificationEvent

type NotificationEvent struct {
	Payload json.RawMessage `json:"payload"`
}

type NotificationMessage

type NotificationMessage struct {
	Version     string                   `json:"version"`
	Bundle      string                   `json:"bundle"`
	Application string                   `json:"application"`
	EventType   string                   `json:"event_type"`
	Timestamp   string                   `json:"timestamp"`
	AccountID   string                   `json:"account_id"`
	OrgId       string                   `json:"org_id"`
	Context     interface{}              `json:"context"`
	Events      []NotificationEvent      `json:"events"`
	Recipients  []notificationRecipients `json:"recipients"`
	ID          string                   `json:"id"`
}

func (NotificationMessage) GenericMessage

func (m NotificationMessage) GenericMessage(ctx context.Context) (GenericMessage, error)

type SourceResult

type SourceResult struct {
	MessageContext     context.Context `json:"-"` // Carries logger and identity
	ResourceID         string          `json:"resource_id"`
	ResourceType       string          `json:"resource_type"`
	Status             StatusType      `json:"status"`
	UserError          string          `json:"error"`
	Err                error           `json:"-"` // Sources do not support error field
	MissingPermissions []string        `json:"-"` // Sources do not support reason field
}

func (SourceResult) GenericMessage

func (sr SourceResult) GenericMessage(ctx context.Context) (GenericMessage, error)

type StatusType

type StatusType string
const (
	StatusUnavailable StatusType = "unavailable"
	StatusAvailable   StatusType = "available"
)

func (StatusType) String

func (st StatusType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL