Documentation ¶
Index ¶
- Constants
- Variables
- func ConsumeRepoStreamLite2(ctx context.Context, con *websocket.Conn, cb LiteStreamHandleFunc) error
- func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler) error
- type DbPersistence
- func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error
- func (p *DbPersistence) Flush(ctx context.Context) error
- func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error)
- func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error)
- func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error)
- func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error)
- func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent))
- func (p *DbPersistence) Shutdown(context.Context) error
- func (p *DbPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error
- type DiskPersistOptions
- type DiskPersistence
- func (dp *DiskPersistence) Flush(ctx context.Context) error
- func (dp *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, ...) (*int64, error)
- func (dp *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent))
- func (dp *DiskPersistence) Shutdown(ctx context.Context) error
- func (dp *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error
- type ErrorFrame
- type EventHeader
- type EventManager
- func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error
- func (em *EventManager) Shutdown(ctx context.Context) error
- func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, ...) (<-chan *XRPCStreamEvent, func(), error)
- func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error
- type EventPersistence
- type InstrumentedRepoStreamCallbacks
- type LiteStreamHandleFunc
- type LogFileRef
- type MemPersister
- func (mp *MemPersister) Flush(ctx context.Context) error
- func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
- func (mp *MemPersister) Shutdown(context.Context) error
- func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error
- type Operation
- type Options
- type PersistenceBatchItem
- type RepoEventRecord
- type RepoStreamCallbacks
- type Scheduler
- type Subscriber
- type UserAction
- type XRPCStreamEvent
- type YoloPersister
- func (yp *YoloPersister) Flush(ctx context.Context) error
- func (yp *YoloPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
- func (yp *YoloPersister) Shutdown(ctx context.Context) error
- func (yp *YoloPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error
Constants ¶
View Source
const ( EvtFlagTakedown = 1 << iota EvtFlagRebased )
View Source
const ( EvtKindErrorFrame = -1 EvtKindMessage = 1 )
Variables ¶
View Source
var ( ErrPlaybackShutdown = fmt.Errorf("playback shutting down") ErrCaughtUp = fmt.Errorf("caught up") )
Functions ¶
func ConsumeRepoStreamLite2 ¶
Types ¶
type DbPersistence ¶
type DbPersistence struct {
// contains filtered or unexported fields
}
func NewDbPersistence ¶
func (*DbPersistence) AddItemToBatch ¶
func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error
func (*DbPersistence) Persist ¶
func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*DbPersistence) Playback ¶
func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*DbPersistence) RecordFromHandleChange ¶
func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error)
func (*DbPersistence) RecordFromRepoCommit ¶
func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error)
func (*DbPersistence) RecordFromRepoIdentity ¶
func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error)
func (*DbPersistence) RecordFromTombstone ¶
func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error)
func (*DbPersistence) SetEventBroadcaster ¶
func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent))
func (*DbPersistence) TakeDownRepo ¶
type DiskPersistOptions ¶
type DiskPersistOptions struct { UIDCacheSize int DIDCacheSize int EventsPerFile int64 WriteBufferSize int Retention time.Duration }
func DefaultDiskPersistOptions ¶
func DefaultDiskPersistOptions() *DiskPersistOptions
type DiskPersistence ¶
type DiskPersistence struct {
// contains filtered or unexported fields
}
func NewDiskPersistence ¶
func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPersistOptions) (*DiskPersistence, error)
func (*DiskPersistence) Persist ¶
func (dp *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*DiskPersistence) Playback ¶
func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*DiskPersistence) PlaybackLogfiles ¶
func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error)
func (*DiskPersistence) SetEventBroadcaster ¶
func (dp *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent))
func (*DiskPersistence) TakeDownRepo ¶
type ErrorFrame ¶
func (*ErrorFrame) MarshalCBOR ¶
func (t *ErrorFrame) MarshalCBOR(w io.Writer) error
func (*ErrorFrame) UnmarshalCBOR ¶
func (t *ErrorFrame) UnmarshalCBOR(r io.Reader) (err error)
type EventHeader ¶
func (*EventHeader) MarshalCBOR ¶
func (t *EventHeader) MarshalCBOR(w io.Writer) error
func (*EventHeader) UnmarshalCBOR ¶
func (t *EventHeader) UnmarshalCBOR(r io.Reader) (err error)
type EventManager ¶
type EventManager struct {
// contains filtered or unexported fields
}
func NewEventManager ¶
func NewEventManager(persister EventPersistence) *EventManager
func (*EventManager) AddEvent ¶
func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error
func (*EventManager) Subscribe ¶
func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error)
func (*EventManager) TakeDownRepo ¶
type EventPersistence ¶
type EventPersistence interface { Persist(ctx context.Context, e *XRPCStreamEvent) error Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error TakeDownRepo(ctx context.Context, usr models.Uid) error Flush(context.Context) error Shutdown(context.Context) error SetEventBroadcaster(func(*XRPCStreamEvent)) }
Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels
type InstrumentedRepoStreamCallbacks ¶
type InstrumentedRepoStreamCallbacks struct { Next func(ctx context.Context, xev *XRPCStreamEvent) error // contains filtered or unexported fields }
func NewInstrumentedRepoStreamCallbacks ¶
func NewInstrumentedRepoStreamCallbacks(limiters []*slidingwindow.Limiter, next func(ctx context.Context, xev *XRPCStreamEvent) error) *InstrumentedRepoStreamCallbacks
func (*InstrumentedRepoStreamCallbacks) EventHandler ¶
func (rsc *InstrumentedRepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error
type LiteStreamHandleFunc ¶
type MemPersister ¶
type MemPersister struct {
// contains filtered or unexported fields
}
MemPersister is the most naive implementation of event persistence This EventPersistence option works fine with all event types ill do better later
func NewMemPersister ¶
func NewMemPersister() *MemPersister
func (*MemPersister) Persist ¶
func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*MemPersister) Playback ¶
func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*MemPersister) SetEventBroadcaster ¶
func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
func (*MemPersister) TakeDownRepo ¶
type Options ¶
type Options struct { MaxBatchSize int MinBatchSize int MaxTimeBetweenFlush time.Duration CheckBatchInterval time.Duration UIDCacheSize int DIDCacheSize int PlaybackBatchSize int HydrationConcurrency int }
func DefaultOptions ¶
func DefaultOptions() *Options
type PersistenceBatchItem ¶
type PersistenceBatchItem struct { Record *RepoEventRecord Event *XRPCStreamEvent }
type RepoEventRecord ¶
type RepoStreamCallbacks ¶
type RepoStreamCallbacks struct { RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error Error func(evt *ErrorFrame) error }
func (*RepoStreamCallbacks) EventHandler ¶
func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error
type Scheduler ¶
type Scheduler interface { AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error Shutdown() }
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
type XRPCStreamEvent ¶
type XRPCStreamEvent struct { Error *ErrorFrame RepoCommit *comatproto.SyncSubscribeRepos_Commit RepoHandle *comatproto.SyncSubscribeRepos_Handle RepoIdentity *comatproto.SyncSubscribeRepos_Identity RepoInfo *comatproto.SyncSubscribeRepos_Info RepoMigrate *comatproto.SyncSubscribeRepos_Migrate RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone LabelLabels *comatproto.LabelSubscribeLabels_Labels LabelInfo *comatproto.LabelSubscribeLabels_Info // some private fields for internal routing perf PrivUid models.Uid `json:"-" cborgen:"-"` PrivPdsId uint `json:"-" cborgen:"-"` PrivRelevantPds []uint `json:"-" cborgen:"-"` }
type YoloPersister ¶
type YoloPersister struct {
// contains filtered or unexported fields
}
YoloPersister is used for benchmarking, it has no persistence, it just emits events and forgets them
func NewYoloPersister ¶
func NewYoloPersister() *YoloPersister
func (*YoloPersister) Persist ¶
func (yp *YoloPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*YoloPersister) Playback ¶
func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*YoloPersister) SetEventBroadcaster ¶
func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
func (*YoloPersister) TakeDownRepo ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.