Version: v0.1.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2021 License: Apache-2.0 Imports: 14 Imported by: 1




This section is empty.


View Source
var DefaultCLIHome = os.ExpandEnv("$HOME/.kafka")

DefaultCLIHome : is the home path

View Source
var ModuleCdc *codec.Codec

module codec

View Source
var SleepRoutine = time.Duration(2500000000)

SleepRoutine : the time the kafka messages are to be taken in

View Source
var SleepTimer = time.Duration(1000000000)

SleepTimer : the time the kafka messages are to be taken in

View Source
var TicketIDAtomicCounter int64

TicketIDAtomicCounter is a counter that adds when each time a function is called

View Source
var Topics = []string{

Topics : is list of topics


func AddResponseToDB

func AddResponseToDB(TicketID Ticket, response []byte, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec)

AddResponseToDB : Updates response to DB

func CliCtxFromKafkaMsg

func CliCtxFromKafkaMsg(kafkaMsg KafkaMsg, cliContext context.CLIContext) context.CLIContext

CliCtxFromKafkaMsg : sets the transaction and cli contexts again to consume

func GetResponseFromDB

func GetResponseFromDB(TicketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec) []byte

GetResponseFromDB : gives the response from DB

func KafkaAdmin

func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin

KafkaAdmin : is admin to create topics

func KafkaProducerDeliverMessage

func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error

KafkaProducerDeliverMessage : delivers messages to kafka

func NewConsumer

func NewConsumer(kafkaPorts []string) sarama.Consumer

NewConsumer : is a consumer which is needed to create child consumers to consume topics

func NewProducer

func NewProducer(kafkaPorts []string) sarama.SyncProducer

NewProducer is a producer to send messages to kafka

func PartitionConsumers

func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer

PartitionConsumers : is a child consumer

func QueryDB

func QueryDB(cdc *codec.Codec, kafkaDB *dbm.GoLevelDB) http.HandlerFunc

QueryDB : REST outputs info from DB

func RegisterCodec

func RegisterCodec(cdc *codec.Codec)

Register concrete types on codec

func SendToKafka

func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte

SendToKafka : handles sending message to kafka

func SetTicketIDtoDB

func SetTicketIDtoDB(TicketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec, msg []byte)

SetTicketIDtoDB : initiates ticketID in Database

func TopicsInit

func TopicsInit(admin sarama.ClusterAdmin, topic string)

TopicsInit : is needed to initialise topics


type KafkaCliCtx

type KafkaCliCtx struct {
	OutputFormat  string
	ChainID       string
	Height        int64
	HomeDir       string
	NodeURI       string
	From          string
	TrustNode     bool
	UseLedger     bool
	BroadcastMode string
	Simulate      bool
	GenerateOnly  bool
	FromAddress   sdk.AccAddress
	FromName      string
	Offline       bool
	Indent        bool
	SkipConfirm   bool

KafkaCliCtx : client tx without codec

type KafkaMsg

type KafkaMsg struct {
	Msg         sdk.Msg      `json:"msg"`
	TicketID    Ticket       `json:"ticketID"`
	BaseRequest rest.BaseReq `json:"base_req"`
	KafkaCli    KafkaCliCtx  `json:"kafkaCliCtx"`

KafkaMsg : is a store that can be stored in kafka queues

func KafkaTopicConsumer

func KafkaTopicConsumer(topic string, consumers map[string]sarama.PartitionConsumer, cdc *codec.Codec) KafkaMsg

KafkaTopicConsumer : Takes a consumer and makes it consume a topic message at a time

func NewKafkaMsgFromRest

func NewKafkaMsgFromRest(msg sdk.Msg, ticketID Ticket, baseRequest rest.BaseReq, cliCtx context.CLIContext) KafkaMsg

NewKafkaMsgFromRest : makes a msg to send to kafka queue

type KafkaState

type KafkaState struct {
	KafkaDB   *dbm.GoLevelDB
	Admin     sarama.ClusterAdmin
	Consumer  sarama.Consumer
	Consumers map[string]sarama.PartitionConsumer
	Producer  sarama.SyncProducer
	Topics    []string

KafkaState : is a struct showing the state of kafka

func NewKafkaState

func NewKafkaState(kafkaPorts []string) KafkaState

NewKafkaState : returns a kafka state

type Ticket

type Ticket string

Ticket : is a type that implements string

func TicketIDGenerator

func TicketIDGenerator(prefix string) Ticket

TicketIDGenerator is a random unique ticket ID generator, output is a string

type TicketIDResponse

type TicketIDResponse struct {
	TicketID Ticket `json:"ticketID" valid:"required~ticketID is mandatory,length(20)~ticketID length should be 20" `

TicketIDResponse : is a json structure to send TicketID to user


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL