consumer

package
v0.0.0-...-ec62d97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	EvtCreateRecord = "c"
	EvtUpdateRecord = "u"
	EvtDeleteRecord = "d"
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	SocketURL string
	Progress  *Progress
	Emit      func(context.Context, Event) error
}

Consumer is the consumer of the firehose

func NewConsumer

func NewConsumer(
	ctx context.Context,
	socketURL string,
	progPath string,
	emit func(context.Context, Event) error,
) (*Consumer, error)

NewConsumer creates a new consumer

func (*Consumer) HandleRepoCommit

func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error

HandleRepoCommit handles a repo commit event from the firehose and processes the records

func (*Consumer) HandleStreamEvent

func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error

HandleStreamEvent handles a stream event from the firehose

func (*Consumer) ReadCursor

func (c *Consumer) ReadCursor(ctx context.Context) error

ReadCursor reads the cursor from file

func (*Consumer) WriteCursor

func (c *Consumer) WriteCursor(ctx context.Context) error

WriteCursor writes the cursor to file

type Event

type Event struct {
	Did        string `json:"did"`
	Seq        int64  `json:"seq"`
	OpType     string `json:"opType"`
	Collection string `json:"collection,omitempty"`
	RKey       string `json:"rkey,omitempty"`

	Record any `json:"record,omitempty"`
}

type Progress

type Progress struct {
	LastSeq            int64     `json:"last_seq"`
	LastSeqProcessedAt time.Time `json:"last_seq_processed_at"`
	// contains filtered or unexported fields
}

Progress is the cursor for the consumer

func (*Progress) Get

func (p *Progress) Get() (int64, time.Time)

func (*Progress) Update

func (p *Progress) Update(seq int64, processedAt time.Time)

type URI

type URI struct {
	Did        string
	RKey       string
	Collection string
}

func GetURI

func GetURI(uri string) (*URI, error)

URI: at://{did}/{namespace}/{rkey}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL