subscriber

package
v1.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: MIT Imports: 22 Imported by: 8

Documentation

Index

Constants

Variables

View Source
var (
	ErrUnknownEventType = errors.New("pipeline: unknown event type")
)
View Source
var PipelineStatusNames = map[PipelineStatus]string{
	0: "Open",
	1: "Half Open",
	2: "Close",
}

Functions

This section is empty.

Types

type CollectionSnapshot added in v0.0.13

type CollectionSnapshot struct {
	// contains filtered or unexported fields
}

type DataEvent added in v0.0.40

type DataEvent struct {
	PipelineID uint64
	Sequence   uint64
	RawData    []byte
	//	Payload    *gravity_sdk_types_projection.Projection
	Payload *gravity_sdk_types_record.Record
}

type EventHandler added in v0.0.40

type EventHandler struct {
	// contains filtered or unexported fields
}

func NewEventHandler added in v0.0.40

func NewEventHandler(subscriber *Subscriber) *EventHandler

func (*EventHandler) ProcessEvent added in v0.0.40

func (eh *EventHandler) ProcessEvent(data []byte) error

type InitialLoadOptions added in v0.0.10

type InitialLoadOptions struct {
	Enabled      bool
	Mode         string
	OmittedCount uint64
}

type Message

type Message struct {
	Pipeline     *Pipeline
	Subscription Subscription
	Type         MessageType
	Payload      interface{}
	Callback     func(*Message)
}

func NewMessage added in v0.0.40

func NewMessage(pipeline *Pipeline, sub Subscription, msgType MessageType, payload interface{}) *Message

func (*Message) Ack

func (msg *Message) Ack()

type MessageHandler

type MessageHandler func(*Message)

type MessageType added in v0.0.40

type MessageType int32
const (
	MESSAGE_TYPE_EVENT MessageType = iota
	MESSAGE_TYPE_SNAPSHOT
)

type Options

type Options struct {
	Endpoint    string
	Domain      string
	Key         *keyring.KeyInfo
	WorkerCount int
	BufferSize  int
	ChunkSize   int
	Verbose     bool
	StateStore  StateStore
	InitialLoad InitialLoadOptions
}

func NewOptions

func NewOptions() *Options

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(id uint64, lastSeq uint64, opts ...PipelineOpt) *Pipeline

func (*Pipeline) Awake added in v0.0.13

func (pipeline *Pipeline) Awake() error

func (*Pipeline) Fail added in v0.0.40

func (pipeline *Pipeline) Fail()

func (*Pipeline) Initialize added in v0.0.40

func (pipeline *Pipeline) Initialize() error

func (*Pipeline) SetUpdatedSequence added in v0.0.40

func (pipeline *Pipeline) SetUpdatedSequence(updatedSeq uint64) error

func (*Pipeline) Succeed added in v0.0.40

func (pipeline *Pipeline) Succeed()

func (*Pipeline) UpdateLastSequence

func (pipeline *Pipeline) UpdateLastSequence(sequence uint64)

type PipelineOpt added in v1.0.6

type PipelineOpt func(*Pipeline)

func WithPipelineChunkSize added in v1.0.6

func WithPipelineChunkSize(size int) PipelineOpt

func WithPipelineCollections added in v1.0.6

func WithPipelineCollections(cols []string) PipelineOpt

func WithPipelineInitialLoad added in v1.0.6

func WithPipelineInitialLoad(enabled bool, mode string, omittedCount uint64) PipelineOpt

func WithPipelineRemoteLastSeq added in v1.0.6

func WithPipelineRemoteLastSeq(lastSeq uint64) PipelineOpt

func WithPipelineRequest added in v1.0.6

func WithPipelineRequest(pr PipelineRequest) PipelineOpt

func WithPipelineSnapshotLastSeq added in v1.0.6

func WithPipelineSnapshotLastSeq(lastSeq uint64) PipelineOpt

func WithPipelineSnapshotRequest added in v1.0.6

func WithPipelineSnapshotRequest(sr SnapshotRequest) PipelineOpt

func WithPipelineStateStore added in v1.0.6

func WithPipelineStateStore(ss StateStore) PipelineOpt

func WithPipelineSubscriberID added in v1.0.6

func WithPipelineSubscriberID(id string) PipelineOpt

func WithPipelineSubscription added in v1.0.6

func WithPipelineSubscription(s Subscription) PipelineOpt

type PipelineRequest added in v1.0.6

type PipelineRequest interface {
	Pull(sid string, pid uint64, startAt uint64, offset uint64, count int64) (*pipeline_pb.PullEventsReply, error)
	Suspend(sid string, pid uint64, seq uint64) (*pipeline_pb.SuspendReply, error)
	Awake(sid string, pid uint64) (*pipeline_pb.AwakeReply, error)
}

func NewPipelineRequestImpl added in v1.0.6

func NewPipelineRequestImpl(request RequestHandler) PipelineRequest

type PipelineRequestImpl added in v1.0.6

type PipelineRequestImpl struct {
	// contains filtered or unexported fields
}

func (*PipelineRequestImpl) Awake added in v1.0.6

func (pr *PipelineRequestImpl) Awake(sid string, pid uint64) (*pipeline_pb.AwakeReply, error)

func (*PipelineRequestImpl) Pull added in v1.0.6

func (pr *PipelineRequestImpl) Pull(sid string, pid uint64, startAt uint64, offset uint64, count int64) (*pipeline_pb.PullEventsReply, error)

func (*PipelineRequestImpl) Suspend added in v1.0.6

func (pr *PipelineRequestImpl) Suspend(sid string, pid uint64, seq uint64) (*pipeline_pb.SuspendReply, error)

type PipelineState

type PipelineState interface {
	GetLastSequence() uint64
	UpdateLastSequence(uint64) error
	Flush() error
}

type PipelineStateDummy added in v1.0.6

type PipelineStateDummy struct {
	// contains filtered or unexported fields
}

func (*PipelineStateDummy) Flush added in v1.0.6

func (psd *PipelineStateDummy) Flush() error

func (*PipelineStateDummy) GetLastSequence added in v1.0.6

func (psd *PipelineStateDummy) GetLastSequence() uint64

func (*PipelineStateDummy) UpdateLastSequence added in v1.0.6

func (psd *PipelineStateDummy) UpdateLastSequence(lastSeq uint64) error

type PipelineStatus added in v0.0.40

type PipelineStatus int32
const (
	PIPELINE_STATUS_OPEN PipelineStatus = iota
	PIPELINE_STATUS_HALF_OPEN
	PIPELINE_STATUS_CLOSE
)

type RequestHandler added in v1.0.6

type RequestHandler func(method string, data []byte, encrypted bool) ([]byte, error)

type Runner added in v0.0.40

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner added in v0.0.40

func NewRunner() *Runner

func (*Runner) AddPipeline added in v0.0.40

func (runner *Runner) AddPipeline(pipeline *Pipeline)

func (*Runner) Awake added in v0.0.40

func (runner *Runner) Awake(pipelineID uint64)

func (*Runner) Start added in v0.0.40

func (runner *Runner) Start()

func (*Runner) Stop added in v0.0.40

func (runner *Runner) Stop()

type Snapshot added in v0.0.10

type Snapshot struct {
	// contains filtered or unexported fields
}

func NewSnapshot added in v0.0.10

func NewSnapshot(opts ...SnapshotOpt) *Snapshot

func (*Snapshot) Close added in v0.0.13

func (snapshot *Snapshot) Close() error

func (*Snapshot) Create added in v0.0.10

func (snapshot *Snapshot) Create() error

func (*Snapshot) Pull added in v0.0.10

func (snapshot *Snapshot) Pull() (string, [][]byte, error)

func (snapshot *Snapshot) Pull() ([]*gravity_sdk_types_snapshot_record.SnapshotRecord, error) {

type SnapshotEvent added in v0.0.40

type SnapshotEvent struct {
	PipelineID uint64
	Collection string
	RawData    []byte
	Payload    *gravity_sdk_types_snapshot_record.SnapshotRecord
}

type SnapshotOpt added in v1.0.6

type SnapshotOpt func(*Snapshot)

func WithSnapshotChunkSize added in v1.0.6

func WithSnapshotChunkSize(size int) SnapshotOpt

func WithSnapshotCollections added in v1.0.6

func WithSnapshotCollections(cols []string) SnapshotOpt

func WithSnapshotPipelineID added in v1.0.6

func WithSnapshotPipelineID(id uint64) SnapshotOpt

func WithSnapshotRequest added in v1.0.6

func WithSnapshotRequest(sr SnapshotRequest) SnapshotOpt

func WithSnapshotSubscriberID added in v1.0.6

func WithSnapshotSubscriberID(id string) SnapshotOpt

type SnapshotRequest added in v1.0.6

type SnapshotRequest interface {
	Pull(id string, sid string, pid uint64, col string, lastKey []byte, offset uint64, count int64) (*pipeline_pb.PullSnapshotReply, error)
	Create(id string, pid uint64) (*pipeline_pb.CreateSnapshotReply, error)
	Close(id string, pid uint64) (*pipeline_pb.ReleaseSnapshotReply, error)
}

func NewSnapshotRequestImpl added in v1.0.6

func NewSnapshotRequestImpl(request RequestHandler) SnapshotRequest

type SnapshotRequestImpl added in v1.0.6

type SnapshotRequestImpl struct {
	// contains filtered or unexported fields
}

func (*SnapshotRequestImpl) Close added in v1.0.6

func (*SnapshotRequestImpl) Create added in v1.0.6

func (*SnapshotRequestImpl) Pull added in v1.0.6

func (sr *SnapshotRequestImpl) Pull(id string, sid string, pid uint64, col string, lastKey []byte, offset uint64, count int64) (*pipeline_pb.PullSnapshotReply, error)

type StateStore

type StateStore interface {
	GetPipelineState(uint64) (PipelineState, error)
	GetPipelines() []uint64
}

func NewStateStoreDummy added in v1.0.6

func NewStateStoreDummy() StateStore

Dummy for testing

type StateStoreDummy added in v1.0.6

type StateStoreDummy struct {
}

func (*StateStoreDummy) GetPipelineState added in v1.0.6

func (ssd *StateStoreDummy) GetPipelineState(id uint64) (PipelineState, error)

func (*StateStoreDummy) GetPipelines added in v1.0.6

func (ssd *StateStoreDummy) GetPipelines() []uint64

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(options *Options) *Subscriber

func NewSubscriberWithClient

func NewSubscriberWithClient(client *core.Client, options *Options) *Subscriber

func (*Subscriber) AddAllPipelines

func (sub *Subscriber) AddAllPipelines() error

func (*Subscriber) AddPipeline

func (sub *Subscriber) AddPipeline(pipeline *Pipeline) error

func (*Subscriber) Connect

func (sub *Subscriber) Connect(host string, options *core.Options) error

func (*Subscriber) Disconnect

func (sub *Subscriber) Disconnect()

func (*Subscriber) GetCollectionInfo

func (sub *Subscriber) GetCollectionInfo(collection string) []string

func (*Subscriber) GetEndpoint added in v0.0.18

func (sub *Subscriber) GetEndpoint() (*core.Endpoint, error)

func (*Subscriber) GetPipeline added in v0.0.13

func (sub *Subscriber) GetPipeline(pipelineID uint64) *Pipeline

func (*Subscriber) GetPipelineCount

func (sub *Subscriber) GetPipelineCount() (uint64, error)

func (*Subscriber) Register

func (sub *Subscriber) Register(subscriberType subscriber_manager_pb.SubscriberType, component string, subscriberID string, name string) error

func (*Subscriber) ReleasePipeline added in v0.0.13

func (sub *Subscriber) ReleasePipeline(pipelineID uint64)

func (sub *Subscriber) AwakePipeline(pipelineID uint64) {

	if sub.scheduler == nil {
		return
	}

	sub.scheduler.Awake(pipelineID)
}

func (*Subscriber) SetEventHandler added in v0.0.13

func (sub *Subscriber) SetEventHandler(cb MessageHandler)

func (*Subscriber) SetSnapshotHandler added in v0.0.13

func (sub *Subscriber) SetSnapshotHandler(cb MessageHandler)

func (*Subscriber) Start added in v0.0.13

func (sub *Subscriber) Start()

func (*Subscriber) SubscribeToCollections

func (sub *Subscriber) SubscribeToCollections(colMap map[string][]string) error

func (*Subscriber) SubscribeToPipelines added in v0.0.20

func (sub *Subscriber) SubscribeToPipelines(pipelines []uint64) error

type Subscription

type Subscription interface {
	Start()
	Push(msg *Message)
	Unsubscribe() error
}

type SubscriptionImpl added in v1.0.6

type SubscriptionImpl struct {
	// contains filtered or unexported fields
}

func NewSubscriptionImpl added in v1.0.6

func NewSubscriptionImpl(opts ...SubscriptionOpt) *SubscriptionImpl

func (*SubscriptionImpl) Push added in v1.0.6

func (s *SubscriptionImpl) Push(msg *Message)

func (*SubscriptionImpl) Start added in v1.0.6

func (s *SubscriptionImpl) Start()

func (*SubscriptionImpl) Unsubscribe added in v1.0.6

func (s *SubscriptionImpl) Unsubscribe() error

type SubscriptionOpt added in v1.0.6

type SubscriptionOpt func(s *SubscriptionImpl)

func WithSubscriptionBufferSize added in v1.0.6

func WithSubscriptionBufferSize(size int) SubscriptionOpt

func WithSubscriptionEventHandler added in v1.0.6

func WithSubscriptionEventHandler(fn MessageHandler) SubscriptionOpt

func WithSubscriptionSnapshotHandler added in v1.0.6

func WithSubscriptionSnapshotHandler(fn MessageHandler) SubscriptionOpt

func WithSubscriptionWorkerCount added in v1.0.6

func WithSubscriptionWorkerCount(count int) SubscriptionOpt

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL