transformer

package
v0.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Overview

Package transformer includes ctxio: a context-bound reader wrapper.

Package transformer handles request/response transformation and token counting.

Package transformer handles request and response format conversion between Anthropic Messages API and OpenAI Chat Completions API.

Package transformer handles request/response transformation and token counting.

Package transformer handles request/response transformation and token counting.

Package transformer handles request/response transformation and token counting.

Index

Constants

This section is empty.

Variables

View Source
var ErrClientDisconnected = fmt.Errorf("client disconnected")

ErrClientDisconnected is returned when the client disconnects during streaming.

View Source
var ErrStreamIdle = fmt.Errorf("upstream stream idle")

ErrStreamIdle is returned when no bytes arrive within idleTimeout on the upstream stream. The connection is stale (e.g. backend hang or network partition). The handler decides whether to fall back to another model.

View Source
var ErrStreamReadCanceled = errors.New("stream read canceled by context")

ErrStreamReadCanceled is returned by ctxReader.Read when its context is canceled or its deadline expires.

Functions

func GeminiToNormalized

func GeminiToNormalized(geminiResp *types.GeminiResponse, modelID string) *core.NormalizedResponse

GeminiToNormalized converts a GeminiResponse to NormalizedResponse.

func HasThinkingBlocks

func HasThinkingBlocks(messages []types.Message) bool

HasThinkingBlocks returns true if any assistant message contains thinking content — either as a dedicated `thinking`-typed block, or attached as a non-empty `thinking` field on a `tool_use` block.

Claude Code emits both shapes: dedicated thinking blocks for text-only reasoning, and tool_use blocks with an inline `thinking` field when the assistant turn ends in a tool call. Both forms must mark the conversation as having thinking history so the proxy enables thinking mode on subsequent upstream calls (DeepSeek defaults to thinking mode and demands `reasoning_content` once it's been engaged).

func IsIdleTimeout

func IsIdleTimeout(err error) bool

IsIdleTimeout reports whether err is a read-timeout (network deadline exceeded on an otherwise live stream).

func NewCtxReadCloser added in v0.3.4

func NewCtxReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser

NewCtxReadCloser is like NewCtxReader but preserves the io.Closer on the returned value. If rc is nil, returns nil.

func NewCtxReader added in v0.3.4

func NewCtxReader(ctx context.Context, r io.Reader) io.Reader

NewCtxReader wraps r so that its next Read returns ErrStreamReadCanceled when ctx is canceled or its deadline expires.

func NormalizedToAnthropic

func NormalizedToAnthropic(req *core.NormalizedRequest, model config.ModelConfig) *types.MessageRequest

NormalizedToAnthropic converts a NormalizedRequest to an Anthropic MessageRequest.

func NormalizedToGemini

func NormalizedToGemini(req *core.NormalizedRequest, model config.ModelConfig) *types.GeminiRequest

NormalizedToGemini converts a NormalizedRequest to a GeminiRequest.

func NormalizedToResponses

func NormalizedToResponses(req *core.NormalizedRequest, model config.ModelConfig) *types.ResponsesRequest

NormalizedToResponses converts a NormalizedRequest to a ResponsesRequest.

func OpenAIResponseToNormalized

func OpenAIResponseToNormalized(openaiResp *types.ChatCompletionResponse, modelID string) *core.NormalizedResponse

OpenAIResponseToNormalized converts an OpenAI ChatCompletionResponse to NormalizedResponse.

func ResponsesToNormalized

func ResponsesToNormalized(responsesResp *types.ResponsesResponse, modelID string) *core.NormalizedResponse

ResponsesToNormalized converts an OpenAI ResponsesResponse to NormalizedResponse.

func StartIdleWatchdog

func StartIdleWatchdog(ctx context.Context, cancel context.CancelFunc, idleTimeout time.Duration) func()

StartIdleWatchdog launches a goroutine that calls cancel() if no call to the returned ping function occurs within idleTimeout. The caller must invoke ping() after every successful byte read from the upstream stream.

The watchdog goroutine exits when ctx is done (e.g., the stream completed or the caller cancelled the context). The caller MUST cancel ctx when the stream is finished to avoid leaking the goroutine.

Pass idleTimeout <= 0 to disable the watchdog (the returned ping is a no-op).

Typical usage:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ping := StartIdleWatchdog(ctx, cancel, idleTimeout)
// In the read loop:
n, err := body.Read(buf)
if n > 0 {
    ping()
    // process bytes
}

func TransformErrorResponse

func TransformErrorResponse(statusCode int, message string) map[string]interface{}

TransformErrorResponse converts an HTTP error into an Anthropic-style error map.

func TransformRequestFromNormalized

func TransformRequestFromNormalized(req *core.NormalizedRequest, model config.ModelConfig) *types.ChatCompletionRequest

TransformRequestFromNormalized converts a NormalizedRequest to OpenAI ChatCompletionRequest by first reconstructing the Anthropic format and running it through the existing TransformRequest pipeline.

Types

type RequestTransformer

type RequestTransformer struct{}

RequestTransformer converts Anthropic requests to OpenAI format.

func NewRequestTransformer

func NewRequestTransformer() *RequestTransformer

NewRequestTransformer creates a new request transformer.

func (*RequestTransformer) TransformRequest

func (t *RequestTransformer) TransformRequest(
	anthropicReq *types.MessageRequest,
	model config.ModelConfig,
) (*types.ChatCompletionRequest, error)

TransformRequest converts an Anthropic MessageRequest to OpenAI ChatCompletionRequest.

func (*RequestTransformer) TransformToGemini

func (t *RequestTransformer) TransformToGemini(
	anthropicReq *types.MessageRequest,
	model config.ModelConfig,
) (*types.GeminiRequest, error)

TransformToGemini converts an Anthropic MessageRequest to GeminiRequest.

func (*RequestTransformer) TransformToResponses

func (t *RequestTransformer) TransformToResponses(
	anthropicReq *types.MessageRequest,
	model config.ModelConfig,
) (*types.ResponsesRequest, error)

TransformToResponses converts an Anthropic MessageRequest to OpenAI ResponsesRequest.

type ResponseTransformer

type ResponseTransformer struct{}

ResponseTransformer converts OpenAI responses to Anthropic format.

func NewResponseTransformer

func NewResponseTransformer() *ResponseTransformer

NewResponseTransformer creates a new response transformer.

func (*ResponseTransformer) TransformGeminiResponse

func (t *ResponseTransformer) TransformGeminiResponse(
	geminiResp *types.GeminiResponse,
	originalModel string,
) (*types.MessageResponse, error)

TransformGeminiResponse converts a GeminiResponse to Anthropic MessageResponse.

func (*ResponseTransformer) TransformResponse

func (t *ResponseTransformer) TransformResponse(
	openaiResp *types.ChatCompletionResponse,
	originalModel string,
) (*types.MessageResponse, error)

TransformResponse converts an OpenAI ChatCompletionResponse to Anthropic MessageResponse.

func (*ResponseTransformer) TransformResponsesResponse

func (t *ResponseTransformer) TransformResponsesResponse(
	responsesResp *types.ResponsesResponse,
	originalModel string,
) (*types.MessageResponse, error)

TransformResponsesResponse converts an OpenAI ResponsesResponse to Anthropic MessageResponse.

type StreamHandler

type StreamHandler struct {
	// contains filtered or unexported fields
}

StreamHandler handles streaming SSE transformation from OpenAI to Anthropic format.

func NewStreamHandler

func NewStreamHandler() *StreamHandler

NewStreamHandler creates a new stream handler.

func (*StreamHandler) EmitMessageResponse added in v0.3.5

func (h *StreamHandler) EmitMessageResponse(w http.ResponseWriter, resp *types.MessageResponse) error

EmitMessageResponse synthesizes an Anthropic-format SSE stream from a non-streaming MessageResponse. This is used for vision scenarios where the upstream model does not support streaming — the proxy fetches the full response, then emits it as SSE events so the client's streaming contract is preserved.

func (*StreamHandler) ProxyGeminiStream

func (h *StreamHandler) ProxyGeminiStream(
	w http.ResponseWriter,
	geminiResp io.ReadCloser,
	originalModel string,
	clientCtx context.Context,
	idleTimeout time.Duration,
	cancel context.CancelFunc,
) error

ProxyGeminiStream takes a Gemini streaming response and writes Anthropic-format SSE. streamCtx is the per-model attempt context (carries streaming_timeout_ms); the caller should wrap geminiResp with NewCtxReadCloser so the body read also respects the deadline.

func (*StreamHandler) ProxyResponsesStream

func (h *StreamHandler) ProxyResponsesStream(
	w http.ResponseWriter,
	responsesResp io.ReadCloser,
	originalModel string,
	clientCtx context.Context,
	idleTimeout time.Duration,
	cancel context.CancelFunc,
) error

ProxyResponsesStream takes an OpenAI Responses streaming response and writes Anthropic-format SSE. streamCtx is the per-model attempt context (carries streaming_timeout_ms); the caller should wrap responsesResp with NewCtxReadCloser so the body read also respects the deadline.

func (*StreamHandler) ProxyStream

func (h *StreamHandler) ProxyStream(
	w http.ResponseWriter,
	openaiResp io.ReadCloser,
	originalModel string,
	clientCtx context.Context,
	idleTimeout time.Duration,
	cancel context.CancelFunc,
) error

ProxyStream takes an OpenAI streaming response and writes Anthropic-format SSE to the writer. It reads OpenAI ChatCompletionChunk SSE events and transforms them into Anthropic MessageEvent SSE events. The streamCtx is the per-model attempt context (carries streaming_timeout_ms); the caller should wrap openaiResp with NewCtxReadCloser so the body read also respects the deadline.

CRITICAL: This function reads directly from resp.Body without buffering to minimize latency. Per deep research: "Don't use bufio.Scanner or bufio.Reader on the response body - it adds buffering"

idleTimeout is the maximum gap between bytes on the upstream stream. The stream lives as long as data keeps flowing; only an idle period longer than idleTimeout is treated as a stuck connection and surfaces as ErrStreamIdle. Pass 0 to disable (stream lives until EOF or error).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL