events

package
v0.0.0-...-9cf9cea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReceiverWalletNewInvitationTopic = "events.receiver-wallets.new_invitation"
	PaymentCompletedTopic            = "events.payment.payment_completed"
	PaymentReadyToPayTopic           = "events.payment.ready_to_pay"
)

Topic Names

Note: when adding a new topic here, please, add the new topic to `kafka-init` service command on dev/docker-compose-sdp-anchor.yml.

`kafka-topics.sh --create --if-not-exists --topic events.new-topic ...`
View Source
const (
	RetryReceiverWalletSMSInvitationType           = "retry-receiver-wallet-sms-invitation"
	BatchReceiverWalletSMSInvitationType           = "batch-receiver-wallet-sms-invitation"
	PaymentCompletedSuccessType                    = "payment-completed-success"
	PaymentCompletedErrorType                      = "payment-completed-error"
	PaymentReadyToPayDisbursementStarted           = "payment-ready-to-pay-disbursement-started"
	PaymentReadyToPayReceiverVerificationCompleted = "payment-ready-to-pay-receiver-verification-completed"
	PaymentReadyToPayRetryFailedPayment            = "payment-ready-to-pay-retry-failed-payment"
)

Type Names

View Source
const MaxBackoffExponent = 8

Variables

View Source
var (
	ErrTopicRequired    = errors.New("message topic is required")
	ErrKeyRequired      = errors.New("message key is required")
	ErrTenantIDRequired = errors.New("message tenant ID is required")
	ErrTypeRequired     = errors.New("message type is required")
	ErrDataRequired     = errors.New("message data is required")
)

Functions

func Consume

func Consume(ctx context.Context, consumer Consumer, crashTracker crashtracker.CrashTrackerClient)

Types

type Consumer

type Consumer interface {
	ReadMessage(ctx context.Context) error
	Topic() string
	Close() error
}

type ConsumerBackoffManager

type ConsumerBackoffManager struct {
	// contains filtered or unexported fields
}

func NewBackoffManager

func NewBackoffManager(backoffChan chan<- struct{}) *ConsumerBackoffManager

func (*ConsumerBackoffManager) GetBackoffDuration

func (bm *ConsumerBackoffManager) GetBackoffDuration() time.Duration

func (*ConsumerBackoffManager) ResetBackoff

func (bm *ConsumerBackoffManager) ResetBackoff()

func (*ConsumerBackoffManager) TriggerBackoff

func (bm *ConsumerBackoffManager) TriggerBackoff()

type EventBrokerType

type EventBrokerType string
const (
	KafkaEventBrokerType EventBrokerType = "KAFKA"
	// NoneEventBrokerType means that no event broker was chosen.
	NoneEventBrokerType EventBrokerType = "NONE"
)

func ParseEventBrokerType

func ParseEventBrokerType(ebType string) (EventBrokerType, error)

type EventHandler

type EventHandler interface {
	Name() string
	CanHandleMessage(ctx context.Context, message *Message) bool
	Handle(ctx context.Context, message *Message)
}

type EventPatchAnchorPlatformTransactionCompletionData

type EventPatchAnchorPlatformTransactionCompletionData struct {
	PaymentID string `json:"payment_id"`
}

type EventReceiverWalletSMSInvitationData

type EventReceiverWalletSMSInvitationData struct {
	ReceiverWalletID string `json:"id"`
}

type KafkaConfig

type KafkaConfig struct {
	Brokers              []string
	SecurityProtocol     KafkaSecurityProtocol
	SASLUsername         string
	SASLPassword         string
	SSLAccessKey         string
	SSLAccessCertificate string
}

func (*KafkaConfig) Validate

func (kc *KafkaConfig) Validate() error

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(config KafkaConfig, topic string, consumerGroupID string, handlers ...EventHandler) (*KafkaConsumer, error)

func (*KafkaConsumer) Close

func (k *KafkaConsumer) Close() error

func (*KafkaConsumer) ReadMessage

func (k *KafkaConsumer) ReadMessage(ctx context.Context) error

func (*KafkaConsumer) Topic

func (k *KafkaConsumer) Topic() string

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(config KafkaConfig) (*KafkaProducer, error)

func (*KafkaProducer) Close

func (k *KafkaProducer) Close() error

func (*KafkaProducer) WriteMessages

func (k *KafkaProducer) WriteMessages(ctx context.Context, messages ...Message) error

type KafkaSecurityProtocol

type KafkaSecurityProtocol string
const (
	KafkaProtocolPlaintext     KafkaSecurityProtocol = "PLAINTEXT"
	KafkaProtocolSASLPlaintext KafkaSecurityProtocol = "SASL_PLAINTEXT"
	KafkaProtocolSASLSSL       KafkaSecurityProtocol = "SASL_SSL"
	KafkaProtocolSSL           KafkaSecurityProtocol = "SSL"
)

func ParseKafkaSecurityProtocol

func ParseKafkaSecurityProtocol(protocol string) (KafkaSecurityProtocol, error)

type Message

type Message struct {
	Topic    string `json:"topic"`
	Key      string `json:"key"`
	TenantID string `json:"tenant_id"`
	Type     string `json:"type"`
	Data     any    `json:"data"`
}

func NewMessage

func NewMessage(ctx context.Context, topic, key, messageType string, data any) (*Message, error)

NewMessage returns a new message with values passed by parameters. It also parses the `TenantID` from the context and inject it into the message. Returns error if the tenant is not found in the context.

func (Message) String

func (m Message) String() string

func (Message) Validate

func (m Message) Validate() error

type MockConsumer

type MockConsumer struct {
	mock.Mock
}

func (*MockConsumer) Close

func (c *MockConsumer) Close() error

func (*MockConsumer) ReadMessage

func (c *MockConsumer) ReadMessage(ctx context.Context) error

func (*MockConsumer) RegisterEventHandler

func (c *MockConsumer) RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error

func (*MockConsumer) Topic

func (c *MockConsumer) Topic() string

type MockProducer

type MockProducer struct {
	mock.Mock
}

func (*MockProducer) Close

func (c *MockProducer) Close() error

func (*MockProducer) WriteMessages

func (c *MockProducer) WriteMessages(ctx context.Context, messages ...Message) error

type NoopProducer

type NoopProducer struct{}

NoopProducer is a producer used to log messages instead of sending them to a real producer.

func (NoopProducer) Close

func (p NoopProducer) Close() error

func (NoopProducer) WriteMessages

func (p NoopProducer) WriteMessages(ctx context.Context, messages ...Message) error

type Producer

type Producer interface {
	WriteMessages(ctx context.Context, messages ...Message) error
	Close() error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL