stream

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2024 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MetricsMiddleware

func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc

MetricsMiddleware defines handler function for metrics middleware

Types

type Cursor

type Cursor struct {
	gorm.Model
	LastSeq int64
}

type Event

type Event struct {
	CreatedAt time.Time `gorm:"index"`
	UpdatedAt time.Time
	DeletedAt gorm.DeletedAt

	FirehoseSeq int64  `gorm:"primarykey;idx_events_repo_seq,priority:2,order:desc"`
	Repo        string `gorm:"index;index:idx_events_repo_seq,priority:1"`
	EventType   string `gorm:"index"`
	Error       string
	Time        int64
	Since       *string
}

type EventsQuery

type EventsQuery struct {
	DID       *syntax.DID
	EventType *string
	Seq       *int64
	Limit     int
}

type EventsResponse

type EventsResponse struct {
	Events []JSONEvent `json:"events"`
	Error  string      `json:"error,omitempty"`
}

type IdentitiesQuery

type IdentitiesQuery struct {
	DID    *syntax.DID
	Handle *syntax.Handle
	PDS    *string
	Limit  int
}

type IdentitiesResponse

type IdentitiesResponse struct {
	Identities []JSONIdentity `json:"identities"`
	Error      string         `json:"error,omitempty"`
}

type Identity

type Identity struct {
	CreatedAt time.Time `gorm:"index"`
	UpdatedAt time.Time
	DeletedAt gorm.DeletedAt

	DID    string `gorm:"primarykey"`
	Handle string `gorm:"index"`
	PDS    string `gorm:"index"`
}

type JSONEvent

type JSONEvent struct {
	FirehoseSeq int64   `json:"seq"`
	Repo        string  `json:"repo"`
	EventType   string  `json:"event_type"`
	Error       string  `json:"error,omitempty"`
	Time        int64   `json:"time"`
	Since       *string `json:"since"`
}

type JSONIdentity

type JSONIdentity struct {
	DID       string    `json:"did"`
	Handle    string    `json:"handle"`
	PDS       string    `json:"pds"`
	UpdatedAt time.Time `json:"updated_at"`
}

type JSONRecord

type JSONRecord struct {
	FirehoseSeq int64                  `json:"seq"`
	Repo        string                 `json:"repo"`
	Handle      string                 `json:"handle"`
	PDS         string                 `json:"pds"`
	Collection  string                 `json:"collection"`
	RKey        string                 `json:"rkey"`
	Action      string                 `json:"action"`
	Raw         map[string]interface{} `json:"raw,omitempty"`
}

type Record

type Record struct {
	ID        uint      `gorm:"primarykey;index:idx_records_repo_id,priority:2,order:desc"`
	CreatedAt time.Time `gorm:"index"`
	UpdatedAt time.Time
	DeletedAt gorm.DeletedAt

	FirehoseSeq int64  `gorm:"index"`
	Repo        string `gorm:"index:idx_path;index:idx_records_repo_id,priority:1"`
	Collection  string `gorm:"index:idx_path"`
	RKey        string `gorm:"index:idx_path"`
	Action      string
	Raw         []byte // Raw JSON data
}

type RecordsQuery

type RecordsQuery struct {
	DID        *syntax.DID
	Collection *syntax.NSID
	Rkey       *syntax.RecordKey
	Seq        *int64
	Limit      int
}

type RecordsResponse

type RecordsResponse struct {
	Records []JSONRecord `json:"records"`
	Error   string       `json:"error,omitempty"`
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(
	logger *slog.Logger,
	socketURL string,
	sqlitePath string,
	migrate bool,
	ttl time.Duration,
	bq *bq.BQ,
) (*Stream, error)

func (*Stream) Error

func (s *Stream) Error(err *events.ErrorFrame) error

func (*Stream) GetSeq

func (s *Stream) GetSeq() int64

func (*Stream) HandleGetEvents

func (s *Stream) HandleGetEvents(c echo.Context) error

HandleGetEvents handles the GET /events endpoint

func (*Stream) HandleGetIdentities

func (s *Stream) HandleGetIdentities(c echo.Context) error

func (*Stream) HandleGetRecords

func (s *Stream) HandleGetRecords(c echo.Context) error

HandleGetRecords handles the GET /records endpoint

func (*Stream) LabelInfo

func (s *Stream) LabelInfo(info *atproto.LabelSubscribeLabels_Info) error

func (*Stream) LabelLabels

func (s *Stream) LabelLabels(label *atproto.LabelSubscribeLabels_Labels) error

func (*Stream) RepoCommit

func (s *Stream) RepoCommit(evt *atproto.SyncSubscribeRepos_Commit) error

func (*Stream) RepoHandle

func (s *Stream) RepoHandle(handle *atproto.SyncSubscribeRepos_Handle) error

func (*Stream) RepoIdentity

func (s *Stream) RepoIdentity(id *atproto.SyncSubscribeRepos_Identity) error

func (*Stream) RepoInfo

func (s *Stream) RepoInfo(info *atproto.SyncSubscribeRepos_Info) error

func (*Stream) RepoMigrate

func (s *Stream) RepoMigrate(migrate *atproto.SyncSubscribeRepos_Migrate) error

func (*Stream) RepoTombstone

func (s *Stream) RepoTombstone(tomb *atproto.SyncSubscribeRepos_Tombstone) error

func (*Stream) SetSeq

func (s *Stream) SetSeq(seq int64)

func (*Stream) Start

func (s *Stream) Start(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL