target

package
v0.0.0-...-ec279c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	Name  string
	Hosts []Target // ordered by index
	Hm    sync.RWMutex
	Type  string
}

Cluster represents a set of host with some kind of routing (sharding) defined by it's type.

func (*Cluster) GetName

func (cl *Cluster) GetName() string

GetName returns cluster name.

func (*Cluster) PushBytes

func (cl *Cluster) PushBytes(r *rec.RecBytes, metrics *metrics.Prom) error

PushBytes sends single record to cluster. Routing happens based on cluster type.

func (*Cluster) Send

func (cl *Cluster) Send(cwg *sync.WaitGroup, finish chan struct{})

Send starts continuous process of sending data to hosts in the cluster. The sending is stopped on demand. Wait-group should be marked as done after finished.

type ClusterTarget

type ClusterTarget interface {
	PushBytes(*rec.RecBytes, *metrics.Prom) error
	Send(*sync.WaitGroup, chan struct{})
	GetName() string
}

ClusterTarget is abstract notion of a target to send the records to during processing.

type Clusters

type Clusters map[string]*Cluster

Clusters are all clusters mapped by name.

func NewClusters

func NewClusters(mainCfg *conf.Main, cfg *conf.Clusters, lg *zap.Logger, ms *metrics.Prom) (Clusters, error)

NewClusters builds new set of clusters from config.

func (Clusters) Send

func (cls Clusters) Send(finish chan struct{}) chan struct{}

Send continuously sends data to all clusters. Under the hood it delegates this task to separate clusters and manages the lifecycles.

Returned chan is closed when everything was stopped.

type Connection

type Connection struct {
	net.Conn
	sync.Mutex
	LastConnUse time.Time
	W           *bufio.Writer
}

Connection contains all the attributes of the target host connection.

func (*Connection) Close

func (c *Connection) Close() error

Close the connection while mainaining correct internal state. Overrides c.Conn.Close(). Use instead the default close.

func (*Connection) New

func (c *Connection) New(conn net.Conn, bufSize int)

New or updated target connection from existing net.Conn Requires Connection.Mutex lock

type Host

type Host struct {
	Name      string
	Port      uint16
	Ch        chan *rec.RecBytes
	Available atomic.Bool
	Conn      Connection

	Lg *zap.Logger
	Ms *metrics.Prom
	// contains filtered or unexported fields
}

Host represents a single target hosts to send records to.

func ConstructHost

func ConstructHost(clusterName string, mainCfg conf.Main, hostCfg conf.Host, lg *zap.Logger, ms *metrics.Prom) *Host

ConstructHost builds new host object from config.

func (*Host) IsAvailable

func (h *Host) IsAvailable() bool

IsAvailable tells if the host is alive.

func (*Host) Push

func (h *Host) Push(r *rec.RecBytes)

Push adds a new record to send to the host queue.

func (*Host) Stop

func (h *Host) Stop()

Stop brings streaming to a halt.

func (*Host) Stream

func (h *Host) Stream(wg *sync.WaitGroup)

Stream launches the sending to target host. Exits when queue is closed and sending is finished.

func (*Host) String

func (h *Host) String() string

String implements the Stringer interface.

type HostGRPC

type HostGRPC struct {
	Host
}

HostGRPC represents a single target hosts to send records to.

func NewHostGRPC

func NewHostGRPC(clusterName string, mainCfg conf.Main, hostCfg conf.Host, lg *zap.Logger, ms *metrics.Prom) *HostGRPC

NewHostGRPC builds new host object from config.

func (*HostGRPC) Stream

func (h *HostGRPC) Stream(wg *sync.WaitGroup)

Stream ...

type HostTCP

type HostTCP struct {
	Host
}

HostTCP will represent TCP host. It's a stub for now, its role is currently carried out by the Host struct.

func NewHostTCP

func NewHostTCP(clusterName string, mainCfg conf.Main, hostCfg conf.Host, lg *zap.Logger, ms *metrics.Prom) *HostTCP

NewHostTCP constructs a new TCP target host.

type Target

type Target interface {
	Stream(wg *sync.WaitGroup)
	Push(r *rec.RecBytes)
	IsAvailable() bool
	Stop()
}

Target represents target the records are sent to

func NewHost

func NewHost(clusterName string, mainCfg conf.Main, hostCfg conf.Host, lg *zap.Logger, ms *metrics.Prom) Target

NewHost builds a target host of appropriate type doing the polymorphic construction.

type TestTarget

type TestTarget struct {
	Name            string
	ReceivedRecsNum uint64
}

TestTarget mocks a target cluster in tests.

func (*TestTarget) GetName

func (tt *TestTarget) GetName() string

GetName returns cluster name.

func (*TestTarget) PushBytes

func (tt *TestTarget) PushBytes(_ *rec.RecBytes, _ *metrics.Prom) error

PushBytes is a push in tests. It does nothing, just increases the counter.

func (*TestTarget) Send

func (tt *TestTarget) Send(wg *sync.WaitGroup, finish chan struct{})

Send emulates mocks a remote sending routine. Does nothing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL