Documentation ¶
Overview ¶
Package filestream implements routines necessary for communicating with the W&B backend filestream service.
Internally there are three goroutines spun up:
process: process records into an appropriate format to transmit transmit: collect and transmit messages to the filestream service feedback: process feedback from the filestream service
Below demonstrates a common execution flow through this package:
{caller}: - filestream.go: NewFileStream - create service - filestream.go: FileStream.Start - spin up worker goroutines - filestream.go: FileStream.StreamRecord - add a record to be processed and sent - loop_process.go: Filestream.addProcess - add to process channel {goroutine process}: - loop_process.go: Filestream.loopProcess - loop acting on process channel - loop_transmit.go: Filestream.addTransmit - add to transmit channel {goroutine transmit}: - loop_transmit.go: Filestream.loopTransmit - loop acting on transmit channel - collector.go: chunkCollector - class to coordinate collecting work from transmit channel - collector.go: chunkCollector.read - read the first transmit work from transmit channel - collector.go: chunkCollector.readMore - keep reading until we have enough or hit timeout - collector.go: chunkCollector.dump - create a blob to be used to serialize into json to send - loop_transmit.go: Filestream.send - send json to backend filestream service - loop_feedback.go: Filestream.add_feedback - add to feedback channel {goroutine feedback} - loop_feedback.go: Filestream.loopFeedback - loop acting on feedback channel {caller} - filestream.go: FileStream.Close - graceful shutdown of worker goroutines
Index ¶
- Constants
- type ChunkTypeEnum
- type FileStream
- type FileStreamOffsetMap
- type FileStreamOption
- func WithDelayProcess(delayProcess time.Duration) FileStreamOption
- func WithHeartbeatTime(heartbeatTime time.Duration) FileStreamOption
- func WithHttpClient(client *retryablehttp.Client) FileStreamOption
- func WithLogger(logger *observability.NexusLogger) FileStreamOption
- func WithMaxItemsPerPush(maxItemsPerPush int) FileStreamOption
- func WithOffsets(offsetMap FileStreamOffsetMap) FileStreamOption
- func WithPath(path string) FileStreamOption
- func WithSettings(settings *service.Settings) FileStreamOption
- type FsTransmitData
Constants ¶
View Source
const ( BufferSize = 32 EventsFileName = "wandb-events.jsonl" HistoryFileName = "wandb-history.jsonl" SummaryFileName = "wandb-summary.json" OutputFileName = "output.log" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChunkTypeEnum ¶
type ChunkTypeEnum int8
const ( NoneChunk ChunkTypeEnum = iota HistoryChunk OutputChunk EventsChunk SummaryChunk )
type FileStream ¶
type FileStream struct {
// contains filtered or unexported fields
}
FileStream is a stream of data to the server
func NewFileStream ¶
func NewFileStream(opts ...FileStreamOption) *FileStream
func (*FileStream) Close ¶
func (fs *FileStream) Close()
Close gracefully shuts down the goroutines created by Start
func (*FileStream) GetInputChan ¶
func (fs *FileStream) GetInputChan() chan protoreflect.ProtoMessage
func (*FileStream) Start ¶
func (fs *FileStream) Start()
Start creates process, transmit, and feedback goroutines
func (*FileStream) StreamRecord ¶
func (fs *FileStream) StreamRecord(rec *service.Record)
StreamRecord is the main entry point for callers to add data to be sent
type FileStreamOffsetMap ¶
type FileStreamOffsetMap map[ChunkTypeEnum]int
type FileStreamOption ¶
type FileStreamOption func(fs *FileStream)
func WithDelayProcess ¶
func WithDelayProcess(delayProcess time.Duration) FileStreamOption
func WithHeartbeatTime ¶
func WithHeartbeatTime(heartbeatTime time.Duration) FileStreamOption
func WithHttpClient ¶
func WithHttpClient(client *retryablehttp.Client) FileStreamOption
func WithLogger ¶
func WithLogger(logger *observability.NexusLogger) FileStreamOption
func WithMaxItemsPerPush ¶
func WithMaxItemsPerPush(maxItemsPerPush int) FileStreamOption
func WithOffsets ¶
func WithOffsets(offsetMap FileStreamOffsetMap) FileStreamOption
func WithPath ¶
func WithPath(path string) FileStreamOption
func WithSettings ¶
func WithSettings(settings *service.Settings) FileStreamOption
type FsTransmitData ¶
type FsTransmitData struct { Files map[string]fsTransmitFileData `json:"files,omitempty"` Complete *bool `json:"complete,omitempty"` Exitcode *int32 `json:"exitcode,omitempty"` Preempting bool `json:"preempting,omitempty"` Dropped int32 `json:"dropped,omitempty"` Uploaded []string `json:"uploaded,omitempty"` }
FsTransmitData is serialized and sent to a W&B server
Click to show internal directories.
Click to hide internal directories.