Documentation
¶
Index ¶
- Constants
- Variables
- func AsActor(ctx context.Context, actorID string, metadata Metadata) context.Context
- type AnthropicErrorResponse
- type AnthropicMessagesBlockingInterception
- type AnthropicMessagesInterceptionBase
- type AnthropicMessagesStreamingInterception
- type AnthropicProvider
- func (p *AnthropicProvider) AuthHeader() string
- func (p *AnthropicProvider) BaseURL() string
- func (p *AnthropicProvider) BridgedRoutes() []string
- func (p *AnthropicProvider) CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)
- func (p *AnthropicProvider) InjectAuthHeader(headers *http.Header)
- func (p *AnthropicProvider) Name() string
- func (p *AnthropicProvider) PassthroughRoutes() []string
- type AsyncRecorder
- func (a *AsyncRecorder) RecordInterception(ctx context.Context, req *InterceptionRecord) error
- func (a *AsyncRecorder) RecordPromptUsage(_ context.Context, req *PromptUsageRecord) error
- func (a *AsyncRecorder) RecordTokenUsage(_ context.Context, req *TokenUsageRecord) error
- func (a *AsyncRecorder) RecordToolUsage(_ context.Context, req *ToolUsageRecord) error
- func (a *AsyncRecorder) Wait()
- type ChatCompletionNewParamsWrapper
- type Config
- type InterceptionRecord
- type Interceptor
- type MessageNewParamsWrapper
- type Metadata
- type OpenAIBlockingChatInterception
- type OpenAIChatInterceptionBase
- type OpenAIErrorResponse
- type OpenAIProvider
- func (p *OpenAIProvider) AuthHeader() string
- func (p *OpenAIProvider) BaseURL() string
- func (p *OpenAIProvider) BridgedRoutes() []string
- func (p *OpenAIProvider) CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)
- func (p *OpenAIProvider) InjectAuthHeader(headers *http.Header)
- func (p *OpenAIProvider) Name() string
- func (p *OpenAIProvider) PassthroughRoutes() []string
- type OpenAIStreamingChatInterception
- type PromptUsageRecord
- type Provider
- type ProviderConfig
- type Recorder
- type RecorderWrapper
- func (r *RecorderWrapper) RecordInterception(ctx context.Context, req *InterceptionRecord) error
- func (r *RecorderWrapper) RecordPromptUsage(ctx context.Context, req *PromptUsageRecord) error
- func (r *RecorderWrapper) RecordTokenUsage(ctx context.Context, req *TokenUsageRecord) error
- func (r *RecorderWrapper) RecordToolUsage(ctx context.Context, req *ToolUsageRecord) error
- type RequestBridge
- type SSEEvent
- type SSEParser
- type TokenUsageRecord
- type ToolArgs
- type ToolUsageRecord
Constants ¶
const ( SSEEventTypeMessage = "message" SSEEventTypeError = "error" SSEEventTypePing = "ping" )
const (
ProviderAnthropic = "anthropic"
)
const (
ProviderOpenAI = "openai"
)
Variables ¶
var ErrEventStreamClosed = errors.New("event stream closed")
var UnknownRoute = errors.New("unknown route")
Functions ¶
Types ¶
type AnthropicErrorResponse ¶
type AnthropicErrorResponse struct {
*anthropic.ErrorResponse
StatusCode int `json:"-"`
}
func (*AnthropicErrorResponse) Error ¶
func (a *AnthropicErrorResponse) Error() string
type AnthropicMessagesBlockingInterception ¶
type AnthropicMessagesBlockingInterception struct {
AnthropicMessagesInterceptionBase
}
func NewAnthropicMessagesBlockingInterception ¶
func NewAnthropicMessagesBlockingInterception(id uuid.UUID, req *MessageNewParamsWrapper, baseURL, key string) *AnthropicMessagesBlockingInterception
func (*AnthropicMessagesBlockingInterception) ProcessRequest ¶
func (i *AnthropicMessagesBlockingInterception) ProcessRequest(w http.ResponseWriter, r *http.Request) error
func (*AnthropicMessagesBlockingInterception) Setup ¶
func (s *AnthropicMessagesBlockingInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)
type AnthropicMessagesInterceptionBase ¶
type AnthropicMessagesInterceptionBase struct {
// contains filtered or unexported fields
}
func (*AnthropicMessagesInterceptionBase) ID ¶
func (i *AnthropicMessagesInterceptionBase) ID() uuid.UUID
func (*AnthropicMessagesInterceptionBase) Model ¶
func (i *AnthropicMessagesInterceptionBase) Model() string
func (*AnthropicMessagesInterceptionBase) Setup ¶
func (i *AnthropicMessagesInterceptionBase) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)
type AnthropicMessagesStreamingInterception ¶
type AnthropicMessagesStreamingInterception struct {
AnthropicMessagesInterceptionBase
}
func NewAnthropicMessagesStreamingInterception ¶
func NewAnthropicMessagesStreamingInterception(id uuid.UUID, req *MessageNewParamsWrapper, baseURL, key string) *AnthropicMessagesStreamingInterception
func (*AnthropicMessagesStreamingInterception) ProcessRequest ¶
func (i *AnthropicMessagesStreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Request) error
ProcessRequest handles a request to /v1/messages. This API has a state-machine behind it, which is described in https://docs.claude.com/en/docs/build-with-claude/streaming#event-types.
Each stream uses the following event flow: - `message_start`: contains a Message object with empty content. - A series of content blocks, each of which have a `content_block_start`, one or more `content_block_delta` events, and a `content_block_stop` event. - Each content block will have an index that corresponds to its index in the final Message content array. - One or more `message_delta` events, indicating top-level changes to the final Message object. - A final `message_stop` event.
It will inject any tools which have been provided by the mcp.ServerProxier.
When a response from the server includes an event indicating that a tool must be invoked, a conditional flow takes place:
a) if the tool is not injected (i.e. defined by the client), relay the event unmodified b) if the tool is injected, it will be invoked by the mcp.ServerProxier in the remote MCP server, and its results relayed to the SERVER. The response from the server will be handled synchronously, and this loop can continue until all injected tool invocations are completed and the response is relayed to the client.
func (*AnthropicMessagesStreamingInterception) Setup ¶
func (s *AnthropicMessagesStreamingInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)
type AnthropicProvider ¶
type AnthropicProvider struct {
// contains filtered or unexported fields
}
AnthropicProvider allows for interactions with the Anthropic API.
func NewAnthropicProvider ¶
func NewAnthropicProvider(cfg ProviderConfig) *AnthropicProvider
func (*AnthropicProvider) AuthHeader ¶
func (p *AnthropicProvider) AuthHeader() string
func (*AnthropicProvider) BaseURL ¶
func (p *AnthropicProvider) BaseURL() string
func (*AnthropicProvider) BridgedRoutes ¶
func (p *AnthropicProvider) BridgedRoutes() []string
func (*AnthropicProvider) CreateInterceptor ¶
func (p *AnthropicProvider) CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)
func (*AnthropicProvider) InjectAuthHeader ¶
func (p *AnthropicProvider) InjectAuthHeader(headers *http.Header)
func (*AnthropicProvider) Name ¶
func (p *AnthropicProvider) Name() string
func (*AnthropicProvider) PassthroughRoutes ¶
func (p *AnthropicProvider) PassthroughRoutes() []string
type AsyncRecorder ¶
type AsyncRecorder struct {
// contains filtered or unexported fields
}
AsyncRecorder calls Recorder methods asynchronously and logs any errors which may occur.
func NewAsyncRecorder ¶
func (*AsyncRecorder) RecordInterception ¶
func (a *AsyncRecorder) RecordInterception(ctx context.Context, req *InterceptionRecord) error
RecordInterception must NOT be called asynchronously. If an interception cannot be recorded, the whole request should fail.
func (*AsyncRecorder) RecordPromptUsage ¶
func (a *AsyncRecorder) RecordPromptUsage(_ context.Context, req *PromptUsageRecord) error
func (*AsyncRecorder) RecordTokenUsage ¶
func (a *AsyncRecorder) RecordTokenUsage(_ context.Context, req *TokenUsageRecord) error
func (*AsyncRecorder) RecordToolUsage ¶
func (a *AsyncRecorder) RecordToolUsage(_ context.Context, req *ToolUsageRecord) error
func (*AsyncRecorder) Wait ¶
func (a *AsyncRecorder) Wait()
type ChatCompletionNewParamsWrapper ¶
type ChatCompletionNewParamsWrapper struct {
openai.ChatCompletionNewParams `json:""`
Stream bool `json:"stream,omitempty"`
}
ChatCompletionNewParamsWrapper exists because the "stream" param is not included in openai.ChatCompletionNewParams.
func (*ChatCompletionNewParamsWrapper) LastUserPrompt ¶
func (c *ChatCompletionNewParamsWrapper) LastUserPrompt() (*string, error)
func (ChatCompletionNewParamsWrapper) MarshalJSON ¶
func (c ChatCompletionNewParamsWrapper) MarshalJSON() ([]byte, error)
func (*ChatCompletionNewParamsWrapper) UnmarshalJSON ¶
func (c *ChatCompletionNewParamsWrapper) UnmarshalJSON(raw []byte) error
type Config ¶
type Config struct {
OpenAI ProviderConfig
Anthropic ProviderConfig
}
type InterceptionRecord ¶
type Interceptor ¶
type Interceptor interface {
// ID returns the unique identifier for this interception.
ID() uuid.UUID
// Setup injects some required dependencies. This MUST be called before using the interceptor
// to process requests.
Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)
// Model returns the model in use for this [Interceptor].
Model() string
// ProcessRequest handles the HTTP request.
ProcessRequest(w http.ResponseWriter, r *http.Request) error
}
Interceptor describes a (potentially) stateful interaction with an AI provider.
type MessageNewParamsWrapper ¶
type MessageNewParamsWrapper struct {
anthropic.MessageNewParams `json:""`
Stream bool `json:"stream,omitempty"`
}
MessageNewParamsWrapper exists because the "stream" param is not included in anthropic.MessageNewParams.
func (*MessageNewParamsWrapper) LastUserPrompt ¶
func (b *MessageNewParamsWrapper) LastUserPrompt() (*string, error)
func (MessageNewParamsWrapper) MarshalJSON ¶
func (b MessageNewParamsWrapper) MarshalJSON() ([]byte, error)
func (*MessageNewParamsWrapper) UnmarshalJSON ¶
func (b *MessageNewParamsWrapper) UnmarshalJSON(raw []byte) error
func (*MessageNewParamsWrapper) UseStreaming ¶
func (b *MessageNewParamsWrapper) UseStreaming() bool
type OpenAIBlockingChatInterception ¶
type OpenAIBlockingChatInterception struct {
OpenAIChatInterceptionBase
}
func NewOpenAIBlockingChatInterception ¶
func NewOpenAIBlockingChatInterception(id uuid.UUID, req *ChatCompletionNewParamsWrapper, baseURL, key string) *OpenAIBlockingChatInterception
func (*OpenAIBlockingChatInterception) ProcessRequest ¶
func (i *OpenAIBlockingChatInterception) ProcessRequest(w http.ResponseWriter, r *http.Request) error
func (*OpenAIBlockingChatInterception) Setup ¶
func (s *OpenAIBlockingChatInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)
type OpenAIChatInterceptionBase ¶
type OpenAIChatInterceptionBase struct {
// contains filtered or unexported fields
}
func (*OpenAIChatInterceptionBase) ID ¶
func (i *OpenAIChatInterceptionBase) ID() uuid.UUID
func (*OpenAIChatInterceptionBase) Model ¶
func (i *OpenAIChatInterceptionBase) Model() string
func (*OpenAIChatInterceptionBase) Setup ¶
func (i *OpenAIChatInterceptionBase) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)
type OpenAIErrorResponse ¶
type OpenAIErrorResponse struct {
*shared.ErrorResponse
StatusCode int `json:"-"`
}
func (*OpenAIErrorResponse) Error ¶
func (a *OpenAIErrorResponse) Error() string
type OpenAIProvider ¶
type OpenAIProvider struct {
// contains filtered or unexported fields
}
OpenAIProvider allows for interactions with the OpenAI API.
func NewOpenAIProvider ¶
func NewOpenAIProvider(cfg ProviderConfig) *OpenAIProvider
func (*OpenAIProvider) AuthHeader ¶
func (p *OpenAIProvider) AuthHeader() string
func (*OpenAIProvider) BaseURL ¶
func (p *OpenAIProvider) BaseURL() string
func (*OpenAIProvider) BridgedRoutes ¶
func (p *OpenAIProvider) BridgedRoutes() []string
func (*OpenAIProvider) CreateInterceptor ¶
func (p *OpenAIProvider) CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)
func (*OpenAIProvider) InjectAuthHeader ¶
func (p *OpenAIProvider) InjectAuthHeader(headers *http.Header)
func (*OpenAIProvider) Name ¶
func (p *OpenAIProvider) Name() string
func (*OpenAIProvider) PassthroughRoutes ¶
func (p *OpenAIProvider) PassthroughRoutes() []string
PassthroughRoutes define the routes which are not currently intercepted but must be passed through to the upstream. The /v1/completions legacy API is deprecated and will not be passed through. See https://platform.openai.com/docs/api-reference/completions.
type OpenAIStreamingChatInterception ¶
type OpenAIStreamingChatInterception struct {
OpenAIChatInterceptionBase
}
func NewOpenAIStreamingChatInterception ¶
func NewOpenAIStreamingChatInterception(id uuid.UUID, req *ChatCompletionNewParamsWrapper, baseURL, key string) *OpenAIStreamingChatInterception
func (*OpenAIStreamingChatInterception) ProcessRequest ¶
func (i *OpenAIStreamingChatInterception) ProcessRequest(w http.ResponseWriter, r *http.Request) error
ProcessRequest handles a request to /v1/chat/completions. See https://platform.openai.com/docs/api-reference/chat-streaming/streaming.
It will inject any tools which have been provided by the mcp.ServerProxier.
When a response from the server includes an event indicating that a tool must be invoked, a conditional flow takes place:
a) if the tool is not injected (i.e. defined by the client), relay the event unmodified b) if the tool is injected, it will be invoked by the mcp.ServerProxier in the remote MCP server, and its results relayed to the SERVER. The response from the server will be handled synchronously, and this loop can continue until all injected tool invocations are completed and the response is relayed to the client.
func (*OpenAIStreamingChatInterception) Setup ¶
func (i *OpenAIStreamingChatInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)
type PromptUsageRecord ¶
type Provider ¶
type Provider interface {
// Name returns the provider's name.
Name() string
// BaseURL defines the base URL endpoint for this provider's API.
BaseURL() string
// CreateInterceptor starts a new [Interceptor] which is responsible for intercepting requests,
// communicating with the upstream provider and formulating a response to be sent to the requesting client.
CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)
// BridgedRoutes returns a slice of [http.ServeMux]-compatible routes which will have special handling.
// See https://pkg.go.dev/net/http#hdr-Patterns-ServeMux.
BridgedRoutes() []string
// PassthroughRoutes returns a slice of whitelisted [http.ServeMux]-compatible* routes which are
// not currently intercepted and must be handled by the upstream directly.
//
// * only path routes can be specified, not ones containing HTTP methods. (i.e. GET /route).
// By default, these passthrough routes will accept any HTTP method.
PassthroughRoutes() []string
// AuthHeader returns the name of the header which the provider expects to find its authentication
// token in.
AuthHeader() string
// InjectAuthHeader allows [Provider]s to set its authentication header.
InjectAuthHeader(*http.Header)
}
Provider describes an AI provider client's behaviour. Provider clients are responsible for interacting with upstream AI providers.
type ProviderConfig ¶
type ProviderConfig struct {
BaseURL, Key string
}
type Recorder ¶
type Recorder interface {
// RecordInterception records metadata about an interception with an upstream AI provider.
RecordInterception(ctx context.Context, req *InterceptionRecord) error
// RecordTokenUsage records the tokens used in an interception with an upstream AI provider.
RecordTokenUsage(ctx context.Context, req *TokenUsageRecord) error
// RecordPromptUsage records the prompts used in an interception with an upstream AI provider.
RecordPromptUsage(ctx context.Context, req *PromptUsageRecord) error
// RecordToolUsage records the tools used in an interception with an upstream AI provider.
RecordToolUsage(ctx context.Context, req *ToolUsageRecord) error
}
Recorder describes all the possible usage information we need to capture during interactions with AI providers. Additionally, it introduces the concept of an "Interception", which includes information about which provider/model was used and by whom. All usage records should reference this Interception by ID.
type RecorderWrapper ¶
type RecorderWrapper struct {
// contains filtered or unexported fields
}
RecorderWrapper is a convenience struct which implements RecorderClient and resolves a client before calling each method. It also sets the start/creation time of each record.
func NewRecorder ¶
func NewRecorder(logger slog.Logger, clientFn func() (Recorder, error)) *RecorderWrapper
func (*RecorderWrapper) RecordInterception ¶
func (r *RecorderWrapper) RecordInterception(ctx context.Context, req *InterceptionRecord) error
func (*RecorderWrapper) RecordPromptUsage ¶
func (r *RecorderWrapper) RecordPromptUsage(ctx context.Context, req *PromptUsageRecord) error
func (*RecorderWrapper) RecordTokenUsage ¶
func (r *RecorderWrapper) RecordTokenUsage(ctx context.Context, req *TokenUsageRecord) error
func (*RecorderWrapper) RecordToolUsage ¶
func (r *RecorderWrapper) RecordToolUsage(ctx context.Context, req *ToolUsageRecord) error
type RequestBridge ¶
type RequestBridge struct {
// contains filtered or unexported fields
}
RequestBridge is an http.Handler which is capable of masquerading as AI providers' APIs; specifically, OpenAI's & Anthropic's at present. RequestBridge intercepts requests to - and responses from - these upstream services to provide a centralized governance layer.
RequestBridge has no concept of authentication or authorization. It does have a concept of identity, in the narrow sense that it expects an [actor] to be defined in the context, to record the initiator of each interception.
RequestBridge is safe for concurrent use.
func NewRequestBridge ¶
func NewRequestBridge(ctx context.Context, providers []Provider, logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier) (*RequestBridge, error)
NewRequestBridge creates a new *RequestBridge and registers the HTTP routes defined by the given providers. Any routes which are requested but not registered will be reverse-proxied to the upstream service.
A Recorder is also required to record prompt, tool, and token use.
mcpProxy will be closed when the RequestBridge is closed.
func (*RequestBridge) InflightRequests ¶
func (b *RequestBridge) InflightRequests() int32
func (*RequestBridge) ServeHTTP ¶
func (b *RequestBridge) ServeHTTP(rw http.ResponseWriter, r *http.Request)
ServeHTTP exposes the internal http.Handler, which has all [Provider]s' routes registered. It also tracks inflight requests.
type SSEParser ¶
type SSEParser struct {
// contains filtered or unexported fields
}
func NewSSEParser ¶
func NewSSEParser() *SSEParser
func (*SSEParser) EventsByType ¶
func (*SSEParser) MessageEvents ¶
type TokenUsageRecord ¶
Source Files
¶
- anthropic.go
- api.go
- bridge.go
- config.go
- context.go
- intercept_anthropic_messages_base.go
- intercept_anthropic_messages_blocking.go
- intercept_anthropic_messages_streaming.go
- intercept_openai_chat_base.go
- intercept_openai_chat_blocking.go
- intercept_openai_chat_streaming.go
- interception.go
- openai.go
- passthrough.go
- provider.go
- provider_anthropic.go
- provider_openai.go
- recorder.go
- sse_parser.go
- streaming.go