core

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotfoundVersion = errors.New("not found version")

Functions

func GetNextVersion

func GetNextVersion(ver int) int

func RegisterVersionStrategy

func RegisterVersionStrategy(s VerStrategy)

RegisterVersionStrategy 注册entity版本变更执行的策略函数

Types

type ActorConstructor

type ActorConstructor struct {
	ID   string
	Name string

	// Weight occupied by the actor, weight algorithm reference: 2c4g (pod = 2 * 4 * 1000)
	Weight int

	Dynamic bool

	// Constructor function
	Constructor CreateFunc

	// NodeUnique indicates whether this actor is unique within the current node
	NodeUnique bool

	// Global quantity limit for the current actor type that can be registered
	GlobalQuantityLimit int

	Options map[string]string
}

type ActorContext

type ActorContext interface {
	// Call 使用 actor 自身发起的 call 调用
	Call(tar router.Target, mw *msg.Wrapper) error
	CallBy(id string, ev string, mw *msg.Wrapper) error

	// ReenterCall 使用 actor 自身发起的 ReenterCall 调用
	ReenterCall(ctx context.Context, tar router.Target, mw *msg.Wrapper) IFuture

	// Send sends an event to another actor
	// Asynchronous call semantics, does not block the current goroutine, used for long-running RPC calls
	Send(tar router.Target, mw *msg.Wrapper) error

	// Pub semantics for pubsub, used to publish messages to an actor's message cache queue
	Pub(topic string, msg *router.Message) error

	// AddressBook 管理全局actor地址的对象,通常由 system 控制调用
	AddressBook() IAddressBook
	System() ISystem

	// Loader returns the actor loader
	Loader(string) IActorBuilder

	// Unregister unregisters an actor
	Unregister(id, ty string) error

	ID() string
	Type() string

	// WithValue returns a new context with the given state.
	// It allows you to embed any state information into the context for later retrieval.
	//
	// Parameters:
	//   - key: The key for the type to be set in the context (can be defined using the form: type StateKey struct{})
	//   - value: The corresponding value
	WithValue(key, value interface{})

	// GetValue retrieves a value from the context based on the provided key.
	//
	// Parameters:
	//   - key: The key used to store the value in the context
	//
	// Returns:
	//   - The value associated with the key, or nil if not found
	//   - A boolean indicating whether the key was found in the context
	GetValue(key interface{}) interface{}
}

type AddressInfo

type AddressInfo struct {
	ActorId string `json:"actor_id"`
	ActorTy string `json:"actor_ty"`
	Node    string `json:"node"`
	Service string `json:"service"`
	Ip      string `json:"ip"`
	Port    int    `json:"port"`
}

type CreateFunc

type CreateFunc func(IActorBuilder) IActor

type IActor

type IActor interface {
	Init(ctx context.Context)

	ID() string
	Type() string

	// Received pushes a message into the actor's mailbox
	Received(mw *msg.Wrapper) error

	// RegisterEvent registers an event handling chain for the actor
	RegisterEvent(ev string, createChainF func(ActorContext) IChain) error

	// RegisterTimer registers a timer function for the actor (Note: all times used here are in milliseconds)
	//  dueTime: delay before execution, 0 for immediate execution
	//  interval: time between each tick
	//  f: callback function
	//  args: can be used to pass the actor entity to the timer callback
	RegisterTimer(dueTime int64, interval int64, f func(interface{}) error, args interface{}) *timewheel.Timer

	// SubscriptionEvent subscribes to a message
	//  If this is the first subscription to this topic, opts will take effect (you can set some options for the topic, such as ttl)
	//  topic: A subject that contains a group of channels (e.g., if topic = offline messages, channel = actorId, then each actor can get its own offline messages in this topic)
	//  channel: Represents different categories within a topic
	//  succ: Callback function for successful subscription
	SubscriptionEvent(topic string, channel string, succ func(), opts ...pubsub.TopicOption) error

	// Update is the main loop of the Actor, running in a separate goroutine
	Update()

	// Call sends an event to another actor
	Call(tar router.Target, mw *msg.Wrapper) error

	ReenterCall(ctx context.Context, tar router.Target, mw *msg.Wrapper) IFuture

	Context() ActorContext

	Exit()
}

IActor is an abstraction of threads (goroutines). In a Node (process), 1 to N actors execute specific business logic.

Each actor object represents a logical computation unit that interacts with the outside world through a mailbox.

type IActorBuilder

type IActorBuilder interface {
	GetID() string
	GetType() string
	GetGlobalQuantityLimit() int
	GetNodeUnique() bool
	GetWeight() int
	GetOpt(key string) string
	GetOptions() map[string]string

	GetSystem() ISystem
	GetLoader() IActorLoader
	GetConstructor() CreateFunc

	// ---
	WithID(string) IActorBuilder
	WithType(string) IActorBuilder
	WithOpt(string, string) IActorBuilder

	// ---
	Register() (IActor, error)
	Picker() error
}

type IActorFactory

type IActorFactory interface {
	Get(ty string) *ActorConstructor
	GetActors() []*ActorConstructor
}

type IActorLoader

type IActorLoader interface {

	// Builder selects an actor from the factory and provides a builder
	Builder(string, ISystem) IActorBuilder

	// Pick selects an appropriate node for the actor builder to register
	Pick(IActorBuilder) error

	AssignToNode(INode)
}

type IAddressBook

type IAddressBook interface {
	Register(context.Context, string, string, int) error // 将 actor 注册到 address book;
	Unregister(context.Context, string, int) error

	GetByID(context.Context, string) (AddressInfo, error)
	GetByType(context.Context, string) ([]AddressInfo, error)

	GetLowWeightNodeForActor(ctx context.Context, actorType string) (AddressInfo, error)

	Clear(context.Context) error
}

type ICacheStrategy

type ICacheStrategy interface {
	// Load - Loads data, prioritizing retrieval from the cache layer. If not found in cache, it pulls from the database, stores in cache, and then returns.
	Load(ctx context.Context) error

	// Sync - Synchronizes memory data back to the cache
	Sync(ctx context.Context, forceUpdate bool) error

	// Store - Stores data to the database and clears the cache
	Store(ctx context.Context) error

	// IsDirty - Checks if the data is dirty
	IsDirty() bool

	// IsExist
	IsExist(context.Context) bool
}

type IChain

type IChain interface {
	Execute(*msg.Wrapper) error
}

type IEntity

type IEntity interface {
	GetID() string
	SetModule(typ reflect.Type, module interface{})
	GetModule(typ reflect.Type) interface{}
}

type IFuture

type IFuture interface {
	Complete(*msg.Wrapper)
	IsCompleted() bool

	Then(func(*msg.Wrapper)) IFuture
}

type INode

type INode interface {
	Init(...NodeOption) error
	Update()
	WaitClose()

	ID() string
	System() ISystem
}

type ISystem

type ISystem interface {
	Register(IActorBuilder) (IActor, error)
	Unregister(id, ty string) error

	Actors() []IActor

	FindActor(ctx context.Context, id string) (IActor, error)

	// Call sends an event to another actor
	// Synchronous call semantics (actual implementation is asynchronous, each call is in a separate goroutine)
	Call(tar router.Target, mw *msg.Wrapper) error

	// Send sends an event to another actor
	// Asynchronous call semantics, does not block the current goroutine, used for long-running RPC calls
	Send(tar router.Target, mw *msg.Wrapper) error

	// Pub semantics for pubsub, used to publish messages to an actor's message cache queue
	Pub(topic string, m *router.Message) error

	// Sub listens to messages in a channel within a specific topic
	//  opts can be used to set initial values on first listen, such as setting the TTL for messages in this topic
	Sub(topic string, channel string, opts ...pubsub.TopicOption) (*pubsub.Channel, error)

	// Loader returns the actor loader
	Loader(string) IActorBuilder

	AddressBook() IAddressBook

	Update()
	Exit(*sync.WaitGroup)
}

type NodeInfo

type NodeInfo struct {
	NodeID string
	Ip     string
	Port   int
	Weight int
}

type NodeOption

type NodeOption func(*NodeParm)

func NodeWithFactory

func NodeWithFactory(factory IActorFactory) NodeOption

func NodeWithID

func NodeWithID(id string) NodeOption

func NodeWithIP

func NodeWithIP(ip string) NodeOption

func NodeWithLoader

func NodeWithLoader(load IActorLoader) NodeOption

func NodeWithPort

func NodeWithPort(port int) NodeOption

func NodeWithServiceInfo

func NodeWithServiceInfo(ip string, port int) NodeOption

tmp

func NodeWithTracer

func NodeWithTracer(t tracer.ITracer) NodeOption

func NodeWithWeight

func NodeWithWeight(weight int) NodeOption

type NodeParm

type NodeParm struct {
	ID     string // node's globally unique ID
	Weight int

	Ip   string
	Port int

	Tracer tracer.ITracer

	Loader  IActorLoader
	Factory IActorFactory
}

type VerStrategy

type VerStrategy struct {
	Version int
	Reason  string
	Func    func(entity interface{}) error
}

func GetVerStrategy

func GetVerStrategy(ver int) (VerStrategy, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL