actors

package
v0.0.0-...-2200cc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2022 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Actor

type Actor struct {
	ConfigKV *configkv.ConfigKV
	// contains filtered or unexported fields
}

Actor is the base struct for all actors. It provides common helper functions and conforms to IActor.

func (*Actor) GetStreamName

func (actor *Actor) GetStreamName() string

GetStreamName returns the stream name that the actor subscribed to.

func (*Actor) OnBootLoadConfig

func (actor *Actor) OnBootLoadConfig() error

OnBootLoadConfig loads the configuration to setup the underlying object

func (*Actor) PublishConfig

func (actor *Actor) PublishConfig(key string, data []byte) error

PublishConfig data into JetStream with a nats key. The nats key looks like this: stream-name.optional-key.command:UPDATE|DELETE.

func (*Actor) RunConfigListener

func (actor *Actor) RunConfigListener(ctx context.Context)

RunConfigListener listens to config changes and execute hooks

func (*Actor) ServeHTTP

func (actor *Actor) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP supports updating and deleting object configuration via HTTP. Supported commands are POST, PUT, DELETE, and UNSUB HTTP GET should only be supported by the underlying object. Override this method if you want to do something custom.

func (*Actor) SetDownstreams

func (actor *Actor) SetDownstreams(downstreams ...string)

SetOnConfigDelete

func (*Actor) SetOnConfigDelete

func (actor *Actor) SetOnConfigDelete(handler func(context.Context, *nats.Msg))

SetOnConfigDelete

func (*Actor) SetOnConfigUpdate

func (actor *Actor) SetOnConfigUpdate(handler func(context.Context, *nats.Msg))

SetOnConfigUpdate

type ActorConfig

type ActorConfig struct {
	// Workers is the number of workers for this actor
	Workers int

	// HTTPAddr is the address to bind the HTTP server
	HTTPAddr string

	// Configuration for Nats
	Nats ActorNatsConfig

	// ConfigKV is the KV store available for all actors.
	ConfigKV *configkv.ConfigKV
}

ActorConfig is the config that all actors need

type ActorNatsConfig

type ActorNatsConfig struct {
	// Addr is the address to connect to
	Addr string

	// Conn is the connection to a NATS cluster
	Conn *nats.Conn

	// JetStreamContext
	JetStreamContext nats.JetStreamContext

	// StreamConfig
	StreamConfig *nats.StreamConfig

	// StreamChanBuffer, min is 64000
	StreamChanBuffer int
}

type CronActor

type CronActor struct {
	Actor
	CronCollection *cron.CronCollection
	IsLeader       chan bool
	IsFollower     chan bool
}

func NewCronActor

func NewCronActor(actorConfig ActorConfig) (*CronActor, error)

NewCronActor is the constructor for CronActors

func (*CronActor) OnBecomingFollowerBlocking

func (actor *CronActor) OnBecomingFollowerBlocking(ctx context.Context)

OnBecomingFollowerBlocking turn off all cron schedulers.

func (*CronActor) OnBecomingLeaderBlocking

func (actor *CronActor) OnBecomingLeaderBlocking(ctx context.Context)

OnBecomingLeaderBlocking turn on all cron schedulers.

func (*CronActor) OnBootLoadConfig

func (actor *CronActor) OnBootLoadConfig() error

OnBootLoadConfig loads cron config from KV store and notify the listener to setup cron schedulers.

func (*CronActor) ServeHTTP

func (actor *CronActor) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP supports updating and deleting object configuration via HTTP. Supported commands are POST, PUT, DELETE, and UNSUB HTTP GET should only be supported by the underlying object. Override this method if you want to do something custom.

type IActor

type IActor interface {
	// GetStreamName
	GetStreamName() string

	// RunConfigListener
	RunConfigListener(context.Context)

	// PublishConfig
	PublishConfig(string, []byte) error

	// ServeHTTP
	ServeHTTP(http.ResponseWriter, *http.Request)

	// OnBootLoadConfig
	OnBootLoadConfig() error

	// SetOnConfigUpdate
	SetOnConfigUpdate(func(context.Context, *nats.Msg))

	// SetOnConfigDelete
	SetOnConfigDelete(func(context.Context, *nats.Msg))

	// SetDownstreams
	SetDownstreams(...string)
	// contains filtered or unexported methods
}

IActor is the interface to conform to for all actors

type RaftActor

type RaftActor struct {
	Actor
	Raft                *raft.Raft
	OnBecomingLeader    func(state graft.State)
	OnBecomingFollower  func(state graft.State)
	OnBecomingCandidate func(state graft.State)
	OnClosed            func(state graft.State)
}

func NewRaftActor

func NewRaftActor(actorConfig ActorConfig) (*RaftActor, error)

NewRaftActor is the constructor of RaftActor

Example
httpAddr := ":3000"

// Every Actor always subscribe to Nats JetStream
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()

jetstreamContext, _ := nc.JetStream()

// Every Actor always store its config on JetStream's KV store
confkv, _ := configkv.NewConfigKV(jetstreamContext)

raftActorConfig := actors.ActorConfig{
	HTTPAddr: httpAddr,
	ConfigKV: confkv,
	Nats: actors.ActorNatsConfig{
		Addr:             nats.DefaultURL,
		Conn:             nc,
		JetStreamContext: jetstreamContext,
		StreamConfig: &nats.StreamConfig{
			MaxAge: 1 * time.Minute,
		},
	},
}

// Always setup cancellation context so that Actor can shutdown properly
ctx, done := context.WithCancel(context.Background())
defer done()

wg, ctx := errgroup.WithContext(ctx)

// ConfigActor job is to receive config (from HTTP)
raftActor, _ := actors.NewRaftActor(raftActorConfig)

raftActor.OnBecomingLeader = func(state graft.State) {
	fmt.Printf("node is becoming a leader\n")
}
raftActor.OnBecomingFollower = func(state graft.State) {
	fmt.Printf("node is becoming a follower\n")
}

wg.Go(func() error {
	raftActor.RunConfigListener(ctx)
	return nil
})

raftActor.OnBootLoadConfig()

// Running HTTP Server to modify or display config
r := chi.NewRouter()
r.Use(middleware.Logger)

r.Get("/", func(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	w.Write([]byte(`{"msg": "welcome"}`))
})

// POST /api/admin/raft
// POST /api/admin/raft?command=UNSUB
// sDELETE /api/admin/raft
r.Method("POST", "/api/admin/raft", raftActor)
r.Method("DELETE", "/api/admin/raft", raftActor)

// GET method for raft metadata is handled by the underlying Raft struct
r.Method("GET", "/api/admin/raft", raft.NewRaftHTTPGet(raftActor.Raft))

r.Method("GET", "/api/admin/configkv", configkv.NewConfigKVHTTPGetAll(confkv))

fmt.Printf("running an HTTP server...\n")
httpServer := &http.Server{Addr: httpAddr, Handler: r}
wg.Go(func() error {
	if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
		return err
	}
	return nil
})

// Listen to interrupts to cleanly kill Actors.
wg.Go(func() error {
	signalChannel := make(chan os.Signal, 1)
	signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

	select {
	case <-signalChannel:
		fmt.Printf("signal received\n")
		httpServer.Shutdown(ctx)
		done()

	case <-ctx.Done():
		fmt.Printf("closing signal goroutine\n")
		return ctx.Err()
	}

	return nil
})

// wait for all errgroup goroutines
wg.Wait()
Output:

func (*RaftActor) OnBootLoadConfig

func (actor *RaftActor) OnBootLoadConfig() error

OnBootLoadConfig loads config from KV store and publish them so that we can build a consensus.

type WSActorConfig

type WSActorConfig struct {
	// Workers is the number of workers for this actor
	Workers int

	// WSURL is the ws:// address to connect to
	WSURL string

	WSConfig recws.RecConn
}

WSActorConfig

type WorkerActor

type WorkerActor struct {
	Actor
}

WorkerActor is a generic Actor. When it received an UPDATE command, it will execute the comand with the payload as parameters. DELETE is a no-op because WorkerActor doesn't store its config in the KV store.

func NewWorkerActor

func NewWorkerActor(actorConfig ActorConfig, name string) (*WorkerActor, error)

NewWorkerActor is the constructor for WorkerActor

func (*WorkerActor) WSHandler

func (actor *WorkerActor) WSHandler(w http.ResponseWriter, r *http.Request)

WSHandler is a websocket HTTP handler. It receives websocket connections and then pushes config data to websocket clients.

type WorkerWSActor

type WorkerWSActor struct {
	// contains filtered or unexported fields
}

WorkerWSActor receives parameters over websocket and execute work It does not inherit from Actor because it doesn't connect to a Nats.

func NewWorkerWSActor

func NewWorkerWSActor(config WSActorConfig) (*WorkerWSActor, error)

func (*WorkerWSActor) Close

func (actor *WorkerWSActor) Close()

Close

func (*WorkerWSActor) RunConfigListener

func (actor *WorkerWSActor) RunConfigListener(ctx context.Context)

RunConfigListener listens to config changes and update the storage

func (*WorkerWSActor) SetOnConfigDelete

func (actor *WorkerWSActor) SetOnConfigDelete(handler func(context.Context, *nats.Msg))

SetOnConfigDelete

func (*WorkerWSActor) SetOnConfigUpdate

func (actor *WorkerWSActor) SetOnConfigUpdate(handler func(context.Context, *nats.Msg))

SetOnConfigUpdate

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL