type Notifier

type Notifier struct {
	// contains filtered or unexported fields

Notifier will wake up sleeping requests when there is some new data. It does not tell requests what that data is, only the sync position which they can use to get at it. This is done to prevent races whereby we tell the caller the event, but the token has already advanced by the time they fetch it, resulting in missed events.

func NewNotifier

func NewNotifier(currPos types.StreamingToken) *Notifier

NewNotifier creates a new notifier set to the given sync position. In order for this to be of any use, the Notifier needs to be told all rooms and the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).

func (*Notifier) CurrentPosition

func (n *Notifier) CurrentPosition() types.StreamingToken

CurrentPosition returns the current sync position

func (*Notifier) GetListener

func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener

GetListener returns a UserStreamListener that can be used to wait for updates for a user. Must be closed. notify for anything before sincePos

func (*Notifier) Load

func (n *Notifier) Load(ctx context.Context, db storage.Database) error

Load the membership states required to notify users correctly.

func (*Notifier) OnNewAccountData

func (n *Notifier) OnNewAccountData(
	userID string, posUpdate types.StreamingToken,

func (*Notifier) OnNewEvent

func (n *Notifier) OnNewEvent(
	ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
	posUpdate types.StreamingToken,

OnNewEvent is called when a new event is received from the room server. Must only be called from a single goroutine, to avoid races between updates which could set the current sync position incorrectly. Chooses which user sync streams to update by a provided *gomatrixserverlib.Event (based on the users in the event's room), a roomID directly, or a list of user IDs, prioritised by parameter ordering. posUpdate contains the latest position(s) for one or more types of events. If a position in posUpdate is 0, it means no updates are available of that type. Typically a consumer supplies a posUpdate with the latest sync position for the event type it handles, leaving other fields as 0.

func (*Notifier) OnNewInvite

func (n *Notifier) OnNewInvite(
	posUpdate types.StreamingToken, wakeUserID string,

func (*Notifier) OnNewKeyChange

func (n *Notifier) OnNewKeyChange(
	posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,

func (*Notifier) OnNewPeek

func (n *Notifier) OnNewPeek(
	roomID, userID, deviceID string,
	posUpdate types.StreamingToken,

func (*Notifier) OnNewReceipt

func (n *Notifier) OnNewReceipt(
	roomID string,
	posUpdate types.StreamingToken,

OnNewReceipt updates the current position

func (*Notifier) OnNewSendToDevice

func (n *Notifier) OnNewSendToDevice(
	userID string, deviceIDs []string,
	posUpdate types.StreamingToken,

func (*Notifier) OnNewTyping

func (n *Notifier) OnNewTyping(
	roomID string,
	posUpdate types.StreamingToken,

OnNewReceipt updates the current position

func (*Notifier) OnRetirePeek

func (n *Notifier) OnRetirePeek(
	roomID, userID, deviceID string,
	posUpdate types.StreamingToken,

func (*Notifier) PeekingDevices

func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice)

Not thread-safe: must be called on the OnNewEvent goroutine only

type UserDeviceStream

type UserDeviceStream struct {
	UserID   string
	DeviceID string
	// contains filtered or unexported fields

UserDeviceStream represents a communication mechanism between the /sync request goroutine and the underlying sync server goroutines. Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() updates.

func NewUserDeviceStream

func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream

NewUserDeviceStream creates a new user stream

func (*UserDeviceStream) Broadcast

func (s *UserDeviceStream) Broadcast(pos types.StreamingToken)

Broadcast a new sync position for this user.

func (*UserDeviceStream) GetListener

GetListener returns UserStreamListener that a sync request can use to wait for new updates with. UserStreamListener must be closed

func (*UserDeviceStream) NumWaiting

func (s *UserDeviceStream) NumWaiting() uint

NumWaiting returns the number of goroutines waiting for waiting for updates. Used for metrics and testing.

func (*UserDeviceStream) TimeOfLastNonEmpty

func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time

TimeOfLastNonEmpty returns the last time that the number of waiting listeners was non-empty, may be time.Now() if number of waiting listeners is currently non-empty.

type UserDeviceStreamListener

type UserDeviceStreamListener struct {
	// contains filtered or unexported fields

UserDeviceStreamListener allows a sync request to wait for updates for a user.

func (*UserDeviceStreamListener) Close

func (s *UserDeviceStreamListener) Close()

Close cleans up resources used

func (*UserDeviceStreamListener) GetNotifyChannel

func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{}

GetNotifyChannel returns a channel that is closed when there may be an update for the user. sincePos specifies from which point we want to be notified about. If there has already been an update after sincePos we'll return a closed channel immediately.

func (*UserDeviceStreamListener) GetSyncPosition

func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken

GetSyncPosition returns last sync position which the UserStream was notified about

