Documentation ¶
Index ¶
- Variables
- func AddResponseToDB(TicketID Ticket, response []byte, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec)
- func CliCtxFromKafkaMsg(kafkaMsg KafkaMsg, cliContext context.CLIContext) context.CLIContext
- func GetResponseFromDB(TicketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec) []byte
- func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
- func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
- func NewConsumer(kafkaPorts []string) sarama.Consumer
- func NewProducer(kafkaPorts []string) sarama.SyncProducer
- func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
- func QueryDB(cdc *codec.Codec, kafkaDB *dbm.GoLevelDB) http.HandlerFunc
- func RegisterCodec(cdc *codec.Codec)
- func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
- func SetTicketIDtoDB(TicketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec, msg []byte)
- func TopicsInit(admin sarama.ClusterAdmin, topic string)
- type KafkaCliCtx
- type KafkaMsg
- type KafkaState
- type Ticket
- type TicketIDResponse
Constants ¶
This section is empty.
Variables ¶
var DefaultCLIHome = os.ExpandEnv("$HOME/.kafka")
DefaultCLIHome : is the home path
var ModuleCdc *codec.Codec
module codec
var SleepRoutine = time.Duration(2500000000)
SleepRoutine : the time the kafka messages are to be taken in
var SleepTimer = time.Duration(1000000000)
SleepTimer : the time the kafka messages are to be taken in
var TicketIDAtomicCounter int64
TicketIDAtomicCounter is a counter that adds when each time a function is called
var Topics = []string{
"Topic",
}
Topics : is list of topics
Functions ¶
func AddResponseToDB ¶
AddResponseToDB : Updates response to DB
func CliCtxFromKafkaMsg ¶
func CliCtxFromKafkaMsg(kafkaMsg KafkaMsg, cliContext context.CLIContext) context.CLIContext
CliCtxFromKafkaMsg : sets the transaction and cli contexts again to consume
func GetResponseFromDB ¶
GetResponseFromDB : gives the response from DB
func KafkaAdmin ¶
func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
KafkaAdmin : is admin to create topics
func KafkaProducerDeliverMessage ¶
func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
KafkaProducerDeliverMessage : delivers messages to kafka
func NewConsumer ¶
NewConsumer : is a consumer which is needed to create child consumers to consume topics
func NewProducer ¶
func NewProducer(kafkaPorts []string) sarama.SyncProducer
NewProducer is a producer to send messages to kafka
func PartitionConsumers ¶
func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
PartitionConsumers : is a child consumer
func SendToKafka ¶
func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
SendToKafka : handles sending message to kafka
func SetTicketIDtoDB ¶
SetTicketIDtoDB : initiates ticketID in Database
func TopicsInit ¶
func TopicsInit(admin sarama.ClusterAdmin, topic string)
TopicsInit : is needed to initialise topics
Types ¶
type KafkaCliCtx ¶
type KafkaCliCtx struct { OutputFormat string ChainID string Height int64 HomeDir string NodeURI string From string TrustNode bool UseLedger bool BroadcastMode string Simulate bool GenerateOnly bool FromAddress sdk.AccAddress FromName string Offline bool Indent bool SkipConfirm bool }
KafkaCliCtx : client tx without codec
type KafkaMsg ¶
type KafkaMsg struct { Msg sdk.Msg `json:"msg"` TicketID Ticket `json:"ticketID"` BaseRequest rest.BaseReq `json:"base_req"` KafkaCli KafkaCliCtx `json:"kafkaCliCtx"` }
KafkaMsg : is a store that can be stored in kafka queues
func KafkaTopicConsumer ¶
func KafkaTopicConsumer(topic string, consumers map[string]sarama.PartitionConsumer, cdc *codec.Codec) KafkaMsg
KafkaTopicConsumer : Takes a consumer and makes it consume a topic message at a time
func NewKafkaMsgFromRest ¶
func NewKafkaMsgFromRest(msg sdk.Msg, ticketID Ticket, baseRequest rest.BaseReq, cliCtx context.CLIContext) KafkaMsg
NewKafkaMsgFromRest : makes a msg to send to kafka queue
type KafkaState ¶
type KafkaState struct { KafkaDB *dbm.GoLevelDB Admin sarama.ClusterAdmin Consumer sarama.Consumer Consumers map[string]sarama.PartitionConsumer Producer sarama.SyncProducer Topics []string }
KafkaState : is a struct showing the state of kafka
func NewKafkaState ¶
func NewKafkaState(kafkaPorts []string) KafkaState
NewKafkaState : returns a kafka state
type Ticket ¶
type Ticket string
Ticket : is a type that implements string
func TicketIDGenerator ¶
TicketIDGenerator is a random unique ticket ID generator, output is a string
type TicketIDResponse ¶
type TicketIDResponse struct {
TicketID Ticket `json:"ticketID" valid:"required~ticketID is mandatory,length(20)~ticketID length should be 20" `
}
TicketIDResponse : is a json structure to send TicketID to user