Documentation
¶
Overview ¶
Package convert provides utilities for converting structured data to and from JSON streams. It includes converters for both input (structured data to JSON) and output (JSON streams to structured data).
Package convert provides utilities for converting structured data to and from Protocol Buffers streams. It includes converters for both input (structured data to protobuf) and output (protobuf streams to structured data).
Index ¶
- func FromJSON(target any) calque.OutputConverter
- func FromJSONSchema[T any](target any) calque.OutputConverter
- func FromProtobuf(target proto.Message) calque.OutputConverter
- func FromProtobufStream(target proto.Message) calque.OutputConverter
- func FromYAML(target any) calque.OutputConverter
- func RawContentFormatter(content string, _ bool) any
- func ToJSON(data any) calque.InputConverter
- func ToJSONSchema(data any) calque.InputConverter
- func ToProtobuf(data proto.Message) calque.InputConverter
- func ToProtobufStream(data proto.Message) calque.InputConverter
- func ToYAML(data any) calque.InputConverter
- type JSONInputConverter
- type JSONOutputConverter
- type ProtobufInputConverter
- type ProtobufOutputConverter
- type ProtobufStreamInputConverter
- type ProtobufStreamOutputConverter
- type SSEChunkMode
- type SSEConverter
- func (s *SSEConverter) Close() error
- func (s *SSEConverter) FromReader(reader io.Reader) error
- func (s *SSEConverter) WithChunkMode(mode SSEChunkMode) *SSEConverter
- func (s *SSEConverter) WithEventFields(fields map[string]any) *SSEConverter
- func (s *SSEConverter) WithKeepAlive(interval time.Duration) *SSEConverter
- func (s *SSEConverter) WriteError(err error) error
- func (s *SSEConverter) WriteEvent(eventType string, data any) error
- type SSEEvent
- type SSEEventFormatter
- type SchemaInputConverter
- type SchemaOutputConverter
- type YAMLInputConverter
- type YAMLOutputConverter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FromJSON ¶ added in v0.2.0
func FromJSON(target any) calque.OutputConverter
FromJSON creates an output converter for parsing JSON streams to structured data.
Input: pointer to target variable for unmarshaling Output: calque.OutputConverter for pipeline output position Behavior: STREAMING - uses json.Decoder for automatic streaming/buffering as needed
Parses JSON data from pipeline output into the specified target type. Target must be a pointer to the desired output type. Uses encoding/json for unmarshaling, supporting all standard JSON types and struct tags.
Example usage:
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
var user User
err := pipeline.Run(ctx, input, convert.FromJSON(&user))
fmt.Printf("User: %s, Age: %d\n", user.Name, user.Age)
func FromJSONSchema ¶ added in v0.2.0
func FromJSONSchema[T any](target any) calque.OutputConverter
FromJSONSchema creates an output converter that validates JSON against schema.
Input: pointer to target variable for unmarshaling, generic type parameter for validation Output: calque.OutputConverter for pipeline output position Behavior: BUFFERED - reads entire JSON stream, validates against schema, unmarshals to target
Parses JSON data that may contain embedded schema information and unmarshals to the specified target type. Handles both schema-embedded format (from ToJSONSchema) and direct JSON format. Uses the generic type parameter to determine expected structure and validation rules.
The converter attempts multiple unmarshaling strategies: 1. Direct unmarshaling to target type 2. Schema-wrapped format extraction 3. Flexible wrapper format handling
Example usage:
type Task struct {
Type string `json:"type"`
Priority string `json:"priority"`
Hours int `json:"hours"`
}
var task Task
err := pipeline.Run(ctx, schemaInput, convert.FromJSONSchema[Task](&task))
fmt.Printf("Task: %s priority, %d hours\n", task.Priority, task.Hours)
func FromProtobuf ¶ added in v0.3.0
func FromProtobuf(target proto.Message) calque.OutputConverter
FromProtobuf creates an output converter for parsing protobuf streams to structured data.
Input: pointer to target proto.Message for unmarshaling Output: calque.OutputConverter for pipeline output position Behavior: STREAMING - uses proto.Unmarshal for efficient binary deserialization
Parses protobuf data from pipeline output into the specified target type. Target must be a pointer to a proto.Message. Uses google.golang.org/protobuf/proto for unmarshaling, supporting all standard protobuf types.
Example usage:
type User struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3"`
Age int32 `protobuf:"varint,2,opt,name=age,proto3"`
}
var user User
err := pipeline.Run(ctx, input, convert.FromProtobuf(&user))
fmt.Printf("User: %s, Age: %d\n", user.Name, user.Age)
func FromProtobufStream ¶ added in v0.3.0
func FromProtobufStream(target proto.Message) calque.OutputConverter
FromProtobufStream creates a streaming output converter for large protobuf messages.
Input: pointer to target proto.Message for unmarshaling Output: calque.OutputConverter for pipeline output position Behavior: STREAMING - efficient streaming of large protobuf messages
This is useful for large protobuf messages where you want to avoid loading the entire message into memory at once.
Example usage:
var largeMessage LargeProtobufMessage err := pipeline.Run(ctx, input, convert.FromProtobufStream(&largeMessage))
func FromYAML ¶ added in v0.2.0
func FromYAML(target any) calque.OutputConverter
FromYAML creates an output converter for parsing YAML streams to structured data.
Input: pointer to target variable for unmarshaling Output: calque.OutputConverter for pipeline output position Behavior: STREAMING - uses yaml.Decoder for automatic streaming/buffering as needed
Parses YAML data from pipeline output into the specified target type. Target must be a pointer to the desired output type. Uses goccy/go-yaml for unmarshaling, supporting standard YAML types and struct tags.
Example usage:
type Config struct {
Database struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
} `yaml:"database"`
}
var config Config
err := pipeline.Run(ctx, input, convert.FromYAML(&config))
fmt.Printf("DB: %s:%d\n", config.Database.Host, config.Database.Port)
func RawContentFormatter ¶
RawContentFormatter sends content directly without wrapping (default).
Input: content string, done flag (ignored) Output: content as-is Behavior: Simple pass-through formatter
Returns content without any wrapper structure. Use for simple text streaming where clients expect raw content.
Example:
formatter := convert.RawContentFormatter
data := formatter("hello", false) // returns "hello"
func ToJSON ¶ added in v0.2.0
func ToJSON(data any) calque.InputConverter
ToJSON creates an input converter for transforming structured data to JSON streams.
Input: any data type (structs, maps, slices, JSON strings, JSON bytes) Output: calque.InputConverter for pipeline input position Behavior: STREAMING - uses json.Encoder for automatic streaming optimization
Converts various data types to valid JSON format for pipeline processing: - Structs/maps/slices: Marshaled using encoding/json - JSON strings: Validated and passed through - JSON bytes: Validated and passed through - Other types: Attempted JSON marshaling
Example usage:
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
user := User{Name: "Alice", Age: 30}
err := pipeline.Run(ctx, convert.ToJson(user), &result)
func ToJSONSchema ¶ added in v0.2.0
func ToJSONSchema(data any) calque.InputConverter
ToJSONSchema creates an input converter for transforming structured data to JSON streams.
Input: any data type (structs, maps, slices, JSON strings, JSON bytes) Output: calque.InputConverter for pipeline input position Behavior: STREAMING - uses json.Encoder for automatic streaming optimization
Converts various data types to valid JSON format for pipeline processing: - Structs/maps/slices: Marshaled using encoding/json - JSON strings: Validated and passed through - JSON bytes: Validated and passed through - Other types: Attempted JSON marshaling
Example usage:
type Task struct {
Type string `json:"type" jsonschema:"required,enum=bug,enum=feature"`
Priority string `json:"priority" jsonschema:"required,enum=low,enum=high"`
Hours int `json:"hours" jsonschema:"minimum=1,maximum=40"`
}
task := Task{Type: "feature", Priority: "high", Hours: 8}
err := pipeline.Run(ctx, convert.ToJSONSchema(task), &result)
func ToProtobuf ¶ added in v0.3.0
func ToProtobuf(data proto.Message) calque.InputConverter
ToProtobuf creates an input converter for transforming protobuf messages to binary streams.
Input: proto.Message data type Output: calque.InputConverter for pipeline input position Behavior: STREAMING - uses proto.Marshal for efficient binary serialization
Converts protobuf messages to binary format for pipeline processing. This provides 30-50% smaller payloads compared to JSON and 2-3x faster serialization.
Example usage:
type User struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3"`
Age int32 `protobuf:"varint,2,opt,name=age,proto3"`
}
user := &User{Name: "Alice", Age: 30}
err := pipeline.Run(ctx, convert.ToProtobuf(user), &result)
func ToProtobufStream ¶ added in v0.3.0
func ToProtobufStream(data proto.Message) calque.InputConverter
ToProtobufStream creates a streaming input converter for large protobuf messages.
Input: proto.Message data type Output: calque.InputConverter for pipeline input position Behavior: STREAMING - efficient streaming of large protobuf messages
This is useful for large protobuf messages where you want to avoid loading the entire message into memory at once. Uses chunked streaming for very large messages.
Example usage:
largeMessage := &LargeProtobufMessage{...}
err := pipeline.Run(ctx, convert.ToProtobufStream(largeMessage), &result)
func ToYAML ¶ added in v0.2.0
func ToYAML(data any) calque.InputConverter
ToYAML creates an input converter for transforming structured data to YAML streams.
Input: any data type (structs, maps, slices, YAML strings, YAML bytes) Output: calque.InputConverter for pipeline input position Behavior: STREAMING - uses yaml.Encoder for automatic streaming optimization
Converts various data types to valid YAML format for pipeline processing: - Structs/maps/slices: Marshaled using goccy/go-yaml - YAML strings: Validated and passed through - YAML bytes: Validated and passed through - Other types: Attempted YAML marshaling
Example usage:
type Config struct {
Database struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
} `yaml:"database"`
}
config := Config{Database: {Host: "localhost", Port: 5432}}
err := pipeline.Run(ctx, convert.ToYAML(config), &result)
Types ¶
type JSONInputConverter ¶ added in v0.2.0
type JSONInputConverter struct {
// contains filtered or unexported fields
}
JSONInputConverter for structured data -> JSON streams
type JSONOutputConverter ¶ added in v0.2.0
type JSONOutputConverter struct {
// contains filtered or unexported fields
}
JSONOutputConverter for JSON streams -> structured data
func (*JSONOutputConverter) FromReader ¶ added in v0.2.0
func (j *JSONOutputConverter) FromReader(reader io.Reader) error
FromReader implements the OutputConverter interface for JSON streams -> structured data.
type ProtobufInputConverter ¶ added in v0.3.0
type ProtobufInputConverter struct {
// contains filtered or unexported fields
}
ProtobufInputConverter for structured data -> protobuf streams
type ProtobufOutputConverter ¶ added in v0.3.0
type ProtobufOutputConverter struct {
// contains filtered or unexported fields
}
ProtobufOutputConverter for protobuf streams -> structured data
func (*ProtobufOutputConverter) FromReader ¶ added in v0.3.0
func (p *ProtobufOutputConverter) FromReader(reader io.Reader) error
FromReader implements the OutputConverter interface for protobuf streams -> structured data.
type ProtobufStreamInputConverter ¶ added in v0.3.0
type ProtobufStreamInputConverter struct {
// contains filtered or unexported fields
}
ProtobufStreamInputConverter for streaming large protobuf messages
type ProtobufStreamOutputConverter ¶ added in v0.3.0
type ProtobufStreamOutputConverter struct {
// contains filtered or unexported fields
}
ProtobufStreamOutputConverter for streaming large protobuf messages
func (*ProtobufStreamOutputConverter) FromReader ¶ added in v0.3.0
func (p *ProtobufStreamOutputConverter) FromReader(reader io.Reader) error
FromReader implements the OutputConverter interface for streaming protobuf streams -> structured data.
type SSEChunkMode ¶
type SSEChunkMode int
SSEChunkMode defines how to chunk the streaming data.
Controls granularity of streaming: by character, word, line, or complete. Affects real-time user experience and bandwidth usage.
const ( // SSEChunkByWord streams content word by word (default mode) SSEChunkByWord SSEChunkMode = iota // Stream word by word (default) // SSEChunkByChar streams content character by character SSEChunkByChar // Stream character by character // SSEChunkByLine streams content line by line SSEChunkByLine // Stream line by line // SSEChunkNone streams entire response as single event SSEChunkNone // Stream entire response as single event )
type SSEConverter ¶
type SSEConverter struct {
// contains filtered or unexported fields
}
SSEConverter streams data as Server-Sent Events to an HTTP response writer.
Handles real-time streaming of data chunks with configurable chunking modes and event formatting. Automatically sets SSE headers and manages flushing.
Example:
sse := convert.ToSSE(w).WithChunkMode(convert.SSEChunkByWord) err := sse.FromReader(dataReader)
func ToSSE ¶
func ToSSE(w http.ResponseWriter) *SSEConverter
ToSSE creates an SSE converter that streams to the given HTTP response writer.
Input: HTTP response writer Output: configured SSEConverter Behavior: Sets SSE headers and configures streaming
Automatically sets required SSE headers (Content-Type, Cache-Control, etc.) and configures default word-by-word chunking with raw content formatting.
Example:
sse := convert.ToSSE(w) err := sse.FromReader(reader)
func (*SSEConverter) Close ¶ added in v0.3.0
func (s *SSEConverter) Close() error
Close forcefully terminates the SSE connection and releases resources.
Input: none Output: error if cleanup fails Behavior: Flushes data, hijacks connection, forces close
Performs aggressive cleanup of SSE streaming connection. First attempts to hijack the underlying HTTP connection for immediate termination, then falls back to io.Closer if available. Use for force-close scenarios or when handlers don't return immediately after streaming.
Note: Most SSE streams close naturally when the HTTP handler returns. This method is primarily for edge cases requiring explicit connection termination (misbehaving clients, long-running handlers, etc.).
Example:
defer sse.Close() // Force cleanup on function exit
if err := sse.Close(); err != nil {
log.Printf("SSE close error: %v", err)
}
func (*SSEConverter) FromReader ¶
func (s *SSEConverter) FromReader(reader io.Reader) error
FromReader implements OutputConverter interface for streaming SSE responses.
Input: io.Reader data source Output: error if streaming fails Behavior: STREAMING - reads and sends data in real-time chunks
Streams data according to configured chunk mode, sending SSE events as data arrives. Handles completion and error events automatically. Starts keep-alive if enabled.
Example:
err := sse.FromReader(llmResponseStream)
func (*SSEConverter) WithChunkMode ¶
func (s *SSEConverter) WithChunkMode(mode SSEChunkMode) *SSEConverter
WithChunkMode sets how the data should be chunked for streaming.
Input: SSEChunkMode enum value Output: *SSEConverter for chaining Behavior: Configures streaming granularity
Example:
sse.WithChunkMode(convert.SSEChunkByChar) // Character-by-character sse.WithChunkMode(convert.SSEChunkByLine) // Line-by-line
func (*SSEConverter) WithEventFields ¶
func (s *SSEConverter) WithEventFields(fields map[string]any) *SSEConverter
WithEventFields sets additional fields to include in each event.
Input: map of additional fields Output: *SSEConverter for chaining Behavior: Switches to map formatter with custom fields
Automatically switches from raw content to structured map format with the provided base fields plus content and done status.
Example:
sse.WithEventFields(map[string]any{
"stream_id": "abc123",
"model": "gpt-4",
})
func (*SSEConverter) WithKeepAlive ¶ added in v0.4.0
func (s *SSEConverter) WithKeepAlive(interval time.Duration) *SSEConverter
WithKeepAlive enables periodic keep-alive messages to prevent connection timeouts.
Input: keep-alive interval (recommended: 30 * time.Second) Output: *SSEConverter for chaining Behavior: Sends periodic comment messages to maintain connection
Keep-alive messages are sent as SSE comments (": keep-alive\n\n") which are ignored by clients but prevent proxy/firewall timeouts. Common interval is 30 seconds to handle most proxy timeout configurations.
Example:
sse.WithKeepAlive(30 * time.Second) // Recommended interval sse.WithKeepAlive(15 * time.Second) // More frequent
func (*SSEConverter) WriteError ¶
func (s *SSEConverter) WriteError(err error) error
WriteError sends an error event (public method for external use).
Input: error to send Output: error if writing fails Behavior: Sends structured error event to client
Example:
if err := processData(); err != nil {
sse.WriteError(err)
}
func (*SSEConverter) WriteEvent ¶ added in v0.4.0
func (s *SSEConverter) WriteEvent(eventType string, data any) error
WriteEvent sends a custom SSE event with specified type and data.
Input: event type string, arbitrary data Output: error if write fails Behavior: Marshals data to JSON and sends as SSE event
Provides direct control over SSE event type and payload for custom streaming scenarios. Data is automatically JSON-marshaled.
Example:
sse.WriteEvent("progress", map[string]any{"percent": 75})
sse.WriteEvent("notification", "Task completed")
type SSEEvent ¶
type SSEEvent struct {
Event string `json:"event,omitempty"`
Data any `json:"data"`
ID string `json:"id,omitempty"`
Retry int `json:"retry,omitempty"`
}
SSEEvent represents a Server-Sent Event.
Defines the structure for SSE messages with optional event type, data payload, ID for reconnection, and retry timing.
Example:
event := SSEEvent{
Event: "message",
Data: "Hello world",
ID: "123",
}
type SSEEventFormatter ¶
SSEEventFormatter defines how to format SSE event data.
Input: content string, done flag Output: formatted data for SSE event Behavior: Transforms content into desired event format
Example:
formatter := func(content string, done bool) any {
return map[string]any{"text": content, "finished": done}
}
func MapEventFormatter ¶
func MapEventFormatter(baseFields map[string]any) SSEEventFormatter
MapEventFormatter creates events as maps with the provided base fields.
Input: base fields map Output: SSEEventFormatter function Behavior: Returns formatter that wraps content in structured map
Creates structured events with custom fields plus content and done status. Useful for clients expecting consistent JSON structure.
Example:
formatter := convert.MapEventFormatter(map[string]any{
"type": "chat", "user_id": "123",
})
// Results in: {"type": "chat", "user_id": "123", "content": "...", "done": false}
type SchemaInputConverter ¶ added in v0.2.0
type SchemaInputConverter struct {
// contains filtered or unexported fields
}
SchemaInputConverter for structured data -> JSON Schema validated data
type SchemaOutputConverter ¶ added in v0.2.0
type SchemaOutputConverter[T any] struct { // contains filtered or unexported fields }
SchemaOutputConverter for JSON Schema validated data -> structured data
func (*SchemaOutputConverter[T]) FromReader ¶ added in v0.2.0
func (j *SchemaOutputConverter[T]) FromReader(reader io.Reader) error
FromReader implements outputConverter interface
type YAMLInputConverter ¶ added in v0.2.0
type YAMLInputConverter struct {
// contains filtered or unexported fields
}
YAMLInputConverter is an input converter for transforming structured data to YAML streams.
type YAMLOutputConverter ¶ added in v0.2.0
type YAMLOutputConverter struct {
// contains filtered or unexported fields
}
YAMLOutputConverter is an output converter for parsing YAML streams to structured data.
func (*YAMLOutputConverter) FromReader ¶ added in v0.2.0
func (y *YAMLOutputConverter) FromReader(reader io.Reader) error
FromReader reads YAML data from an io.Reader into the target variable.