Documentation ¶
Index ¶
- Constants
- Variables
- func InitDatastoreMessage() error
- func InitDatastoreMessageStatus() error
- func InitDatastoreSubscription() error
- func InitDatastoreTopic() error
- type Attributes
- type ByMessageID
- type BySubscriptionName
- type ByTopicName
- type DatastoreMessage
- type DatastoreMessageStatus
- func (d *DatastoreMessageStatus) CollectByIDs(ids ...string) ([]*MessageStatus, error)
- func (d *DatastoreMessageStatus) Delete(key string) error
- func (d *DatastoreMessageStatus) FindByAckID(ackID string) (*MessageStatus, error)
- func (d *DatastoreMessageStatus) FindBySubscriptionIDAndMessageID(subID, msgID string) (*MessageStatus, error)
- func (d *DatastoreMessageStatus) Get(key string) (*MessageStatus, error)
- func (d *DatastoreMessageStatus) List() ([]*MessageStatus, error)
- func (d *DatastoreMessageStatus) ListBySubscriptionID(subID string) ([]*MessageStatus, error)
- func (d *DatastoreMessageStatus) Set(ms *MessageStatus) error
- type DatastoreSubscription
- func (d *DatastoreSubscription) CollectByTopicID(topicID string) ([]*Subscription, error)
- func (d *DatastoreSubscription) Delete(key string) error
- func (d *DatastoreSubscription) Get(key string) (*Subscription, error)
- func (d *DatastoreSubscription) List() ([]*Subscription, error)
- func (d *DatastoreSubscription) Set(sub *Subscription) error
- type DatastoreTopic
- type Message
- type MessageStatus
- type MessageStatusStore
- func (mss *MessageStatusStore) Ack(ackID string) error
- func (mss *MessageStatusStore) CollectAllMessages() ([]*MessageStatus, error)
- func (mss *MessageStatusStore) CollectReadableMessage(size int) ([]*Message, error)
- func (mss *MessageStatusStore) Deliver(msgID, ackID string) error
- func (mss *MessageStatusStore) FindByAckID(ackID string) (*MessageStatus, error)
- func (mss *MessageStatusStore) NewMessageStatus(subID, msgID string, deadline time.Duration) (*MessageStatus, error)
- type PullMessage
- type Push
- type PushRequest
- type SentState
- type Subscription
- func (s *Subscription) Ack(ids ...string) error
- func (s *Subscription) Delete() error
- func (s *Subscription) ModifyAckDeadline(id string, timeout int64) error
- func (s *Subscription) Pull(size int) ([]*PullMessage, error)
- func (s *Subscription) Push(size int) (SentState, error)
- func (s *Subscription) PushLoop() error
- func (s *Subscription) RegisterMessage(msg *Message) error
- func (s *Subscription) Save() error
- func (s *Subscription) SetPushConfig(endpoint string, attribute map[string]string) error
- type Topic
Constants ¶
const ( PushInterval = 10 * time.Second MaxPushSize = 1000 MinPushSize = 1 )
push variables
Variables ¶
var ( ErrAlreadyExistSubscription = errors.New("already exist subscription") ErrNotFoundAckID = errors.New("not found message dependent to ack id") ErrInvalidEndpoint = errors.New("invalid endpoint URL format") )
subscription errors
var ( ErrEmptyMessage = errors.New("empty message") ErrNotYetReceivedAck = errors.New("not yet received ack") ErrAlreadyReadMessage = errors.New("already read message") )
message errors
var ( ErrNotFoundEntry = errors.New("not found entry") ErrInvalidEntry = errors.New("invalid entry") ErrNotMatchTypeMessage = errors.New("not match type message") ErrNotMatchTypeMessageStatus = errors.New("not match type message status") ErrNotMatchTypeSubscription = errors.New("not match type subscription") ErrNotMatchTypeTopic = errors.New("not match type topic") ErrNotSupportOperation = errors.New("not support operation") ErrNotSupportDriver = errors.New("not support driver") )
datastore errors
var (
ErrAlreadyExistTopic = errors.New("already exist topic")
)
topic errors
Functions ¶
func InitDatastoreMessage ¶
func InitDatastoreMessage() error
InitDatastoreMessage initialize global datastore object
func InitDatastoreMessageStatus ¶
func InitDatastoreMessageStatus() error
InitDatastoreMessageStatus initialize global datastore object
func InitDatastoreSubscription ¶
func InitDatastoreSubscription() error
InitDatastoreSubscription initialize global datastore object
func InitDatastoreTopic ¶
func InitDatastoreTopic() error
InitDatastoreTopic initialize global datastore object
Types ¶
type Attributes ¶
Attributes is string key-value map. optional for the message, push...
func (*Attributes) Dump ¶
func (a *Attributes) Dump() map[string]string
Dump returns all item from attributes
func (*Attributes) Get ¶
func (a *Attributes) Get(key string) (string, bool)
Get return item from attributes
func (*Attributes) String ¶
func (a *Attributes) String() string
type ByMessageID ¶
type ByMessageID []*Message
ByMessageID implements sort.Interface for []*Message based on the ID
func (ByMessageID) Len ¶
func (a ByMessageID) Len() int
func (ByMessageID) Less ¶
func (a ByMessageID) Less(i, j int) bool
func (ByMessageID) Swap ¶
func (a ByMessageID) Swap(i, j int)
type BySubscriptionName ¶
type BySubscriptionName []*Subscription
BySubscriptionName implements sort.Interface for []*Subscription based on the ID
func (BySubscriptionName) Len ¶
func (a BySubscriptionName) Len() int
func (BySubscriptionName) Less ¶
func (a BySubscriptionName) Less(i, j int) bool
func (BySubscriptionName) Swap ¶
func (a BySubscriptionName) Swap(i, j int)
type ByTopicName ¶
type ByTopicName []*Topic
ByTopicName is implements sort.Interface for []*Topic based on the ID
func (ByTopicName) Len ¶
func (a ByTopicName) Len() int
func (ByTopicName) Less ¶
func (a ByTopicName) Less(i, j int) bool
func (ByTopicName) Swap ¶
func (a ByTopicName) Swap(i, j int)
type DatastoreMessage ¶
type DatastoreMessage struct {
// contains filtered or unexported fields
}
DatastoreMessage is adapter between actual datastore and datastore client
func NewDatastoreMessage ¶
func NewDatastoreMessage(cfg *datastore.Config) (*DatastoreMessage, error)
NewDatastoreMessage create DatastoreTopic object
func (*DatastoreMessage) Delete ¶
func (d *DatastoreMessage) Delete(key string) error
Delete delete item
func (*DatastoreMessage) Get ¶
func (d *DatastoreMessage) Get(key string) (*Message, error)
Get return item via datastore
func (*DatastoreMessage) Set ¶
func (d *DatastoreMessage) Set(m *Message) error
Set save item to datastore
type DatastoreMessageStatus ¶
type DatastoreMessageStatus struct {
// contains filtered or unexported fields
}
DatastoreMessageStatus is adapter between actual datastore and datastore client
func NewDatastoreMessageStatus ¶
func NewDatastoreMessageStatus(cfg *datastore.Config) (*DatastoreMessageStatus, error)
NewDatastoreMessageStatus create DatastoreTopic object
func (*DatastoreMessageStatus) CollectByIDs ¶
func (d *DatastoreMessageStatus) CollectByIDs(ids ...string) ([]*MessageStatus, error)
CollectByIDs returns all MessageStatus depends ids
func (*DatastoreMessageStatus) Delete ¶
func (d *DatastoreMessageStatus) Delete(key string) error
Delete delete item
func (*DatastoreMessageStatus) FindByAckID ¶
func (d *DatastoreMessageStatus) FindByAckID(ackID string) (*MessageStatus, error)
FindByAckID return MessageStatus matched AckID
func (*DatastoreMessageStatus) FindBySubscriptionIDAndMessageID ¶
func (d *DatastoreMessageStatus) FindBySubscriptionIDAndMessageID(subID, msgID string) (*MessageStatus, error)
FindBySubscriptionIDAndMessageID return MessageStatus matched MessageID
func (*DatastoreMessageStatus) Get ¶
func (d *DatastoreMessageStatus) Get(key string) (*MessageStatus, error)
Get return item via datastore
func (*DatastoreMessageStatus) List ¶
func (d *DatastoreMessageStatus) List() ([]*MessageStatus, error)
List return all MessageStatus slice
func (*DatastoreMessageStatus) ListBySubscriptionID ¶
func (d *DatastoreMessageStatus) ListBySubscriptionID(subID string) ([]*MessageStatus, error)
ListBySubscriptionID return all MessageStatus slice matched SubscriptionID
func (*DatastoreMessageStatus) Set ¶
func (d *DatastoreMessageStatus) Set(ms *MessageStatus) error
Set save item to datastore
type DatastoreSubscription ¶
type DatastoreSubscription struct {
// contains filtered or unexported fields
}
DatastoreSubscription is adapter between actual datastore and datastore client
func NewDatastoreSubscription ¶
func NewDatastoreSubscription(cfg *datastore.Config) (*DatastoreSubscription, error)
NewDatastoreSubscription create DatastoreSubscription object
func (*DatastoreSubscription) CollectByTopicID ¶
func (d *DatastoreSubscription) CollectByTopicID(topicID string) ([]*Subscription, error)
CollectByTopicID returns all Subscription depends topic ids
func (*DatastoreSubscription) Delete ¶
func (d *DatastoreSubscription) Delete(key string) error
Delete delete item
func (*DatastoreSubscription) Get ¶
func (d *DatastoreSubscription) Get(key string) (*Subscription, error)
Get return item via datastore
func (*DatastoreSubscription) List ¶
func (d *DatastoreSubscription) List() ([]*Subscription, error)
List return all Subscription slice
func (*DatastoreSubscription) Set ¶
func (d *DatastoreSubscription) Set(sub *Subscription) error
Set save item to datastore
type DatastoreTopic ¶
type DatastoreTopic struct {
// contains filtered or unexported fields
}
DatastoreTopic is adapter between actual datastore and datastore client
func NewDatastoreTopic ¶
func NewDatastoreTopic(cfg *datastore.Config) (*DatastoreTopic, error)
NewDatastoreTopic create DatastoreTopic object
func (*DatastoreTopic) Delete ¶
func (d *DatastoreTopic) Delete(key string) error
Delete delete item
func (*DatastoreTopic) Get ¶
func (d *DatastoreTopic) Get(key string) (*Topic, error)
Get return item via datastore
func (*DatastoreTopic) List ¶
func (d *DatastoreTopic) List() ([]*Topic, error)
List return all topic slice
func (*DatastoreTopic) Set ¶
func (d *DatastoreTopic) Set(topic *Topic) error
Set save item to datastore
type Message ¶
type Message struct { ID string `json:"message_id"` Data []byte `json:"data"` Attributes map[string]string `json:"attributes"` SubscribeIDs []string `json:"-"` PublishedAt time.Time `json:"publish_time"` }
Message is data object
func NewMessage ¶
NewMessage return initialized Message
func (*Message) AckSubscription ¶
AckSubscription remove Subscription
func (*Message) AddSubscription ¶
AddSubscription add depend subscription
type MessageStatus ¶
type MessageStatus struct { ID string SubscriptionID string MessageID string AckID string AckDeadline time.Duration AckState messageState DeliveredAt time.Time }
MessageStatus is holds params for Message
func (*MessageStatus) Delete ¶
func (ms *MessageStatus) Delete() error
Delete delete MessageStatus from backend datastore
func (*MessageStatus) Deliver ¶
func (ms *MessageStatus) Deliver(ackID string)
Deliver setting deliver state and new AckID
func (*MessageStatus) Readable ¶
func (ms *MessageStatus) Readable() bool
Readable return whether the message can be read
func (*MessageStatus) Save ¶
func (ms *MessageStatus) Save() error
Save save MessageStatus to backend datastore
type MessageStatusStore ¶
MessageStatusStore is holds and adapter for MessageStatus
func NewMessageStatusStore ¶
func NewMessageStatusStore(subID string) *MessageStatusStore
NewMessageStatusStore return created MessageStatusStore
func (*MessageStatusStore) Ack ¶
func (mss *MessageStatusStore) Ack(ackID string) error
Ack invisible message depends ackID
func (*MessageStatusStore) CollectAllMessages ¶
func (mss *MessageStatusStore) CollectAllMessages() ([]*MessageStatus, error)
CollectAllMessages returns all Message
func (*MessageStatusStore) CollectReadableMessage ¶
func (mss *MessageStatusStore) CollectReadableMessage(size int) ([]*Message, error)
CollectReadableMessage return readable messages
func (*MessageStatusStore) Deliver ¶
func (mss *MessageStatusStore) Deliver(msgID, ackID string) error
Deliver register AckID to message
func (*MessageStatusStore) FindByAckID ¶
func (mss *MessageStatusStore) FindByAckID(ackID string) (*MessageStatus, error)
FindByAckID return MessageStatus depends AckID
func (*MessageStatusStore) NewMessageStatus ¶
func (mss *MessageStatusStore) NewMessageStatus(subID, msgID string, deadline time.Duration) (*MessageStatus, error)
NewMessageStatus return created MessageStatus and save datastore
type PullMessage ¶
PullMessage represent Message and AckID pair
type Push ¶
type Push struct { Endpoint *url.URL Attributes *Attributes }
Push is represent push message in Subscription
func (*Push) HasValidEndpoint ¶
HasValidEndpoint return bool push has valid endpoint
type PushRequest ¶
PushRequest is represent a send push http request
type Subscription ¶
type Subscription struct { Name string `json:"name"` TopicID string `json:"topic"` Message *MessageStatusStore `json:"-"` DefaultAckDeadline time.Duration `json:"ack_deadline_seconds"` PushConfig *Push `json:"push_config"` // push params PushTick time.Duration `json:"-"` AbortPush bool `json:"-"` PushRunning bool `json:"-"` PushSize int `json:"-"` // contains filtered or unexported fields }
Subscription is subscription object
func GetSubscription ¶
func GetSubscription(name string) (*Subscription, error)
GetSubscription return Subscription object
func ListSubscription ¶
func ListSubscription() ([]*Subscription, error)
ListSubscription returns subscription list from globalSubscription
func NewSubscription ¶
func NewSubscription(name, topicName string, timeout int64, endpoint string, attr map[string]string) (*Subscription, error)
NewSubscription return initialized subscription, if not exist already same name Subscription
func (*Subscription) Ack ¶
func (s *Subscription) Ack(ids ...string) error
Ack succeed Message delivery. remove sent Message.
func (*Subscription) Delete ¶
func (s *Subscription) Delete() error
Delete is delete subscription at globalSubscription
func (*Subscription) ModifyAckDeadline ¶
func (s *Subscription) ModifyAckDeadline(id string, timeout int64) error
ModifyAckDeadline modify message ack deadline seconds
func (*Subscription) Pull ¶
func (s *Subscription) Pull(size int) ([]*PullMessage, error)
Pull returns readable messages, and change message state
func (*Subscription) Push ¶
func (s *Subscription) Push(size int) (SentState, error)
Push send message to push endpoint, returns send flag and error
func (*Subscription) PushLoop ¶
func (s *Subscription) PushLoop() error
PushLoop goroutine that keeps looking for pushable messages
func (*Subscription) RegisterMessage ¶
func (s *Subscription) RegisterMessage(msg *Message) error
RegisterMessage associate Message to Subscription
func (*Subscription) SetPushConfig ¶
func (s *Subscription) SetPushConfig(endpoint string, attribute map[string]string) error
SetPushConfig setting push endpoint with attributes
type Topic ¶
type Topic struct {
Name string `json:"name"`
}
Topic is topic object
func (*Topic) GetSubscriptions ¶
func (t *Topic) GetSubscriptions() ([]*Subscription, error)
GetSubscriptions returns topic dependent Subscription list