pulse

module
v0.0.0-...-f80b70b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2022 License: MIT

README

pulse

Go Report Card visitor badges TODOs

Follow on Twitter Discord Banner

an eventbus made on portable MQ.

Usage

Check the example/pulse and find an example for supported MQ. Note: You need run your MQ first and get its address.

Use MQ as broker

example for natsstreaming.

meta := protocol.NewMetadata()
meta.SetDriver(nats.DriverName)
meta.Properties[nats.NatsURL] = "nats://localhost:4222"
meta.Properties[nats.NatsStreamingClusterID] = "test-cluster"
meta.Properties[nats.SubscriptionType] = "topic"
meta.Properties[nats.ConsumerID] = "app-test-a"

Publisher

Publisher is asynchronously and could get result about the success or failure to send the event.

t, err := topic.NewTopic(meta, topic.WithMiddlewares(visitor.WithRetry(3)))
if err != nil {
    log.Error(err)
    return
}

res := t.Publish(context.Background(), protocol.NewMessage("test", "", []byte("hello")))
go func() {
    if _, err := res.Get(context.Background()); err != nil {
    	log.Error(err)
    }
}()
Subscribe

Receive is a synchronous function and blocks until have an err set by like ctx.Done() or other error.

s, err := subscription.NewSubscription("hello", meta, subscription.WithCount())
if err != nil {
    log.Error(err)
    return
}

err = s.Receive(context.Background(), protocol.NewSubscribeRequest("test", meta), func(ctx context.Context, m *protocol.Message) {
    log.Debug("receive message ", m)
})

feature

  • Idempotence: Simply record in the map of each Subscription to avoid repeated processing by a single consumer. Nats can provide queueSubscribe

  • Orderliness: Messages use OrderingKey to ensure orderly delivery. If an error occurs, the sending of a certain OrderingKey will be suspended, and the empty key will not be suspended

  • Concurrent processing: Both topic and Subscription use concurrent processing. Pay attention to whether the middleware has critical resources

  • Reliability: ack is implemented independently to ensure that the message is delivered at least once. In future: support QoS 0,1,2 three level.

  • Asynchronous: Message send asynchronously and could be buffered and delay send.

  • Batch Handle: Scheduler could buffer message and batch handle them if underlying MQ supports.

Architecture

Concepts

Driver

Driver is the realization of various protocol like Nats, http etc.

Message Format

Message is the object transport from the pulse endpoint. It's format as CloudEvents.

            {
              "specversion": "1.x-wip",
              "type": "coolevent",
              "id": "xxxx-xxxx-xxxx",
              "source": "bigco.com",
              "data": { ... }
            }

Reference

Hall of fame
Repos
- nuid: now use nats's package nuid to generated uuid

- CloudEvent: a CNCF project

- Cloud State: sidecar project to move state out of application

- GoogleCloud Pub sub : use to get a new driver

- Dapr: sidecar Synthesizer

- Saga: pulse usage.

- Kong: siprit on extension.

- Kafka: log

- axon: event source DDD CQRS

- webhook: to establish a webhook to receive the response asynchronously

Directories

Path Synopsis
pkg
logger
logger use dapr logger https://github.com/dapr/dapr
logger use dapr logger https://github.com/dapr/dapr
protocol/adapter/proto
Package proto defines the protobuf codec.
Package proto defines the protobuf codec.
protocol/ratelimit
todo: add ratelimit from go-kit
todo: add ratelimit from go-kit
topic
Publish logic select the code form Google Code with MIT.
Publish logic select the code form Google Code with MIT.
sample
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL