proxy

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2025 License: GPL-3.0 Imports: 15 Imported by: 0

README

Proxy NATS

This implementation helps us to take responsibility of balancing.

You should make a service and use server implementation together with Ping, as it exists in test. You should take client, and embed it into your client application. We should subscribe on received subject. Server will generate and send subject to subscribe. Server will ping client app, and remove it from queue, if ping failed.

Documentation

Index

Constants

View Source
const (
	HeaderID = "id"

	PingMsg = "PING"
	PongMsg = "PONG"
)

Variables

View Source
var (
	ErrWrongPongResponse   = errors.New("wrong pong response")
	ErrWrongSubject        = errors.New("subscribe on unexpected subject")
	ErrNoAvailableInstance = errors.New("no available instance")
	ErrChooseWrongBucket   = errors.New("error choose wrong bucket")
)

Functions

func DefaultIDGetter

func DefaultIDGetter(msg *nats.Msg) (string, error)

func GetPodSubject

func GetPodSubject(subject string, instance string) string

func GetSubscribeSubject

func GetSubscribeSubject(subject string) string

func GetUnsubscribeSubject

func GetUnsubscribeSubject(subject string) string

Types

type Balancer

type Balancer struct {
	// contains filtered or unexported fields
}

Balancer is responsible to control right balance between services

func NewBalancer

func NewBalancer(s Storage) *Balancer

func (*Balancer) AddInstance

func (b *Balancer) AddInstance(subject, instanceID string) error

func (*Balancer) DestroyInstance

func (b *Balancer) DestroyInstance(instanceID string) error

func (*Balancer) Execute

func (b *Balancer) Execute(subject, id string) (string, error)

Execute takes id and return instanceID for given id

func (*Balancer) GetAllInstances

func (b *Balancer) GetAllInstances() []string

func (*Balancer) GetSubjectInstances

func (b *Balancer) GetSubjectInstances(subject string) []string

func (*Balancer) RemoveInstance

func (b *Balancer) RemoveInstance(subject, instanceID string) error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(client NATSPopulator, proxyServiceSubject string, instanceGetter func() string) *Client

func (*Client) PongListener

func (c *Client) PongListener(natsHandler *subscriber.Subscriber) error

func (*Client) RunAndWait

func (c *Client) RunAndWait(
	natsHandler *subscriber.Subscriber,
	checkEvery, maxPingDelay time.Duration,
	stopFunction func(),
) error

func (*Client) ServiceHealthcheck

func (c *Client) ServiceHealthcheck(checkEvery, maxPingDelay time.Duration, stopFunction func())

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, subjectToSubscribe string) (string, error)

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ctx context.Context, subjectToUnsubscribe string) error

type EasyJSON_exporter_Subscribe

type EasyJSON_exporter_Subscribe *Subscribe

type EasyJSON_exporter_Unsubscribe

type EasyJSON_exporter_Unsubscribe *Unsubscribe

type NATSPopulator

type NATSPopulator interface {
	NATSPublisher
	NATSRequester
}

type NATSPublisher

type NATSPublisher interface {
	Publish(subj string, msg []byte, headers ...string) error
	PublishWithContext(ctx context.Context, subj string, msg []byte, headers ...string) error
}

type NATSRequester

type NATSRequester interface {
	Requester(subj string, data []byte, timeout time.Duration, headers ...string) ([]byte, error)
	RequesterWithContext(ctx context.Context, subj string, msg []byte, headers ...string) ([]byte, error)
}

type Ping

type Ping struct {
	// contains filtered or unexported fields
}

func NewPing

func NewPing(client NATSRequester, pingEvery time.Duration) *Ping

func (*Ping) Ping

func (p *Ping) Ping(srv *Server, instanceID string) error

func (*Ping) Service

func (p *Ping) Service(ctx context.Context, srv *Server)

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(client NATSPublisher, subject string, subjects []string) *Server

func NewServerWithStorage

func NewServerWithStorage(storage Storage, client NATSPublisher, subject string, subjects []string) *Server

func (*Server) GetInstances

func (s *Server) GetInstances() []string

func (*Server) Process

func (s *Server) Process(natsHandler *subscriber.Subscriber) error

func (*Server) ProcessWithGetter

func (s *Server) ProcessWithGetter(
	natsHandler *subscriber.Subscriber,
	idGetter func(msg *nats.Msg) (string, error),
) error

func (*Server) Proxy

func (s *Server) Proxy(subject string) func(ctx context.Context, msg *nats.Msg) error

func (*Server) ProxyWithGetter

func (s *Server) ProxyWithGetter(
	subject string,
	idGetter func(msg *nats.Msg) (string, error),
) func(ctx context.Context, msg *nats.Msg) error

func (*Server) Subscribe

func (s *Server) Subscribe(_ context.Context, msg *nats.Msg) error

func (*Server) Unsubscribe

func (s *Server) Unsubscribe(_ context.Context, msg *nats.Msg) error

type Storage

type Storage interface {
	Add(group string, id string) error
	Delete(group string, id string) error
	DeleteByID(id string) error
	GetKeys(group string) []string
	GetIDs() []string
	Size(group string) int
}

type Subscribe

type Subscribe struct {
	InstanceID string
	Subject    string
}

func (Subscribe) MarshalEasyJSON

func (Subscribe) MarshalEasyJSON(w *jwriter.Writer)

func (Subscribe) MarshalJSON

func (Subscribe) MarshalJSON() ([]byte, error)

func (*Subscribe) UnmarshalEasyJSON

func (*Subscribe) UnmarshalEasyJSON(l *jlexer.Lexer)

func (*Subscribe) UnmarshalJSON

func (*Subscribe) UnmarshalJSON([]byte) error

type Unsubscribe

type Unsubscribe struct {
	InstanceID string
	Subject    string
}

func (Unsubscribe) MarshalEasyJSON

func (Unsubscribe) MarshalEasyJSON(w *jwriter.Writer)

func (Unsubscribe) MarshalJSON

func (Unsubscribe) MarshalJSON() ([]byte, error)

func (*Unsubscribe) UnmarshalEasyJSON

func (*Unsubscribe) UnmarshalEasyJSON(l *jlexer.Lexer)

func (*Unsubscribe) UnmarshalJSON

func (*Unsubscribe) UnmarshalJSON([]byte) error

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL