queue

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2025 License: MIT Imports: 10 Imported by: 4

README

queue

infra.Go queue module.

Documentation

Index

Constants

View Source
const (
	NAME = "QUEUE"
)

Variables

View Source
var (
	ErrInvalidConnection = errors.New("Invalid queue connection.")
	ErrInvalidMsg        = errors.New("Invalid queue msg.")
	ErrInvalidDeclare    = errors.New("Invalid queue declare.")
	ErrInvalidWeight     = errors.New("Invalid queue connection weight.")
	ErrQueueUnfinished   = errors.New("queue unfinished.")
)

Functions

func DeferredPublish

func DeferredPublish(name string, value Map, delay time.Duration) error

func DeferredPublishTo

func DeferredPublishTo(conn, name string, value Map, delay time.Duration) error

func Publish

func Publish(name string, values ...Map) error

func PublishTo

func PublishTo(conn, name string, values ...Map) error

Types

type Config

type Config struct {
	Driver   string
	External bool
	Codec    string
	Weight   int
	Prefix   string
	Setting  Map
}

type Configs

type Configs map[string]Config

type Connect

type Connect interface {
	Open() error
	Health() (Health, error)
	Close() error

	Register(string) error

	Start() error
	Stop() error

	Publish(name string, data []byte) error
	DeferredPublish(name string, data []byte, delay time.Duration) error
}

Connect 连接

type Context

type Context struct {
	infra.Meta

	// 以下几个字段必须独立
	// 要不然,Invoke的时候,会被修改掉
	Name    string
	Config  *Queue
	Setting Map

	Value  Map
	Args   Map
	Locals Map

	Body Any
	// contains filtered or unexported fields
}

func (*Context) Denied

func (ctx *Context) Denied(res Res)

func (*Context) Erred

func (ctx *Context) Erred(res Res)

func (*Context) Failed

func (ctx *Context) Failed(res Res)

func (*Context) Finish

func (ctx *Context) Finish()

func (*Context) Found

func (ctx *Context) Found()

func (*Context) Next

func (ctx *Context) Next()

func (*Context) Retry

func (ctx *Context) Retry(delays ...time.Duration)

返回延迟(相当于重试)

type Declare

type Declare struct {
	Alias    []string `json:"alias"`
	Name     string   `json:"name"`
	Text     string   `json:"text"`
	Nullable bool     `json:"-"`
	Args     Vars     `json:"args"`
}

Declare 声明,表示当前节点会发出的队列声明

type Delay

type Delay = []time.Duration

type Driver

type Driver interface {
	Connect(*Instance) (Connect, error)
}

Driver 数据驱动

type Filter

type Filter struct {
	Name     string  `json:"name"`
	Text     string  `json:"text"`
	Serve    ctxFunc `json:"-"`
	Request  ctxFunc `json:"-"`
	Execute  ctxFunc `json:"-"`
	Response ctxFunc `json:"-"`
}

Filter 拦截器

type Handler

type Handler struct {
	Name   string  `json:"name"`
	Text   string  `json:"text"`
	Found  ctxFunc `json:"-"`
	Error  ctxFunc `json:"-"`
	Failed ctxFunc `json:"-"`
	Denied ctxFunc `json:"-"`
}

Handler 处理器

type Health

type Health struct {
	Workload int64
}

type Info

type Info struct {
	Name   string
	Thread int
	Retry  []time.Duration
}

type Instance

type Instance struct {
	Name    string
	Config  Config
	Setting Map
	// contains filtered or unexported fields
}

func (*Instance) Serve

func (this *Instance) Serve(req Request) Response

func (*Instance) Submit

func (this *Instance) Submit(next func())

type Module

type Module struct {
	// contains filtered or unexported fields
}

func (*Module) Config

func (this *Module) Config(name string, config Config)

func (*Module) Configs

func (this *Module) Configs(config Configs)

func (*Module) Configure

func (this *Module) Configure(global Map)

func (*Module) Connect

func (this *Module) Connect()

func (*Module) Declare

func (module *Module) Declare(name string, config Declare)

Declare 声明

func (*Module) DeferredPublish

func (this *Module) DeferredPublish(name string, value Map, delay time.Duration) error

func (*Module) DeferredPublishTo

func (this *Module) DeferredPublishTo(conn, name string, value Map, delay time.Duration) error

func (*Module) Driver

func (module *Module) Driver(name string, driver Driver)

Driver 注册驱动

func (*Module) Filter

func (module *Module) Filter(name string, config Filter)

Filter 注册 拦截器

func (*Module) Handler

func (module *Module) Handler(name string, config Handler)

Handler 注册 处理器

func (*Module) Initialize

func (this *Module) Initialize()

func (*Module) Launch

func (this *Module) Launch()

func (*Module) Publish

func (this *Module) Publish(name string, values ...Map) error

func (*Module) PublishTo

func (this *Module) PublishTo(conn, name string, values ...Map) error

func (*Module) Queue

func (module *Module) Queue(name string, config Queue)

直接使用另外注册,是为了方便alias被替换 要不然有可能会重名,在别名里重名

func (*Module) Register

func (this *Module) Register(name string, value Any)

func (*Module) Terminate

func (this *Module) Terminate()

type Queue

type Queue struct {
	Alias    []string `json:"alias"`
	Name     string   `json:"name"`
	Text     string   `json:"text"`
	Nullable bool     `json:"-"`
	Args     Vars     `json:"args"`
	Setting  Map      `json:"-"`
	Coding   bool     `json:"-"`

	Action  ctxFunc   `json:"-"`
	Actions []ctxFunc `json:"-"`

	// 路由单独可定义的处理器
	Found  ctxFunc `json:"-"`
	Error  ctxFunc `json:"-"`
	Failed ctxFunc `json:"-"`
	Denied ctxFunc `json:"-"`

	Connect string `json:"connect"`

	//Option
	Thread int             `json:"thread"`
	Retry  []time.Duration `json:"delay"`
}

type Request

type Request struct {
	Name      string
	Data      []byte
	Attempt   int
	Timestamp time.Time
}

type Response

type Response struct {
	Retry bool
	Delay time.Duration
}

type Retry

type Retry = []time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL