gpubsub

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

README

gpubsub - A generic PubSub messaging -

Generate Topic

topic := gpubsub.NewTopic[int](topicName, concurrency, interval, ttl)

Generate Subscription

subscription := topic.NewSubscription("DummyConsumer")

Consume messages with callback

subscription.Subscribe(ctx, func (m *gpubsub.Message[int]) {
	// get the content of message which has type T
	message := m.Body()
	
	// some consumer process 
	
	if err == nil {
		// Ack if succeed
		m.Ack()
	} else {
		// Nack if failed, retry later
		m.Nack()
	}
})

Publish a message

topic.Publish(1)

Brief Example

ctx, cancel := context.WithCancel(context.Background())

topicName := "DummyData"
concurrency := int64(2)
interval := 30 * time.Second
ttl := 1 * time.Hour

topic := gpubsub.NewTopic[int](topicName, concurrency, interval, ttl)
subscription := topic.NewSubscription("DummyConsumer")

var wg sync.WaitGroup

wg.Add(1)
go func() {
  defer wg.Done()
  subscription.Subscribe(ctx, func(m gpubsub.Message[int]) {
    log.Printf("data: %d\n", m.Body())
    m.Ack()
  })
}()

for i := 0; i < 10; i++ {
  topic.Publish(i)
}
cancel()

wg.Wait()

It will show belows.


2022/04/10 13:59:26 closing subscription: DummyConsumer
2022/04/10 13:59:27 data: 0
2022/04/10 13:59:27 data: 1
2022/04/10 13:59:28 data: 4
2022/04/10 13:59:28 data: 3
2022/04/10 13:59:29 data: 5
2022/04/10 13:59:29 data: 6
2022/04/10 13:59:30 data: 7
2022/04/10 13:59:30 data: 8
2022/04/10 13:59:31 data: 9
2022/04/10 13:59:31 data: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message[T any] struct {
	// contains filtered or unexported fields
}

func (*Message[T]) Ack

func (m *Message[T]) Ack()

func (*Message[T]) Body

func (m *Message[T]) Body() T

func (*Message[T]) CreatedAt

func (m *Message[T]) CreatedAt() time.Time

func (*Message[T]) ID

func (m *Message[T]) ID() string

func (*Message[T]) LastViewedAt

func (m *Message[T]) LastViewedAt() time.Time

func (*Message[T]) Nack

func (m *Message[T]) Nack()

type Subscription

type Subscription[T any] struct {
	// contains filtered or unexported fields
}

func (*Subscription[T]) Name

func (s *Subscription[T]) Name() string

func (*Subscription[T]) Subscribe

func (s *Subscription[T]) Subscribe(ctx context.Context, consumer func(*Message[T]))

func (*Subscription[T]) Topic

func (s *Subscription[T]) Topic() *Topic[T]

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic[T any](name string, concurrency int64, interval time.Duration, ttl time.Duration) *Topic[T]

func (*Topic[T]) Name

func (t *Topic[T]) Name() string

func (*Topic[T]) NewSubscription

func (t *Topic[T]) NewSubscription(name string) *Subscription[T]

func (*Topic[T]) Publish

func (t *Topic[T]) Publish(body T)

func (*Topic[T]) Subscriptions

func (t *Topic[T]) Subscriptions() map[string]*Subscription[T]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL