ingest

package
v3.8.30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: BSD-2-Clause, BSD-2-Clause Imports: 37 Imported by: 3

README

The Gravwell Ingest API

API documentation: https://pkg.go.dev/github.com/gravwell/gravwell/v3/ingest

This package provides methods to build ingesters for the Gravwell analytics platform. Ingesters take raw data from a particular source (pcap, log files, a camera, etc.), bundle it into Entries, and ship the Entries up to the Gravwell indexers.

An Entry (defined in the sub-package github.com/gravwell/gravwell/v3/ingest/entry) looks like this:

type Entry struct {
    TS   Timestamp
    SRC  net.IP
    Tag  EntryTag
    Data []byte
}

The most important element is Data; this is simply a discrete piece of information you want to store as an entry, be it binary or text. The Tag field associates the entry with a specific tag in the indexer, making it easier to search later. The timestamp and source IP give additional information about the entry.

Entries are sent to Gravwell indexers using an IngestMuxer, which can connect to multiple destinations simultaneously to improve ingestion rate. The example below (also found in the file ingester_example) shows the basics of getting Entries into Gravwell using an IngestMuxer. See github.com/gravwell/gravwell/ingesters for open-source real-world examples.

package main

import (
	"fmt"
	"log"
	"net"
	"time"

	"github.com/gravwell/gravwell/v3/ingest"
	"github.com/gravwell/gravwell/v3/ingest/entry"
)

var (
	tags    = []string{"test"}
	targets = []string{"tcp://127.0.0.1:4023"}
	secret  = "IngestSecrets" // set this to the actual ingest secret
)

// Example demonstrates how to write a simple ingester, which generates
// and writes some entries to Gravwell
func main() {
	// Configure the ingester
	ingestConfig := ingest.UniformMuxerConfig{
		Destinations: targets,
		Tags:         tags,
		Auth:         secret,
		PublicKey:    ``,
		PrivateKey:   ``,
		LogLevel:     "WARN",
	}

	// Start the ingester
	igst, err := ingest.NewUniformMuxer(ingestConfig)
	if err != nil {
		log.Fatalf("Failed build our ingest system: %v\n", err)
	}
	defer igst.Close()
	if err := igst.Start(); err != nil {
		log.Fatalf("Failed start our ingest system: %v\n", err)
	}

	// Wait for connection to indexers
	if err := igst.WaitForHot(0); err != nil {
		log.Fatalf("Timedout waiting for backend connections: %v\n", err)
	}

	// Generate and send some entries
	tag, err := igst.GetTag("test")
	if err != nil {
		log.Fatalf("Failed to get tag: %v", err)
	}
	var src net.IP
	if src, err = igst.SourceIP(); err != nil {
		log.Fatalf("failed to get source IP: %v", err)
	}
	for i := 0; i < 100; i++ {
		e := &entry.Entry{
			TS:   entry.Now(),
			SRC:  src,
			Tag:  tag,
			Data: []byte(fmt.Sprintf("test entry %d", i)),
		}
		if err := igst.WriteEntry(e); err != nil {
			log.Printf("Failed to write entry: %v", err)
			break
		}
	}

	// Now shut down
	if err := igst.Sync(time.Second); err != nil {
		log.Printf("Failed to sync: %v", err)
	}
	igst.Close()
}

Documentation

Overview

Example (Simplest)

SimplestExample is the simplest possible example of ingesting a single Entry.

/*************************************************************************
 * Copyright 2017 Gravwell, Inc. All rights reserved.
 * Contact: <legal@gravwell.io>
 *
 * This software may be modified and distributed under the terms of the
 * BSD 2-clause license. See the LICENSE file for details.
 **************************************************************************/

package main

import (
	"log"
	"net"

	"github.com/gravwell/gravwell/v3/ingest"
	"github.com/gravwell/gravwell/v3/ingest/entry"
)

var (
	dst          = "tcp://127.0.0.1:4023"
	sharedSecret = "IngestSecrets"
	simple_tags  = []string{"testtag"}
)

// SimplestExample is the simplest possible example of ingesting a single Entry.
func main() {
	// Get an IngestConnection
	igst, err := ingest.InitializeConnection(dst, sharedSecret, simple_tags, "", "", false)
	if err != nil {
		log.Fatalf("Couldn't open connection to ingester: %v", err)
	}
	defer igst.Close()

	// We need to get the numeric value for the tag we're using
	tagid, ok := igst.GetTag(simple_tags[0])
	if !ok {
		log.Fatal("couldn't look up tag")
	}

	// Now we'll create an Entry
	ent := entry.Entry{
		TS:   entry.Now(),
		SRC:  net.ParseIP("127.0.0.1"),
		Tag:  tagid,
		Data: []byte("This is my test data!"),
	}

	// And finally write the Entry
	igst.WriteEntry(&ent)
}
Output:

Index

Examples

Constants

View Source
const (
	//MAJOR API VERSIONS should always be compatible, there just may be additional features
	API_VERSION_MAJOR uint32 = 0
	API_VERSION_MINOR uint32 = uint32(VERSION)
)
View Source
const (
	// The number of times to hash the shared secret
	HASH_ITERATIONS uint16 = 16
	// Auth protocol version number
	VERSION uint16 = 0x8
	// Authenticated, but not ready for ingest
	STATE_AUTHENTICATED uint32 = 0xBEEF42
	// Not authenticated
	STATE_NOT_AUTHENTICATED uint32 = 0xFEED51
	// Authenticated and ready for ingest
	STATE_HOT uint32 = 0xCAFE54

	// Minimum auth version supporting tenants
	MinTenantAuthVersion uint16 = 0x7
	MaxTenantNameLength  uint16 = 512 //maximum length of a tenant name in bytes
	SystemTenant         string = ``  // blank string, basically the root/system/infrastructure user

)
View Source
const (
	READ_BUFFER_SIZE int = 4 * 1024 * 1024
	//TODO - we should really discover the MTU of the link and use that
	ACK_WRITER_BUFFER_SIZE int = (ackEncodeSize * MAX_UNCONFIRMED_COUNT)
	ACK_WRITER_CAN_WRITE   int = (ackEncodeSize * (MAX_UNCONFIRMED_COUNT / 2))
)
View Source
const (
	ACK_SIZE int = 12 //ackmagic + entrySendID
	//READ_ENTRY_HEADER_SIZE should be 46 bytes
	//34 + 4 + 4 + 8 (magic, data len, entry ID)
	READ_ENTRY_HEADER_SIZE int = entry.ENTRY_HEADER_SIZE + 12
	MAX_ENTRY_SIZE         int = int(entry.MaxDataSize)
	//TODO: We should make this configurable by configuration
	WRITE_BUFFER_SIZE           int           = 1024 * 1024
	MAX_WRITE_ERROR             int           = 4
	BUFFERED_ACK_READER_SIZE    int           = ACK_SIZE * MAX_UNCONFIRMED_COUNT
	CLOSING_SERVICE_ACK_TIMEOUT time.Duration = 3 * time.Second

	MAX_UNCONFIRMED_COUNT int = 1024 * 4

	MINIMUM_TAG_RENEGOTIATE_VERSION uint16 = 0x2 // minimum server version to renegotiate tags
	MINIMUM_ID_VERSION              uint16 = 0x3 // minimum server version to send ID info
	MINIMUM_INGEST_OK_VERSION       uint16 = 0x4 // minimum server version to ask
	MINIMUM_DYN_CONFIG_VERSION      uint16 = 0x5 // minimum server version to send dynamic config block
	MINIMUM_INGEST_STATE_VERSION    uint16 = 0x6 // minimum server version to send detailed ingester state messages
	MINIMUM_INGEST_EV_VERSION       uint16 = 0x8 // minimum server version to send enumerated values attached to entries

)
View Source
const (
	CacheModeAlways = `always`
	CacheModeFail   = `fail`
)
View Source
const (
	KB       uint64  = 1024
	MB       uint64  = 1024 * KB
	GB       uint64  = 1024 * MB
	TB       uint64  = 1024 * GB
	PB       uint64  = 1024 * TB
	YB       uint64  = 1024 * PB
	K                = 1000.0
	M                = K * 1000.0
	G                = M * 1000.0
	T                = G * 1000.0
	P                = G * 1000.0
	Y                = P * 1000.0
	NsPerSec float64 = 1000000000.0
)
View Source
const (
	MIN_REMOTE_KEYSIZE int           = 16
	DIAL_TIMEOUT       time.Duration = (1 * time.Second)
	CHANNEL_BUFFER     int           = 4096
	DEFAULT_TLS_PORT   int           = 4024
	DEFAULT_CLEAR_PORT int           = 4023
	DEFAULT_PIPE_PATH  string        = "/opt/gravwell/comms/pipe"
	MAX_TAG_LENGTH     int           = 4096 //a 4KB tagname is a short story...

	FORBIDDEN_TAG_SET string = "!@#$%^&*()=+<>,.:;`\"'{[}]|\\ 	" // DEPRECATED - includes space and tab characters at the end
)
View Source
const (
	//This MUST be > 1 or the universe explodes
	//no matter what a user requests, this is the maximum
	//basically a sanity check
	ABSOLUTE_MAX_UNCONFIRMED_WRITES int = 0xffff
)

Variables

View Source
var (
	ErrInvalidBuffer            = errors.New("invalid buffer")
	ErrInvalidIngestStateHeader = errors.New("Invalid ingest state header")
	ErrOversizedConfigBlock     = errors.New("configuration block too large")
	ErrEmptyConfigBlock         = errors.New("configuration block empty")
)
View Source
var (
	ErrInvalidStateResponseLen = errors.New("Invalid state response length")
	ErrInvalidTagRequestLen    = errors.New("Invalid tag request length")
	ErrInvalidTagResponseLen   = errors.New("Invalid tag response length")
	ErrFailedAuthHashGen       = errors.New("Failed to generate authentication hash")
	ErrFailedAuth              = errors.New("Failed authentication, bad secret")
	ErrFailedTagNegotiation    = errors.New("Failed to negotiate tags")
	ErrShortRead               = errors.New("Failed to read complete buffer")
	ErrShortWrite              = errors.New("Failed to write complete buffer")
	ErrInvalidAuthVersion      = errors.New("auth version is invalid")
	ErrInvalidTenantName       = errors.New("auth tenant name is invalid")
	ErrNilChallengeResponse    = errors.New("Got a nil challenge response")
	ErrTenantAuthUnsupported   = errors.New("authentication endpoint does not support tenants")
)
View Source
var (
	ErrEmptyTag     = errors.New("Tag name is empty")
	ErrOversizedTag = errors.New("Tag name is too long")
	ErrForbiddenTag = errors.New("Forbidden character in tag")
)
View Source
var (
	ErrAllConnsDown          = errors.New("All connections down")
	ErrNotRunning            = errors.New("Not running")
	ErrNotReady              = errors.New("Not ready to start")
	ErrTagNotFound           = errors.New("Tag not found")
	ErrTagMapInvalid         = errors.New("Tag map invalid")
	ErrNoTargets             = errors.New("No connections specified")
	ErrConnectionTimeout     = errors.New("Connection timeout")
	ErrSyncTimeout           = errors.New("Sync timeout")
	ErrEmptyAuth             = errors.New("Ingest key is empty")
	ErrEmergencyListOverflow = errors.New("Emergency list overflow")
	ErrTimeout               = errors.New("Timed out waiting for ingesters")
	ErrWriteTimeout          = errors.New("Timed out waiting to write entry")
	ErrInvalidEntry          = errors.New("Invalid entry value")
)
View Source
var (
	ErrFailedParseLocalIP    = errors.New("Failed to parse the local address")
	ErrMalformedDestination  = errors.New("Malformed destination string")
	ErrInvalidCerts          = errors.New("Failed to get certificates")
	ErrInvalidDest           = errors.New("Invalid destination")
	ErrInvalidRemoteKeySize  = errors.New("Invalid remote keysize")
	ErrInvalidConnectionType = errors.New("Invalid connection type")
	ErrInvalidSecret         = errors.New("Failed to login, invalid secret")
)
View Source
var (
	ErrUnderFill       = errors.New("short cryptographic buffer read")
	ErrBadTagCharacter = errors.New("Bad tag remap character")
)
View Source
var (
	ErrInvalidCount = errors.New("invalid buffer size")
)
View Source
var (
	ErrOversizedEntry = errors.New("Entry data exceeds maximum size")
)

Functions

func CheckTag

func CheckTag(tag string) error

CheckTag takes a tag name and returns an error if it contains any characters which are not allowed in tags.

func ConnectionType

func ConnectionType(dst string) (string, string, error)

ConnectionType cracks out the type of connection and returns its type, the target, and/or an error

func EnableKeepAlive added in v3.8.30

func EnableKeepAlive(c net.Conn, period time.Duration)

EnableTCPKeepAlive enables TCP KeepAlive on the given connection, if it's a compatible connection type. If it is not, no action is taken.

func HumanCount

func HumanCount(b uint64) string

HumanCount will take a number and return an appropriately-scaled string, e.g. HumanCount(12500) will return "12.50 K"

func HumanEntryRate

func HumanEntryRate(b uint64, dur time.Duration) string

HumanEntryRate will take an entry count and duration and produce a human readable string in terms of entries per second. e.g. 2400 K entries /s

func HumanLineRate

func HumanLineRate(b uint64, dur time.Duration) string

HumanLineRate will take a byte count and duration and produce a human readable string in terms of bits. e.g. Megabits/s (Mbps)

func HumanRate

func HumanRate(b uint64, dur time.Duration) string

HumanRate will take a byte count and duration and produce a human readable string showing data per second. e.g. Megabytes/s (MB/s)

func HumanSize

func HumanSize(b uint64) string

HumanSize will take a byte count and duration and produce a human readable string showing data per second. e.g. Megabytes/s (MB/s)

func NewInsecureRNG added in v3.8.14

func NewInsecureRNG() *rand.Rand

func NewLockedSource added in v3.8.14

func NewLockedSource(src rand.Source) rand.Source

func NewRNG added in v3.8.14

func NewRNG() (*rand.Rand, error)

func PrintVersion

func PrintVersion(wtr io.Writer)

func RemapTag added in v3.8.19

func RemapTag(tag string, rchar rune) (rtag string, err error)

RemapTag takes a proposed tag string and remaps any forbidden characters to the provided character. err is set if the rchar is forbidden or the resulting tag is not valid.

func SecureSeed added in v3.8.14

func SecureSeed() (int64, error)

func VerifyResponse

func VerifyResponse(auth AuthHash, chal Challenge, resp ChallengeResponse) error

VerifyResponse takes a hash and challenge and computes a completed response. If the computed response does not match an error is returned. If the response matches, nil is returned

Types

type AuthHash

type AuthHash [16]byte

AuthHash represents a hashed shared secret.

func GenAuthHash

func GenAuthHash(password string) (AuthHash, error)

GenAuthHash takes a key and generates a hash using the "password" token we iterate over the value, hashing with MD5 and SHA256. We choose these two algorithms because they aren't too heavy, but the alternating makes it very difficult to optimize in an FPGA or ASIC.

type Challenge

type Challenge struct {
	// Number of times to iterate the hash
	Iterate uint16
	// The random number to be hashed with the secret
	RandChallenge [32]byte
	// Authentication version number
	Version uint16
}

Challenge request, used to validate remote clients. The server generates RandChallenge, a random number which is hashed with the pre-hashed shared secret, then run through Iterate iterations of md5 and sha256 to create the response.

func NewChallenge

func NewChallenge(auth AuthHash) (Challenge, error)

NewChallenge generates a random hash string and a random iteration count

func (*Challenge) Read

func (c *Challenge) Read(r io.Reader) error

Read the challenge from reader

func (*Challenge) Write

func (c *Challenge) Write(w io.Writer) error

Write out the challenge to w

type ChallengeResponse

type ChallengeResponse struct {
	Response [32]byte
	Version  uint16
	Tenant   string
}

ChallengeResponse is the resulting hash sent back as part of the challenge/response process.

func GenerateResponse

func GenerateResponse(auth AuthHash, ch Challenge) (resp *ChallengeResponse, err error)

GenerateResponse creates a ChallengeResponse based on the Challenge and AuthHash

func (*ChallengeResponse) Read

func (cr *ChallengeResponse) Read(r io.Reader) error

Decode the ChallengeResponse from the reader

func (*ChallengeResponse) Write

func (cr *ChallengeResponse) Write(w io.Writer) error

Write the challenge response to the writer

type CircularIndex added in v3.8.5

type CircularIndex struct {
	// contains filtered or unexported fields
}

re-usable circular buffer index

func NewCircularIndex added in v3.8.5

func NewCircularIndex(sz uint) (ci *CircularIndex, err error)

func (*CircularIndex) Add added in v3.8.5

func (cb *CircularIndex) Add() (idx uint)

func (*CircularIndex) Count added in v3.8.5

func (cb *CircularIndex) Count() uint

func (*CircularIndex) Free added in v3.8.5

func (cb *CircularIndex) Free() uint

func (*CircularIndex) Pop added in v3.8.5

func (cb *CircularIndex) Pop() (idx uint, ok bool)

func (*CircularIndex) Size added in v3.8.5

func (cb *CircularIndex) Size() uint

type CompressionType

type CompressionType uint8
const (
	CompressNone   CompressionType = 0
	CompressSnappy CompressionType = 0x10
)

func ParseCompression

func ParseCompression(v string) (ct CompressionType, err error)

type EntryBuffer added in v3.8.5

type EntryBuffer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewEntryBuffer added in v3.8.5

func NewEntryBuffer(sz uint) (stb *EntryBuffer, err error)

func (*EntryBuffer) Add added in v3.8.5

func (stb *EntryBuffer) Add(ste entry.Entry)

func (*EntryBuffer) AddBlock added in v3.8.5

func (stb *EntryBuffer) AddBlock(stes []entry.Entry)

func (*EntryBuffer) Count added in v3.8.5

func (stb *EntryBuffer) Count() (r uint)

func (*EntryBuffer) Drain added in v3.8.5

func (stb *EntryBuffer) Drain() (r []entry.Entry)

func (*EntryBuffer) Free added in v3.8.5

func (stb *EntryBuffer) Free() (r uint)

func (*EntryBuffer) Pop added in v3.8.5

func (stb *EntryBuffer) Pop() (r entry.Entry, ok bool)

func (*EntryBuffer) PopBlock added in v3.8.5

func (stb *EntryBuffer) PopBlock(max uint) (r []entry.Entry)

func (*EntryBuffer) Size added in v3.8.5

func (stb *EntryBuffer) Size() (r uint)

type EntryReader

type EntryReader struct {
	// contains filtered or unexported fields
}

func NewEntryReader

func NewEntryReader(conn net.Conn) (*EntryReader, error)

func NewEntryReaderEx

func NewEntryReaderEx(cfg EntryReaderWriterConfig) (*EntryReader, error)

func (*EntryReader) AddIngesterStateCallback added in v3.6.1

func (er *EntryReader) AddIngesterStateCallback(f IngesterStateCallback)

AddIngesterStateCallback registers a callback function which will be called every time the EntryReader reads an IngesterState message from the client. Calling AddIngesterStateCallback multiple times will add additional callbacks to the list. Warning: If a callback hangs, the entire entry reader will hang.

func (*EntryReader) Close

func (er *EntryReader) Close() error

func (*EntryReader) ConfigureStream

func (er *EntryReader) ConfigureStream() (err error)

configureStream will

func (*EntryReader) GetIngesterAPIVersion

func (er *EntryReader) GetIngesterAPIVersion() uint16

func (*EntryReader) GetIngesterInfo

func (er *EntryReader) GetIngesterInfo() (string, string, string)

func (*EntryReader) GetIngesterState added in v3.6.0

func (er *EntryReader) GetIngesterState() (is IngesterState)

GetIngesterState returns the most recent state object received from the ingester.

func (*EntryReader) IngestOK

func (er *EntryReader) IngestOK(ok bool) (err error)

IngestOK waits for the ingester to send an INGEST_OK message and responds with the argument given. Any other command will make it return an error

func (*EntryReader) Read

func (er *EntryReader) Read() (e *entry.Entry, err error)

func (*EntryReader) SendThrottle

func (er *EntryReader) SendThrottle(d time.Duration) error

func (*EntryReader) SetTagManager

func (er *EntryReader) SetTagManager(tm TagManager)

SetTagManager gives a handle on the instantiator's tag management system. If this is not set, tags cannot be negotiated on the fly

func (*EntryReader) SetupConnection

func (er *EntryReader) SetupConnection() (err error)

SetupConnection negotiations ingester API version and other information It should properly handle old ingesters, too

func (*EntryReader) Start

func (er *EntryReader) Start() error

type EntryReaderWriterConfig

type EntryReaderWriterConfig struct {
	Conn                  net.Conn
	OutstandingEntryCount int
	BufferSize            int
	Timeout               time.Duration
	TagMan                TagManager
}

type EntryWriter

type EntryWriter struct {
	// contains filtered or unexported fields
}

func NewEntryWriter

func NewEntryWriter(conn net.Conn) (*EntryWriter, error)

func NewEntryWriterEx

func NewEntryWriterEx(cfg EntryReaderWriterConfig) (*EntryWriter, error)

func (*EntryWriter) Ack

func (ew *EntryWriter) Ack() error

Ack will block waiting for at least one ack to free up a slot for sending

func (*EntryWriter) Close

func (ew *EntryWriter) Close() (err error)

func (*EntryWriter) ConfigureStream

func (ew *EntryWriter) ConfigureStream(c StreamConfiguration) (err error)

configureStream will

func (*EntryWriter) ForceAck

func (ew *EntryWriter) ForceAck() error

func (*EntryWriter) IdentifyIngester

func (ew *EntryWriter) IdentifyIngester(name string, version string, id string) (err error)

func (*EntryWriter) IngestOK

func (ew *EntryWriter) IngestOK() (ok bool, err error)

func (*EntryWriter) NegotiateTag

func (ew *EntryWriter) NegotiateTag(name string) (tg entry.EntryTag, err error)

func (*EntryWriter) OpenSlots

func (ew *EntryWriter) OpenSlots(ent *entry.Entry) int

OpenSlots informs the caller how many slots are available before we must service acks. This is used for mostly in a multiplexing system where we want to know how much we can write before we need to service acks and move on.

func (EntryWriter) OptimalBatchWriteSize

func (ew EntryWriter) OptimalBatchWriteSize() int

func (*EntryWriter) OverrideAckTimeout

func (ew *EntryWriter) OverrideAckTimeout(t time.Duration) error

func (*EntryWriter) Ping

func (ew *EntryWriter) Ping() (err error)

Ping is essentially a force ack, we send a PING command, which will cause the server to flush all acks and a PONG command. We read until we get the PONG

func (*EntryWriter) SendIngesterAPIVersion

func (ew *EntryWriter) SendIngesterAPIVersion() (err error)

func (*EntryWriter) SendIngesterState added in v3.6.0

func (ew *EntryWriter) SendIngesterState(state IngesterState) (err error)

SendIngesterState sends a whole lot of information to the indexer about the state of the ingester.

func (*EntryWriter) Write

func (ew *EntryWriter) Write(ent *entry.Entry) error

Write expects to have exclusive control over the entry and all its buffers from the period of write and forever after. This is because it needs to be able to resend the entry if it fails to confirm. If a buffer is re-used and the entry fails to confirm we will send the new modified buffer which may not have the original data.

func (*EntryWriter) WriteBatch

func (ew *EntryWriter) WriteBatch(ents [](*entry.Entry)) (int, error)

WriteBatch takes a slice of entries and writes them, this function is useful in multithreaded environments where we want to lessen the impact of hits on a channel by threads

func (*EntryWriter) WriteSync

func (ew *EntryWriter) WriteSync(ent *entry.Entry) error

func (*EntryWriter) WriteWithHint

func (ew *EntryWriter) WriteWithHint(ent *entry.Entry) (bool, error)

WriteWithHint behaves exactly like Write but also returns a bool which indicates whether or not the flush was required. This function method is primarily used when muxing across multiple indexers, so the muxer knows when to transition to the next indexer

type IngestCommand

type IngestCommand uint32
const (
	//ingester commands
	INVALID_MAGIC                IngestCommand = 0x00000000
	NEW_ENTRY_MAGIC              IngestCommand = 0xC7C95ACB
	FORCE_ACK_MAGIC              IngestCommand = 0x1ADF7350
	CONFIRM_ENTRY_MAGIC          IngestCommand = 0xF6E0307E
	THROTTLE_MAGIC               IngestCommand = 0xBDEACC1E
	PING_MAGIC                   IngestCommand = 0x88770001
	PONG_MAGIC                   IngestCommand = 0x88770008
	TAG_MAGIC                    IngestCommand = 0x18675300
	CONFIRM_TAG_MAGIC            IngestCommand = 0x18675301
	ERROR_TAG_MAGIC              IngestCommand = 0x18675302
	ID_MAGIC                     IngestCommand = 0x22793400
	CONFIRM_ID_MAGIC             IngestCommand = 0x22793401
	API_VER_MAGIC                IngestCommand = 0x22334400
	CONFIRM_API_VER_MAGIC        IngestCommand = 0x22334401
	INGEST_OK_MAGIC              IngestCommand = 0x33445500
	CONFIRM_INGEST_OK_MAGIC      IngestCommand = 0x33445501
	INGESTER_STATE_MAGIC         IngestCommand = 0x44556600
	CONFIRM_INGESTER_STATE_MAGIC IngestCommand = 0x44556601
)

func (IngestCommand) Buff

func (ic IngestCommand) Buff() (b []byte)

func (IngestCommand) String

func (ic IngestCommand) String() string

type IngestConnection deprecated

type IngestConnection struct {
	// contains filtered or unexported fields
}

IngestConnection is a lower-level interface for connecting to and communicating with a single indexer. It is kept public for compatibility, but should not be used in new projects.

Deprecated: Use the IngestMuxer instead.

func InitializeConnection deprecated

func InitializeConnection(dst, authString string, tags []string, pubKey, privKey string, verifyRemoteKey bool) (*IngestConnection, error)

InitializeConnection is a simple wrapper to get a line to an ingester. callers can just call this function and get back a hot ingest connection We take care of establishing the connection and shuttling auth around

Deprecated: Use the IngestMuxer instead.

func NewPipeConnection deprecated

func NewPipeConnection(dst string, auth AuthHash, tags []string) (*IngestConnection, error)

This function will create a new NamedPipe connection to a local system. We have NO WAY of knowing which process is REALLY on the other end of the pipe. But it is assumed that gravwell will be running with highly limited privileges, so if the integrity of the local system is compromised, its already over.

Deprecated: Use the IngestMuxer instead.

func NewTCPConnection deprecated

func NewTCPConnection(dst string, auth AuthHash, tags []string) (*IngestConnection, error)

This function will create a new cleartext TCP connection to a remote system. No verification of the server is performed AT ALL. All traffic is snoopable and modifiable. If someone has control of the network, they will be able to inject and monitor this traffic.

dst: should be a address:port pair. For example "ingest.gravwell.com:4042" or "10.0.0.1:4042"

Deprecated: Use the IngestMuxer instead.

func NewTLSConnection deprecated

func NewTLSConnection(dst string, auth AuthHash, certs *TLSCerts, verify bool, tags []string) (*IngestConnection, error)

NewTLSConnection will create a new connection to a remote system using a secure TLS tunnel. If the remotePubKey in TLSCerts is set, we will verify the public key of the remote server and bail if it doesn't match. This is a basic MitM protection. This requires that we HAVE the remote public key, getting that will be done else where.

Deprecated: Use the IngestMuxer instead.

func (*IngestConnection) Close

func (igst *IngestConnection) Close() error

func (*IngestConnection) GetTag

func (igst *IngestConnection) GetTag(name string) (entry.EntryTag, bool)

func (*IngestConnection) IdentifyIngester

func (igst *IngestConnection) IdentifyIngester(name, version, id string) (err error)

func (*IngestConnection) IngestOK

func (igst *IngestConnection) IngestOK() (ok bool, err error)

IngestOK asks the indexer if it is ok to start sending entries yet.

func (*IngestConnection) NegotiateTag

func (igst *IngestConnection) NegotiateTag(name string) (tg entry.EntryTag, err error)

func (*IngestConnection) Running

func (igst *IngestConnection) Running() bool

func (*IngestConnection) SendIngesterState added in v3.6.0

func (igst *IngestConnection) SendIngesterState(state IngesterState) error

func (*IngestConnection) Source

func (igst *IngestConnection) Source() (net.IP, error)

func (*IngestConnection) String

func (igst *IngestConnection) String() (s string)

func (*IngestConnection) Sync

func (igst *IngestConnection) Sync() error
Sync causes the entry writer to force an ack from the server.  This ensures that all

* entries that have been written are flushed and fully acked by the server.

func (*IngestConnection) Write

func (igst *IngestConnection) Write(ts entry.Timestamp, tag entry.EntryTag, data []byte) error

func (*IngestConnection) WriteBatchEntry

func (igst *IngestConnection) WriteBatchEntry(ents []*entry.Entry) (err error)

WriteBatchEntry DOES NOT populate the source on write, the caller must do so

func (*IngestConnection) WriteEntry

func (igst *IngestConnection) WriteEntry(ent *entry.Entry) error

func (*IngestConnection) WriteEntrySync

func (igst *IngestConnection) WriteEntrySync(ent *entry.Entry) error

type IngestLogger

type IngestLogger interface {
	Errorf(string, ...interface{}) error
	Warnf(string, ...interface{}) error
	Infof(string, ...interface{}) error
	Error(string, ...rfc5424.SDParam) error
	Warn(string, ...rfc5424.SDParam) error
	Info(string, ...rfc5424.SDParam) error
}

type IngestMuxer

type IngestMuxer struct {
	// contains filtered or unexported fields
}

func NewIngestMuxer

func NewIngestMuxer(dests []Target, tags []string, pubKey, privKey string) (*IngestMuxer, error)

func NewIngestMuxerExt

func NewIngestMuxerExt(dests []Target, tags []string, pubKey, privKey string, cacheDepth int) (*IngestMuxer, error)

func NewMuxer

func NewMuxer(c MuxerConfig) (*IngestMuxer, error)

func NewUniformIngestMuxer

func NewUniformIngestMuxer(dests, tags []string, authString, pubKey, privKey, remoteKey string) (*IngestMuxer, error)

NewIngestMuxer creates a new muxer that will automatically distribute entries amongst the clients

func NewUniformIngestMuxerExt

func NewUniformIngestMuxerExt(dests, tags []string, authString, pubKey, privKey, remoteKey string, cacheDepth int) (*IngestMuxer, error)

func NewUniformMuxer

func NewUniformMuxer(c UniformMuxerConfig) (*IngestMuxer, error)

func (*IngestMuxer) Close

func (im *IngestMuxer) Close() error

Close the connection

func (*IngestMuxer) Dead

func (im *IngestMuxer) Dead() (int, error)

Dead returns how many connections are currently dead

func (*IngestMuxer) Error

func (im *IngestMuxer) Error(msg string, args ...rfc5424.SDParam) error

Error send an error entry down the line with the gravwell tag

func (*IngestMuxer) Errorf added in v3.8.4

func (im *IngestMuxer) Errorf(format string, args ...interface{}) error

Errorf send an error entry down the line with the gravwell tag

func (*IngestMuxer) GetTag

func (im *IngestMuxer) GetTag(tag string) (tg entry.EntryTag, err error)

GetTag pulls back an intermediary tag id the intermediary tag has NO RELATION to the backend servers tag mapping it is used to speed along tag mappings

func (*IngestMuxer) Hot

func (im *IngestMuxer) Hot() (int, error)

Hot returns how many connections are functioning

func (*IngestMuxer) Info

func (im *IngestMuxer) Info(msg string, args ...rfc5424.SDParam) error

func (*IngestMuxer) Infof added in v3.8.4

func (im *IngestMuxer) Infof(format string, args ...interface{}) error

func (*IngestMuxer) KnownTags added in v3.4.0

func (im *IngestMuxer) KnownTags() (tgs []string)

KnownTags will return a string slice of tags that the muxer actively knows about

func (*IngestMuxer) LookupTag

func (im *IngestMuxer) LookupTag(tg entry.EntryTag) (name string, ok bool)

LookupTag will reverse a tag id into a name, this operation is more expensive than a straight lookup Users that expect to translate a tag repeatedly should maintain their own tag map

func (*IngestMuxer) NegotiateTag

func (im *IngestMuxer) NegotiateTag(name string) (tg entry.EntryTag, err error)

NegotiateTag will attempt to lookup a tag name in the negotiated set The the tag name has not already been negotiated, the muxer will contact each indexer and negotiate it. This call can potentially block and fail

func (*IngestMuxer) RegisterChild added in v3.6.0

func (im *IngestMuxer) RegisterChild(k string, v IngesterState)

func (*IngestMuxer) SetMetadata added in v3.6.0

func (im *IngestMuxer) SetMetadata(obj interface{}) (err error)

func (*IngestMuxer) SetRawConfiguration added in v3.6.0

func (im *IngestMuxer) SetRawConfiguration(obj interface{}) (err error)

func (*IngestMuxer) Size

func (im *IngestMuxer) Size() (int, error)

Size returns the total number of specified connections, hot or dead

func (*IngestMuxer) SourceIP

func (im *IngestMuxer) SourceIP() (net.IP, error)

SourceIP is a convenience function used to pull back a source value

func (*IngestMuxer) Start

func (im *IngestMuxer) Start() error

Start starts the connection process. This will return immediately, and does not mean that connections are ready. Callers should call WaitForHot immediately after to wait for the connections to be ready.

func (*IngestMuxer) Sync

func (im *IngestMuxer) Sync(to time.Duration) error

func (*IngestMuxer) SyncContext

func (im *IngestMuxer) SyncContext(ctx context.Context, to time.Duration) error

func (*IngestMuxer) UnregisterChild added in v3.6.0

func (im *IngestMuxer) UnregisterChild(k string)

func (*IngestMuxer) WaitForHot

func (im *IngestMuxer) WaitForHot(to time.Duration) error

WaitForHot waits until at least one connection goes into the hot state The timeout duration parameter is an optional timeout, if zero, it waits indefinitely

func (*IngestMuxer) WaitForHotContext

func (im *IngestMuxer) WaitForHotContext(ctx context.Context, to time.Duration) error

func (*IngestMuxer) Warn

func (im *IngestMuxer) Warn(msg string, args ...rfc5424.SDParam) error

func (*IngestMuxer) Warnf added in v3.8.4

func (im *IngestMuxer) Warnf(format string, args ...interface{}) error

func (*IngestMuxer) WillBlock added in v3.8.29

func (im *IngestMuxer) WillBlock() bool

returns true if a write to the muxer will block

func (*IngestMuxer) Write

func (im *IngestMuxer) Write(tm entry.Timestamp, tag entry.EntryTag, data []byte) error

Write puts together the arguments to create an entry and writes it to the queue to be sent out by the first available entry writer routine, if all routines are dead, THIS WILL BLOCK once the channel fills up. We figure this is a natural "wait" mechanism

func (*IngestMuxer) WriteBatch

func (im *IngestMuxer) WriteBatch(b []*entry.Entry) error

WriteBatch puts a slice of entries into the queue to be sent out by the first available entry writer routine. The entry writer routines will consume the entire slice, so extremely large slices will go to a single indexer.

func (*IngestMuxer) WriteBatchContext

func (im *IngestMuxer) WriteBatchContext(ctx context.Context, b []*entry.Entry) error

WriteBatchContext puts a slice of entries into the queue to be sent out by the first available entry writer routine. The entry writer routines will consume the entire slice, so extremely large slices will go to a single indexer. if a cancellation context isn't needed, use WriteBatch

func (*IngestMuxer) WriteContext

func (im *IngestMuxer) WriteContext(ctx context.Context, tm entry.Timestamp, tag entry.EntryTag, data []byte) error

WriteContext puts together the arguments to create an entry and writes it to the queue to be sent out by the first available entry writer routine, if all routines are dead, THIS WILL BLOCK once the channel fills up. We figure this is a natural "wait" mechanism if the context isn't needed use Write instead

func (*IngestMuxer) WriteEntry

func (im *IngestMuxer) WriteEntry(e *entry.Entry) error

WriteEntry puts an entry into the queue to be sent out by the first available entry writer routine, if all routines are dead, THIS WILL BLOCK once the channel fills up. We figure this is a natural "wait" mechanism

func (*IngestMuxer) WriteEntryContext

func (im *IngestMuxer) WriteEntryContext(ctx context.Context, e *entry.Entry) error

WriteEntryContext puts an entry into the queue to be sent out by the first available entry writer routine, if all routines are dead, THIS WILL BLOCK once the channel fills up. We figure this is a natural "wait" mechanism if not using a context, use WriteEntry as it is faster due to the lack of a select

func (*IngestMuxer) WriteEntryTimeout

func (im *IngestMuxer) WriteEntryTimeout(e *entry.Entry, d time.Duration) (err error)

WriteEntryTimeout attempts to put an entry into the queue to be sent out of the first available writer routine. This write is opportunistic and contains a timeout. It is therefor every expensive and shouldn't be used for normal writes The typical use case is via the gravwell_log calls

func (*IngestMuxer) WriteLog added in v3.8.5

func (im *IngestMuxer) WriteLog(ts time.Time, b []byte) error

WriteLog writes a log entry to the muxer, making IngestMuxer compatible with the log.Relay interface.

type IngesterState added in v3.6.0

type IngesterState struct {
	UUID          string
	Name          string
	Version       string
	Label         string
	IP            net.IP        //child IP, won't be populated unless in child
	Hostname      string        // whatever the ingester thinks its hostname is
	Entries       uint64        // How many entries the ingester has written
	Size          uint64        // How many bytes the ingester has written
	Uptime        time.Duration // Nanoseconds since the ingest muxer was initialized
	Tags          []string      // The tags registered with the ingester
	CacheState    string
	CacheSize     uint64
	LastSeen      time.Time
	Children      map[string]IngesterState
	Configuration json.RawMessage `json:",omitempty"`
	Metadata      json.RawMessage `json:",omitempty"`
}

func (IngesterState) Copy added in v3.8.8

func (s IngesterState) Copy() (r IngesterState)

Copy creates a deep copy of the ingester state, this is important when handing the data type off to a gob encoder if the server updates the ingester state when it is attempting to encode a state blob we could get a race where the internal map is updated while we are attempting to encode it, this would cause fault

func (IngesterState) MarshalJSON added in v3.8.17

func (s IngesterState) MarshalJSON() ([]byte, error)

func (*IngesterState) Read added in v3.6.0

func (s *IngesterState) Read(rdr io.Reader) (err error)

func (*IngesterState) Write added in v3.6.0

func (s *IngesterState) Write(wtr io.Writer) (err error)

type IngesterStateCallback added in v3.6.1

type IngesterStateCallback func(IngesterState)

type LockedSource added in v3.8.14

type LockedSource struct {
	// contains filtered or unexported fields
}

The implementation of this is actually in the Go stdlib, it's just not exported See math/rand/rand.go in the Go source tree.

func (*LockedSource) Int63 added in v3.8.14

func (r *LockedSource) Int63() (n int64)

func (*LockedSource) Seed added in v3.8.14

func (r *LockedSource) Seed(seed int64)

type Logger

type Logger interface {
	Infof(string, ...interface{}) error
	Warnf(string, ...interface{}) error
	Errorf(string, ...interface{}) error
	Info(string, ...rfc5424.SDParam) error
	Warn(string, ...rfc5424.SDParam) error
	Error(string, ...rfc5424.SDParam) error
	InfofWithDepth(int, string, ...interface{}) error
	WarnfWithDepth(int, string, ...interface{}) error
	ErrorfWithDepth(int, string, ...interface{}) error
	InfoWithDepth(int, string, ...rfc5424.SDParam) error
	WarnWithDepth(int, string, ...rfc5424.SDParam) error
	ErrorWithDepth(int, string, ...rfc5424.SDParam) error
	Hostname() string
	Appname() string
}

func NoLogger

func NoLogger() Logger

type MuxerConfig

type MuxerConfig struct {
	config.IngestStreamConfig
	Destinations      []Target
	Tags              []string
	PublicKey         string
	PrivateKey        string
	VerifyCert        bool
	CacheDepth        int
	CachePath         string
	CacheSize         int
	CacheMode         string
	LogLevel          string // deprecated, no longer used
	Logger            Logger
	IngesterName      string
	IngesterVersion   string
	IngesterUUID      string
	IngesterLabel     string
	RateLimitBps      int64
	LogSourceOverride net.IP
	Attach            attach.AttachConfig
}

type StateResponse

type StateResponse struct {
	ID   uint32
	Info string
}

StateResponse

func (*StateResponse) Read

func (sr *StateResponse) Read(r io.Reader) error

Read reads a state response from the reader

func (*StateResponse) Write

func (sr *StateResponse) Write(w io.Writer) error

Write the StateResponse

type StreamConfiguration

type StreamConfiguration struct {
	Compression CompressionType
}

StreamConfiguration is a structure that can be sent back and

func (*StreamConfiguration) Read

func (c *StreamConfiguration) Read(rdr io.Reader) (err error)

func (StreamConfiguration) Write

func (c StreamConfiguration) Write(wtr io.Writer) (err error)

type TLSCerts

type TLSCerts struct {
	Cert tls.Certificate
}

type TagManager

type TagManager interface {
	GetAndPopulate(string) (entry.EntryTag, error)
}

type TagRequest

type TagRequest struct {
	Count uint32
	Tags  []string
}

TagRequest is used to request tags for the ingester

func (*TagRequest) Read

func (tr *TagRequest) Read(r io.Reader) error

Read a TagRequest structure

func (*TagRequest) Write

func (tr *TagRequest) Write(w io.Writer) error

Write a TagRequest

type TagResponse

type TagResponse struct {
	Count uint32
	Tags  map[string]entry.EntryTag
}

TagResponse represents the Tag Name to Tag Number mapping supported by the ingest server

func (*TagResponse) Read

func (tr *TagResponse) Read(r io.Reader) error

Read a TagResponse

func (*TagResponse) Write

func (tr *TagResponse) Write(w io.Writer) error

Write a TagResponse

type Target

type Target struct {
	Address string
	Tenant  string
	Secret  string
}

type TargetError

type TargetError struct {
	Address string
	Error   error
}

type UniformMuxerConfig

type UniformMuxerConfig struct {
	config.IngestStreamConfig
	Destinations      []string
	Tags              []string
	Tenant            string
	Auth              string
	PublicKey         string
	PrivateKey        string
	VerifyCert        bool
	CacheDepth        int
	CachePath         string
	CacheSize         int
	CacheMode         string
	LogLevel          string // deprecated, no longer used
	Logger            Logger
	IngesterName      string
	IngesterVersion   string
	IngesterUUID      string
	IngesterLabel     string
	RateLimitBps      int64
	LogSourceOverride net.IP
	Attach            attach.AttachConfig
}

Directories

Path Synopsis
Package config provides a common base for Gravwell ingester config files.
Package config provides a common base for Gravwell ingester config files.
log
Package processors implements preprocessors for ingesters.
Package processors implements preprocessors for ingesters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL