topic

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 5 Imported by: 4

README

Topic

Actions Status PkgGoDev

Package topic is an in-process pubsub system where new values can be pulled instead of being pushed.

The idea of pulling updates is inspired by Kafka or Redis-Streams. A subscriber does not have to register or unsubscribe to a topic and can take as long as it needs to process the messages.

How to use topic

A topic can be created with: top := topic.New[string]().

To publish one or more values, use: top.Publish("info1", "info2").

To receive values for the first time use: id, values, err := top.Receive(ctx, 0). The first value is a numeric id, it is needed for the next call of top.Receive(). The second argument is a list of all values that were published by this topic.

To receive newer values, use id, values, err = top.Receive(ctx, id). It returns all values that were published after the given id.

A topic is safe for concurrent use.

Run tests

Contributions are welcome. Please make sure that the tests are running with:

go test

Who is using topic

Topic was built for OpenSlides and is used in production for the Autoupdate-Service for many years without any problems.

Documentation

Overview

Package topic is an in-memory pubsub system where new values are pulled instead of being pushed.

It solves the problem, that you want to publish data to many goroutines. The standard way in go uses channels to push values to the readers. But channels have the problems, that either the goroutine sending the data has to wait for the reader or has to discard messages, if the reader is too slow. A buffered channel can help to delay the problem, but eventually the buffer could be full.

The idea of pulling updates is inspired by Kafka or Redis-Streams. A subscriber does not have to register or unsubscribe to a topic and can take as much time as it needs to process the messages. Therefore, the system is less error prone.

In a pulling messaging system, the publisher does not push the values, but the receivers have to pull them. The publisher can save values without waiting on slow receivers. A receiver has all the time it needs to process messages and can pull again as soon as the work is done.

Another benefit of this pattern is, that a receiver does not have to register on the pubsub system. Since the publisher does not send the messages, it does not have to know how many receivers there are. Therefore there are no register or unregister methods in this package.

Create new topic

To create a new topic use the topic.New() constructor:

top := topic.New[string]()

Publish messages

Messages can be published with the Publish()-method:

top.Publish("some value")

More than one message can be published at once:

top.Publish("some value", "other value")

Internally, the topic creates a new id that can be used to receive newer values. The Publish()-method returns this id. In most cases, the returned id can be ignored.

Receive messages

Messages can be received with the ReceiveAll()- or ReceiveSince()-method:

id, values := topic.ReceiveAll()
id, values, err := top.ReceiveSince(context.Background(), 42)

The returned id is the number of values in the topic. It can only increase.

The returned values are a slice of the published messages.

To receive newer values, ReceiveSince() can be called again with the id from the last call:

id, values, err := top.ReceiveAll()
...
id, values, err = top.ReceiveSince(context.Background(), id)

Only messages, that were published after the given id are returned.

When there are no new values in the topic, then the ReceiveSince()-call blocks until there are new values. To add a timeout to the call, the context can be used:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, values, err = top.ReceiveSince(ctx, id)

If there are no new values before the context is canceled, the topic returns the error of the context. For example `context.DeadlineExceeded` or `context.Canceled`.

The usual pattern to subscribe to a topic is:

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

var id uint64
var values []string
var err error
for {
    id, values, err = top.ReceiveSince(ctx, id)
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
            // Timeout
            break
        }
        // Handle other errors
    }
    // Process values
}

The loop will process all values published to the topic for one minute.

Get Last ID

The example above will process all messages in the topic. If only messages should be processed that were published after the loop starts, the method LastID() can be used:

id := top.LastID()
id, values, err = top.ReceiveSince(context.Background(), id)

The return value of LastID() is the highest id in the topic. So a ReceiveSince() call on top.LastID() will only return data that was published after the call.

A pattern to receive only new data is:

id := top.LastID()
var values []string
var err error
for {
    id, values, err = top.ReceiveSince(context.Background(), id)
    if err != nil {
        // Handle error
    }
    // Process values
}

Prune old values

For this pattern to work, the topic has to save all values that were ever published. To free some memory, old values can be deleted from time to time. This can be accomplished with the Prune() method:

top.Prune(time.Now().Add(-10*time.Minute))

This call will remove all values in the topic that are older than ten minutes.

Make sure that all receivers have read the values before they are pruned.

If a Receive()-call tries to receive pruned values, it will return with the error `topic.UnknownIDError`.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

Topic is a datastructure that holds a list of values. To add a value to a topic is called publishing that value. Each time, a list of values is published, a new id is created. It is possible to receive all values from a topic at once or the values that were published after a specific id.

A Topic has to be created with `topic.New()`. For example topic.New[string]().

A Topic is safe for concurrent use.

Example
package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/ostcar/topic"
)

func main() {
	ctx, shutdown := context.WithCancel(context.Background())
	top := topic.New[string]()

	// Write the messages v1, v2 and v3 in a different goroutine.
	go func() {
		top.Publish("v1")
		top.Publish("v2", "v3")
		shutdown()
	}()

	// Receive the two messages and print all values.
	var id uint64
	var values []string
	var err error
	for {
		id, values, err = top.ReceiveSince(ctx, id)
		if err != nil {
			if errors.Is(err, context.Canceled) {
				// shutdown was called.
				return
			}

			// Handle Error:
			fmt.Printf("Receive() returned an unexpected error: %v", err)
			return
		}

		// Process values:
		for _, v := range values {
			fmt.Println(v)
		}
	}

}
Output:
v1
v2
v3

func New

func New[T any]() *Topic[T]

New creates a new topic.

func (*Topic[T]) LastID

func (t *Topic[T]) LastID() uint64

LastID returns the last id of the topic. Returns 0 for an empty topic.

func (*Topic[T]) Prune

func (t *Topic[T]) Prune(until time.Time)

Prune removes entries from the topic that are older than the given time.

func (*Topic[T]) Publish added in v0.2.0

func (t *Topic[T]) Publish(value ...T) uint64

Publish adds one or many values to a topic. It returns the new id. All waiting Receive()-calls are awakened.

func (*Topic[T]) ReceiveAll added in v0.6.0

func (t *Topic[T]) ReceiveAll() (uint64, []T)

ReceiveAll returns all values from the topic, that is not pruned.

For performance reasons, this function returns the internal slice of the topic. It is not allowed to manipulate the values.

func (*Topic[T]) ReceiveSince added in v0.6.0

func (t *Topic[T]) ReceiveSince(ctx context.Context, id uint64) (uint64, []T, error)

ReceiveSince returns all values from the topic after the given id.

If the id is lower than the lowest id in the topic, an error of type UnknownIDError is returned.

If there is no new data, Receive() blocks until there is new data or the given channel is done.

For performance reasons, this function returns the internal slice of the topic. It is not allowed to manipulate the values.

type UnknownIDError

type UnknownIDError struct {
	// FirstID is the lowest id in the topic.
	FirstID uint64

	// ID is the id that was requested.
	ID uint64
}

UnknownIDError is returned by Receive() when a topic id is requested, that is lower then the lowest id in the topic.

func (UnknownIDError) Error

func (e UnknownIDError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL