Documentation
¶
Overview ¶
Package transformer includes ctxio: a context-bound reader wrapper.
Package transformer handles request/response transformation and token counting.
Package transformer handles request and response format conversion between Anthropic Messages API and OpenAI Chat Completions API.
Package transformer handles request/response transformation and token counting.
Package transformer handles request/response transformation and token counting.
Package transformer handles request/response transformation and token counting.
Index ¶
- Variables
- func GeminiToNormalized(geminiResp *types.GeminiResponse, modelID string) *core.NormalizedResponse
- func HasThinkingBlocks(messages []types.Message) bool
- func IsIdleTimeout(err error) bool
- func NewCtxReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser
- func NewCtxReader(ctx context.Context, r io.Reader) io.Reader
- func NormalizedToAnthropic(req *core.NormalizedRequest, model config.ModelConfig) *types.MessageRequest
- func NormalizedToGemini(req *core.NormalizedRequest, model config.ModelConfig) *types.GeminiRequest
- func NormalizedToResponses(req *core.NormalizedRequest, model config.ModelConfig) *types.ResponsesRequest
- func OpenAIResponseToNormalized(openaiResp *types.ChatCompletionResponse, modelID string) *core.NormalizedResponse
- func ResponsesToNormalized(responsesResp *types.ResponsesResponse, modelID string) *core.NormalizedResponse
- func StartIdleWatchdog(ctx context.Context, cancel context.CancelFunc, idleTimeout time.Duration) func()
- func TransformErrorResponse(statusCode int, message string) map[string]interface{}
- func TransformRequestFromNormalized(req *core.NormalizedRequest, model config.ModelConfig) *types.ChatCompletionRequest
- type RequestTransformer
- func (t *RequestTransformer) TransformRequest(anthropicReq *types.MessageRequest, model config.ModelConfig) (*types.ChatCompletionRequest, error)
- func (t *RequestTransformer) TransformToGemini(anthropicReq *types.MessageRequest, model config.ModelConfig) (*types.GeminiRequest, error)
- func (t *RequestTransformer) TransformToResponses(anthropicReq *types.MessageRequest, model config.ModelConfig) (*types.ResponsesRequest, error)
- type ResponseTransformer
- func (t *ResponseTransformer) TransformGeminiResponse(geminiResp *types.GeminiResponse, originalModel string) (*types.MessageResponse, error)
- func (t *ResponseTransformer) TransformResponse(openaiResp *types.ChatCompletionResponse, originalModel string) (*types.MessageResponse, error)
- func (t *ResponseTransformer) TransformResponsesResponse(responsesResp *types.ResponsesResponse, originalModel string) (*types.MessageResponse, error)
- type StreamHandler
- func (h *StreamHandler) EmitMessageResponse(w http.ResponseWriter, resp *types.MessageResponse) error
- func (h *StreamHandler) ProxyGeminiStream(w http.ResponseWriter, geminiResp io.ReadCloser, originalModel string, ...) error
- func (h *StreamHandler) ProxyResponsesStream(w http.ResponseWriter, responsesResp io.ReadCloser, originalModel string, ...) error
- func (h *StreamHandler) ProxyStream(w http.ResponseWriter, openaiResp io.ReadCloser, originalModel string, ...) error
Constants ¶
This section is empty.
Variables ¶
var ErrClientDisconnected = fmt.Errorf("client disconnected")
ErrClientDisconnected is returned when the client disconnects during streaming.
var ErrStreamIdle = fmt.Errorf("upstream stream idle")
ErrStreamIdle is returned when no bytes arrive within idleTimeout on the upstream stream. The connection is stale (e.g. backend hang or network partition). The handler decides whether to fall back to another model.
var ErrStreamReadCanceled = errors.New("stream read canceled by context")
ErrStreamReadCanceled is returned by ctxReader.Read when its context is canceled or its deadline expires.
Functions ¶
func GeminiToNormalized ¶
func GeminiToNormalized(geminiResp *types.GeminiResponse, modelID string) *core.NormalizedResponse
GeminiToNormalized converts a GeminiResponse to NormalizedResponse.
func HasThinkingBlocks ¶
HasThinkingBlocks returns true if any assistant message contains thinking content — either as a dedicated `thinking`-typed block, or attached as a non-empty `thinking` field on a `tool_use` block.
Claude Code emits both shapes: dedicated thinking blocks for text-only reasoning, and tool_use blocks with an inline `thinking` field when the assistant turn ends in a tool call. Both forms must mark the conversation as having thinking history so the proxy enables thinking mode on subsequent upstream calls (DeepSeek defaults to thinking mode and demands `reasoning_content` once it's been engaged).
func IsIdleTimeout ¶
IsIdleTimeout reports whether err is a read-timeout (network deadline exceeded on an otherwise live stream).
func NewCtxReadCloser ¶ added in v0.3.4
func NewCtxReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser
NewCtxReadCloser is like NewCtxReader but preserves the io.Closer on the returned value. If rc is nil, returns nil.
func NewCtxReader ¶ added in v0.3.4
NewCtxReader wraps r so that its next Read returns ErrStreamReadCanceled when ctx is canceled or its deadline expires.
func NormalizedToAnthropic ¶
func NormalizedToAnthropic(req *core.NormalizedRequest, model config.ModelConfig) *types.MessageRequest
NormalizedToAnthropic converts a NormalizedRequest to an Anthropic MessageRequest.
func NormalizedToGemini ¶
func NormalizedToGemini(req *core.NormalizedRequest, model config.ModelConfig) *types.GeminiRequest
NormalizedToGemini converts a NormalizedRequest to a GeminiRequest.
func NormalizedToResponses ¶
func NormalizedToResponses(req *core.NormalizedRequest, model config.ModelConfig) *types.ResponsesRequest
NormalizedToResponses converts a NormalizedRequest to a ResponsesRequest.
func OpenAIResponseToNormalized ¶
func OpenAIResponseToNormalized(openaiResp *types.ChatCompletionResponse, modelID string) *core.NormalizedResponse
OpenAIResponseToNormalized converts an OpenAI ChatCompletionResponse to NormalizedResponse.
func ResponsesToNormalized ¶
func ResponsesToNormalized(responsesResp *types.ResponsesResponse, modelID string) *core.NormalizedResponse
ResponsesToNormalized converts an OpenAI ResponsesResponse to NormalizedResponse.
func StartIdleWatchdog ¶
func StartIdleWatchdog(ctx context.Context, cancel context.CancelFunc, idleTimeout time.Duration) func()
StartIdleWatchdog launches a goroutine that calls cancel() if no call to the returned ping function occurs within idleTimeout. The caller must invoke ping() after every successful byte read from the upstream stream.
The watchdog goroutine exits when ctx is done (e.g., the stream completed or the caller cancelled the context). The caller MUST cancel ctx when the stream is finished to avoid leaking the goroutine.
Pass idleTimeout <= 0 to disable the watchdog (the returned ping is a no-op).
Typical usage:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ping := StartIdleWatchdog(ctx, cancel, idleTimeout)
// In the read loop:
n, err := body.Read(buf)
if n > 0 {
ping()
// process bytes
}
func TransformErrorResponse ¶
TransformErrorResponse converts an HTTP error into an Anthropic-style error map.
func TransformRequestFromNormalized ¶
func TransformRequestFromNormalized(req *core.NormalizedRequest, model config.ModelConfig) *types.ChatCompletionRequest
TransformRequestFromNormalized converts a NormalizedRequest to OpenAI ChatCompletionRequest by first reconstructing the Anthropic format and running it through the existing TransformRequest pipeline.
Types ¶
type RequestTransformer ¶
type RequestTransformer struct{}
RequestTransformer converts Anthropic requests to OpenAI format.
func NewRequestTransformer ¶
func NewRequestTransformer() *RequestTransformer
NewRequestTransformer creates a new request transformer.
func (*RequestTransformer) TransformRequest ¶
func (t *RequestTransformer) TransformRequest( anthropicReq *types.MessageRequest, model config.ModelConfig, ) (*types.ChatCompletionRequest, error)
TransformRequest converts an Anthropic MessageRequest to OpenAI ChatCompletionRequest.
func (*RequestTransformer) TransformToGemini ¶
func (t *RequestTransformer) TransformToGemini( anthropicReq *types.MessageRequest, model config.ModelConfig, ) (*types.GeminiRequest, error)
TransformToGemini converts an Anthropic MessageRequest to GeminiRequest.
func (*RequestTransformer) TransformToResponses ¶
func (t *RequestTransformer) TransformToResponses( anthropicReq *types.MessageRequest, model config.ModelConfig, ) (*types.ResponsesRequest, error)
TransformToResponses converts an Anthropic MessageRequest to OpenAI ResponsesRequest.
type ResponseTransformer ¶
type ResponseTransformer struct{}
ResponseTransformer converts OpenAI responses to Anthropic format.
func NewResponseTransformer ¶
func NewResponseTransformer() *ResponseTransformer
NewResponseTransformer creates a new response transformer.
func (*ResponseTransformer) TransformGeminiResponse ¶
func (t *ResponseTransformer) TransformGeminiResponse( geminiResp *types.GeminiResponse, originalModel string, ) (*types.MessageResponse, error)
TransformGeminiResponse converts a GeminiResponse to Anthropic MessageResponse.
func (*ResponseTransformer) TransformResponse ¶
func (t *ResponseTransformer) TransformResponse( openaiResp *types.ChatCompletionResponse, originalModel string, ) (*types.MessageResponse, error)
TransformResponse converts an OpenAI ChatCompletionResponse to Anthropic MessageResponse.
func (*ResponseTransformer) TransformResponsesResponse ¶
func (t *ResponseTransformer) TransformResponsesResponse( responsesResp *types.ResponsesResponse, originalModel string, ) (*types.MessageResponse, error)
TransformResponsesResponse converts an OpenAI ResponsesResponse to Anthropic MessageResponse.
type StreamHandler ¶
type StreamHandler struct {
// contains filtered or unexported fields
}
StreamHandler handles streaming SSE transformation from OpenAI to Anthropic format.
func NewStreamHandler ¶
func NewStreamHandler() *StreamHandler
NewStreamHandler creates a new stream handler.
func (*StreamHandler) EmitMessageResponse ¶ added in v0.3.5
func (h *StreamHandler) EmitMessageResponse(w http.ResponseWriter, resp *types.MessageResponse) error
EmitMessageResponse synthesizes an Anthropic-format SSE stream from a non-streaming MessageResponse. This is used for vision scenarios where the upstream model does not support streaming — the proxy fetches the full response, then emits it as SSE events so the client's streaming contract is preserved.
func (*StreamHandler) ProxyGeminiStream ¶
func (h *StreamHandler) ProxyGeminiStream( w http.ResponseWriter, geminiResp io.ReadCloser, originalModel string, clientCtx context.Context, idleTimeout time.Duration, cancel context.CancelFunc, ) error
ProxyGeminiStream takes a Gemini streaming response and writes Anthropic-format SSE. streamCtx is the per-model attempt context (carries streaming_timeout_ms); the caller should wrap geminiResp with NewCtxReadCloser so the body read also respects the deadline.
func (*StreamHandler) ProxyResponsesStream ¶
func (h *StreamHandler) ProxyResponsesStream( w http.ResponseWriter, responsesResp io.ReadCloser, originalModel string, clientCtx context.Context, idleTimeout time.Duration, cancel context.CancelFunc, ) error
ProxyResponsesStream takes an OpenAI Responses streaming response and writes Anthropic-format SSE. streamCtx is the per-model attempt context (carries streaming_timeout_ms); the caller should wrap responsesResp with NewCtxReadCloser so the body read also respects the deadline.
func (*StreamHandler) ProxyStream ¶
func (h *StreamHandler) ProxyStream( w http.ResponseWriter, openaiResp io.ReadCloser, originalModel string, clientCtx context.Context, idleTimeout time.Duration, cancel context.CancelFunc, ) error
ProxyStream takes an OpenAI streaming response and writes Anthropic-format SSE to the writer. It reads OpenAI ChatCompletionChunk SSE events and transforms them into Anthropic MessageEvent SSE events. The streamCtx is the per-model attempt context (carries streaming_timeout_ms); the caller should wrap openaiResp with NewCtxReadCloser so the body read also respects the deadline.
CRITICAL: This function reads directly from resp.Body without buffering to minimize latency. Per deep research: "Don't use bufio.Scanner or bufio.Reader on the response body - it adds buffering"
idleTimeout is the maximum gap between bytes on the upstream stream. The stream lives as long as data keeps flowing; only an idle period longer than idleTimeout is treated as a stuck connection and surfaces as ErrStreamIdle. Pass 0 to disable (stream lives until EOF or error).