changestream

package
v0.11.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWebsocketClosed = errors.New("websocket closed")
	ErrAlreadyStarted  = errors.New("already started")
)
View Source
var (
	ErrNoMoreClients   = errors.New("max clients reached, no more clients allowed")
	ErrDuplicateClient = errors.New("Watch called with duplicate clientId")
	ErrAlreadyRunning  = errors.New("already running")
)
View Source
var (
	ClientBufferSize = 10
	ServerBufferSize = 1000
	BacklogWait      = 100 * time.Millisecond
)
View Source
var (
	ErrStopped            = errors.New("Streamer.Stop called")
	ErrServerClosedStream = errors.New("server closed the change stream")
	ErrBufferTooSmall     = errors.New("current events buffer reached ServerBufferSize before backlog streaming done")
)

Functions

This section is empty.

Types

type MongoDBServer

type MongoDBServer struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

func NewMongoDBServer

func NewMongoDBServer(cfg ServerConfig) *MongoDBServer

func (*MongoDBServer) Close

func (s *MongoDBServer) Close(clientId string)

func (*MongoDBServer) Run

func (s *MongoDBServer) Run() error

func (*MongoDBServer) Stop

func (s *MongoDBServer) Stop()

func (*MongoDBServer) Watch

func (s *MongoDBServer) Watch(clientId string) (<-chan etre.CDCEvent, error)

type Server

type Server interface {
	Run() error
	Stop()
	Watch(clientId string) (<-chan etre.CDCEvent, error)
	Close(clientId string)
}

type ServerConfig

type ServerConfig struct {
	CDCCollection *mongo.Collection
	MaxClients    uint
	BufferSize    uint
}

type ServerStream

type ServerStream struct {
	// contains filtered or unexported fields
}

func NewServerStream

func NewServerStream(clientId string, server Server, store cdc.Store) *ServerStream

newStreamer creates a streamer that uses the provided Poller and Store to stream events. feedId is the id of the feed that created it.

func (*ServerStream) Error

func (s *ServerStream) Error() error

func (*ServerStream) InSync

func (s *ServerStream) InSync() chan struct{}

func (*ServerStream) Start

func (s *ServerStream) Start(sinceTs int64) <-chan etre.CDCEvent

func (*ServerStream) Status

func (s *ServerStream) Status() Status

func (*ServerStream) Stop

func (s *ServerStream) Stop()

type ServerStreamFactory

type ServerStreamFactory struct {
	Server Server
	Store  cdc.Store
}

func (ServerStreamFactory) Make

func (f ServerStreamFactory) Make(clientId string) Streamer

type Status

type Status struct {
	ClientId           string
	Running            bool
	InSync             bool
	SyncMethod         string
	Since              time.Time
	BacklogState       string
	BufferUsage        []int // [max, in, out]
	ServerClosedStream bool
}

type Streamer

type Streamer interface {
	// Start starts streaming events from a given timestamp. It returns a
	// channel on which the caller can receive all of the events it streams
	// for as long as the streamer is running. A streamer runs until Stop
	// is called or until it encounters an error.
	Start(sinceTs int64) <-chan etre.CDCEvent

	InSync() chan struct{}

	Status() Status

	// Stop stops the streamer and closes the event channel returned by Start.
	Stop()

	// Error returns the error that caused the streamer to stop. Start resets
	// the error.
	Error() error
}

A Streamer produces a stream of CDC events on a channel. It can start streaming events from any point in the past, continually making its way closer and closer to present events. Once it catches up to new events that are being created in real-time, it seamlessly transitions to streaming those.

Streamers are used by feeds to produce the events that a feed sends to its client. A feed can only have one streamer.

type StreamerFactory

type StreamerFactory interface {
	Make(clientId string) Streamer
}

type WebsocketClient

type WebsocketClient struct {

	// --
	*sync.Mutex // guards function calls
	// contains filtered or unexported fields
}

func NewWebsocketClient

func NewWebsocketClient(clientId string, wsConn *websocket.Conn, stream Streamer) *WebsocketClient

func (*WebsocketClient) Ping

func (f *WebsocketClient) Ping(timeout time.Duration) etre.Latency

func (*WebsocketClient) Run

func (f *WebsocketClient) Run() error

func (*WebsocketClient) Stop

func (f *WebsocketClient) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL