subscriber

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelConsumer

type ChannelConsumer struct {
	// contains filtered or unexported fields
}

consumer, err := subscriber.NewChannelConsumer(

reader,
dummy.NewService(
	store.NewStore(
		store.StoreTableConfig{
			Logs: fmt.Sprint(tablePrefix, viper.GetString("dynamodb.tables.audit.logs")),
		},
		dynamoClient,
	),
	monitor,
	readerCfg.GetGroupID(),
),
monitor,

)

if err != nil {
	panic(err)
}

consumer.Start()

c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

// Block until we receive our signal. <-c

consumer.Stop() log.Info("Shutting down") os.Exit(0)

func NewChannelConsumer

func NewChannelConsumer(refreshable core.RefreshableInterface, svc ServiceInterface, monitor monitoring.MonitorInterface) (*ChannelConsumer, error)

NewChannelConsumer creates an object implementing ConsumerInterface

func (*ChannelConsumer) Start

func (c *ChannelConsumer) Start()

Start creates all the goroutines for read and consume

func (*ChannelConsumer) Stats

func (c *ChannelConsumer) Stats() kafka.ReaderStats

Stats returns kafka.ReaderStats

func (*ChannelConsumer) Stop

func (c *ChannelConsumer) Stop()

Stop makes sure goroutines for read and consume are being gracefully stopped

type ServiceInterface

type ServiceInterface interface {
	TaskName() string
	Flow(MessageKey, MessageValue []byte) error
}

type StandardConsumer

type StandardConsumer struct {
	// contains filtered or unexported fields
}

consumer, err := subscriber.NewStandardConsumer(

reader,
dummy.NewService(
	store.NewStore(
		store.StoreTableConfig{
			Logs: fmt.Sprint(tablePrefix, viper.GetString("dynamodb.tables.audit.logs")),
		},
		dynamoClient,
	),
	monitor,
	readerCfg.GetGroupID(),
),
monitor,

)

if err != nil {
	panic(err)
}

consumer.Start()

c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

// Block until we receive our signal. <-c

consumer.Stop() log.Info("Shutting down") os.Exit(0)

func NewStandardConsumer

func NewStandardConsumer(refreshable core.RefreshableInterface, svc ServiceInterface, monitor monitoring.MonitorInterface) (*StandardConsumer, error)

NewStandardConsumer creates an object implementing ConsumerInterface

func (*StandardConsumer) Start

func (c *StandardConsumer) Start()

Start creates all the goroutine for consuming and reading

func (*StandardConsumer) Stats

func (c *StandardConsumer) Stats() kafka.ReaderStats

Stats returns kafka.ReaderStats

func (*StandardConsumer) Stop

func (c *StandardConsumer) Stop()

Stop makes sure goroutines for read and consume are being gracefully stopped

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL