terminal

package
v1.6.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: GPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultQueueSize = 50000
	MaxQueueSize     = 1000000
)

Flow Queue Configuration.

View Source
const (
	SusFactorCommon          = 1
	SusFactorWeirdButOK      = 5
	SusFactorQuiteUnusual    = 10
	SusFactorMustBeMalicious = 100
)

Suspicion Factors.

View Source
const CounterOpType string = "debug/count"

CounterOpType is the type ID for the Counter Operation.

View Source
const (
	// UsePriorityDataMsgs defines whether priority data messages should be used.
	UsePriorityDataMsgs = true
)

Variables

View Source
var (
	// ErrUnknownError is the default error.
	ErrUnknownError = registerError(0, errors.New("unknown error"))

	ErrStopping    = registerError(2, errors.New("stopping"))
	ErrExplicitAck = registerError(3, errors.New("explicit ack"))
	ErrNoActivity  = registerError(4, errors.New("no activity"))

	ErrInternalError          = registerError(8, errors.New("internal error"))
	ErrMalformedData          = registerError(9, errors.New("malformed data"))
	ErrUnexpectedMsgType      = registerError(10, errors.New("unexpected message type"))
	ErrUnknownOperationType   = registerError(11, errors.New("unknown operation type"))
	ErrUnknownOperationID     = registerError(12, errors.New("unknown operation id"))
	ErrPermissionDenied       = registerError(13, errors.New("permission denied"))
	ErrIntegrity              = registerError(14, errors.New("integrity violated"))
	ErrInvalidOptions         = registerError(15, errors.New("invalid options"))
	ErrHubNotReady            = registerError(16, errors.New("hub not ready"))
	ErrRateLimited            = registerError(24, errors.New("rate limited"))
	ErrIncorrectUsage         = registerError(22, errors.New("incorrect usage"))
	ErrTimeout                = registerError(62, errors.New("timed out"))
	ErrUnsupportedVersion     = registerError(93, errors.New("unsupported version"))
	ErrHubUnavailable         = registerError(101, errors.New("hub unavailable"))
	ErrAbandonedTerminal      = registerError(102, errors.New("terminal is being abandoned"))
	ErrShipSunk               = registerError(108, errors.New("ship sunk"))
	ErrDestinationUnavailable = registerError(113, errors.New("destination unavailable"))
	ErrTryAgainLater          = registerError(114, errors.New("try again later"))
	ErrConnectionError        = registerError(121, errors.New("connection error"))
	ErrQueueOverflow          = registerError(122, errors.New("queue overflowed"))
	ErrCanceled               = registerError(125, context.Canceled)
)

Terminal Errors.

Functions

func AddIDType

func AddIDType(c *container.Container, id uint32, msgType MsgType)

AddIDType prepends the ID and Type header to the message.

func MakeDirectDeliveryDeliverFunc

func MakeDirectDeliveryDeliverFunc(
	ctx context.Context,
	deliver chan *Msg,
) func(c *Msg) *Error

MakeDirectDeliveryDeliverFunc creates a submit upstream function with the given delivery channel.

func MakeDirectDeliveryRecvFunc

func MakeDirectDeliveryRecvFunc(
	deliver chan *Msg,
) func() <-chan *Msg

MakeDirectDeliveryRecvFunc makes a delivery receive function with the given delivery channel.

func MakeMsg

func MakeMsg(c *container.Container, id uint32, msgType MsgType)

MakeMsg prepends the message header (Length and ID+Type) to the data.

func NewCounterOp

func NewCounterOp(t Terminal, opts CounterOpts) (*CounterOp, *Error)

NewCounterOp returns a new CounterOp.

func NewLocalBaseTerminal

func NewLocalBaseTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	remoteHub *hub.Hub,
	initMsg *TerminalOpts,
	upstream Upstream,
) (
	t *TerminalBase,
	initData *container.Container,
	err *Error,
)

NewLocalBaseTerminal creates a new local terminal base for use with inheriting terminals.

func NewLocalTestTerminal

func NewLocalTestTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	remoteHub *hub.Hub,
	initMsg *TerminalOpts,
	upstream Upstream,
) (*TestTerminal, *container.Container, *Error)

NewLocalTestTerminal returns a new local test terminal.

func NewRemoteBaseTerminal

func NewRemoteBaseTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	identity *cabin.Identity,
	initData *container.Container,
	upstream Upstream,
) (
	t *TerminalBase,
	initMsg *TerminalOpts,
	err *Error,
)

NewRemoteBaseTerminal creates a new remote terminal base for use with inheriting terminals.

func NewRemoteTestTerminal

func NewRemoteTestTerminal(
	ctx context.Context,
	id uint32,
	parentID string,
	identity *cabin.Identity,
	initData *container.Container,
	upstream Upstream,
) (*TestTerminal, *TerminalOpts, *Error)

NewRemoteTestTerminal returns a new remote test terminal.

func ParseTerminalOpts

func ParseTerminalOpts(c *container.Container) (*TerminalOpts, *Error)

ParseTerminalOpts parses terminal options from the container and checks if they are valid.

func RegisterOpType

func RegisterOpType(factory OperationFactory)

RegisterOpType registers a new operation type and may only be called during Go's init and a module's prep phase.

func StopScheduler

func StopScheduler()

StopScheduler stops the unit scheduler.

func TimedOut

func TimedOut(timeout time.Duration) <-chan time.Time

TimedOut returns a channel that triggers when the timeout is reached.

Types

type AuthorizingTerminal

type AuthorizingTerminal interface {
	GrantPermission(grant Permission)
	HasPermission(required Permission) bool
}

AuthorizingTerminal is an interface for terminals that support authorization.

type BareTerminal

type BareTerminal struct{}

BareTerminal is a bare terminal that just returns errors for testing.

func (*BareTerminal) Abandon

func (t *BareTerminal) Abandon(err *Error)

Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon(). Should not be overridden by implementations.

func (*BareTerminal) Ctx

func (t *BareTerminal) Ctx() context.Context

Ctx returns the terminal context.

func (*BareTerminal) Deliver

func (t *BareTerminal) Deliver(msg *Msg) *Error

Deliver delivers a message to the terminal. Should not be overridden by implementations.

func (*BareTerminal) Flush

func (t *BareTerminal) Flush(timeout time.Duration)

Flush sends all messages waiting in the terminal. Should not be overridden by implementations.

func (*BareTerminal) FmtID

func (t *BareTerminal) FmtID() string

FmtID formats the terminal ID (including parent IDs). May be overridden by implementations.

func (*BareTerminal) HandleAbandon

func (t *BareTerminal) HandleAbandon(err *Error) (errorToSend *Error)

HandleAbandon gives the terminal the ability to cleanly shut down. The terminal is still fully functional at this point. The returned error is the error to send to the other side. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.

func (*BareTerminal) HandleDestruction

func (t *BareTerminal) HandleDestruction(err *Error)

HandleDestruction gives the terminal the ability to clean up. The terminal has already fully shut down at this point. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.

func (*BareTerminal) ID

func (t *BareTerminal) ID() uint32

ID returns the terminal ID.

func (*BareTerminal) Send

func (t *BareTerminal) Send(msg *Msg, timeout time.Duration) *Error

Send is used by others to send a message through the terminal. Should not be overridden by implementations.

func (*BareTerminal) StartOperation

func (t *BareTerminal) StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error

StartOperation starts the given operation by assigning it an ID and sending the given operation initialization data. Should not be overridden by implementations.

func (*BareTerminal) StopOperation

func (t *BareTerminal) StopOperation(op Operation, err *Error)

StopOperation stops the given operation. Should not be overridden by implementations.

type CounterOp

type CounterOp struct {
	OperationBase

	ClientCounter uint64
	ServerCounter uint64
	Error         error
	// contains filtered or unexported fields
}

CounterOp sends increasing numbers on both sides.

func (*CounterOp) CounterWorker

func (op *CounterOp) CounterWorker(ctx context.Context) error

CounterWorker is a worker that sends counters.

func (*CounterOp) Deliver

func (op *CounterOp) Deliver(msg *Msg) *Error

Deliver delivers data to the operation.

func (*CounterOp) HandleStop

func (op *CounterOp) HandleStop(err *Error) (errorToSend *Error)

HandleStop handles stopping the operation.

func (*CounterOp) SendCounter

func (op *CounterOp) SendCounter() *Error

SendCounter sends the next counter.

func (*CounterOp) Type

func (op *CounterOp) Type() string

Type returns the operation's type ID.

func (*CounterOp) Wait

func (op *CounterOp) Wait()

Wait waits for the Counter Op to finish.

type CounterOpts

type CounterOpts struct {
	ClientCountTo uint64
	ServerCountTo uint64
	Wait          time.Duration
	Flush         bool
	// contains filtered or unexported fields
}

CounterOpts holds the options for CounterOp.

type CustomTerminalIDFormatting

type CustomTerminalIDFormatting interface {
	CustomIDFormat() string
}

CustomTerminalIDFormatting defines an interface for terminal to define their custom ID format.

type DuplexFlowQueue

type DuplexFlowQueue struct {
	// contains filtered or unexported fields
}

DuplexFlowQueue is a duplex flow control mechanism using queues.

func NewDuplexFlowQueue

func NewDuplexFlowQueue(
	ctx context.Context,
	queueSize uint32,
	submitUpstream func(msg *Msg, timeout time.Duration),
) *DuplexFlowQueue

NewDuplexFlowQueue returns a new duplex flow queue.

func (*DuplexFlowQueue) Deliver

func (dfq *DuplexFlowQueue) Deliver(msg *Msg) *Error

Deliver submits a container for receiving from upstream.

func (*DuplexFlowQueue) FlowHandler

func (dfq *DuplexFlowQueue) FlowHandler(_ context.Context) error

FlowHandler handles all flow queue internals and must be started as a worker in the module where it is used.

func (*DuplexFlowQueue) FlowStats

func (dfq *DuplexFlowQueue) FlowStats() string

FlowStats returns a k=v formatted string of internal stats.

func (*DuplexFlowQueue) Flush

func (dfq *DuplexFlowQueue) Flush(timeout time.Duration)

Flush waits for all waiting data to be sent.

func (*DuplexFlowQueue) ReadyToSend

func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{}

ReadyToSend returns a channel that can be read when data can be sent.

func (*DuplexFlowQueue) Receive

func (dfq *DuplexFlowQueue) Receive() <-chan *Msg

Receive receives a container from the recv queue.

func (*DuplexFlowQueue) RecvQueueLen

func (dfq *DuplexFlowQueue) RecvQueueLen() int

RecvQueueLen returns the current length of the receive queue.

func (*DuplexFlowQueue) Send

func (dfq *DuplexFlowQueue) Send(msg *Msg, timeout time.Duration) *Error

Send adds the given container to the send queue.

func (*DuplexFlowQueue) SendQueueLen

func (dfq *DuplexFlowQueue) SendQueueLen() int

SendQueueLen returns the current length of the send queue.

func (*DuplexFlowQueue) StartWorkers

func (dfq *DuplexFlowQueue) StartWorkers(m *modules.Module, terminalName string)

StartWorkers starts the necessary workers to operate the flow queue.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is a terminal error.

func NewExternalError

func NewExternalError(id uint8) *Error

NewExternalError creates an external error based on the given ID.

func ParseExternalError

func ParseExternalError(id []byte) (*Error, error)

ParseExternalError parses an external error.

func (*Error) AsExternal

func (e *Error) AsExternal() *Error

AsExternal creates and returns an external version of the error.

func (*Error) Error

func (e *Error) Error() string

Error returns the human readable format of the error.

func (*Error) ID

func (e *Error) ID() uint8

ID returns the internal ID of the error.

func (*Error) Is

func (e *Error) Is(target error) bool

Is returns whether the given error is of the same type.

func (*Error) IsError

func (e *Error) IsError() bool

IsError returns if the error represents an erronous condition.

func (*Error) IsExternal

func (e *Error) IsExternal() bool

IsExternal returns whether the error occurred externally.

func (*Error) IsOK

func (e *Error) IsOK() bool

IsOK returns if the error represents a "OK" or success status.

func (*Error) Pack

func (e *Error) Pack() []byte

Pack returns the serialized internal error ID. The additional message is lost and is replaced with the default message upon parsing.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the wrapped error.

func (*Error) With

func (e *Error) With(format string, a ...interface{}) *Error

With adds context and details where the error occurred. The provided message is appended to the error. A new error with the same ID is returned and must be compared with errors.Is().

func (*Error) Wrap

func (e *Error) Wrap(format string, a ...interface{}) *Error

Wrap adds context higher up in the call chain. The provided message is prepended to the error. A new error with the same ID is returned and must be compared with errors.Is().

type FlowControl

type FlowControl interface {
	Deliver(msg *Msg) *Error
	Receive() <-chan *Msg
	Send(msg *Msg, timeout time.Duration) *Error
	ReadyToSend() <-chan struct{}
	Flush(timeout time.Duration)
	StartWorkers(m *modules.Module, terminalName string)
	RecvQueueLen() int
	SendQueueLen() int
}

FlowControl defines the flow control interface.

type FlowControlType

type FlowControlType uint8

FlowControlType represents a flow control type.

const (
	FlowControlDefault FlowControlType = 0
	FlowControlDFQ     FlowControlType = 1
	FlowControlNone    FlowControlType = 2
)

Flow Control Types.

func (FlowControlType) DefaultSize

func (fct FlowControlType) DefaultSize() uint32

DefaultSize returns the default flow control size.

type MessageStreamOperationBase

type MessageStreamOperationBase struct {
	OperationBase

	Delivered chan *Msg
	Ended     chan *Error
}

MessageStreamOperationBase is an operation base for receiving a message stream. Every received message must be finished by the implementing operation.

func (*MessageStreamOperationBase) Deliver

func (op *MessageStreamOperationBase) Deliver(msg *Msg) *Error

Deliver delivers data to the operation.

func (*MessageStreamOperationBase) HandleStop

func (op *MessageStreamOperationBase) HandleStop(err *Error) (errorToSend *Error)

HandleStop gives the operation the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Stop() instead.

func (*MessageStreamOperationBase) Init

func (op *MessageStreamOperationBase) Init(deliverQueueSize int)

Init initializes the operation base.

type Msg

type Msg struct {
	FlowID uint32
	Type   MsgType
	Data   *container.Container

	// Unit scheduling.
	// Note: With just 100B per packet, a uint64 (the Unit ID) is enough for
	// over 1800 Exabyte. No need for overflow support.
	Unit *unit.Unit
}

Msg is a message within the SPN network stack. It includes metadata and unit scheduling.

func NewEmptyMsg

func NewEmptyMsg() *Msg

NewEmptyMsg returns a new empty msg with an initialized Unit. The FlowID is unset. The Type is Data. The Data is unset.

func NewMsg

func NewMsg(data []byte) *Msg

NewMsg returns a new msg. The FlowID is unset. The Type is Data.

func (*Msg) Consume

func (msg *Msg) Consume(other *Msg)

Consume adds another Message to itself. The given Msg is packed before adding it to the data. The data is moved - not copied! High priority mark is inherited.

func (*Msg) Debug

func (msg *Msg) Debug()

Debug registers the unit for debug output with the given source. Additional calls on the same unit update the unit source. StartDebugLog() must be called before calling DebugUnit().

func (*Msg) Finish

func (msg *Msg) Finish()

Finish signals the unit scheduler that this unit has finished processing. Will no-op if called on a nil Msg.

func (*Msg) Pack

func (msg *Msg) Pack()

Pack prepends the message header (Length and ID+Type) to the data.

type MsgType

type MsgType uint8

MsgType is the message type for both terminals and operations.

const (
	// MsgTypeInit is used to establish a new terminal or run a new operation.
	MsgTypeInit MsgType = 1

	// MsgTypeData is used to send data to a terminal or operation.
	MsgTypeData MsgType = 2

	// MsgTypePriorityData is used to send prioritized data to a terminal or operation.
	MsgTypePriorityData MsgType = 0

	// MsgTypeStop is used to abandon a terminal or end an operation, with an optional error.
	MsgTypeStop MsgType = 3
)

func ParseIDType

func ParseIDType(c *container.Container) (id uint32, msgType MsgType, err error)

ParseIDType parses the combined message ID and type.

type OneOffOperationBase

type OneOffOperationBase struct {
	OperationBase

	Result chan *Error
}

OneOffOperationBase is an operation base for operations that just have one message and a error return.

func (*OneOffOperationBase) HandleStop

func (op *OneOffOperationBase) HandleStop(err *Error) (errorToSend *Error)

HandleStop gives the operation the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Stop() instead.

func (*OneOffOperationBase) Init

func (op *OneOffOperationBase) Init()

Init initializes the single operation base.

type Operation

type Operation interface {
	// InitOperationBase initialize the operation with the ID and attached terminal.
	// Should not be overridden by implementations.
	InitOperationBase(t Terminal, opID uint32)

	// ID returns the ID of the operation.
	// Should not be overridden by implementations.
	ID() uint32

	// Type returns the operation's type ID.
	// Should be overridden by implementations to return correct type ID.
	Type() string

	// Deliver delivers a message to the operation.
	// Meant to be overridden by implementations.
	Deliver(msg *Msg) *Error

	// NewMsg creates a new message from this operation.
	// Should not be overridden by implementations.
	NewMsg(data []byte) *Msg

	// Send sends a message to the other side.
	// Should not be overridden by implementations.
	Send(msg *Msg, timeout time.Duration) *Error

	// Flush sends all messages waiting in the terminal.
	// Should not be overridden by implementations.
	Flush(timeout time.Duration)

	// Stopped returns whether the operation has stopped.
	// Should not be overridden by implementations.
	Stopped() bool

	// Stop stops the operation by unregistering it from the terminal and calling HandleStop().
	// Should not be overridden by implementations.
	Stop(self Operation, err *Error)

	// HandleStop gives the operation the ability to cleanly shut down.
	// The returned error is the error to send to the other side.
	// Should never be called directly. Call Stop() instead.
	// Meant to be overridden by implementations.
	HandleStop(err *Error) (errorToSend *Error)

	// Terminal returns the terminal the operation is linked to.
	// Should not be overridden by implementations.
	Terminal() Terminal
	// contains filtered or unexported methods
}

Operation is an interface for all operations.

type OperationBase

type OperationBase struct {
	// contains filtered or unexported fields
}

OperationBase provides the basic operation functionality.

func (*OperationBase) Deliver

func (op *OperationBase) Deliver(_ *Msg) *Error

Deliver delivers a message to the operation. Meant to be overridden by implementations.

func (*OperationBase) Flush

func (op *OperationBase) Flush(timeout time.Duration)

Flush sends all messages waiting in the terminal. Meant to be overridden by implementations.

func (*OperationBase) HandleStop

func (op *OperationBase) HandleStop(err *Error) (errorToSend *Error)

HandleStop gives the operation the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Stop() instead. Meant to be overridden by implementations.

func (*OperationBase) ID

func (op *OperationBase) ID() uint32

ID returns the ID of the operation. Should not be overridden by implementations.

func (*OperationBase) InitOperationBase

func (op *OperationBase) InitOperationBase(t Terminal, opID uint32)

InitOperationBase initialize the operation with the ID and attached terminal. Should not be overridden by implementations.

func (*OperationBase) NewEmptyMsg

func (op *OperationBase) NewEmptyMsg() *Msg

NewEmptyMsg creates a new empty message from this operation. Should not be overridden by implementations.

func (*OperationBase) NewMsg

func (op *OperationBase) NewMsg(data []byte) *Msg

NewMsg creates a new message from this operation. Should not be overridden by implementations.

func (*OperationBase) Send

func (op *OperationBase) Send(msg *Msg, timeout time.Duration) *Error

Send sends a message to the other side. Should not be overridden by implementations.

func (*OperationBase) Stop

func (op *OperationBase) Stop(self Operation, err *Error)

Stop stops the operation by unregistering it from the terminal and calling HandleStop(). Should not be overridden by implementations.

func (*OperationBase) Stopped

func (op *OperationBase) Stopped() bool

Stopped returns whether the operation has stopped. Should not be overridden by implementations.

func (*OperationBase) Terminal

func (op *OperationBase) Terminal() Terminal

Terminal returns the terminal the operation is linked to. Should not be overridden by implementations.

func (*OperationBase) Type

func (op *OperationBase) Type() string

Type returns the operation's type ID. Should be overridden by implementations to return correct type ID.

type OperationFactory

type OperationFactory struct {
	// Type is the type id of an operation.
	Type string
	// Requires defines the required permissions to run an operation.
	Requires Permission
	// Start is the function that starts a new operation.
	Start OperationStarter
}

OperationFactory defines an operation factory.

type OperationStarter

type OperationStarter func(attachedTerminal Terminal, opID uint32, initData *container.Container) (Operation, *Error)

OperationStarter is used to initialize operations remotely.

type Permission

type Permission uint16

Permission is a bit-map of granted permissions.

const (
	NoPermission      Permission = 0x0
	MayExpand         Permission = 0x1
	MayConnect        Permission = 0x2
	IsHubOwner        Permission = 0x100
	IsHubAdvisor      Permission = 0x200
	IsCraneController Permission = 0x8000
)

Permissions.

func AddPermissions

func AddPermissions(perms ...Permission) Permission

AddPermissions combines multiple permissions.

func (Permission) Has

func (p Permission) Has(required Permission) bool

Has returns if the permission includes the specified permission.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter is a data flow rate limiter.

func NewRateLimiter

func NewRateLimiter(mbits uint64) *RateLimiter

NewRateLimiter returns a new rate limiter. The given MBit/s are transformed to bytes, so giving a multiple of 8 is advised for accurate results.

func (*RateLimiter) Limit

func (rl *RateLimiter) Limit(xferBytes uint64)

Limit is given the current transferred bytes and blocks until they may be sent.

type Session

type Session struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Session holds terminal metadata for operations.

func NewSession

func NewSession() *Session

NewSession returns a new session.

func (*Session) LimitConcurrency

func (s *Session) LimitConcurrency(ctx context.Context, f func()) error

LimitConcurrency limits concurrent executions. If over the limit, waiting goroutines are selected randomly. It returns the context error if it was canceled.

func (*Session) RateLimit

func (s *Session) RateLimit() *Error

RateLimit enforces a rate and suspicion limit.

func (*Session) RateLimitInfo

func (s *Session) RateLimitInfo() string

RateLimitInfo returns some basic information about the status of the rate limiter.

func (*Session) ReportSuspiciousActivity

func (s *Session) ReportSuspiciousActivity(factor int64)

ReportSuspiciousActivity reports suspicious activity of the terminal.

type SessionAddOn

type SessionAddOn struct {
	// contains filtered or unexported fields
}

SessionAddOn can be inherited by terminals to add support for sessions.

func (*SessionAddOn) GetSession

func (t *SessionAddOn) GetSession() *Session

GetSession returns the terminal's session.

type SessionTerminal

type SessionTerminal interface {
	GetSession() *Session
}

SessionTerminal is an interface for terminals that support authorization.

type Terminal

type Terminal interface {
	// ID returns the terminal ID.
	ID() uint32
	// Ctx returns the terminal context.
	Ctx() context.Context

	// Deliver delivers a message to the terminal.
	// Should not be overridden by implementations.
	Deliver(msg *Msg) *Error
	// Send is used by others to send a message through the terminal.
	// Should not be overridden by implementations.
	Send(msg *Msg, timeout time.Duration) *Error
	// Flush sends all messages waiting in the terminal.
	// Should not be overridden by implementations.
	Flush(timeout time.Duration)

	// StartOperation starts the given operation by assigning it an ID and sending the given operation initialization data.
	// Should not be overridden by implementations.
	StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error
	// StopOperation stops the given operation.
	// Should not be overridden by implementations.
	StopOperation(op Operation, err *Error)

	// Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon().
	// Should not be overridden by implementations.
	Abandon(err *Error)
	// HandleAbandon gives the terminal the ability to cleanly shut down.
	// The terminal is still fully functional at this point.
	// The returned error is the error to send to the other side.
	// Should never be called directly. Call Abandon() instead.
	// Meant to be overridden by implementations.
	HandleAbandon(err *Error) (errorToSend *Error)
	// HandleDestruction gives the terminal the ability to clean up.
	// The terminal has already fully shut down at this point.
	// Should never be called directly. Call Abandon() instead.
	// Meant to be overridden by implementations.
	HandleDestruction(err *Error)

	// FmtID formats the terminal ID (including parent IDs).
	// May be overridden by implementations.
	FmtID() string
}

Terminal represents a terminal.

type TerminalBase

type TerminalBase struct {
	// TODO: Fix maligned.
	Terminal // Interface check.

	// Abandoning indicates if the Terminal is being abandoned. The main handlers
	// will keep running until the context has been canceled by the abandon
	// procedure.
	// No new operations should be started.
	// Whoever initiates the abandoning must also start the abandon procedure.
	Abandoning *abool.AtomicBool
	// contains filtered or unexported fields
}

TerminalBase contains the basic functions of a terminal.

func (*TerminalBase) Abandon

func (t *TerminalBase) Abandon(err *Error)

Abandon shuts down the terminal unregistering it from upstream and calling HandleAbandon(). Should not be overridden by implementations.

func (*TerminalBase) Ctx

func (t *TerminalBase) Ctx() context.Context

Ctx returns the Terminal's context.

func (*TerminalBase) DeleteActiveOp

func (t *TerminalBase) DeleteActiveOp(opID uint32)

DeleteActiveOp deletes an active operation from the Terminal state.

func (*TerminalBase) Deliver

func (t *TerminalBase) Deliver(msg *Msg) *Error

Deliver on TerminalBase only exists to conform to the interface. It must be overridden by an actual implementation.

func (*TerminalBase) Flush

func (t *TerminalBase) Flush(timeout time.Duration)

Flush sends all data waiting to be sent.

func (*TerminalBase) FmtID

func (t *TerminalBase) FmtID() string

FmtID formats the terminal ID together with the parent's ID.

func (*TerminalBase) GetActiveOp

func (t *TerminalBase) GetActiveOp(opID uint32) (op Operation, ok bool)

GetActiveOp returns the active operation with the given ID from the Terminal state.

func (*TerminalBase) GetActiveOpCount

func (t *TerminalBase) GetActiveOpCount() int

GetActiveOpCount returns the amount of active operations.

func (*TerminalBase) GrantPermission

func (t *TerminalBase) GrantPermission(grant Permission)

GrantPermission grants the specified permissions to the Terminal.

func (*TerminalBase) HandleAbandon

func (t *TerminalBase) HandleAbandon(err *Error) (errorToSend *Error)

HandleAbandon gives the terminal the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.

func (*TerminalBase) HandleDestruction

func (t *TerminalBase) HandleDestruction(err *Error)

HandleDestruction gives the terminal the ability to clean up. The terminal has already fully shut down at this point. Should never be called directly. Call Abandon() instead. Meant to be overridden by implementations.

func (*TerminalBase) Handler

func (t *TerminalBase) Handler(_ context.Context) error

Handler receives and handles messages and must be started as a worker in the module where the Terminal is used.

func (*TerminalBase) HasPermission

func (t *TerminalBase) HasPermission(required Permission) bool

HasPermission returns if the Terminal has the specified permission.

func (*TerminalBase) ID

func (t *TerminalBase) ID() uint32

ID returns the Terminal's ID.

func (*TerminalBase) Send

func (t *TerminalBase) Send(msg *Msg, timeout time.Duration) *Error

Send sends data via this terminal. If a timeout is set, sending will fail after the given timeout passed.

func (*TerminalBase) Sender

func (t *TerminalBase) Sender(_ context.Context) error

Sender handles sending messages and must be started as a worker in the module where the Terminal is used.

func (*TerminalBase) SetActiveOp

func (t *TerminalBase) SetActiveOp(opID uint32, op Operation)

SetActiveOp saves an active operation to the Terminal state.

func (*TerminalBase) SetTerminalExtension

func (t *TerminalBase) SetTerminalExtension(ext Terminal)

SetTerminalExtension sets the Terminal's extension. This function is not guarded and may only be used during initialization.

func (*TerminalBase) SetTimeout

func (t *TerminalBase) SetTimeout(d time.Duration)

SetTimeout sets the Terminal's idle timeout duration. It is broken down into slots internally.

func (*TerminalBase) StartOperation

func (t *TerminalBase) StartOperation(op Operation, initData *container.Container, timeout time.Duration) *Error

StartOperation starts the given operation by assigning it an ID and sending the given operation initialization data.

func (*TerminalBase) StartWorkers

func (t *TerminalBase) StartWorkers(m *modules.Module, terminalName string)

StartWorkers starts the necessary workers to operate the Terminal.

func (*TerminalBase) StopOperation

func (t *TerminalBase) StopOperation(op Operation, err *Error)

StopOperation sends the end signal with an optional error and then deletes the operation from the Terminal state and calls HandleStop() on the Operation.

func (*TerminalBase) WaitForFlush

func (t *TerminalBase) WaitForFlush()

WaitForFlush makes the terminal pause all sending until the next call to Flush().

type TerminalOpts

type TerminalOpts struct {
	Version uint8  `json:"-"`
	Encrypt bool   `json:"e,omitempty"`
	Padding uint16 `json:"p,omitempty"`

	FlowControl     FlowControlType `json:"fc,omitempty"`
	FlowControlSize uint32          `json:"qs,omitempty"` // Previously was "QueueSize".

	UsePriorityDataMsgs bool `json:"pr,omitempty"`
}

TerminalOpts holds configuration for the terminal.

func DefaultCraneControllerOpts

func DefaultCraneControllerOpts() *TerminalOpts

DefaultCraneControllerOpts returns the default terminal options for a crane controller terminal.

func DefaultExpansionTerminalOpts

func DefaultExpansionTerminalOpts() *TerminalOpts

DefaultExpansionTerminalOpts returns the default terminal options for an expansion terminal.

func DefaultHomeHubTerminalOpts

func DefaultHomeHubTerminalOpts() *TerminalOpts

DefaultHomeHubTerminalOpts returns the default terminal options for a crane terminal used for the home hub.

func (*TerminalOpts) Check

func (opts *TerminalOpts) Check(useDefaultsForRequired bool) *Error

Check checks if terminal options are valid.

func (*TerminalOpts) Pack

func (opts *TerminalOpts) Pack() (*container.Container, *Error)

Pack serialized the terminal options and checks if they are valid.

type TestTerminal

type TestTerminal struct {
	*TerminalBase
}

TestTerminal is a terminal for running tests.

func NewSimpleTestTerminalPair

func NewSimpleTestTerminalPair(delay time.Duration, delayQueueSize int, opts *TerminalOpts) (a, b *TestTerminal, err error)

NewSimpleTestTerminalPair provides a simple conntected terminal pair for tests.

func (*TestTerminal) HandleAbandon

func (t *TestTerminal) HandleAbandon(err *Error) (errorToSend *Error)

HandleAbandon gives the terminal the ability to cleanly shut down. The returned error is the error to send to the other side. Should never be called directly. Call Abandon() instead.

type Upstream

type Upstream interface {
	Send(msg *Msg, timeout time.Duration) *Error
}

Upstream defines the interface for upstream (parent) components.

type UpstreamSendFunc

type UpstreamSendFunc func(msg *Msg, timeout time.Duration) *Error

UpstreamSendFunc is a helper to be able to satisfy the Upstream interface.

func (UpstreamSendFunc) Send

func (fn UpstreamSendFunc) Send(msg *Msg, timeout time.Duration) *Error

Send is used to send a message through this upstream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL