ir

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package ir provides types and utilities for the Traffic2OpenAPI Intermediate Representation.

Index

Constants

View Source
const Version = "ir.v1"

Version is the current IR schema version.

Variables

View Source
var ErrChannelClosed = errors.New("channel writer is closed")

ErrChannelClosed is returned when writing to a closed ChannelWriter.

Functions

func StreamNDJSON

func StreamNDJSON(r io.Reader) (<-chan IRRecord, <-chan error)

StreamNDJSON streams NDJSON records through a channel. Useful for processing large files without loading all into memory.

func WriteBatch

func WriteBatch(w io.Writer, records []IRRecord) error

WriteBatch writes records in batch format.

func WriteFile

func WriteFile(path string, records []IRRecord) error

WriteFile writes IR records to a file. Format is determined by file extension: - .ndjson: newline-delimited JSON - .json: batch format

func WriteNDJSON

func WriteNDJSON(w io.Writer, records []IRRecord) error

WriteNDJSON writes records in newline-delimited JSON format.

Types

type AsyncNDJSONWriter

type AsyncNDJSONWriter struct {
	// contains filtered or unexported fields
}

AsyncNDJSONWriter provides async streaming writes for NDJSON format. Records are buffered in a channel and written by a background goroutine. Errors are delivered via an error handler callback.

func NewAsyncNDJSONFileWriter

func NewAsyncNDJSONFileWriter(path string, opts ...AsyncWriterOption) (*AsyncNDJSONWriter, error)

NewAsyncNDJSONFileWriter creates an async writer for streaming to a file.

func NewAsyncNDJSONWriter

func NewAsyncNDJSONWriter(writer *NDJSONWriter, opts ...AsyncWriterOption) *AsyncNDJSONWriter

NewAsyncNDJSONWriter creates an async writer wrapping an existing NDJSONWriter.

func (*AsyncNDJSONWriter) Close

func (w *AsyncNDJSONWriter) Close() error

Close signals the writer to stop, waits for pending writes to complete, and closes the underlying writer.

func (*AsyncNDJSONWriter) Count

func (w *AsyncNDJSONWriter) Count() int

Count returns the number of records written.

func (*AsyncNDJSONWriter) Flush

func (w *AsyncNDJSONWriter) Flush() error

Flush waits for all pending writes to complete and flushes the underlying writer.

func (*AsyncNDJSONWriter) Write

func (w *AsyncNDJSONWriter) Write(record *IRRecord) error

Write queues a record for async writing. Returns nil immediately; errors are delivered via the error handler.

type AsyncWriterOption

type AsyncWriterOption func(*AsyncNDJSONWriter)

AsyncWriterOption configures an AsyncNDJSONWriter.

func WithBufferSize

func WithBufferSize(size int) AsyncWriterOption

WithBufferSize sets the channel buffer size for async writes.

func WithErrorHandler

func WithErrorHandler(handler ErrorHandler) AsyncWriterOption

WithErrorHandler sets the error handler callback.

type Batch

type Batch struct {
	Version  string         `json:"version"`
	Metadata *BatchMetadata `json:"metadata,omitempty"`
	Records  []IRRecord     `json:"records"`
}

Batch represents a collection of IR records with metadata. This is the wrapper format for JSON batch files.

func NewBatch

func NewBatch(records []IRRecord) *Batch

NewBatch creates a new batch with the current version.

type BatchMetadata

type BatchMetadata struct {
	GeneratedAt *time.Time `json:"generatedAt,omitempty"`
	Source      string     `json:"source,omitempty"`
	RecordCount int        `json:"recordCount,omitempty"`
}

BatchMetadata contains optional metadata about a batch of records.

type ChannelProvider

type ChannelProvider struct {
	// contains filtered or unexported fields
}

ChannelProvider provides symmetric read/write access to IR records using Go channels. This is useful for in-memory pipelines, testing, and real-time processing scenarios.

Unlike file-based providers, ChannelProvider connects writers and readers through a shared channel rather than via file paths.

func Channel

func Channel(opts ...ChannelProviderOption) *ChannelProvider

Channel creates a new channel provider with the given options.

func (*ChannelProvider) Chan

func (p *ChannelProvider) Chan() chan *IRRecord

Chan returns the underlying channel. This allows direct access for advanced use cases.

func (*ChannelProvider) NewReader

func (p *ChannelProvider) NewReader(ctx context.Context, _ string) (IRReader, error)

NewReader creates a reader that receives records from the channel. The path parameter is ignored for channel providers.

func (*ChannelProvider) NewStreamReader

func (p *ChannelProvider) NewStreamReader(_ io.Reader) (IRReader, error)

NewStreamReader creates a reader that receives records from the channel. The io.Reader parameter is ignored for channel providers.

func (*ChannelProvider) NewStreamWriter

func (p *ChannelProvider) NewStreamWriter(_ io.Writer) IRWriter

NewStreamWriter creates a writer that sends records to the channel. The io.Writer parameter is ignored for channel providers.

func (*ChannelProvider) NewWriter

func (p *ChannelProvider) NewWriter(ctx context.Context, _ string) (IRWriter, error)

NewWriter creates a writer that sends records to the channel. The path parameter is ignored for channel providers.

type ChannelProviderOption

type ChannelProviderOption func(*ChannelProvider)

ChannelProviderOption configures a ChannelProvider.

func WithChannelProviderBufferSize

func WithChannelProviderBufferSize(size int) ChannelProviderOption

WithChannelProviderBufferSize sets the channel buffer size. Default is 0 (unbuffered).

func WithExistingChannel

func WithExistingChannel(ch chan *IRRecord) ChannelProviderOption

WithExistingChannel uses an existing channel instead of creating a new one. This allows connecting to external channel sources.

type ChannelReader

type ChannelReader struct {
	// contains filtered or unexported fields
}

ChannelReader reads IR records from a channel. Useful for consuming records from a ChannelWriter or other channel-based sources.

func NewChannelReader

func NewChannelReader(ch <-chan *IRRecord) *ChannelReader

NewChannelReader creates a reader that consumes from the given channel.

func NewChannelReaderFromWriter

func NewChannelReaderFromWriter(w *ChannelWriter) *ChannelReader

NewChannelReaderFromWriter creates a reader that consumes from a ChannelWriter. This enables pipelines like: LoggingTransport → ChannelWriter → ChannelReader → Engine

func (*ChannelReader) Close

func (r *ChannelReader) Close() error

Close marks the reader as closed. Note: This does not close the underlying channel. The channel should be closed by the writer.

func (*ChannelReader) Read

func (r *ChannelReader) Read() (*IRRecord, error)

Read reads the next IR record from the channel. Returns io.EOF when the channel is closed. Blocks if no record is available.

type ChannelWriter

type ChannelWriter struct {
	// contains filtered or unexported fields
}

ChannelWriter writes IR records to a channel for in-memory processing. Useful for testing, pipelines, and real-time processing scenarios.

func NewChannelWriter

func NewChannelWriter(opts ...ChannelWriterOption) *ChannelWriter

NewChannelWriter creates a new ChannelWriter with default unbuffered channel.

func NewChannelWriterWithChan

func NewChannelWriterWithChan(ch chan *IRRecord) *ChannelWriter

NewChannelWriterWithChan creates a ChannelWriter using an existing channel. This allows the caller to provide their own channel for more control.

func (*ChannelWriter) Channel

func (w *ChannelWriter) Channel() <-chan *IRRecord

Channel returns the underlying channel for reading records. Consumers should range over this channel to receive records.

func (*ChannelWriter) Close

func (w *ChannelWriter) Close() error

Close closes the underlying channel. After Close, Write will return ErrChannelClosed.

func (*ChannelWriter) Flush

func (w *ChannelWriter) Flush() error

Flush is a no-op for ChannelWriter since writes go directly to the channel.

func (*ChannelWriter) Write

func (w *ChannelWriter) Write(record *IRRecord) error

Write sends a record to the channel. Returns ErrChannelClosed if the writer has been closed. May block if the channel buffer is full (or unbuffered).

type ChannelWriterOption

type ChannelWriterOption func(*ChannelWriter)

ChannelWriterOption configures a ChannelWriter.

func WithChannelBufferSize

func WithChannelBufferSize(size int) ChannelWriterOption

WithChannelBufferSize sets the channel buffer size.

type ErrorHandler

type ErrorHandler func(err error)

ErrorHandler is a callback function for handling async write errors.

type GzipNDJSONOption

type GzipNDJSONOption func(*GzipNDJSONProvider)

GzipNDJSONOption configures a GzipNDJSONProvider.

func WithGzipCompressionLevel

func WithGzipCompressionLevel(level int) GzipNDJSONOption

WithGzipCompressionLevel sets the gzip compression level. Valid levels: gzip.NoCompression, gzip.BestSpeed, gzip.BestCompression, gzip.DefaultCompression, or 1-9.

type GzipNDJSONProvider

type GzipNDJSONProvider struct {
	// contains filtered or unexported fields
}

GzipNDJSONProvider provides symmetric read/write access to gzip-compressed NDJSON-formatted IR records.

func GzipNDJSON

func GzipNDJSON(opts ...GzipNDJSONOption) *GzipNDJSONProvider

GzipNDJSON creates a new gzip-compressed NDJSON provider.

func (*GzipNDJSONProvider) NewReader

func (p *GzipNDJSONProvider) NewReader(ctx context.Context, path string) (IRReader, error)

NewReader creates a reader for the given file path.

func (*GzipNDJSONProvider) NewStreamReader

func (p *GzipNDJSONProvider) NewStreamReader(r io.Reader) (IRReader, error)

NewStreamReader creates a reader that reads from the given io.Reader.

func (*GzipNDJSONProvider) NewStreamWriter

func (p *GzipNDJSONProvider) NewStreamWriter(w io.Writer) IRWriter

NewStreamWriter creates a writer that writes to the given io.Writer.

func (*GzipNDJSONProvider) NewWriter

func (p *GzipNDJSONProvider) NewWriter(ctx context.Context, path string) (IRWriter, error)

NewWriter creates a writer for the given file path.

type GzipNDJSONReader

type GzipNDJSONReader struct {
	// contains filtered or unexported fields
}

GzipNDJSONReader reads IR records from gzip-compressed NDJSON format.

func NewGzipNDJSONFileReader

func NewGzipNDJSONFileReader(path string) (*GzipNDJSONReader, error)

NewGzipNDJSONFileReader creates a reader for streaming from a gzip-compressed file.

func NewGzipNDJSONReader

func NewGzipNDJSONReader(r io.Reader) (*GzipNDJSONReader, error)

NewGzipNDJSONReader creates a reader for streaming gzip-compressed NDJSON input.

func (*GzipNDJSONReader) Close

func (r *GzipNDJSONReader) Close() error

Close closes the gzip reader and underlying file if applicable.

func (*GzipNDJSONReader) LineNumber

func (r *GzipNDJSONReader) LineNumber() int

LineNumber returns the current line number (useful for error reporting).

func (*GzipNDJSONReader) Read

func (r *GzipNDJSONReader) Read() (*IRRecord, error)

Read reads the next IR record. Returns io.EOF when no more records are available.

type GzipNDJSONWriter

type GzipNDJSONWriter struct {
	// contains filtered or unexported fields
}

GzipNDJSONWriter provides streaming writes for gzip-compressed NDJSON format. Each record is JSON-encoded and written as a newline-delimited line. The output is gzip-compressed for storage efficiency.

func NewGzipNDJSONFileWriter

func NewGzipNDJSONFileWriter(path string) (*GzipNDJSONWriter, error)

NewGzipNDJSONFileWriter creates a writer for streaming to a gzip-compressed file. The file should typically have a .ndjson.gz extension.

func NewGzipNDJSONFileWriterLevel

func NewGzipNDJSONFileWriterLevel(path string, level int) (*GzipNDJSONWriter, error)

NewGzipNDJSONFileWriterLevel creates a writer for streaming to a gzip-compressed file with a specific compression level.

func NewGzipNDJSONWriter

func NewGzipNDJSONWriter(w io.Writer) *GzipNDJSONWriter

NewGzipNDJSONWriter creates a writer for streaming gzip-compressed NDJSON output.

func NewGzipNDJSONWriterLevel

func NewGzipNDJSONWriterLevel(w io.Writer, level int) (*GzipNDJSONWriter, error)

NewGzipNDJSONWriterLevel creates a writer with a specific compression level.

func (*GzipNDJSONWriter) Close

func (w *GzipNDJSONWriter) Close() error

Close flushes and closes the gzip writer and underlying file if applicable.

func (*GzipNDJSONWriter) Count

func (w *GzipNDJSONWriter) Count() int

Count returns the number of records written.

func (*GzipNDJSONWriter) Flush

func (w *GzipNDJSONWriter) Flush() error

Flush flushes buffered data to the underlying gzip stream.

func (*GzipNDJSONWriter) Write

func (w *GzipNDJSONWriter) Write(record *IRRecord) error

Write writes a single record.

type GzipWriterOption

type GzipWriterOption func(*GzipNDJSONWriter)

GzipWriterOption configures a GzipNDJSONWriter.

func WithGzipLevel

func WithGzipLevel(level int) GzipWriterOption

WithGzipLevel sets the gzip compression level. Valid levels are gzip.DefaultCompression, gzip.NoCompression, gzip.BestSpeed, gzip.BestCompression, or any integer from 1 to 9.

type IRReader

type IRReader interface {
	// Read reads the next IR record.
	// Returns io.EOF when no more records are available.
	Read() (*IRRecord, error)

	// Close releases any resources held by the reader.
	Close() error
}

IRReader is the interface for reading IR records from any source. Implementations should return io.EOF when no more records are available.

type IRRecord

type IRRecord struct {
	// Round-trip time in milliseconds.
	DurationMs *float64 `json:"durationMs,omitempty" yaml:"durationMs,omitempty" mapstructure:"durationMs,omitempty"`

	// Unique identifier for this record (e.g., UUID).
	Id *string `json:"id,omitempty" yaml:"id,omitempty" mapstructure:"id,omitempty"`

	// Request corresponds to the JSON schema field "request".
	Request Request `json:"request" yaml:"request" mapstructure:"request"`

	// Response corresponds to the JSON schema field "response".
	Response Response `json:"response" yaml:"response" mapstructure:"response"`

	// Adapter/source that generated this record.
	Source *IRRecordSource `json:"source,omitempty" yaml:"source,omitempty" mapstructure:"source,omitempty"`

	// ISO 8601 timestamp of the request capture.
	Timestamp *time.Time `json:"timestamp,omitempty" yaml:"timestamp,omitempty" mapstructure:"timestamp,omitempty"`
}

A single HTTP request/response capture.

func NewRecord

func NewRecord(method RequestMethod, path string, status int) *IRRecord

NewRecord creates a new IR record with required fields.

func ReadBatch

func ReadBatch(r io.Reader) ([]IRRecord, error)

ReadBatch reads a batch-format JSON file.

func ReadDir

func ReadDir(dir string) ([]IRRecord, error)

ReadDir reads all IR files from a directory.

func ReadFile

func ReadFile(path string) ([]IRRecord, error)

ReadFile reads IR records from a file. Automatically detects format based on file extension: - .ndjson: newline-delimited JSON (one record per line) - .json: batch format with version and records array

func ReadNDJSON

func ReadNDJSON(r io.Reader) ([]IRRecord, error)

ReadNDJSON reads newline-delimited JSON records.

func (*IRRecord) EffectivePathTemplate

func (r *IRRecord) EffectivePathTemplate() string

EffectivePathTemplate returns pathTemplate if set, otherwise returns path.

func (*IRRecord) MethodString

func (r *IRRecord) MethodString() string

MethodString returns the method as a string.

func (*IRRecord) SetDuration

func (r *IRRecord) SetDuration(ms float64) *IRRecord

SetDuration sets the duration in milliseconds.

func (*IRRecord) SetHost

func (r *IRRecord) SetHost(host string) *IRRecord

SetHost sets the request host and returns the record for chaining.

func (*IRRecord) SetID

func (r *IRRecord) SetID(id string) *IRRecord

SetID sets the record ID and returns the record for chaining.

func (*IRRecord) SetPathTemplate

func (r *IRRecord) SetPathTemplate(template string, params map[string]string) *IRRecord

SetPathTemplate sets the path template and parameters.

func (*IRRecord) SetQuery

func (r *IRRecord) SetQuery(query map[string]interface{}) *IRRecord

SetQuery sets query parameters and returns the record for chaining.

func (*IRRecord) SetRequestBody

func (r *IRRecord) SetRequestBody(body interface{}) *IRRecord

SetRequestBody sets the request body and returns the record for chaining.

func (*IRRecord) SetResponseBody

func (r *IRRecord) SetResponseBody(body interface{}) *IRRecord

SetResponseBody sets the response body and returns the record for chaining.

func (*IRRecord) SetSource

func (r *IRRecord) SetSource(source IRRecordSource) *IRRecord

SetSource sets the source adapter type and returns the record for chaining.

func (*IRRecord) SetTimestamp

func (r *IRRecord) SetTimestamp(t time.Time) *IRRecord

SetTimestamp sets the timestamp and returns the record for chaining.

func (*IRRecord) UnmarshalJSON

func (j *IRRecord) UnmarshalJSON(value []byte) error

UnmarshalJSON implements json.Unmarshaler.

type IRRecordSource

type IRRecordSource string
const IRRecordSourceHar IRRecordSource = "har"
const IRRecordSourceLoggingTransport IRRecordSource = "logging-transport"
const IRRecordSourceManual IRRecordSource = "manual"
const IRRecordSourcePlaywright IRRecordSource = "playwright"
const IRRecordSourceProxy IRRecordSource = "proxy"

func (*IRRecordSource) UnmarshalJSON

func (j *IRRecordSource) UnmarshalJSON(value []byte) error

UnmarshalJSON implements json.Unmarshaler.

type IRWriter

type IRWriter interface {
	// Write writes a single IR record.
	// For sync implementations, returns any write error.
	// For async implementations, returns nil and delivers errors via callback.
	Write(record *IRRecord) error

	// Flush flushes any buffered data to the underlying destination.
	// For async implementations, blocks until all pending writes complete.
	Flush() error

	// Close flushes any buffered data and releases resources.
	// For async implementations, blocks until all pending writes complete.
	Close() error
}

IRWriter is the interface for writing IR records to any destination. Implementations may be synchronous or asynchronous internally. For async implementations, errors are delivered via an error handler callback rather than returned from Write().

type LoggingOptions

type LoggingOptions struct {
	// FilterHeaders are headers to exclude from logging (case-insensitive).
	// Defaults to common sensitive headers if nil.
	FilterHeaders []string

	// IncludeRequestBody controls whether request bodies are captured.
	IncludeRequestBody bool

	// IncludeResponseBody controls whether response bodies are captured.
	IncludeResponseBody bool

	// MaxBodySize limits body capture size. 0 means no limit.
	MaxBodySize int64

	// Source is the source identifier for IR records.
	Source IRRecordSource

	// SkipPaths are path prefixes to skip logging (e.g., "/health", "/metrics").
	// If a request path starts with any of these prefixes, it won't be logged.
	SkipPaths []string

	// AllowMethods limits logging to specific HTTP methods (e.g., "GET", "POST").
	// If empty, all methods are logged.
	AllowMethods []string

	// AllowHosts limits logging to specific hosts.
	// If empty, all hosts are logged.
	AllowHosts []string

	// SkipStatusCodes are status codes to skip logging (e.g., 404, 500).
	SkipStatusCodes []int

	// SampleRate is the percentage of requests to log (0.0 to 1.0).
	// Values > 0.0 and < 1.0 enable probabilistic sampling (e.g., 0.5 = 50%).
	// Values <= 0.0 or >= 1.0 log all requests.
	// The zero value (0.0) means "not configured" and logs all requests,
	// making it safe to use partial LoggingOptions without setting SampleRate.
	SampleRate float64

	// RequestIDHeaders are headers to check for request ID (in order of priority).
	// The first non-empty value found will be used as the record ID.
	// If empty or no header found, a UUID is generated.
	// Common headers: "X-Request-ID", "X-Correlation-ID", "X-Trace-ID"
	RequestIDHeaders []string
}

LoggingOptions configures the LoggingTransport behavior.

func DefaultLoggingOptions

func DefaultLoggingOptions() LoggingOptions

DefaultLoggingOptions returns sensible defaults for logging.

type LoggingTransport

type LoggingTransport struct {
	// Base is the underlying transport. If nil, http.DefaultTransport is used.
	Base http.RoundTripper

	// Writer receives IR records for each request/response.
	Writer IRWriter

	// Options configures logging behavior.
	Options LoggingOptions

	// ErrorHandler is called when writing an IR record fails.
	// If nil, write errors are silently ignored (HTTP request still succeeds).
	ErrorHandler ErrorHandler
}

LoggingTransport is an http.RoundTripper that logs HTTP traffic as IR records.

func NewLoggingTransport

func NewLoggingTransport(writer IRWriter, opts ...LoggingTransportOption) *LoggingTransport

NewLoggingTransport creates a new logging transport.

func (*LoggingTransport) RoundTrip

func (t *LoggingTransport) RoundTrip(req *http.Request) (*http.Response, error)

RoundTrip implements http.RoundTripper.

type LoggingTransportOption

type LoggingTransportOption func(*LoggingTransport)

LoggingTransportOption configures a LoggingTransport.

func WithBase

WithBase sets the base transport.

func WithLoggingOptions

func WithLoggingOptions(opts LoggingOptions) LoggingTransportOption

WithLoggingOptions sets the logging options.

func WithTransportErrorHandler

func WithTransportErrorHandler(handler ErrorHandler) LoggingTransportOption

WithTransportErrorHandler sets the error handler for write failures.

type MultiWriter

type MultiWriter struct {
	// contains filtered or unexported fields
}

MultiWriter fans out writes to multiple IRWriter destinations. Writes are performed sequentially to each writer in order.

func NewMultiWriter

func NewMultiWriter(writers ...IRWriter) (*MultiWriter, error)

NewMultiWriter creates a MultiWriter that writes to all provided writers. At least one writer must be provided.

func (*MultiWriter) Close

func (w *MultiWriter) Close() error

Close closes all underlying writers. If any writer returns an error, it continues to the next writer. Returns a combined error if any closes failed.

func (*MultiWriter) Flush

func (w *MultiWriter) Flush() error

Flush flushes all underlying writers. If any writer returns an error, it continues to the next writer. Returns a combined error if any flushes failed.

func (*MultiWriter) Write

func (w *MultiWriter) Write(record *IRRecord) error

Write writes a record to all underlying writers. If any writer returns an error, it continues to the next writer. Returns a combined error if any writes failed.

type NDJSONProvider

type NDJSONProvider struct {
	// contains filtered or unexported fields
}

NDJSONProvider provides symmetric read/write access to NDJSON-formatted IR records.

func NDJSON

func NDJSON(opts ...ProviderOption) *NDJSONProvider

NDJSON creates a new NDJSON provider with default options.

func (*NDJSONProvider) NewReader

func (p *NDJSONProvider) NewReader(ctx context.Context, path string) (IRReader, error)

NewReader creates a reader for the given file path.

func (*NDJSONProvider) NewStreamReader

func (p *NDJSONProvider) NewStreamReader(r io.Reader) (IRReader, error)

NewStreamReader creates a reader that reads from the given io.Reader.

func (*NDJSONProvider) NewStreamWriter

func (p *NDJSONProvider) NewStreamWriter(w io.Writer) IRWriter

NewStreamWriter creates a writer that writes to the given io.Writer.

func (*NDJSONProvider) NewWriter

func (p *NDJSONProvider) NewWriter(ctx context.Context, path string) (IRWriter, error)

NewWriter creates a writer for the given file path.

type NDJSONReader

type NDJSONReader struct {
	// contains filtered or unexported fields
}

NDJSONReader reads IR records from newline-delimited JSON format.

func NewNDJSONFileReader

func NewNDJSONFileReader(path string) (*NDJSONReader, error)

NewNDJSONFileReader creates a reader for streaming from a file.

func NewNDJSONReader

func NewNDJSONReader(r io.Reader) *NDJSONReader

NewNDJSONReader creates a reader for streaming NDJSON input.

func (*NDJSONReader) Close

func (r *NDJSONReader) Close() error

Close closes the underlying reader if it implements io.Closer.

func (*NDJSONReader) LineNumber

func (r *NDJSONReader) LineNumber() int

LineNumber returns the current line number (useful for error reporting).

func (*NDJSONReader) Read

func (r *NDJSONReader) Read() (*IRRecord, error)

Read reads the next IR record. Returns io.EOF when no more records are available.

type NDJSONWriter

type NDJSONWriter struct {
	// contains filtered or unexported fields
}

NDJSONWriter provides streaming writes for NDJSON format.

func NewNDJSONFileWriter

func NewNDJSONFileWriter(path string) (*NDJSONWriter, error)

NewNDJSONFileWriter creates a writer for streaming to a file.

func NewNDJSONWriter

func NewNDJSONWriter(w io.Writer) *NDJSONWriter

NewNDJSONWriter creates a writer for streaming NDJSON output.

func (*NDJSONWriter) Close

func (w *NDJSONWriter) Close() error

Close flushes and closes the underlying writer if it implements io.Closer.

func (*NDJSONWriter) Count

func (w *NDJSONWriter) Count() int

Count returns the number of records written.

func (*NDJSONWriter) Flush

func (w *NDJSONWriter) Flush() error

Flush flushes buffered data.

func (*NDJSONWriter) Write

func (w *NDJSONWriter) Write(record *IRRecord) error

Write writes a single record.

type Provider

type Provider interface {
	// NewWriter creates a writer for the given path.
	// The path interpretation depends on the provider implementation.
	NewWriter(ctx context.Context, path string) (IRWriter, error)

	// NewReader creates a reader for the given path.
	// The path interpretation depends on the provider implementation.
	NewReader(ctx context.Context, path string) (IRReader, error)
}

Provider defines the interface for IR record storage providers. Providers offer symmetric read/write access to IR records.

type ProviderOption

type ProviderOption func(*ProviderOptions)

ProviderOption configures a Provider.

func WithProviderBufferSize

func WithProviderBufferSize(size int) ProviderOption

WithBufferSize sets the buffer size for I/O operations.

type ProviderOptions

type ProviderOptions struct {
	// BufferSize is the buffer size for I/O operations.
	// 0 means use the provider's default.
	BufferSize int
}

ProviderOptions holds common configuration for providers.

func ApplyProviderOptions

func ApplyProviderOptions(opts ...ProviderOption) *ProviderOptions

ApplyProviderOptions applies options to ProviderOptions.

type Request

type Request struct {
	// Parsed request body. Object/array for JSON, string for other content types,
	// null for no body.
	Body interface{} `json:"body,omitempty" yaml:"body,omitempty" mapstructure:"body,omitempty"`

	// Content-Type header value (e.g., application/json).
	ContentType *string `json:"contentType,omitempty" yaml:"contentType,omitempty" mapstructure:"contentType,omitempty"`

	// Request headers (keys should be lowercase).
	Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty" mapstructure:"headers,omitempty"`

	// Request host (e.g., api.example.com).
	Host *string `json:"host,omitempty" yaml:"host,omitempty" mapstructure:"host,omitempty"`

	// HTTP method.
	Method RequestMethod `json:"method" yaml:"method" mapstructure:"method"`

	// Raw request path without query string (e.g., /users/123).
	Path string `json:"path" yaml:"path" mapstructure:"path"`

	// Extracted path parameter values.
	PathParams map[string]string `json:"pathParams,omitempty" yaml:"pathParams,omitempty" mapstructure:"pathParams,omitempty"`

	// Normalized path template with parameters (e.g., /users/{id}). Optional - can be
	// inferred.
	PathTemplate *string `json:"pathTemplate,omitempty" yaml:"pathTemplate,omitempty" mapstructure:"pathTemplate,omitempty"`

	// Query parameters as key/value pairs.
	Query map[string]interface{} `json:"query,omitempty" yaml:"query,omitempty" mapstructure:"query,omitempty"`

	// URL scheme.
	Scheme RequestScheme `json:"scheme,omitempty" yaml:"scheme,omitempty" mapstructure:"scheme,omitempty"`
}

HTTP request details.

func (*Request) UnmarshalJSON

func (j *Request) UnmarshalJSON(value []byte) error

UnmarshalJSON implements json.Unmarshaler.

type RequestMethod

type RequestMethod string
const RequestMethodDELETE RequestMethod = "DELETE"
const RequestMethodGET RequestMethod = "GET"
const RequestMethodHEAD RequestMethod = "HEAD"
const RequestMethodOPTIONS RequestMethod = "OPTIONS"
const RequestMethodPATCH RequestMethod = "PATCH"
const RequestMethodPOST RequestMethod = "POST"
const RequestMethodPUT RequestMethod = "PUT"
const RequestMethodTRACE RequestMethod = "TRACE"

func (*RequestMethod) UnmarshalJSON

func (j *RequestMethod) UnmarshalJSON(value []byte) error

UnmarshalJSON implements json.Unmarshaler.

type RequestScheme

type RequestScheme string
const RequestSchemeHttp RequestScheme = "http"
const RequestSchemeHttps RequestScheme = "https"

func (*RequestScheme) UnmarshalJSON

func (j *RequestScheme) UnmarshalJSON(value []byte) error

UnmarshalJSON implements json.Unmarshaler.

type Response

type Response struct {
	// Parsed response body. Object/array for JSON, string for other content types,
	// null for no body.
	Body interface{} `json:"body,omitempty" yaml:"body,omitempty" mapstructure:"body,omitempty"`

	// Content-Type header value.
	ContentType *string `json:"contentType,omitempty" yaml:"contentType,omitempty" mapstructure:"contentType,omitempty"`

	// Response headers (keys should be lowercase).
	Headers map[string]string `json:"headers,omitempty" yaml:"headers,omitempty" mapstructure:"headers,omitempty"`

	// HTTP status code.
	Status int `json:"status" yaml:"status" mapstructure:"status"`
}

HTTP response details.

func (*Response) UnmarshalJSON

func (j *Response) UnmarshalJSON(value []byte) error

UnmarshalJSON implements json.Unmarshaler.

type SliceReader

type SliceReader struct {
	// contains filtered or unexported fields
}

SliceReader reads IR records from an in-memory slice. Useful for converting existing []IRRecord data to the IRReader interface.

func NewSliceReader

func NewSliceReader(records []IRRecord) *SliceReader

NewSliceReader creates a reader from an existing slice of records.

func (*SliceReader) Close

func (r *SliceReader) Close() error

Close is a no-op for SliceReader.

func (*SliceReader) Len

func (r *SliceReader) Len() int

Len returns the total number of records.

func (*SliceReader) Read

func (r *SliceReader) Read() (*IRRecord, error)

Read reads the next IR record from the slice. Returns io.EOF when all records have been read.

func (*SliceReader) Remaining

func (r *SliceReader) Remaining() int

Remaining returns the number of unread records.

func (*SliceReader) Reset

func (r *SliceReader) Reset()

Reset resets the reader to the beginning of the slice.

type StorageProvider

type StorageProvider struct {
	// contains filtered or unexported fields
}

StorageProvider provides symmetric read/write access to IR records using an omnistorage backend. It automatically handles compression based on file path extensions.

func Storage

func Storage(backend omnistorage.Backend, opts ...StorageProviderOption) *StorageProvider

Storage creates a new storage provider with the given omnistorage backend.

func (*StorageProvider) Backend

func (p *StorageProvider) Backend() omnistorage.Backend

Backend returns the underlying omnistorage backend.

func (*StorageProvider) NewReader

func (p *StorageProvider) NewReader(ctx context.Context, path string) (IRReader, error)

NewReader creates a reader for the given path. If the path ends with .gz, gzip decompression is automatically applied. Supported path patterns:

  • *.ndjson - plain NDJSON
  • *.ndjson.gz - gzip-compressed NDJSON

func (*StorageProvider) NewWriter

func (p *StorageProvider) NewWriter(ctx context.Context, path string) (IRWriter, error)

NewWriter creates a writer for the given path. If the path ends with .gz, gzip compression is automatically applied. Supported path patterns:

  • *.ndjson - plain NDJSON
  • *.ndjson.gz - gzip-compressed NDJSON

type StorageProviderOption

type StorageProviderOption func(*StorageProvider)

StorageProviderOption configures a StorageProvider.

type StorageReader

type StorageReader struct {
	// contains filtered or unexported fields
}

StorageReader reads IR records from an omnistorage backend. It automatically handles decompression based on the file path extension.

func NewStorageReader

func NewStorageReader(ctx context.Context, backend omnistorage.Backend, path string) (*StorageReader, error)

NewStorageReader creates an IR reader using an omnistorage backend. If the path ends with .gz, gzip decompression is automatically applied. Supported path patterns:

  • *.ndjson - plain NDJSON
  • *.ndjson.gz - gzip-compressed NDJSON

func (*StorageReader) Close

func (r *StorageReader) Close() error

Close closes the reader.

func (*StorageReader) LineNumber

func (r *StorageReader) LineNumber() int

LineNumber returns the current line number (useful for error reporting).

func (*StorageReader) Read

func (r *StorageReader) Read() (*IRRecord, error)

Read reads the next IR record. Returns io.EOF when no more records are available.

type StorageWriter

type StorageWriter struct {
	// contains filtered or unexported fields
}

StorageWriter writes IR records to an omnistorage backend. It automatically handles compression based on the file path extension.

func NewStorageWriter

func NewStorageWriter(ctx context.Context, backend omnistorage.Backend, path string) (*StorageWriter, error)

NewStorageWriter creates an IR writer using an omnistorage backend. If the path ends with .gz, gzip compression is automatically applied. Supported path patterns:

  • *.ndjson - plain NDJSON
  • *.ndjson.gz - gzip-compressed NDJSON

func (*StorageWriter) Close

func (w *StorageWriter) Close() error

Close flushes and closes the writer.

func (*StorageWriter) Count

func (w *StorageWriter) Count() int

Count returns the number of records written.

func (*StorageWriter) Flush

func (w *StorageWriter) Flush() error

Flush flushes any buffered data.

func (*StorageWriter) Write

func (w *StorageWriter) Write(record *IRRecord) error

Write writes a single IR record.

type StreamProvider

type StreamProvider interface {
	// NewStreamWriter creates a writer that writes to the given io.Writer.
	NewStreamWriter(w io.Writer) IRWriter

	// NewStreamReader creates a reader that reads from the given io.Reader.
	// Returns an error if the reader cannot be initialized (e.g., invalid format).
	NewStreamReader(r io.Reader) (IRReader, error)
}

StreamProvider defines the interface for providers that work with io.Reader/Writer rather than paths. This is useful for in-memory or streaming scenarios.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL