cherami

package
v2.6.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2017 License: MIT Imports: 28 Imported by: 22

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrMessageTimedout = errors.New("Timed out.")

ErrMessageTimedout is returned by Publish when no ack is received within timeout interval

Functions

This section is empty.

Types

type AuthProvider added in v1.20.0

type AuthProvider interface {
	// CreateSecurityContext creates security context
	CreateSecurityContext(ctx thrift.Context) (thrift.Context, error)
}

AuthProvider provides authentication information in client side

type BypassAuthProvider added in v1.20.0

type BypassAuthProvider struct{}

BypassAuthProvider is a dummy implementation for AuthProvider

func NewBypassAuthProvider added in v1.20.0

func NewBypassAuthProvider() *BypassAuthProvider

NewBypassAuthProvider creates a dummy AuthProvider instance

func (*BypassAuthProvider) CreateSecurityContext added in v1.20.0

func (t *BypassAuthProvider) CreateSecurityContext(ctx thrift.Context) (thrift.Context, error)

CreateSecurityContext creates security context

type Client

type Client interface {
	Close()
	CreateConsumerGroup(request *cherami.CreateConsumerGroupRequest) (*cherami.ConsumerGroupDescription, error)
	CreateDestination(request *cherami.CreateDestinationRequest) (*cherami.DestinationDescription, error)
	CreateConsumer(request *CreateConsumerRequest) Consumer
	CreatePublisher(request *CreatePublisherRequest) Publisher
	DeleteConsumerGroup(request *cherami.DeleteConsumerGroupRequest) error
	DeleteDestination(request *cherami.DeleteDestinationRequest) error
	ListConsumerGroups(request *cherami.ListConsumerGroupRequest) (*cherami.ListConsumerGroupResult_, error)
	ListDestinations(request *cherami.ListDestinationsRequest) (*cherami.ListDestinationsResult_, error)
	ReadConsumerGroup(request *cherami.ReadConsumerGroupRequest) (*cherami.ConsumerGroupDescription, error)
	ReadDestination(request *cherami.ReadDestinationRequest) (*cherami.DestinationDescription, error)
	UpdateConsumerGroup(request *cherami.UpdateConsumerGroupRequest) (*cherami.ConsumerGroupDescription, error)
	UpdateDestination(request *cherami.UpdateDestinationRequest) (*cherami.DestinationDescription, error)
	GetQueueDepthInfo(request *cherami.GetQueueDepthInfoRequest) (*cherami.GetQueueDepthInfoResult_, error)
	MergeDLQForConsumerGroup(request *cherami.MergeDLQForConsumerGroupRequest) error
	PurgeDLQForConsumerGroup(request *cherami.PurgeDLQForConsumerGroupRequest) error
	ReadPublisherOptions(path string) (*cherami.ReadPublisherOptionsResult_, error)
	ReadConsumerGroupHosts(path string, consumerGroupName string) (*cherami.ReadConsumerGroupHostsResult_, error)
}

Client exposes API for destination and consumer group CRUD and capability to publish and consume messages

func NewClient

func NewClient(serviceName string, host string, port int, options *ClientOptions) (Client, error)

NewClient returns the singleton Cherami client used for communicating with the service at given port

func NewClientWithFEClient added in v1.20.0

func NewClientWithFEClient(feClient cherami.TChanBFrontend, options *ClientOptions) (Client, error)

NewClientWithFEClient is used by Frontend to create a Cherami client for itself. It is used by non-streaming publish/consume APIs. ** Internal Cherami Use Only **

func NewHyperbahnClient

func NewHyperbahnClient(serviceName string, bootstrapFile string, options *ClientOptions) (Client, error)

NewHyperbahnClient returns the singleton Cherami client used for communicating with the service via Hyperbahn or Muttley. Streaming methods (for LOG/Consistent destinations) will not work.

type ClientOptions

type ClientOptions struct {
	Timeout time.Duration
	// DeploymentStr specifies which deployment(staging,prod,dev,etc) the client should connect to
	// If the string is empty, client will connect to prod
	// If the string is 'prod', client will connect to prod
	// If the string is 'staging' or 'staging2', client will connect to staging or staging2
	// If the string is 'dev', client will connect to dev server
	DeploymentStr string
	// MetricsReporter is the reporter object
	MetricsReporter metrics.Reporter
	// Logger is the logger object
	Logger bark.Logger
	// Interval for polling input/output host updates. Normally client doesn't need to explicitly set this option
	// because the default setting should work fine. This is only useful in testing or other edge scenarios
	ReconfigurationPollingInterval time.Duration
	// AuthProvider provides the authentication information in client side
	AuthProvider AuthProvider
}

ClientOptions used by Cherami client

type Consumer

type Consumer interface {
	// Open will connect to Cherami nodes and start delivering messages to
	// a provided Delivery channel for registered consumer group.
	//
	// It is ADVISED that deliveryCh's buffer size should be bigger than the
	// total PrefetchCount in CreateConsumerRequest of the consumers writing
	// to this channel.
	Open(deliveryCh chan Delivery) (chan Delivery, error)
	// Closed all the connections to Cherami nodes for this consumer
	Close()
	// Pause consuming messages
	Pause()
	// Resume consuming messages
	Resume()
	// AckDelivery can be used by application to Ack a message so it is not delivered to any other consumer
	AckDelivery(deliveryToken string) error
	// NackDelivery can be used by application to Nack a message so it can be delivered to another consumer immediately
	// without waiting for the timeout to expire
	NackDelivery(deliveryToken string) error
}

Consumer is used by an application to receive messages from Cherami service

type CreateConsumerRequest

type CreateConsumerRequest struct {
	// Path to destination consumer wants to consume messages from
	Path string
	// ConsumerGroupName registered with Cherami for a particular destination
	ConsumerGroupName string
	// Name of consumer (worker) connecting to Cherami
	ConsumerName string
	// Number of messages to buffer locally.  Clients which process messages very fast may want to specify larger value
	// for PrefetchCount for faster throughput.  On the flip side larger values for PrefetchCount will result in
	// more messages being buffered locally causing high memory foot print
	PrefetchCount int
	// Options used for making API calls to Cherami services
	// This option is now deprecated. If you need to specify any option, you can specify it when you call NewClient()
	Options *ClientOptions
}

CreateConsumerRequest struct is used to call Client.CreateConsumer to create an object used by application to consume messages

type CreatePublisherRequest

type CreatePublisherRequest struct {
	Path                             string
	MaxInflightMessagesPerConnection int
	// PublisherType represents the mode in which
	// publishing should be done i.e. either through
	// websocket streaming or through tchannel batch API
	// Defaults to websocket streaming. Choose non-streaming
	// batch API for low throughput publishing.
	PublisherType PublisherType
}

CreatePublisherRequest struct used to call Client.CreatePublisher to create an object used by application to publish messages

type CreateTaskExecutorRequest

type CreateTaskExecutorRequest struct {
	// Concurrency is the number of concurrent workers to execute tasks
	Concurrency int
	// Path to destination which tasks dequeued from
	Path string
	// ConsumerGroupName registered with Cherami for a particular destination
	ConsumerGroupName string
	// ConsumerName is name of consumer (worker) connecting to Cherami
	ConsumerName string
	// PrefetchCount is number of messages to buffer locally
	PrefetchCount int
	// Timeout is timeout setting used when ack/nack back to Cherami
	Timeout time.Duration
}

CreateTaskExecutorRequest is used to call Client.CreateTaskExecutor to create a task executor

type CreateTaskSchedulerRequest

type CreateTaskSchedulerRequest struct {
	// Path to destination which tasks enqueue into
	Path string
	// MaxInflightMessagesPerConnection is number of messages pending confirmation per connection
	MaxInflightMessagesPerConnection int
}

CreateTaskSchedulerRequest is used to call Client.CreateTaskScheduler to create a task scheduler

type Delivery

type Delivery interface {
	// Returns the message returned by Cherami
	GetMessage() *cherami.ConsumerMessage
	// Returns a delivery token which can be used to Ack/Nack delivery using the Consumer API
	// Consumer has 2 options to Ack/Nack a delivery:
	// 1) Simply call the Ack/Nack API on the delivery after processing the message
	// 2) If the consumer wants to forward the message to downstream component for processing then they can get the
	// DeliveryToken by calling this function and pass it along.  Later the downstream component can call the
	// API on the Consumer with this token to Ack/Nack the message.
	GetDeliveryToken() string
	// Acks this delivery
	Ack() error
	// Nacks this delivery
	Nack() error
	// VerifyChecksum verifies checksum of the message if exist
	// Consumer needs to perform this verification and decide what to do based on returned result
	VerifyChecksum() bool
}

Delivery is the container which has the actual message returned by Cherami

type OpenConsumerOutWebsocketStream

type OpenConsumerOutWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenConsumerOutWebsocketStream is a wrapper for websocket to work with OpenPublisherStream

func (*OpenConsumerOutWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenConsumerOutWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenConsumerOutWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenConsumerOutWebsocketStream) ResponseHeaders

func (s *OpenConsumerOutWebsocketStream) ResponseHeaders() (map[string]string, error)

ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface

func (*OpenConsumerOutWebsocketStream) Write

Write writes a result to the response stream

type OpenPublisherOutWebsocketStream

type OpenPublisherOutWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenPublisherOutWebsocketStream is a wrapper for websocket to work with OpenPublisherStream

func (*OpenPublisherOutWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenPublisherOutWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenPublisherOutWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenPublisherOutWebsocketStream) ResponseHeaders

func (s *OpenPublisherOutWebsocketStream) ResponseHeaders() (map[string]string, error)

ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface

func (*OpenPublisherOutWebsocketStream) Write

Write writes a result to the response stream

type Publisher

type Publisher interface {
	Open() error
	Close()
	// Pause publishing. All publishing will fail until Resume() is called.
	// Note: Pause/Resume APIs only work for streaming publishers(i.e. publish type is PublisherTypeStreaming)
	// For non-streaming publishers, Pause/Resume APIs are no-op.
	Pause()
	// Resume publishing.
	Resume()
	Publish(message *PublisherMessage) *PublisherReceipt
	PublishAsync(message *PublisherMessage, done chan<- *PublisherReceipt) (string, error)
}

Publisher is used by an application to publish messages to Cherami service

func NewPublisher

func NewPublisher(client *clientImpl, path string, maxInflightMessagesPerConnection int) Publisher

NewPublisher constructs a new Publisher object Deprecated: NewPublisher is deprecated, please use NewPublisherWithReporter

func NewPublisherWithReporter added in v1.20.0

func NewPublisherWithReporter(client *clientImpl, path string, maxInflightMessagesPerConnection int, reporter metrics.Reporter) Publisher

NewPublisherWithReporter constructs a new Publisher object

type PublisherMessage

type PublisherMessage struct {
	Data  []byte
	Delay time.Duration
	// UserContext is user specified context to pass through
	UserContext map[string]string
}

PublisherMessage is a struct that wraps the message payload and a delay time duration.

type PublisherReceipt

type PublisherReceipt struct {
	// ID is the message id passed with message when published
	ID string
	// Receipt is a token that contains info where the message is stored
	Receipt string
	// Error is the error if any that associates with the publishing of this message
	Error error
	// UserContext is user specified context to pass through
	UserContext map[string]string
}

PublisherReceipt is an token for publisher as the prove of message being durably stored.

type PublisherType

type PublisherType int

PublisherType represents the type of publisher viz. streaming/non-streaming

const (
	// PublisherTypeStreaming indicates a publisher that uses websocket streaming
	PublisherTypeStreaming PublisherType = iota
	// PublisherTypeNonStreaming indicates a publisher that uses tchannel batch api
	PublisherTypeNonStreaming
)

type ScheduleTaskRequest

type ScheduleTaskRequest struct {
	// TaskType is the unique type name which is used to register task handler with task executor
	TaskType string
	// TaskID is the unique identifier of this specific task
	TaskID string
	// TaskValue can be anything represent the task
	TaskValue interface{}
	// Context is key value pairs context accosicated with the task
	Context map[string]string
	// Delay is the time duration before task can be executed
	Delay time.Duration
}

ScheduleTaskRequest is used to call TaskScheduler.ScheduleTask to schedule a new task

type Task

type Task interface {
	// GetType returns the unique type name that can be used to identify cooresponding task handler
	GetType() string
	// GetID returns the unique identifier of this specific task
	GetID() string
	// GetValue deserializes task value into given struct that matches the type used to publish the task
	GetValue(instance interface{}) error
	// GetContext returns key value pairs context accosicated with the task when published
	GetContext() map[string]string
}

Task represents the task queued in Cherami

type TaskExecutor

type TaskExecutor interface {
	// Register registers task handler with its *unique* task type
	Register(taskType string, taskFunc TaskFunc)
	// Start starts dequeuing tasks and execute them
	Start() error
	// Stop stops dequeuing/exeuction of tasks
	// There's no guarantee to drain scheduled tasks when Stop is invoked
	Stop()
}

TaskExecutor is used to pull tasks from Cherami and execute their task handlers accordingly

func NewTaskExecutor

func NewTaskExecutor(client Client, request *CreateTaskExecutorRequest) TaskExecutor

NewTaskExecutor creates a task executor

type TaskFunc

type TaskFunc func(task Task) error

TaskFunc is function signature of task handler

type TaskScheduler

type TaskScheduler interface {
	// Open gets TaskScheduler for scheduling tasks
	Open() error
	// Close make sure resources are released
	Close()
	// ScheduleTask enqueues a task
	ScheduleTask(request *ScheduleTaskRequest) error
}

TaskScheduler is used to put tasks into Cherami

func NewTaskScheduler

func NewTaskScheduler(client Client, request *CreateTaskSchedulerRequest) TaskScheduler

NewTaskScheduler creates a task scheduler

type WSConnector

type WSConnector interface {
	OpenPublisherStream(hostPort string, requestHeader http.Header) (stream.BInOpenPublisherStreamOutCall, error)
	OpenConsumerStream(hostPort string, requestHeader http.Header) (stream.BOutOpenConsumerStreamOutCall, error)
}

WSConnector takes care of establishing connection via websocket stream

func NewWSConnector

func NewWSConnector() WSConnector

NewWSConnector creates a WSConnector

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL