Documentation ¶
Index ¶
- Variables
- func BytesToString(b []byte) string
- func ConsumerName() string
- func DurEqual(d1, d2 time.Duration, threshold int) bool
- func ErrorHandler(msg *Message, msgErr error, retries *uint32) error
- func FormatHandlerError(msg *Message, retrylimit int) error
- func Scheduler(name string, c *redis.Client, fn func(ctx context.Context) (int, error))
- func SetLogger(logger *log.Logger)
- func StringToBytes(s string) []byte
- func UnixMs(tm time.Time) int64
- func Version() string
- func WithRedisLock(ctx context.Context, name string, redis Redis, ...) error
- type Broker
- type Config
- type DisqError
- type Error
- type Handler
- type HandlerFunc
- type Message
- type MessageRaw
- type Redis
- type Stats
- type Task
- type TaskMap
- type TaskOptions
Constants ¶
This section is empty.
Variables ¶
View Source
var Logger *log.Logger
Functions ¶
func BytesToString ¶ added in v0.1.4
BytesToString converts byte slice to string.
func ConsumerName ¶
func ConsumerName() string
func ErrorHandler ¶ added in v0.1.6
func FormatHandlerError ¶ added in v0.1.6
func StringToBytes ¶ added in v0.1.4
StringToBytes converts string to byte slice.
Types ¶
type Error ¶ added in v0.1.6
func (*Error) GetRateLimit ¶ added in v0.1.6
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct { Ctx context.Context `msgpack:"-"` ID string `msgpack:"ID"` Name string `msgpack:"Name"` // Delay specifies the duration the queue must wait // before executing the message. Delay time.Duration `msgpack:"Delay"` Args []interface{} `msgpack:"Args"` ArgsBin []byte `msgpack:"ArgsBin"` TaskName string `msgpack:"TaskName"` RetryCount int `msgpack:"RetryCount"` //Execution time need for localstorage delays ExecutionTime time.Time `msgpack:"ExecutionTime"` Err error `msgpack:"Err"` }
Message is used as a uniform object for publishing and consuming messages from a queue.
func NewMessage ¶
func (*Message) MarshalArgs ¶ added in v0.1.5
func (*Message) MarshalBinary ¶ added in v0.1.1
func (*Message) UnmarshalBinary ¶ added in v0.1.1
type MessageRaw ¶
type MessageRaw Message
type Redis ¶
type Redis interface { Del(ctx context.Context, keys ...string) *redis.IntCmd SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd ScriptLoad(ctx context.Context, script string) *redis.StringCmd //Stream and ZSET methods TxPipeline() redis.Pipeliner XAdd(ctx context.Context, a *redis.XAddArgs) *redis.StringCmd XDel(ctx context.Context, stream string, ids ...string) *redis.IntCmd XLen(ctx context.Context, stream string) *redis.IntCmd XRangeN(ctx context.Context, stream, start, stop string, count int64) *redis.XMessageSliceCmd XRange(ctx context.Context, stream, start, stop string) *redis.XMessageSliceCmd XGroupCreateMkStream(ctx context.Context, stream, group, start string) *redis.StatusCmd XReadGroup(ctx context.Context, a *redis.XReadGroupArgs) *redis.XStreamSliceCmd XAck(ctx context.Context, stream, group string, ids ...string) *redis.IntCmd XInfoStream(ctx context.Context, key string) *redis.XInfoStreamCmd XPendingExt(ctx context.Context, a *redis.XPendingExtArgs) *redis.XPendingExtCmd XPending(ctx context.Context, stream, group string) *redis.XPendingCmd XTrim(ctx context.Context, key string, maxLen int64) *redis.IntCmd XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *redis.IntCmd ZAdd(ctx context.Context, key string, members ...*redis.Z) *redis.IntCmd ZRangeByScore(ctx context.Context, key string, opt *redis.ZRangeBy) *redis.StringSliceCmd ZRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd XInfoConsumers(ctx context.Context, key string, group string) *redis.XInfoConsumersCmd //List methods LIndex(ctx context.Context, key string, index int64) *redis.StringCmd LLen(ctx context.Context, key string) *redis.IntCmd LPop(ctx context.Context, key string) *redis.StringCmd LPopCount(ctx context.Context, key string, count int) *redis.StringSliceCmd LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd LRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd LRem(ctx context.Context, key string, count int64, value interface{}) *redis.IntCmd LTrim(ctx context.Context, key string, start, stop int64) *redis.StatusCmd }
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func NewTask ¶
func NewTask(opt *TaskOptions) *Task
func RegisterTask ¶
func RegisterTask(opt *TaskOptions) (*Task, error)
func (*Task) HandleMessage ¶
func (*Task) RetryLimit ¶
type TaskMap ¶
type TaskMap struct {
// contains filtered or unexported fields
}
var Tasks TaskMap
func (*TaskMap) RegisterTasks ¶
func (s *TaskMap) RegisterTasks(opts *TaskOptions) (*Task, error)
type TaskOptions ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.