streaming

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Acker

type Acker interface {
	XAck(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
}

Acker is the interface used by events to acknowledge themselves.

type Event

type Event struct {
	// ID is the unique event ID.
	ID string
	// StreamName is the name of the stream the event belongs to.
	StreamName string
	// SinkName is the name of the sink the event belongs to.
	SinkName string
	// EventName is the producer-defined event name.
	EventName string
	// Topic is the producer-defined event topic if any, empty string if none.
	Topic string
	// Payload is the event payload.
	Payload []byte
	// Acker is the redis client used to acknowledge events.
	Acker Acker
	// contains filtered or unexported fields
}

Event is a stream event.

func (*Event) CreatedAt

func (e *Event) CreatedAt() time.Time

CreatedAt returns the event creation time (millisecond precision).

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader represents a stream reader.

func (*Reader) AddStream

func (r *Reader) AddStream(ctx context.Context, stream *Stream, opts ...options.AddStream) error

AddStream adds the stream to the sink. By default the stream cursor starts at the same timestamp as the sink main stream cursor. This can be overridden with opts. AddStream does nothing if the stream is already part of the sink.

func (*Reader) Close

func (r *Reader) Close()

Close stops event polling and closes the reader channel. It is safe to call Close multiple times.

func (*Reader) IsClosed

func (r *Reader) IsClosed() bool

IsClosed returns true if the reader is stopped.

func (*Reader) RemoveStream

func (r *Reader) RemoveStream(ctx context.Context, stream *Stream) error

RemoveStream removes the stream from the sink, it is idempotent.

func (*Reader) Subscribe

func (r *Reader) Subscribe() <-chan *Event

Subscribe returns a channel that receives events from the stream. The channel is closed when the reader is closed.

func (*Reader) Unsubscribe

func (r *Reader) Unsubscribe(c <-chan *Event)

Unsubscribe removes the channel from the reader subscribers and closes it.

type Sink

type Sink struct {
	// Name is the sink name.
	Name string
	// contains filtered or unexported fields
}

Sink represents a stream sink.

func (*Sink) Ack

func (s *Sink) Ack(ctx context.Context, e *Event) error

Ack acknowledges the event.

func (*Sink) AddStream

func (s *Sink) AddStream(ctx context.Context, stream *Stream, opts ...options.AddStream) error

AddStream adds the stream to the sink. By default the stream cursor starts at the same timestamp as the sink main stream cursor. This can be overridden with opts. AddStream does nothing if the stream is already part of the sink.

func (*Sink) Close

func (s *Sink) Close(ctx context.Context)

Close stops event polling, waits for all events to be processed, and closes the sink channel. It is safe to call Close multiple times.

func (*Sink) IsClosed

func (s *Sink) IsClosed() bool

IsClosed returns true if the sink was closed.

func (*Sink) RemoveStream

func (s *Sink) RemoveStream(ctx context.Context, stream *Stream) error

RemoveStream removes the stream from the sink, it is idempotent.

func (*Sink) Subscribe

func (s *Sink) Subscribe() <-chan *Event

Subscribe returns a channel that receives events from the sink.

func (*Sink) Unsubscribe

func (s *Sink) Unsubscribe(c <-chan *Event)

Unsubscribe removes the channel from the sink and closes it.

type Stream

type Stream struct {
	// Name of the stream.
	Name string
	// MaxLen is the maximum number of events in the stream.
	MaxLen int
	// contains filtered or unexported fields
}

Stream encapsulates a stream of events. Events published to a stream can optionally be associated with a topic. Stream consumers can subscribe to a stream and optionally provide a topic matching criteria. Consumers can be created within a group. Each consumer group receives a unique copy of the stream events.

func NewStream

func NewStream(name string, rdb *redis.Client, opts ...options.Stream) (*Stream, error)

NewStream returns the stream with the given name. All stream instances with the same name share the same events.

func (*Stream) Add

func (s *Stream) Add(ctx context.Context, name string, payload []byte, opts ...options.AddEvent) (string, error)

Add appends an event to the stream and returns its ID. If the option WithOnlyIfStreamExists is used and the stream does not exist then no event is added and the empty string is returned. The stream is created if the option is omitted or when NewSink is called.

func (*Stream) Destroy

func (s *Stream) Destroy(ctx context.Context) error

Destroy deletes the entire stream and all its messages.

func (*Stream) NewReader

func (s *Stream) NewReader(ctx context.Context, opts ...options.Reader) (*Reader, error)

NewReader creates a new stream reader. All reader instances get all the events in the stream. Events are read starting:

  • from the last event by default
  • from the oldest event stored in the stream if the WithReaderStartAtOldest option is used
  • after the event with the ID provided via WithReaderLastEventID if the event is still in the stream, oldest event otherwise
  • from the event added on or after the timestamp provided via WithReaderStartAt if still in the stream, oldest event otherwise

func (*Stream) NewSink

func (s *Stream) NewSink(ctx context.Context, name string, opts ...options.Sink) (*Sink, error)

NewSink creates a new stream sink with the given name. All sink instances with the same name share the same stream cursor. Events read through a sink are not removed from the stream until they are acked by the client unless the WithNoAck option is used. Events are read starting:

  • from the last event by default
  • from the oldest event stored in the stream if the WithSinkStartAtOldest option is used
  • after the event with the ID provided via WithSinkLastEventID if the event is still in the stream, oldest event otherwise
  • from the event added on or after the timestamp provided via WithSinkStartAt if still in the stream, oldest event otherwise

func (*Stream) Remove

func (s *Stream) Remove(ctx context.Context, ids ...string) error

Remove removes the events with the given IDs from the stream. Note: clients should not need to call this method in normal operation, instead they should use the Ack method to acknowledge events.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL