streams

package
v0.0.0-...-49f0b68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8

The maximum number of tasks that can be queued in total before backpressure will build up and the rests will start to block.

View Source
const PDU_STREAM_WORKERS = 256

The max number of per-room goroutines to have running. Too high and this will consume lots of CPU, too low and complete sync responses will take longer to process.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccountDataStreamProvider

type AccountDataStreamProvider struct {
	StreamProvider
	// contains filtered or unexported fields
}

func (*AccountDataStreamProvider) CompleteSync

func (*AccountDataStreamProvider) IncrementalSync

func (*AccountDataStreamProvider) Setup

func (p *AccountDataStreamProvider) Setup()

type DeviceListStreamProvider

type DeviceListStreamProvider struct {
	StreamProvider
	// contains filtered or unexported fields
}

func (*DeviceListStreamProvider) CompleteSync

func (*DeviceListStreamProvider) IncrementalSync

type InviteStreamProvider

type InviteStreamProvider struct {
	StreamProvider
}

func (*InviteStreamProvider) CompleteSync

func (*InviteStreamProvider) IncrementalSync

func (p *InviteStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) types.StreamPosition

func (*InviteStreamProvider) Setup

func (p *InviteStreamProvider) Setup()

type NotificationDataStreamProvider

type NotificationDataStreamProvider struct {
	StreamProvider
}

func (*NotificationDataStreamProvider) CompleteSync

func (*NotificationDataStreamProvider) IncrementalSync

func (*NotificationDataStreamProvider) Setup

func (p *NotificationDataStreamProvider) Setup()

type PDUStreamProvider

type PDUStreamProvider struct {
	StreamProvider
	// contains filtered or unexported fields
}

func (*PDUStreamProvider) CompleteSync

func (p *PDUStreamProvider) CompleteSync(
	ctx context.Context,
	req *types.SyncRequest,
) types.StreamPosition

func (*PDUStreamProvider) IncrementalSync

func (p *PDUStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) (newPos types.StreamPosition)

func (*PDUStreamProvider) Setup

func (p *PDUStreamProvider) Setup()

type PresenceStreamProvider

type PresenceStreamProvider struct {
	StreamProvider
	// contains filtered or unexported fields
}

func (*PresenceStreamProvider) CompleteSync

func (*PresenceStreamProvider) IncrementalSync

func (p *PresenceStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) types.StreamPosition

func (*PresenceStreamProvider) Setup

func (p *PresenceStreamProvider) Setup()

type ReceiptMRead

type ReceiptMRead struct {
	User map[string]ReceiptTS `json:"m.read"`
}

type ReceiptStreamProvider

type ReceiptStreamProvider struct {
	StreamProvider
}

func (*ReceiptStreamProvider) CompleteSync

func (*ReceiptStreamProvider) IncrementalSync

func (p *ReceiptStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) types.StreamPosition

func (*ReceiptStreamProvider) Setup

func (p *ReceiptStreamProvider) Setup()

type ReceiptTS

type ReceiptTS struct {
	TS gomatrixserverlib.Timestamp `json:"ts"`
}

type SendToDeviceStreamProvider

type SendToDeviceStreamProvider struct {
	StreamProvider
}

func (*SendToDeviceStreamProvider) CompleteSync

func (*SendToDeviceStreamProvider) IncrementalSync

func (*SendToDeviceStreamProvider) Setup

func (p *SendToDeviceStreamProvider) Setup()

type StreamProvider

type StreamProvider struct {
	DB storage.Database
	// contains filtered or unexported fields
}

func (*StreamProvider) Advance

func (p *StreamProvider) Advance(
	latest types.StreamPosition,
)

func (*StreamProvider) LatestPosition

func (p *StreamProvider) LatestPosition(
	ctx context.Context,
) types.StreamPosition

func (*StreamProvider) Setup

func (p *StreamProvider) Setup()

type Streams

type Streams struct {
	PDUStreamProvider              types.StreamProvider
	TypingStreamProvider           types.StreamProvider
	ReceiptStreamProvider          types.StreamProvider
	InviteStreamProvider           types.StreamProvider
	SendToDeviceStreamProvider     types.StreamProvider
	AccountDataStreamProvider      types.StreamProvider
	DeviceListStreamProvider       types.StreamProvider
	NotificationDataStreamProvider types.StreamProvider
	PresenceStreamProvider         types.StreamProvider
}

func NewSyncStreamProviders

func NewSyncStreamProviders(
	d storage.Database, userAPI userapi.SyncUserAPI,
	rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI,
	eduCache *caching.EDUCache, lazyLoadCache caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams

func (*Streams) Latest

func (s *Streams) Latest(ctx context.Context) types.StreamingToken

type TypingStreamProvider

type TypingStreamProvider struct {
	StreamProvider
	EDUCache *caching.EDUCache
}

func (*TypingStreamProvider) CompleteSync

func (*TypingStreamProvider) IncrementalSync

func (p *TypingStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) types.StreamPosition

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL