concurrent

package
v0.1.50 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: LGPL-2.1 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrFuturesClosed           = errors.New("futures already closed")                   // Future控制器已关闭
	ErrFutureNotFound          = errors.New("future not found")                         // Future未找到
	ErrFutureCanceled          = errors.New("future canceled")                          // Future被取消
	ErrFutureTimeout           = errors.New("future timeout")                           // Future超时
	ErrFutureRespIncorrectType = errors.New("future response has incorrect value type") // Future响应的返回值类型错误
	ErrFutureReplyClosed       = errors.New("future reply closed")                      // Future答复已关闭
)

Functions

func MakeFutureRespAsyncRet

func MakeFutureRespAsyncRet(fs IFutures, ctx context.Context, timeout ...time.Duration) (Future, RespAsyncRet)

MakeFutureRespAsyncRet 创建future与接收响应返回值的异步调用结果

func MakeFutureRespChan

func MakeFutureRespChan[T any](fs IFutures, ctx context.Context, timeout ...time.Duration) (Future, RespChan[T])

MakeFutureRespChan 创建future与接收响应返回值的channel

Types

type Deduplication

type Deduplication struct {
	// contains filtered or unexported fields
}

Deduplication 去重器,用于保持幂等性

func MakeDeduplication

func MakeDeduplication() Deduplication

MakeDeduplication 创建去重器

func (*Deduplication) Make added in v0.1.33

func (d *Deduplication) Make() int64

Make 创建序号

func (*Deduplication) Remove

func (d *Deduplication) Remove(remote string)

Remove 删除对端

func (*Deduplication) Validate added in v0.1.33

func (d *Deduplication) Validate(remote string, seq int64) (passed bool)

Validate 验证序号

type Future

type Future struct {
	Finish context.Context // 上下文
	Id     int64           // Id
	// contains filtered or unexported fields
}

Future 异步模型Future

func MakeFuture

func MakeFuture[T Resp](fs IFutures, ctx context.Context, resp T, timeout ...time.Duration) Future

MakeFuture 创建Future

func (Future) Cancel

func (f Future) Cancel(err error)

Cancel 取消

func (Future) Wait

func (f Future) Wait(ctx context.Context)

Wait 等待

type Futures

type Futures struct {
	Ctx     context.Context // 上下文
	Id      int64           // 请求id生成器
	Timeout time.Duration   // 请求超时时间
	// contains filtered or unexported fields
}

Futures Future控制器

func MakeFutures

func MakeFutures(ctx context.Context, timeout time.Duration) Futures

MakeFutures 创建Future控制器

func (*Futures) Make

func (fs *Futures) Make(ctx context.Context, resp Resp, timeout ...time.Duration) Future

Make 创建Future

func (*Futures) Request

func (fs *Futures) Request(ctx context.Context, handler RequestHandler, timeout ...time.Duration) runtime.AsyncRet

Request 请求

func (*Futures) Resolve

func (fs *Futures) Resolve(id int64, ret Ret[any]) error

Resolve 解决

type IDeduplication

type IDeduplication interface {
	// Make 创建序号
	Make() int64
	// Validate 验证序号
	Validate(remote string, seq int64) bool
	// Remove 删除对端
	Remove(remote string)
}

IDeduplication 去重器接口

type IFutures

type IFutures interface {
	// Make 创建Future
	Make(ctx context.Context, resp Resp, timeout ...time.Duration) Future
	// Request 请求
	Request(ctx context.Context, handler RequestHandler, timeout ...time.Duration) runtime.AsyncRet
	// Resolve 解决
	Resolve(id int64, ret Ret[any]) error
	// contains filtered or unexported methods
}

IFutures Future控制器接口

type IMapEachElement added in v0.1.32

type IMapEachElement[K comparable, V any] interface {
	Each(fun generic.Action2[K, V])
}

type ISliceEachElement added in v0.1.32

type ISliceEachElement[T any] interface {
	Each(fun generic.Action1[T])
}

type Locked

type Locked[T any] struct {
	// contains filtered or unexported fields
}

func MakeLocked

func MakeLocked[T any](obj T) Locked[T]

func NewLocked

func NewLocked[T any](obj T) *Locked[T]

func (*Locked[T]) AutoLock

func (l *Locked[T]) AutoLock(fun generic.Action1[*T])

type LockedMap

type LockedMap[K comparable, V any] struct {
	RWLocked[map[K]V]
}

func MakeLockedMap

func MakeLockedMap[K comparable, V any](size int) LockedMap[K, V]

func NewLockedMap

func NewLockedMap[K comparable, V any](size int) *LockedMap[K, V]

func (*LockedMap[K, V]) Delete

func (lm *LockedMap[K, V]) Delete(k K)

func (*LockedMap[K, V]) Each added in v0.1.32

func (lm *LockedMap[K, V]) Each(fun generic.Action2[K, V])

func (*LockedMap[K, V]) Get

func (lm *LockedMap[K, V]) Get(k K) (v V, ok bool)

func (*LockedMap[K, V]) Insert

func (lm *LockedMap[K, V]) Insert(k K, v V)

func (*LockedMap[K, V]) Len

func (lm *LockedMap[K, V]) Len() (l int)

func (*LockedMap[K, V]) Range added in v0.1.33

func (lm *LockedMap[K, V]) Range(fun generic.Func2[K, V, bool])

type LockedSlice

type LockedSlice[T any] struct {
	RWLocked[[]T]
}

func MakeLockedSlice

func MakeLockedSlice[T any](len, cap int) LockedSlice[T]

func NewLockedSlice

func NewLockedSlice[T any](len, cap int) *LockedSlice[T]

func (*LockedSlice[T]) Append

func (ls *LockedSlice[T]) Append(values ...T)

func (*LockedSlice[T]) Delete

func (ls *LockedSlice[T]) Delete(idx ...int)

func (*LockedSlice[T]) Each added in v0.1.32

func (ls *LockedSlice[T]) Each(fun generic.Action1[T])

func (*LockedSlice[T]) Insert

func (ls *LockedSlice[T]) Insert(idx int, values ...T)

func (*LockedSlice[T]) Len

func (ls *LockedSlice[T]) Len() (l int)

func (*LockedSlice[T]) Range added in v0.1.33

func (ls *LockedSlice[T]) Range(fun generic.Func1[T, bool])

type RWLocked

type RWLocked[T any] struct {
	// contains filtered or unexported fields
}

func MakeRWLocked

func MakeRWLocked[T any](obj T) RWLocked[T]

func NewRWLocked

func NewRWLocked[T any](obj T) *RWLocked[T]

func (*RWLocked[T]) AutoLock

func (l *RWLocked[T]) AutoLock(fun generic.Action1[*T])

func (*RWLocked[T]) AutoRLock

func (l *RWLocked[T]) AutoRLock(fun generic.Action1[*T])

type Reply

type Reply[T any] <-chan Ret[T]

Reply 异步答复

func (Reply[T]) Wait

func (reply Reply[T]) Wait(ctx context.Context) Ret[T]

Wait 等待

type RequestHandler

type RequestHandler = generic.Action1[Future] // Future请求处理器

type Resp

type Resp interface {
	// Push 填入返回结果
	Push(ret Ret[any]) error
}

Resp 响应接口

type RespAsyncRet

type RespAsyncRet chan runtime.Ret

RespAsyncRet 接收响应返回值的异步调用结果

func MakeRespAsyncRet

func MakeRespAsyncRet() RespAsyncRet

MakeRespAsyncRet 创建接收响应返回值的异步调用结果

func (RespAsyncRet) CastAsyncRet

func (resp RespAsyncRet) CastAsyncRet() runtime.AsyncRet

CastAsyncRet 转换为异步调用结果

func (RespAsyncRet) Push

func (resp RespAsyncRet) Push(ret Ret[any]) error

Push 填入返回结果

type RespChan

type RespChan[T any] chan Ret[T]

RespChan 接收响应返回值的channel

func MakeRespChan

func MakeRespChan[T any]() RespChan[T]

MakeRespChan 创建接收响应返回值的channel

func (RespChan[T]) CastReply

func (resp RespChan[T]) CastReply() Reply[T]

CastReply 转换为异步答复

func (RespChan[T]) Push

func (resp RespChan[T]) Push(ret Ret[any]) error

Push 填入返回结果

type RespDelegate

type RespDelegate[T any] generic.DelegateAction1[Ret[T]]

RespDelegate 接收响应返回值的委托

func (RespDelegate[T]) Push

func (resp RespDelegate[T]) Push(ret Ret[any]) error

Push 填入返回结果

type RespFunc

type RespFunc[T any] generic.Action1[Ret[T]]

RespFunc 接收响应返回值的函数

func (RespFunc[T]) Push

func (resp RespFunc[T]) Push(ret Ret[any]) error

Push 填入返回结果

type Ret

type Ret[T any] struct {
	Value T     // 返回值
	Error error // 返回错误
}

Ret 返回结果

func MakeRet

func MakeRet[T any](val T, err error) Ret[T]

MakeRet 创建结果

func (Ret[T]) OK

func (ret Ret[T]) OK() bool

OK 是否成功

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL