Documentation ¶
Index ¶
- Constants
- Variables
- func Consume(ctx context.Context, consumer Consumer, ...)
- type Consumer
- type ConsumerBackoffManager
- type EventBrokerType
- type EventHandler
- type EventPatchAnchorPlatformTransactionCompletionData
- type EventReceiverWalletSMSInvitationData
- type KafkaConfig
- type KafkaConsumer
- type KafkaProducer
- type KafkaSecurityProtocol
- type Message
- type MockConsumer
- type MockProducer
- type NoopProducer
- type Producer
Constants ¶
View Source
const ( ReceiverWalletNewInvitationTopic = "events.receiver-wallets.new_invitation" PaymentCompletedTopic = "events.payment.payment_completed" PaymentReadyToPayTopic = "events.payment.ready_to_pay" )
Topic Names
Note: when adding a new topic here, please, add the new topic to `kafka-init` service command on dev/docker-compose-sdp-anchor.yml.
`kafka-topics.sh --create --if-not-exists --topic events.new-topic ...`
View Source
const ( RetryReceiverWalletSMSInvitationType = "retry-receiver-wallet-sms-invitation" BatchReceiverWalletSMSInvitationType = "batch-receiver-wallet-sms-invitation" PaymentCompletedSuccessType = "payment-completed-success" PaymentCompletedErrorType = "payment-completed-error" PaymentReadyToPayDisbursementStarted = "payment-ready-to-pay-disbursement-started" PaymentReadyToPayReceiverVerificationCompleted = "payment-ready-to-pay-receiver-verification-completed" PaymentReadyToPayRetryFailedPayment = "payment-ready-to-pay-retry-failed-payment" )
Type Names
View Source
const MaxBackoffExponent = 8
Variables ¶
View Source
var ( ErrTopicRequired = errors.New("message topic is required") ErrKeyRequired = errors.New("message key is required") ErrTenantIDRequired = errors.New("message tenant ID is required") ErrTypeRequired = errors.New("message type is required") ErrDataRequired = errors.New("message data is required") )
View Source
var ( SASLProtocols = []KafkaSecurityProtocol{KafkaProtocolSASLPlaintext, KafkaProtocolSASLSSL} SSLProtocols = []KafkaSecurityProtocol{KafkaProtocolSASLSSL, KafkaProtocolSSL} )
Functions ¶
func Consume ¶
func Consume(ctx context.Context, consumer Consumer, crashTracker crashtracker.CrashTrackerClient)
Types ¶
type ConsumerBackoffManager ¶
type ConsumerBackoffManager struct {
// contains filtered or unexported fields
}
func NewBackoffManager ¶
func NewBackoffManager(backoffChan chan<- struct{}) *ConsumerBackoffManager
func (*ConsumerBackoffManager) GetBackoffDuration ¶
func (bm *ConsumerBackoffManager) GetBackoffDuration() time.Duration
func (*ConsumerBackoffManager) ResetBackoff ¶
func (bm *ConsumerBackoffManager) ResetBackoff()
func (*ConsumerBackoffManager) TriggerBackoff ¶
func (bm *ConsumerBackoffManager) TriggerBackoff()
type EventBrokerType ¶
type EventBrokerType string
const ( KafkaEventBrokerType EventBrokerType = "KAFKA" // NoneEventBrokerType means that no event broker was chosen. NoneEventBrokerType EventBrokerType = "NONE" )
func ParseEventBrokerType ¶
func ParseEventBrokerType(ebType string) (EventBrokerType, error)
type EventHandler ¶
type EventPatchAnchorPlatformTransactionCompletionData ¶
type EventPatchAnchorPlatformTransactionCompletionData struct {
PaymentID string `json:"payment_id"`
}
type EventReceiverWalletSMSInvitationData ¶
type EventReceiverWalletSMSInvitationData struct {
ReceiverWalletID string `json:"id"`
}
type KafkaConfig ¶
type KafkaConfig struct { Brokers []string SecurityProtocol KafkaSecurityProtocol SASLUsername string SASLPassword string SSLAccessKey string SSLAccessCertificate string }
func (*KafkaConfig) Validate ¶
func (kc *KafkaConfig) Validate() error
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func NewKafkaConsumer ¶
func NewKafkaConsumer(config KafkaConfig, topic string, consumerGroupID string, handlers ...EventHandler) (*KafkaConsumer, error)
func (*KafkaConsumer) Close ¶
func (k *KafkaConsumer) Close() error
func (*KafkaConsumer) ReadMessage ¶
func (k *KafkaConsumer) ReadMessage(ctx context.Context) error
func (*KafkaConsumer) Topic ¶
func (k *KafkaConsumer) Topic() string
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func NewKafkaProducer ¶
func NewKafkaProducer(config KafkaConfig) (*KafkaProducer, error)
func (*KafkaProducer) Close ¶
func (k *KafkaProducer) Close() error
func (*KafkaProducer) WriteMessages ¶
func (k *KafkaProducer) WriteMessages(ctx context.Context, messages ...Message) error
type KafkaSecurityProtocol ¶
type KafkaSecurityProtocol string
const ( KafkaProtocolPlaintext KafkaSecurityProtocol = "PLAINTEXT" KafkaProtocolSASLPlaintext KafkaSecurityProtocol = "SASL_PLAINTEXT" KafkaProtocolSASLSSL KafkaSecurityProtocol = "SASL_SSL" KafkaProtocolSSL KafkaSecurityProtocol = "SSL" )
func ParseKafkaSecurityProtocol ¶
func ParseKafkaSecurityProtocol(protocol string) (KafkaSecurityProtocol, error)
type Message ¶
type Message struct { Topic string `json:"topic"` Key string `json:"key"` TenantID string `json:"tenant_id"` Type string `json:"type"` Data any `json:"data"` }
func NewMessage ¶
NewMessage returns a new message with values passed by parameters. It also parses the `TenantID` from the context and inject it into the message. Returns error if the tenant is not found in the context.
type MockConsumer ¶
func (*MockConsumer) Close ¶
func (c *MockConsumer) Close() error
func (*MockConsumer) ReadMessage ¶
func (c *MockConsumer) ReadMessage(ctx context.Context) error
func (*MockConsumer) RegisterEventHandler ¶
func (c *MockConsumer) RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error
func (*MockConsumer) Topic ¶
func (c *MockConsumer) Topic() string
type MockProducer ¶
func (*MockProducer) Close ¶
func (c *MockProducer) Close() error
func (*MockProducer) WriteMessages ¶
func (c *MockProducer) WriteMessages(ctx context.Context, messages ...Message) error
type NoopProducer ¶
type NoopProducer struct{}
NoopProducer is a producer used to log messages instead of sending them to a real producer.
func (NoopProducer) Close ¶
func (p NoopProducer) Close() error
func (NoopProducer) WriteMessages ¶
func (p NoopProducer) WriteMessages(ctx context.Context, messages ...Message) error
Source Files ¶
Click to show internal directories.
Click to hide internal directories.