stream

package
v0.0.0-...-5107381 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2022 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package stream handles interactions with adapters that allow them to create streams in the API

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildAdapterInventory

func BuildAdapterInventory(
	inventory []AdapterInfo,
	cfg config.Config,
	streamUp chan<- Provider,
	streamDown chan<- int,
	logger *zap.Logger,
) (map[string]Adapter, error)

BuildAdapterInventory creates all of the enabled adapters provided in the inventory list

Types

type Adapter

type Adapter interface {
	suture.Service
}

Adapter defines an interface that translates data from data sources into streams that the core server can consume data from. Each stream provided by the adapter is wrapped in a Provider in order for the core server to operate on them.

The core server is capable of handling multiple streams of data from multiple sources, and they are uniquely identified by stream IDs. These stream IDs can correspond to anything to OS processes or just a unique identifier for a service or a cluster of services.

Adapters must implement the suture.Service interface so that it can itself be started as a long running goroutine. If the adapter must itself parent some services, the adapter should itself be or embed a *suture.Supervisor so that the non-child nodes of the process tree are only supervisors.

type AdapterBuilder

type AdapterBuilder interface {
	// LoadConfig provides the builder with configuration for the adapter. It
	// should return an error if there are any configuration errors other than
	// validation errors. Validation of the config will be handled separately when
	// the configuration is first loaded.
	//
	// If LoadConfig returns an error, Build will not be called for this
	// AdapterBuilder. This error is considered fatal and will cause the server to
	// exit.
	//
	// Adapter authors should add an Enabled field in the adapter configuration
	// struct to conditionally disable the adapter. If the adapter is disabled in
	// the configuration, then this builder is automatically skipped.
	//
	// Adapters are expected to receive their configuration from a section
	// designated by `[adapters.ADAPTER_NAME]`, but it may peek at configuration
	// options outside of this section.
	LoadConfig(config.Config) error

	// Build must return an adapter that is capable of notifying the core
	// with StreamProviders whenever a new stream is to be created. The adapter
	// should also be capable of notifying the core with stream IDs whenever
	// a stream is closed.
	// These StreamProviders should provide the stream's ID and channels
	// that allow the core to consume data from the stream.
	Build(streamUp chan<- Provider, streamDown chan<- int, logger *zap.Logger) Adapter
}

AdapterBuilder represents a set of methods that instantiates the adapter.

type AdapterInfo

type AdapterInfo struct {
	Name    string
	Builder AdapterBuilder
}

AdapterInfo lists information about this Adapter and provides a builder for the Adapter.

type Handler

type Handler interface {
	suture.Service
}

Handler defines the interface that a stream handler must implement

func NewHandler

func NewHandler(args HandlerFactoryArgs) Handler

NewHandler returns a new stream Handler.

type HandlerFactory

type HandlerFactory func(h HandlerFactoryArgs) Handler

HandlerFactory defines a factory for creating a handler capable of processing updates from the stream provider

type HandlerFactoryArgs

type HandlerFactoryArgs struct {
	StreamID    int
	IngressChan <-chan *xivnet.Frame
	EgressChan  <-chan *xivnet.Frame
	UpdateChan  chan<- store.Update
	Generator   update.Generator
	Logger      *zap.Logger
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is a process responsible for watching stream created or stream closed events from all adapters

func NewManager

func NewManager(
	generator update.Generator,
	updateChan chan<- store.Update,
	streamSupervisor *suture.Supervisor,
	handlerFactory HandlerFactory,
	logger *zap.Logger,
) *Manager

NewManager returns a new stream Manager.

func (*Manager) SendRequest

func (m *Manager) SendRequest(streamID int, req []byte) ([]byte, error)

SendRequest forwards a request for a given stream ID to the correct stream Provider.

func (*Manager) Serve

func (m *Manager) Serve()

Serve runs the stream manager. It is responsible for spinning up stream handlers whenever new streams are created and shutting down stream handlers when streams are closed.

func (*Manager) Stop

func (m *Manager) Stop()

Stop will shutdown this service and wait on it to stop before returning

func (*Manager) StreamDown

func (m *Manager) StreamDown() chan<- int

StreamDown returns a channel that allows an upstream service to notify the manager that a stream has closed.

func (*Manager) StreamUp

func (m *Manager) StreamUp() chan<- Provider

StreamUp returns a channel that allows an upstream service to notify the manager that a new stream has been created.

type Provider

type Provider interface {
	// StreamID returns a unique identifier for the stream. This identifier
	// must be unique across all adapters.
	StreamID() int
	// SubscribeIngress notifies the core of network packets in the ingress
	// direction from this stream.
	SubscribeIngress() <-chan *xivnet.Frame
	// SubscribeEgress notifies the core of network packets in the egress
	// direction from this stream.
	SubscribeEgress() <-chan *xivnet.Frame
	// SendRequest provides an interface to allow clients to query or control
	// the adapter.
	SendRequest(req []byte) (resp []byte, err error)
}

Provider defines the public facing interface for a provider of a parsed data stream. It must provide methods for ingesting data and allow some way of controlling the stream.

It is assumed that all blocks in the frames produced by this Provider are already parsed into the correct xivnet datatype. This is to ensure backwards compatibility with older data when the datatype opcodes are updated.

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL