models

package
Version: v0.0.0-...-73638bf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2018 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PushInterval = 10 * time.Second
	MaxPushSize  = 1000
	MinPushSize  = 1
)

push variables

Variables

View Source
var (
	ErrAlreadyExistSubscription = errors.New("already exist subscription")
	ErrNotFoundAckID            = errors.New("not found message dependent to ack id")
	ErrInvalidEndpoint          = errors.New("invalid endpoint URL format")
)

subscription errors

View Source
var (
	ErrEmptyMessage       = errors.New("empty message")
	ErrNotYetReceivedAck  = errors.New("not yet received ack")
	ErrAlreadyReadMessage = errors.New("already read message")
)

message errors

View Source
var (
	ErrNotFoundEntry             = errors.New("not found entry")
	ErrInvalidEntry              = errors.New("invalid entry")
	ErrNotMatchTypeMessage       = errors.New("not match type message")
	ErrNotMatchTypeMessageStatus = errors.New("not match type message status")
	ErrNotMatchTypeSubscription  = errors.New("not match type subscription")
	ErrNotMatchTypeTopic         = errors.New("not match type topic")
	ErrNotSupportOperation       = errors.New("not support operation")
	ErrNotSupportDriver          = errors.New("not support driver")
)

datastore errors

View Source
var (
	ErrAlreadyExistTopic = errors.New("already exist topic")
)

topic errors

Functions

func InitDatastoreMessage

func InitDatastoreMessage() error

InitDatastoreMessage initialize global datastore object

func InitDatastoreMessageStatus

func InitDatastoreMessageStatus() error

InitDatastoreMessageStatus initialize global datastore object

func InitDatastoreSubscription

func InitDatastoreSubscription() error

InitDatastoreSubscription initialize global datastore object

func InitDatastoreTopic

func InitDatastoreTopic() error

InitDatastoreTopic initialize global datastore object

Types

type Attributes

type Attributes struct {
	Attr map[string]string
	// contains filtered or unexported fields
}

Attributes is string key-value map. optional for the message, push...

func (*Attributes) Dump

func (a *Attributes) Dump() map[string]string

Dump returns all item from attributes

func (*Attributes) Get

func (a *Attributes) Get(key string) (string, bool)

Get return item from attributes

func (*Attributes) Set

func (a *Attributes) Set(key, value string)

Set set item to attributes

func (*Attributes) String

func (a *Attributes) String() string

type ByMessageID

type ByMessageID []*Message

ByMessageID implements sort.Interface for []*Message based on the ID

func (ByMessageID) Len

func (a ByMessageID) Len() int

func (ByMessageID) Less

func (a ByMessageID) Less(i, j int) bool

func (ByMessageID) Swap

func (a ByMessageID) Swap(i, j int)

type BySubscriptionName

type BySubscriptionName []*Subscription

BySubscriptionName implements sort.Interface for []*Subscription based on the ID

func (BySubscriptionName) Len

func (a BySubscriptionName) Len() int

func (BySubscriptionName) Less

func (a BySubscriptionName) Less(i, j int) bool

func (BySubscriptionName) Swap

func (a BySubscriptionName) Swap(i, j int)

type ByTopicName

type ByTopicName []*Topic

ByTopicName is implements sort.Interface for []*Topic based on the ID

func (ByTopicName) Len

func (a ByTopicName) Len() int

func (ByTopicName) Less

func (a ByTopicName) Less(i, j int) bool

func (ByTopicName) Swap

func (a ByTopicName) Swap(i, j int)

type DatastoreMessage

type DatastoreMessage struct {
	// contains filtered or unexported fields
}

DatastoreMessage is adapter between actual datastore and datastore client

func NewDatastoreMessage

func NewDatastoreMessage(cfg *datastore.Config) (*DatastoreMessage, error)

NewDatastoreMessage create DatastoreTopic object

func (*DatastoreMessage) Delete

func (d *DatastoreMessage) Delete(key string) error

Delete delete item

func (*DatastoreMessage) Get

func (d *DatastoreMessage) Get(key string) (*Message, error)

Get return item via datastore

func (*DatastoreMessage) Set

func (d *DatastoreMessage) Set(m *Message) error

Set save item to datastore

type DatastoreMessageStatus

type DatastoreMessageStatus struct {
	// contains filtered or unexported fields
}

DatastoreMessageStatus is adapter between actual datastore and datastore client

func NewDatastoreMessageStatus

func NewDatastoreMessageStatus(cfg *datastore.Config) (*DatastoreMessageStatus, error)

NewDatastoreMessageStatus create DatastoreTopic object

func (*DatastoreMessageStatus) CollectByIDs

func (d *DatastoreMessageStatus) CollectByIDs(ids ...string) ([]*MessageStatus, error)

CollectByIDs returns all MessageStatus depends ids

func (*DatastoreMessageStatus) Delete

func (d *DatastoreMessageStatus) Delete(key string) error

Delete delete item

func (*DatastoreMessageStatus) FindByAckID

func (d *DatastoreMessageStatus) FindByAckID(ackID string) (*MessageStatus, error)

FindByAckID return MessageStatus matched AckID

func (*DatastoreMessageStatus) FindBySubscriptionIDAndMessageID

func (d *DatastoreMessageStatus) FindBySubscriptionIDAndMessageID(subID, msgID string) (*MessageStatus, error)

FindBySubscriptionIDAndMessageID return MessageStatus matched MessageID

func (*DatastoreMessageStatus) Get

Get return item via datastore

func (*DatastoreMessageStatus) List

func (d *DatastoreMessageStatus) List() ([]*MessageStatus, error)

List return all MessageStatus slice

func (*DatastoreMessageStatus) ListBySubscriptionID

func (d *DatastoreMessageStatus) ListBySubscriptionID(subID string) ([]*MessageStatus, error)

ListBySubscriptionID return all MessageStatus slice matched SubscriptionID

func (*DatastoreMessageStatus) Set

Set save item to datastore

type DatastoreSubscription

type DatastoreSubscription struct {
	// contains filtered or unexported fields
}

DatastoreSubscription is adapter between actual datastore and datastore client

func NewDatastoreSubscription

func NewDatastoreSubscription(cfg *datastore.Config) (*DatastoreSubscription, error)

NewDatastoreSubscription create DatastoreSubscription object

func (*DatastoreSubscription) CollectByTopicID

func (d *DatastoreSubscription) CollectByTopicID(topicID string) ([]*Subscription, error)

CollectByTopicID returns all Subscription depends topic ids

func (*DatastoreSubscription) Delete

func (d *DatastoreSubscription) Delete(key string) error

Delete delete item

func (*DatastoreSubscription) Get

Get return item via datastore

func (*DatastoreSubscription) List

func (d *DatastoreSubscription) List() ([]*Subscription, error)

List return all Subscription slice

func (*DatastoreSubscription) Set

Set save item to datastore

type DatastoreTopic

type DatastoreTopic struct {
	// contains filtered or unexported fields
}

DatastoreTopic is adapter between actual datastore and datastore client

func NewDatastoreTopic

func NewDatastoreTopic(cfg *datastore.Config) (*DatastoreTopic, error)

NewDatastoreTopic create DatastoreTopic object

func (*DatastoreTopic) Delete

func (d *DatastoreTopic) Delete(key string) error

Delete delete item

func (*DatastoreTopic) Get

func (d *DatastoreTopic) Get(key string) (*Topic, error)

Get return item via datastore

func (*DatastoreTopic) List

func (d *DatastoreTopic) List() ([]*Topic, error)

List return all topic slice

func (*DatastoreTopic) Set

func (d *DatastoreTopic) Set(topic *Topic) error

Set save item to datastore

type Message

type Message struct {
	ID           string            `json:"message_id"`
	Data         []byte            `json:"data"`
	Attributes   map[string]string `json:"attributes"`
	SubscribeIDs []string          `json:"-"`
	PublishedAt  time.Time         `json:"publish_time"`
}

Message is data object

func NewMessage

func NewMessage(id string, data []byte, attr map[string]string, subs []*Subscription) *Message

NewMessage return initialized Message

func (*Message) AckSubscription

func (m *Message) AckSubscription(subID string) error

AckSubscription remove Subscription

func (*Message) AddSubscription

func (m *Message) AddSubscription(subID string) error

AddSubscription add depend subscription

func (*Message) Delete

func (m *Message) Delete() error

Delete is received all ack response message to delete

func (*Message) Save

func (m *Message) Save() error

Save is save message to datastore

type MessageStatus

type MessageStatus struct {
	ID             string
	SubscriptionID string
	MessageID      string
	AckID          string
	AckDeadline    time.Duration
	AckState       messageState
	DeliveredAt    time.Time
}

MessageStatus is holds params for Message

func (*MessageStatus) Delete

func (ms *MessageStatus) Delete() error

Delete delete MessageStatus from backend datastore

func (*MessageStatus) Deliver

func (ms *MessageStatus) Deliver(ackID string)

Deliver setting deliver state and new AckID

func (*MessageStatus) Readable

func (ms *MessageStatus) Readable() bool

Readable return whether the message can be read

func (*MessageStatus) Save

func (ms *MessageStatus) Save() error

Save save MessageStatus to backend datastore

type MessageStatusStore

type MessageStatusStore struct {
	SubscriptionID string
	Status         []string
}

MessageStatusStore is holds and adapter for MessageStatus

func NewMessageStatusStore

func NewMessageStatusStore(subID string) *MessageStatusStore

NewMessageStatusStore return created MessageStatusStore

func (*MessageStatusStore) Ack

func (mss *MessageStatusStore) Ack(ackID string) error

Ack invisible message depends ackID

func (*MessageStatusStore) CollectAllMessages

func (mss *MessageStatusStore) CollectAllMessages() ([]*MessageStatus, error)

CollectAllMessages returns all Message

func (*MessageStatusStore) CollectReadableMessage

func (mss *MessageStatusStore) CollectReadableMessage(size int) ([]*Message, error)

CollectReadableMessage return readable messages

func (*MessageStatusStore) Deliver

func (mss *MessageStatusStore) Deliver(msgID, ackID string) error

Deliver register AckID to message

func (*MessageStatusStore) FindByAckID

func (mss *MessageStatusStore) FindByAckID(ackID string) (*MessageStatus, error)

FindByAckID return MessageStatus depends AckID

func (*MessageStatusStore) NewMessageStatus

func (mss *MessageStatusStore) NewMessageStatus(subID, msgID string, deadline time.Duration) (*MessageStatus, error)

NewMessageStatus return created MessageStatus and save datastore

type PullMessage

type PullMessage struct {
	AckID   string   `json:"ack_id"`
	Message *Message `json:"message"`
}

PullMessage represent Message and AckID pair

type Push

type Push struct {
	Endpoint   *url.URL
	Attributes *Attributes
}

Push is represent push message in Subscription

func NewPush

func NewPush(endpoint string, attributes map[string]string) (*Push, error)

NewPush return initialized Push object

func (*Push) HasValidEndpoint

func (p *Push) HasValidEndpoint() bool

HasValidEndpoint return bool push has valid endpoint

type PushRequest

type PushRequest struct {
	Message        *Message
	SubscriptionID string
}

PushRequest is represent a send push http request

type SentState

type SentState int

SentState is state of send push message

func (SentState) String

func (s SentState) String() string

type Subscription

type Subscription struct {
	Name               string              `json:"name"`
	TopicID            string              `json:"topic"`
	Message            *MessageStatusStore `json:"-"`
	DefaultAckDeadline time.Duration       `json:"ack_deadline_seconds"`
	PushConfig         *Push               `json:"push_config"`

	// push params
	PushTick    time.Duration `json:"-"`
	AbortPush   bool          `json:"-"`
	PushRunning bool          `json:"-"`
	PushSize    int           `json:"-"`
	// contains filtered or unexported fields
}

Subscription is subscription object

func GetSubscription

func GetSubscription(name string) (*Subscription, error)

GetSubscription return Subscription object

func ListSubscription

func ListSubscription() ([]*Subscription, error)

ListSubscription returns subscription list from globalSubscription

func NewSubscription

func NewSubscription(name, topicName string, timeout int64, endpoint string, attr map[string]string) (*Subscription, error)

NewSubscription return initialized subscription, if not exist already same name Subscription

func (*Subscription) Ack

func (s *Subscription) Ack(ids ...string) error

Ack succeed Message delivery. remove sent Message.

func (*Subscription) Delete

func (s *Subscription) Delete() error

Delete is delete subscription at globalSubscription

func (*Subscription) ModifyAckDeadline

func (s *Subscription) ModifyAckDeadline(id string, timeout int64) error

ModifyAckDeadline modify message ack deadline seconds

func (*Subscription) Pull

func (s *Subscription) Pull(size int) ([]*PullMessage, error)

Pull returns readable messages, and change message state

func (*Subscription) Push

func (s *Subscription) Push(size int) (SentState, error)

Push send message to push endpoint, returns send flag and error

func (*Subscription) PushLoop

func (s *Subscription) PushLoop() error

PushLoop goroutine that keeps looking for pushable messages

func (*Subscription) RegisterMessage

func (s *Subscription) RegisterMessage(msg *Message) error

RegisterMessage associate Message to Subscription

func (*Subscription) Save

func (s *Subscription) Save() error

Save is save to datastore

func (*Subscription) SetPushConfig

func (s *Subscription) SetPushConfig(endpoint string, attribute map[string]string) error

SetPushConfig setting push endpoint with attributes

type Topic

type Topic struct {
	Name string `json:"name"`
}

Topic is topic object

func GetTopic

func GetTopic(name string) (*Topic, error)

GetTopic return topic object

func ListTopic

func ListTopic() ([]*Topic, error)

ListTopic returns topic list

func NewTopic

func NewTopic(name string) (*Topic, error)

NewTopic return initialized topic, if not exist already topic name in GlobalTopics

func (*Topic) Delete

func (t *Topic) Delete() error

Delete topic object at GlobalTopics

func (*Topic) GetSubscriptions

func (t *Topic) GetSubscriptions() ([]*Subscription, error)

GetSubscriptions returns topic dependent Subscription list

func (*Topic) Publish

func (t *Topic) Publish(data []byte, attr map[string]string) (string, error)

Publish create message and deliver to subscription, and return created message id

func (*Topic) Save

func (t *Topic) Save() error

Save save to datastore

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL