Documentation
¶
Index ¶
- type BoltSnapshotter
- type ClientStatistics
- type Configuration
- type ExponentialBackoff
- type FileConfiguration
- type FileData
- type FileReader
- type FileReaderPool
- func (p *FileReaderPool) Add(reader *FileReader)
- func (p *FileReaderPool) Counts() (available int, locked int)
- func (p *FileReaderPool) IsPathInPool(filePath string) bool
- func (p *FileReaderPool) LockNext() *FileReader
- func (p *FileReaderPool) Remove(reader *FileReader)
- func (p *FileReaderPool) Unlock(reader *FileReader)
- func (p *FileReaderPool) UnlockAll(readers []*FileReader)
- type FileReaderPoolStatistics
- type FileStatistics
- type HighWaterMark
- type MemorySnapshotter
- type NetworkConfiguration
- type ServerConfiguration
- type Snapshotter
- type Spooler
- type Statistics
- func (s *Statistics) DeleteFileStatistics(filePath string)
- func (s *Statistics) IncrementClientLinesSent(clientName string, linesSent int)
- func (s *Statistics) MarshalJSON() ([]byte, error)
- func (s *Statistics) SetClientStatus(clientName string, status string)
- func (s *Statistics) SetFilePosition(filePath string, position int64)
- func (s *Statistics) SetFileSnapshotPosition(filePath string, snapshotPosition int64)
- func (s *Statistics) UpdateFileReaderPoolStatistics(available int, locked int)
- func (s *Statistics) UpdateFileSizeStatistics()
- type StatisticsConfiguration
- type StatisticsServer
- type Supervisor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BoltSnapshotter ¶
func (*BoltSnapshotter) HighWaterMark ¶
func (s *BoltSnapshotter) HighWaterMark(filePath string) (*HighWaterMark, error)
func (*BoltSnapshotter) SetHighWaterMarks ¶
func (s *BoltSnapshotter) SetHighWaterMarks(marks []*HighWaterMark) error
type ClientStatistics ¶
type ClientStatistics struct {
Status string `json:"status"`
// The number of lines sent successfully to the client
LinesSent int `json:"lines_sent"`
// The last time lines were successfully sent to this client
LastSendTime time.Time `json:"last_send_time"`
// The number of lines in the last chunk successfully sent to this client
LastChunkSize int `json:"last_chunk_size"`
}
type Configuration ¶
type Configuration struct {
State string `json:"state"`
Network NetworkConfiguration `json:"network"`
Statistics StatisticsConfiguration `json:"statistics"`
Files []FileConfiguration `json:"files"`
MaxLength int `json:"max_length"`
}
func LoadConfiguration ¶
func LoadConfiguration(configFile string) (*Configuration, error)
func (*Configuration) BuildTLSConfig ¶
func (c *Configuration) BuildTLSConfig() (*tls.Config, error)
type ExponentialBackoff ¶
type ExponentialBackoff struct {
Minimum time.Duration
Maximum time.Duration
// contains filtered or unexported fields
}
func (*ExponentialBackoff) Current ¶
func (b *ExponentialBackoff) Current() time.Duration
func (*ExponentialBackoff) Next ¶
func (b *ExponentialBackoff) Next() time.Duration
func (*ExponentialBackoff) Reset ¶
func (b *ExponentialBackoff) Reset()
type FileConfiguration ¶
type FileData ¶
type FileData struct {
client.Data
*HighWaterMark
}
type FileReader ¶
type FileReader struct {
C chan []*FileData
ChunkSize int
MaxLength int
// contains filtered or unexported fields
}
func NewFileReader ¶
func (*FileReader) FilePath ¶
func (h *FileReader) FilePath() string
type FileReaderPool ¶
type FileReaderPool struct {
// contains filtered or unexported fields
}
func NewFileReaderPool ¶
func NewFileReaderPool() *FileReaderPool
func (*FileReaderPool) Add ¶
func (p *FileReaderPool) Add(reader *FileReader)
func (*FileReaderPool) Counts ¶
func (p *FileReaderPool) Counts() (available int, locked int)
func (*FileReaderPool) IsPathInPool ¶
func (p *FileReaderPool) IsPathInPool(filePath string) bool
func (*FileReaderPool) LockNext ¶
func (p *FileReaderPool) LockNext() *FileReader
TODO: Figure out how to make this block, rather than return nil
func (*FileReaderPool) Remove ¶
func (p *FileReaderPool) Remove(reader *FileReader)
func (*FileReaderPool) Unlock ¶
func (p *FileReaderPool) Unlock(reader *FileReader)
func (*FileReaderPool) UnlockAll ¶
func (p *FileReaderPool) UnlockAll(readers []*FileReader)
type FileStatistics ¶
type FileStatistics struct {
// The current size of the file.
Size int64 `json:"size"`
// The current position (in bytes) that has been read into the file. This
// might be greater than SnapshotPosition if there are lines buffered into
// memory that haven't been acknowledged by the server
Position int64 `json:"position"`
// The last time the file was read from into the in-memory buffer.
LastRead time.Time `json:"last_read"`
// The current position (in bytes) that has been successfully sent and
// acknowledged by the remote server.
SnapshotPosition int64 `json:"snapshot_position"`
// The last time a line from this file was successfully sent and acknowledged
// by the remote server.
LastSnapshot time.Time `json:"last_snapshot"`
}
type HighWaterMark ¶
type MemorySnapshotter ¶
type MemorySnapshotter struct {
// contains filtered or unexported fields
}
func (*MemorySnapshotter) HighWaterMark ¶
func (s *MemorySnapshotter) HighWaterMark(filePath string) (*HighWaterMark, error)
func (*MemorySnapshotter) SetHighWaterMarks ¶
func (s *MemorySnapshotter) SetHighWaterMarks(marks []*HighWaterMark) error
type NetworkConfiguration ¶
type ServerConfiguration ¶
type Snapshotter ¶
type Snapshotter interface {
HighWaterMark(filePath string) (*HighWaterMark, error)
SetHighWaterMarks(marks []*HighWaterMark) error
}
type Spooler ¶
type Spooler struct {
In chan *FileData
Out chan []*FileData
// contains filtered or unexported fields
}
Spooler accepts items on the In channel and chunks them into items on the Out channel.
type Statistics ¶
type Statistics struct {
// contains filtered or unexported fields
}
Statistics keeps stats about the current operation of the program. It is meant to keep snapshot-in-time stats, as opposed to counters or timers that statsd offers.
Statistics may be exposed by APIs that allow human- or machine-readable monitoring.
var GlobalStatistics *Statistics = NewStatistics()
func NewStatistics ¶
func NewStatistics() *Statistics
func (*Statistics) DeleteFileStatistics ¶
func (s *Statistics) DeleteFileStatistics(filePath string)
func (*Statistics) IncrementClientLinesSent ¶
func (s *Statistics) IncrementClientLinesSent(clientName string, linesSent int)
func (*Statistics) MarshalJSON ¶
func (s *Statistics) MarshalJSON() ([]byte, error)
func (*Statistics) SetClientStatus ¶
func (s *Statistics) SetClientStatus(clientName string, status string)
func (*Statistics) SetFilePosition ¶
func (s *Statistics) SetFilePosition(filePath string, position int64)
func (*Statistics) SetFileSnapshotPosition ¶
func (s *Statistics) SetFileSnapshotPosition(filePath string, snapshotPosition int64)
func (*Statistics) UpdateFileReaderPoolStatistics ¶
func (s *Statistics) UpdateFileReaderPoolStatistics(available int, locked int)
func (*Statistics) UpdateFileSizeStatistics ¶
func (s *Statistics) UpdateFileSizeStatistics()
UpdateFileSizeStatistics updates the Size attribute of each file, so it's easier to compare how much progress butteredscones has made through a file.
UpdateFileSizeStatistics should be called before displaying statistics to an end user.
type StatisticsConfiguration ¶
type StatisticsConfiguration struct {
Addr string `json:"addr"`
}
type StatisticsServer ¶
type StatisticsServer struct {
Statistics *Statistics
Addr string
}
StatisticsServer constructs an HTTP server that returns JSON formatted statistics. These statistics can be used for debugging or automated monitoring.
func (*StatisticsServer) ListenAndServe ¶
func (s *StatisticsServer) ListenAndServe() error
type Supervisor ¶
type Supervisor struct {
// Optional settings
SpoolSize int
MaxLength int
// How frequently to glob for new files that may have appeared
GlobRefresh time.Duration
// contains filtered or unexported fields
}
func NewSupervisor ¶
func NewSupervisor(files []FileConfiguration, clients []client.Client, snapshotter Snapshotter, maxLength int) *Supervisor
func (*Supervisor) Start ¶
func (s *Supervisor) Start()
Start pulls things together and plays match-maker.
func (*Supervisor) Stop ¶
func (s *Supervisor) Stop()
Stop stops the supervisor cleanly, making sure all progress has been snapshotted before exiting.