README

package main

import (
    "xxx.com/infrastructure/kkkk/configor"
    "xxx.com/infrastructure/kkkk/broker"
)

type Config struct {
    Kafka  broker.Config
    Rocket broker.Config
}

var (
    conf Config
)

func init() {
    if err := configor.Load("./configs/conf.toml", &conf); err != nil {
        panic(err)
    }
}

func main() {
    // 生产
    producer := broker.NewSyncProducer(broker.KAFKA, conf.Kafka)
    //producer := broker.NewSyncProducer(broker.ROCKET, conf.Rocket)
    producer.Send(ctx, "A", &broker.Message{Tag: "", Key: "", Value: []byte("what'up test")})

    // 消费
    consumer := broker.NewConsumer(broker.KAFKA, conf.Kafka)
    //consumer := broker.NewConsumer(broker.ROCKET, conf.Rocket)
    consumer.Recv("A", h1)
    consumer.Recv("B", h2)
    consumer.Start()
}

func h1(ctx context.Context, event broker.Event) error {
    fmt.Println(tracing.GetTraceID(ctx), event.GetTopic(), event.GetMessage().Tag, string(event.GetMessage().Value))
    return nil
}

func h2(ctx context.Context, event broker.Event) error {
    fmt.Println(tracing.GetTraceID(ctx), event.GetTopic(), event.GetMessage().Tag, string(event.GetMessage().Value))
    return nil
}
#conf.toml
[kafka]
    endpoints = ["localhost:9092"]
    group     = "feed"
    [[kafka.topics]]
        name     = "A"
        topic    = "test"
    [[kafka.topics]]
        name     = "B"
        topic    = "hello"

#[rocket]
#   endpoints  = ["http://MQ_xxx.cn-beijing.internal.aliyuncs.com:8080"]
#   group      = "GID-xx-waitdeposit-test-local2"
#   access_key = "accessxxxx"
#   secret_key = "secretyyyy"
#   instance   = "Minstance"
#   [[rocket.topics]]
#       name     = "A"
#       topic    = "xx-testing-local"
Expand ▾ Collapse ▴

Documentation

Index

Constants

View Source
const (
	KAFKA  = 1
	ROCKET = 2
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer interface {
	Send(ctx context.Context, topicName string, msg *Message) error
	Close() error
}

AsyncProducer 异步生产接口

func NewAsyncProducer

func NewAsyncProducer(broker int, conf Config) AsyncProducer

NewAsyncProducer 创建AsyncProducer

type CallbackHandler

type CallbackHandler func(ctx context.Context, event Event) error

CallbackHandler 消费回调函数

type Config

type Config struct {
	Broker    string        `toml:"broker"`
	Endpoints []string      `toml:"endpoints"`
	AccessKey string        `toml:"access_key"`
	SecretKey string        `toml:"secret_key"`
	Instance  string        `toml:"instance"`
	Group     string        `toml:"group"`
	Topics    []TopicConfig `toml:"topics"`
}

Config 消息队列配置项

type Consumer

type Consumer interface {
	Recv(topicName string, h CallbackHandler) error
	Start() error
	Close() error
}

Consumer 消费接口

func NewConsumer

func NewConsumer(broker int, conf *Config) Consumer

NewConsumer 创建Consumer

type Event

type Event interface {
	GetTopic() string
	GetMessage() Message
}

Event 回调参数接口

type GroupHandler

type GroupHandler struct {
	TopicName       string
	CallbackHandler CallbackHandler
	ConsumerGroup   sarama.ConsumerGroup
}

GroupHandler 回调封装 sarama约定

func (*GroupHandler) Cleanup

Cleanup ...

func (*GroupHandler) ConsumeClaim

ConsumeClaim 回调函数执行

func (*GroupHandler) Setup

Setup ...

type KafkaAsyncProducer

type KafkaAsyncProducer struct {

	// contains filtered or unexported fields

}

KafkaAsyncProducer kafka异步生产者结构

func NewKafkaAsyncProducer

func NewKafkaAsyncProducer(conf Config) (*KafkaAsyncProducer, error)

NewKafkaAsyncProducer 创建KafkaAsyncProducer

func (*KafkaAsyncProducer) Close

func (p *KafkaAsyncProducer) Close() error

Close 关闭异步生产者

func (*KafkaAsyncProducer) Send

func (p *KafkaAsyncProducer) Send(ctx context.Context, name string, msg *Message) error

Send 异步发送消息

type KafkaConsumer

type KafkaConsumer struct {

	// contains filtered or unexported fields

}

KafkaConsumer kafka消费者结构

func NewKafkaConsumer

func NewKafkaConsumer(conf *Config) (*KafkaConsumer, error)

NewKafkaConsumer 创建KafkaConsumer

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close() error

Close 关闭消费

func (*KafkaConsumer) Recv

func (c *KafkaConsumer) Recv(name string, callback CallbackHandler) error

Recv 消费消息 设置回调函数

func (*KafkaConsumer) Start

func (c *KafkaConsumer) Start() error

Start 启动消费消息

type KafkaEvent

type KafkaEvent struct {
	Topic   string
	Message Message
}

KafkaEvent kafka消息事件

func (*KafkaEvent) GetMessage

func (k *KafkaEvent) GetMessage() Message

GetMessage 获取事件对应的Message

func (*KafkaEvent) GetTopic

func (k *KafkaEvent) GetTopic() string

GetTopic 获取事件对应的Topic

type KafkaSyncProducer

type KafkaSyncProducer struct {

	// contains filtered or unexported fields

}

KafkaSyncProducer kafka同步生产者结构

func NewKafkaSyncProducer

func NewKafkaSyncProducer(conf *Config) (*KafkaSyncProducer, error)

NewKafkaSyncProducer 创建KafkaSyncProducer

func (*KafkaSyncProducer) Close

func (p *KafkaSyncProducer) Close() error

Close 关闭同步生产者

func (*KafkaSyncProducer) Send

func (p *KafkaSyncProducer) Send(ctx context.Context, name string, msg *Message) error

Send 同步发送消息

type Message

type Message struct {
	Tag   string
	Key   string
	Value []byte
}

Message 消息结构

type SyncProducer

type SyncProducer interface {
	Send(ctx context.Context, topicName string, msg *Message) error
	Close() error
}

SyncProducer 同步生产接口

func NewSyncProducer

func NewSyncProducer(broker int, conf *Config) SyncProducer

NewSyncProducer 创建SyncProducer

type TopicConfig

type TopicConfig struct {
	Name  string `toml:"name"`
	Topic string `toml:"topic"`
}

TopicConfig topic配置

Source Files