kafka

package
v0.0.0-...-423bd50 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: BSD-3-Clause Imports: 19 Imported by: 0

Documentation

Overview

Package kafka manages kafka interaction

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Emit

func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error

Emit sends a message over the Kafka interface.

func SignedBlindedTokenIssuerHandler

func SignedBlindedTokenIssuerHandler(
	msg kafka.Message,
	producer *kafka.Writer,
	server *cbpServer.Server,
	log *zerolog.Logger,
) error

SignedBlindedTokenIssuerHandler emits signed, blinded tokens based on provided blinded tokens.

In cases where there are unrecoverable errors that prevent progress we will return nil.
These permanent failure cases are different from cases where we encounter temporary
errors inside the request data. For permanent failures inside the data processing loop we
simply add the error to the results. However, temporary errors inside the loop should break
the loop and return non-nil just like the errors outside the data processing loop. This is
because future attempts to process permanent failure cases will not succeed.
@TODO: It would be better for the Server implementation and the Kafka implementation of
this behavior to share utility functions rather than passing an instance of the server
as an argument here. That will require a bit of refactoring.

func SignedTokenRedeemHandler

func SignedTokenRedeemHandler(
	msg kafka.Message,
	producer *kafka.Writer,
	server *cbpServer.Server,
	log *zerolog.Logger,
) error

func StartConsumers

func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error

StartConsumers reads configuration variables and starts the associated kafka consumers

Types

type MessageContext

type MessageContext struct {
	// contains filtered or unexported fields
}

MessageContext is used for channel coordination when processing batches of messages

type ProcessingResult

type ProcessingResult struct {
	ResultProducer *kafka.Writer
	Message        []byte
	RequestID      string
}

ProcessingResult contains a message and the topic to which the message should be emitted

type Processor

type Processor func(
	kafka.Message,
	*kafka.Writer,
	*server.Server,
	*zerolog.Logger,
) error

Processor is a function that is used to process Kafka messages

type SignedIssuerToken

type SignedIssuerToken struct {
	// contains filtered or unexported fields
}

type TopicMapping

type TopicMapping struct {
	Topic          string
	ResultProducer *kafka.Writer
	Processor      Processor
	Group          string
}

TopicMapping represents a kafka topic, how to process it, and where to emit the result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL