aibridge

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: AGPL-3.0 Imports: 35 Imported by: 2

README

aibridge

Documentation

Index

Constants

View Source
const (
	SSEEventTypeMessage = "message"
	SSEEventTypeError   = "error"
	SSEEventTypePing    = "ping"
)
View Source
const (
	ProviderAnthropic = "anthropic"
)
View Source
const (
	ProviderOpenAI = "openai"
)

Variables

View Source
var ErrEventStreamClosed = errors.New("event stream closed")
View Source
var UnknownRoute = errors.New("unknown route")

Functions

func AsActor

func AsActor(ctx context.Context, actorID string, metadata Metadata) context.Context

Types

type AnthropicErrorResponse

type AnthropicErrorResponse struct {
	*anthropic.ErrorResponse

	StatusCode int `json:"-"`
}

func (*AnthropicErrorResponse) Error

func (a *AnthropicErrorResponse) Error() string

type AnthropicMessagesBlockingInterception

type AnthropicMessagesBlockingInterception struct {
	AnthropicMessagesInterceptionBase
}

func NewAnthropicMessagesBlockingInterception

func NewAnthropicMessagesBlockingInterception(id uuid.UUID, req *MessageNewParamsWrapper, baseURL, key string) *AnthropicMessagesBlockingInterception

func (*AnthropicMessagesBlockingInterception) ProcessRequest

func (*AnthropicMessagesBlockingInterception) Setup

func (s *AnthropicMessagesBlockingInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)

type AnthropicMessagesInterceptionBase

type AnthropicMessagesInterceptionBase struct {
	// contains filtered or unexported fields
}

func (*AnthropicMessagesInterceptionBase) ID

func (*AnthropicMessagesInterceptionBase) Model

func (*AnthropicMessagesInterceptionBase) Setup

func (i *AnthropicMessagesInterceptionBase) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)

type AnthropicMessagesStreamingInterception

type AnthropicMessagesStreamingInterception struct {
	AnthropicMessagesInterceptionBase
}

func NewAnthropicMessagesStreamingInterception

func NewAnthropicMessagesStreamingInterception(id uuid.UUID, req *MessageNewParamsWrapper, baseURL, key string) *AnthropicMessagesStreamingInterception

func (*AnthropicMessagesStreamingInterception) ProcessRequest

ProcessRequest handles a request to /v1/messages. This API has a state-machine behind it, which is described in https://docs.claude.com/en/docs/build-with-claude/streaming#event-types.

Each stream uses the following event flow: - `message_start`: contains a Message object with empty content. - A series of content blocks, each of which have a `content_block_start`, one or more `content_block_delta` events, and a `content_block_stop` event. - Each content block will have an index that corresponds to its index in the final Message content array. - One or more `message_delta` events, indicating top-level changes to the final Message object. - A final `message_stop` event.

It will inject any tools which have been provided by the mcp.ServerProxier.

When a response from the server includes an event indicating that a tool must be invoked, a conditional flow takes place:

a) if the tool is not injected (i.e. defined by the client), relay the event unmodified b) if the tool is injected, it will be invoked by the mcp.ServerProxier in the remote MCP server, and its results relayed to the SERVER. The response from the server will be handled synchronously, and this loop can continue until all injected tool invocations are completed and the response is relayed to the client.

func (*AnthropicMessagesStreamingInterception) Setup

func (s *AnthropicMessagesStreamingInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)

type AnthropicProvider

type AnthropicProvider struct {
	// contains filtered or unexported fields
}

AnthropicProvider allows for interactions with the Anthropic API.

func NewAnthropicProvider

func NewAnthropicProvider(cfg ProviderConfig) *AnthropicProvider

func (*AnthropicProvider) AuthHeader

func (p *AnthropicProvider) AuthHeader() string

func (*AnthropicProvider) BaseURL

func (p *AnthropicProvider) BaseURL() string

func (*AnthropicProvider) BridgedRoutes

func (p *AnthropicProvider) BridgedRoutes() []string

func (*AnthropicProvider) CreateInterceptor

func (p *AnthropicProvider) CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)

func (*AnthropicProvider) InjectAuthHeader

func (p *AnthropicProvider) InjectAuthHeader(headers *http.Header)

func (*AnthropicProvider) Name

func (p *AnthropicProvider) Name() string

func (*AnthropicProvider) PassthroughRoutes

func (p *AnthropicProvider) PassthroughRoutes() []string

type AsyncRecorder

type AsyncRecorder struct {
	// contains filtered or unexported fields
}

AsyncRecorder calls Recorder methods asynchronously and logs any errors which may occur.

func NewAsyncRecorder

func NewAsyncRecorder(logger slog.Logger, wrapped Recorder, timeout time.Duration) *AsyncRecorder

func (*AsyncRecorder) RecordInterception

func (a *AsyncRecorder) RecordInterception(ctx context.Context, req *InterceptionRecord) error

RecordInterception must NOT be called asynchronously. If an interception cannot be recorded, the whole request should fail.

func (*AsyncRecorder) RecordPromptUsage

func (a *AsyncRecorder) RecordPromptUsage(_ context.Context, req *PromptUsageRecord) error

func (*AsyncRecorder) RecordTokenUsage

func (a *AsyncRecorder) RecordTokenUsage(_ context.Context, req *TokenUsageRecord) error

func (*AsyncRecorder) RecordToolUsage

func (a *AsyncRecorder) RecordToolUsage(_ context.Context, req *ToolUsageRecord) error

func (*AsyncRecorder) Wait

func (a *AsyncRecorder) Wait()

type ChatCompletionNewParamsWrapper

type ChatCompletionNewParamsWrapper struct {
	openai.ChatCompletionNewParams `json:""`
	Stream                         bool `json:"stream,omitempty"`
}

ChatCompletionNewParamsWrapper exists because the "stream" param is not included in openai.ChatCompletionNewParams.

func (*ChatCompletionNewParamsWrapper) LastUserPrompt

func (c *ChatCompletionNewParamsWrapper) LastUserPrompt() (*string, error)

func (ChatCompletionNewParamsWrapper) MarshalJSON

func (c ChatCompletionNewParamsWrapper) MarshalJSON() ([]byte, error)

func (*ChatCompletionNewParamsWrapper) UnmarshalJSON

func (c *ChatCompletionNewParamsWrapper) UnmarshalJSON(raw []byte) error

type Config

type Config struct {
	OpenAI    ProviderConfig
	Anthropic ProviderConfig
}

type InterceptionRecord

type InterceptionRecord struct {
	ID                           string
	InitiatorID, Provider, Model string
	Metadata                     Metadata
	StartedAt                    time.Time
}

type Interceptor

type Interceptor interface {
	// ID returns the unique identifier for this interception.
	ID() uuid.UUID
	// Setup injects some required dependencies. This MUST be called before using the interceptor
	// to process requests.
	Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)

	// Model returns the model in use for this [Interceptor].
	Model() string
	// ProcessRequest handles the HTTP request.
	ProcessRequest(w http.ResponseWriter, r *http.Request) error
}

Interceptor describes a (potentially) stateful interaction with an AI provider.

type MessageNewParamsWrapper

type MessageNewParamsWrapper struct {
	anthropic.MessageNewParams `json:""`
	Stream                     bool `json:"stream,omitempty"`
}

MessageNewParamsWrapper exists because the "stream" param is not included in anthropic.MessageNewParams.

func (*MessageNewParamsWrapper) LastUserPrompt

func (b *MessageNewParamsWrapper) LastUserPrompt() (*string, error)

func (MessageNewParamsWrapper) MarshalJSON

func (b MessageNewParamsWrapper) MarshalJSON() ([]byte, error)

func (*MessageNewParamsWrapper) UnmarshalJSON

func (b *MessageNewParamsWrapper) UnmarshalJSON(raw []byte) error

func (*MessageNewParamsWrapper) UseStreaming

func (b *MessageNewParamsWrapper) UseStreaming() bool

type Metadata

type Metadata map[string]any

type OpenAIBlockingChatInterception

type OpenAIBlockingChatInterception struct {
	OpenAIChatInterceptionBase
}

func NewOpenAIBlockingChatInterception

func NewOpenAIBlockingChatInterception(id uuid.UUID, req *ChatCompletionNewParamsWrapper, baseURL, key string) *OpenAIBlockingChatInterception

func (*OpenAIBlockingChatInterception) ProcessRequest

func (*OpenAIBlockingChatInterception) Setup

func (s *OpenAIBlockingChatInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)

type OpenAIChatInterceptionBase

type OpenAIChatInterceptionBase struct {
	// contains filtered or unexported fields
}

func (*OpenAIChatInterceptionBase) ID

func (*OpenAIChatInterceptionBase) Model

func (*OpenAIChatInterceptionBase) Setup

func (i *OpenAIChatInterceptionBase) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)

type OpenAIErrorResponse

type OpenAIErrorResponse struct {
	*shared.ErrorResponse

	StatusCode int `json:"-"`
}

func (*OpenAIErrorResponse) Error

func (a *OpenAIErrorResponse) Error() string

type OpenAIProvider

type OpenAIProvider struct {
	// contains filtered or unexported fields
}

OpenAIProvider allows for interactions with the OpenAI API.

func NewOpenAIProvider

func NewOpenAIProvider(cfg ProviderConfig) *OpenAIProvider

func (*OpenAIProvider) AuthHeader

func (p *OpenAIProvider) AuthHeader() string

func (*OpenAIProvider) BaseURL

func (p *OpenAIProvider) BaseURL() string

func (*OpenAIProvider) BridgedRoutes

func (p *OpenAIProvider) BridgedRoutes() []string

func (*OpenAIProvider) CreateInterceptor

func (p *OpenAIProvider) CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)

func (*OpenAIProvider) InjectAuthHeader

func (p *OpenAIProvider) InjectAuthHeader(headers *http.Header)

func (*OpenAIProvider) Name

func (p *OpenAIProvider) Name() string

func (*OpenAIProvider) PassthroughRoutes

func (p *OpenAIProvider) PassthroughRoutes() []string

PassthroughRoutes define the routes which are not currently intercepted but must be passed through to the upstream. The /v1/completions legacy API is deprecated and will not be passed through. See https://platform.openai.com/docs/api-reference/completions.

type OpenAIStreamingChatInterception

type OpenAIStreamingChatInterception struct {
	OpenAIChatInterceptionBase
}

func NewOpenAIStreamingChatInterception

func NewOpenAIStreamingChatInterception(id uuid.UUID, req *ChatCompletionNewParamsWrapper, baseURL, key string) *OpenAIStreamingChatInterception

func (*OpenAIStreamingChatInterception) ProcessRequest

ProcessRequest handles a request to /v1/chat/completions. See https://platform.openai.com/docs/api-reference/chat-streaming/streaming.

It will inject any tools which have been provided by the mcp.ServerProxier.

When a response from the server includes an event indicating that a tool must be invoked, a conditional flow takes place:

a) if the tool is not injected (i.e. defined by the client), relay the event unmodified b) if the tool is injected, it will be invoked by the mcp.ServerProxier in the remote MCP server, and its results relayed to the SERVER. The response from the server will be handled synchronously, and this loop can continue until all injected tool invocations are completed and the response is relayed to the client.

func (*OpenAIStreamingChatInterception) Setup

func (i *OpenAIStreamingChatInterception) Setup(logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier)

type PromptUsageRecord

type PromptUsageRecord struct {
	InterceptionID string
	MsgID, Prompt  string
	Metadata       Metadata
	CreatedAt      time.Time
}

type Provider

type Provider interface {
	// Name returns the provider's name.
	Name() string
	// BaseURL defines the base URL endpoint for this provider's API.
	BaseURL() string

	// CreateInterceptor starts a new [Interceptor] which is responsible for intercepting requests,
	// communicating with the upstream provider and formulating a response to be sent to the requesting client.
	CreateInterceptor(w http.ResponseWriter, r *http.Request) (Interceptor, error)

	// BridgedRoutes returns a slice of [http.ServeMux]-compatible routes which will have special handling.
	// See https://pkg.go.dev/net/http#hdr-Patterns-ServeMux.
	BridgedRoutes() []string
	// PassthroughRoutes returns a slice of whitelisted [http.ServeMux]-compatible* routes which are
	// not currently intercepted and must be handled by the upstream directly.
	//
	// * only path routes can be specified, not ones containing HTTP methods. (i.e. GET /route).
	// By default, these passthrough routes will accept any HTTP method.
	PassthroughRoutes() []string

	// AuthHeader returns the name of the header which the provider expects to find its authentication
	// token in.
	AuthHeader() string
	// InjectAuthHeader allows [Provider]s to set its authentication header.
	InjectAuthHeader(*http.Header)
}

Provider describes an AI provider client's behaviour. Provider clients are responsible for interacting with upstream AI providers.

type ProviderConfig

type ProviderConfig struct {
	BaseURL, Key string
}

type Recorder

type Recorder interface {
	// RecordInterception records metadata about an interception with an upstream AI provider.
	RecordInterception(ctx context.Context, req *InterceptionRecord) error
	// RecordTokenUsage records the tokens used in an interception with an upstream AI provider.
	RecordTokenUsage(ctx context.Context, req *TokenUsageRecord) error
	// RecordPromptUsage records the prompts used in an interception with an upstream AI provider.
	RecordPromptUsage(ctx context.Context, req *PromptUsageRecord) error
	// RecordToolUsage records the tools used in an interception with an upstream AI provider.
	RecordToolUsage(ctx context.Context, req *ToolUsageRecord) error
}

Recorder describes all the possible usage information we need to capture during interactions with AI providers. Additionally, it introduces the concept of an "Interception", which includes information about which provider/model was used and by whom. All usage records should reference this Interception by ID.

type RecorderWrapper

type RecorderWrapper struct {
	// contains filtered or unexported fields
}

RecorderWrapper is a convenience struct which implements RecorderClient and resolves a client before calling each method. It also sets the start/creation time of each record.

func NewRecorder

func NewRecorder(logger slog.Logger, clientFn func() (Recorder, error)) *RecorderWrapper

func (*RecorderWrapper) RecordInterception

func (r *RecorderWrapper) RecordInterception(ctx context.Context, req *InterceptionRecord) error

func (*RecorderWrapper) RecordPromptUsage

func (r *RecorderWrapper) RecordPromptUsage(ctx context.Context, req *PromptUsageRecord) error

func (*RecorderWrapper) RecordTokenUsage

func (r *RecorderWrapper) RecordTokenUsage(ctx context.Context, req *TokenUsageRecord) error

func (*RecorderWrapper) RecordToolUsage

func (r *RecorderWrapper) RecordToolUsage(ctx context.Context, req *ToolUsageRecord) error

type RequestBridge

type RequestBridge struct {
	// contains filtered or unexported fields
}

RequestBridge is an http.Handler which is capable of masquerading as AI providers' APIs; specifically, OpenAI's & Anthropic's at present. RequestBridge intercepts requests to - and responses from - these upstream services to provide a centralized governance layer.

RequestBridge has no concept of authentication or authorization. It does have a concept of identity, in the narrow sense that it expects an [actor] to be defined in the context, to record the initiator of each interception.

RequestBridge is safe for concurrent use.

func NewRequestBridge

func NewRequestBridge(ctx context.Context, providers []Provider, logger slog.Logger, recorder Recorder, mcpProxy mcp.ServerProxier) (*RequestBridge, error)

NewRequestBridge creates a new *RequestBridge and registers the HTTP routes defined by the given providers. Any routes which are requested but not registered will be reverse-proxied to the upstream service.

A Recorder is also required to record prompt, tool, and token use.

mcpProxy will be closed when the RequestBridge is closed.

func (*RequestBridge) InflightRequests

func (b *RequestBridge) InflightRequests() int32

func (*RequestBridge) ServeHTTP

func (b *RequestBridge) ServeHTTP(rw http.ResponseWriter, r *http.Request)

ServeHTTP exposes the internal http.Handler, which has all [Provider]s' routes registered. It also tracks inflight requests.

func (*RequestBridge) Shutdown

func (b *RequestBridge) Shutdown(ctx context.Context) error

Shutdown will attempt to gracefully shutdown. This entails waiting for all requests to complete, and shutting down the MCP server proxier. TODO: add tests.

type SSEEvent

type SSEEvent struct {
	Type  string
	Data  string
	ID    string
	Retry int
}

type SSEParser

type SSEParser struct {
	// contains filtered or unexported fields
}

func NewSSEParser

func NewSSEParser() *SSEParser

func (*SSEParser) AllEvents

func (p *SSEParser) AllEvents() map[string][]SSEEvent

func (*SSEParser) EventsByType

func (p *SSEParser) EventsByType(eventType string) []SSEEvent

func (*SSEParser) MessageEvents

func (p *SSEParser) MessageEvents() []SSEEvent

func (*SSEParser) Parse

func (p *SSEParser) Parse(reader io.Reader) error

type TokenUsageRecord

type TokenUsageRecord struct {
	InterceptionID string
	MsgID          string
	Input, Output  int64
	Metadata       Metadata
	CreatedAt      time.Time
}

type ToolArgs

type ToolArgs any

type ToolUsageRecord

type ToolUsageRecord struct {
	InterceptionID  string
	MsgID, Tool     string
	ServerURL       *string
	Args            ToolArgs
	Injected        bool
	InvocationError error
	Metadata        Metadata
	CreatedAt       time.Time
}

Directories

Path Synopsis
Package mcpmock is a generated GoMock package.
Package mcpmock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL