Documentation
¶
Index ¶
- Constants
- Variables
- func GetToolCallID(ctx context.Context) string
- func InitGraphCompileCallbacks(cbs []GraphCompileCallback)
- func IsInterruptRerunError(err error) (any, bool)
- func NewInterruptAndRerunErr(extra any) error
- func ProcessState[S any](ctx context.Context, handler func(context.Context, S) error) error
- func RegisterInternalType(f func(key string, value any) error) error
- func RegisterSerializableType[T any](name string) error
- func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))
- func RegisterValuesMergeFunc[T any](fn func([]T) (T, error))
- type AnyGraph
- type Chain
- func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]
- func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendParallel(p *Parallel) *Chain[I, O]
- func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- type ChainBranch
- func NewChainBranch[T any](cond GraphBranchCondition[T]) *ChainBranch
- func NewChainMultiBranch[T any](cond GraphMultiBranchCondition[T]) *ChainBranch
- func NewStreamChainBranch[T any](cond StreamGraphBranchCondition[T]) *ChainBranch
- func NewStreamChainMultiBranch[T any](cond StreamGraphMultiBranchCondition[T]) *ChainBranch
- func (cb *ChainBranch) AddChatModel(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch
- type CheckPointStore
- type Collect
- type CollectWOOpt
- type FanInMergeConfig
- type FieldMapping
- func FromField(from string) *FieldMapping
- func FromFieldPath(fromFieldPath FieldPath) *FieldMapping
- func MapFieldPaths(fromFieldPath, toFieldPath FieldPath) *FieldMapping
- func MapFields(from, to string) *FieldMapping
- func ToField(to string, opts ...FieldMappingOption) *FieldMapping
- func ToFieldPath(toFieldPath FieldPath, opts ...FieldMappingOption) *FieldMapping
- type FieldMappingOption
- type FieldPath
- type GenLocalState
- type Graph
- func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)
- func (g Graph) AddChatModelNode(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) error
- func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error
- func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error
- func (g *Graph[I, O]) AddEdge(startNode, endNode string) (err error)
- func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error
- func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error
- func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error
- func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error
- func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error
- func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error
- func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error
- func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error
- func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- func (g Graph) GetType() string
- type GraphAddNodeOpt
- func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt
- func WithInputKey(k string) GraphAddNodeOpt
- func WithNodeKey(key string) GraphAddNodeOpt
- func WithNodeName(n string) GraphAddNodeOpt
- func WithOutputKey(k string) GraphAddNodeOpt
- func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt
- func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt
- func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt
- func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt
- type GraphBranch
- func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
- func NewGraphMultiBranch[T any](condition GraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
- func NewStreamGraphBranch[T any](condition StreamGraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
- func NewStreamGraphMultiBranch[T any](condition StreamGraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
- type GraphBranchCondition
- type GraphCompileCallback
- type GraphCompileOption
- func WithCheckPointStore(store CheckPointStore) GraphCompileOption
- func WithEagerExecution() GraphCompileOption
- func WithEagerExecutionDisabled() GraphCompileOption
- func WithFanInMergeConfig(confs map[string]FanInMergeConfig) GraphCompileOption
- func WithGraphCompileCallbacks(cbs ...GraphCompileCallback) GraphCompileOption
- func WithGraphName(graphName string) GraphCompileOption
- func WithInterruptAfterNodes(nodes []string) GraphCompileOption
- func WithInterruptBeforeNodes(nodes []string) GraphCompileOption
- func WithMaxRunSteps(maxSteps int) GraphCompileOption
- func WithNodeTriggerMode(triggerMode NodeTriggerMode) GraphCompileOption
- func WithSerializer(serializer Serializer) GraphCompileOption
- type GraphInfo
- type GraphMultiBranchCondition
- type GraphNodeInfo
- type InterruptInfo
- type Invoke
- type InvokeWOOpt
- type Lambda
- func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption], c Collect[I, O, TOption], ...) (*Lambda, error)
- func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda
- func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda
- func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda
- func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda
- func ToList[I any](opts ...LambdaOpt) *Lambda
- func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda
- func TransformableLambdaWithOption[I, O, TOption any](t Transform[I, O, TOption], opts ...LambdaOpt) *Lambda
- type LambdaOpt
- type NewGraphOption
- type NodePath
- type NodeTriggerMode
- type Option
- func WithCallbacks(cbs ...callbacks.Handler) Option
- func WithChatModelOption(opts ...model.Option) Option
- func WithChatTemplateOption(opts ...prompt.Option) Option
- func WithCheckPointID(checkPointID string) Option
- func WithDocumentTransformerOption(opts ...document.TransformerOption) Option
- func WithEmbeddingOption(opts ...embedding.Option) Option
- func WithForceNewRun() Option
- func WithIndexerOption(opts ...indexer.Option) Option
- func WithLambdaOption(opts ...any) Option
- func WithLoaderOption(opts ...document.LoaderOption) Option
- func WithRetrieverOption(opts ...retriever.Option) Option
- func WithRuntimeMaxSteps(maxSteps int) Option
- func WithStateModifier(sm StateModifier) Option
- func WithToolsNodeOption(opts ...ToolsNodeOption) Option
- func WithWriteToCheckPointID(checkPointID string) Option
- type Parallel
- func (p *Parallel) AddChatModel(outputKey string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel
- type Runnable
- type Serializer
- type StateModifier
- type StatePostHandler
- type StatePreHandler
- type Stream
- type StreamGraphBranchCondition
- type StreamGraphMultiBranchCondition
- type StreamStatePostHandler
- type StreamStatePreHandler
- type StreamWOOpt
- type ToolsInterruptAndRerunExtra
- type ToolsNode
- func (tn *ToolsNode) GetType() string
- func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
- func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
- type ToolsNodeConfig
- type ToolsNodeOption
- type Transform
- type TransformWOOpts
- type Workflow
- func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch
- func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddChatTemplateNode(key string, chatTemplate prompt.ChatTemplate, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddDocumentTransformerNode(key string, transformer document.Transformer, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddEmbeddingNode(key string, embedding embedding.Embedder, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddEnd(fromNodeKey string, inputs ...*FieldMapping) *Workflow[I, O]deprecated
- func (wf *Workflow[I, O]) AddGraphNode(key string, graph AnyGraph, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddIndexerNode(key string, indexer indexer.Indexer, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddLambdaNode(key string, lambda *Lambda, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddLoaderNode(key string, loader document.Loader, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddRetrieverNode(key string, retriever retriever.Retriever, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) AddToolsNode(key string, tools *ToolsNode, opts ...GraphAddNodeOpt) *WorkflowNode
- func (wf *Workflow[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- func (wf *Workflow[I, O]) End() *WorkflowNode
- type WorkflowAddInputOpt
- type WorkflowBranch
- type WorkflowNode
- func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode
- func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode
- func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode
- func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode
Constants ¶
const ( ComponentOfUnknown component = "Unknown" ComponentOfGraph component = "Graph" ComponentOfWorkflow component = "Workflow" ComponentOfChain component = "Chain" ComponentOfPassthrough component = "Passthrough" ComponentOfToolsNode component = "ToolsNode" ComponentOfLambda component = "Lambda" )
built-in component types in graph node. it represents the type of the most primitive executable object provided by the user.
const END = "end"
END is the end node of the graph. You can add your last edge with END.
const START = "start"
START is the start node of the graph. You can add your first edge with START.
Variables ¶
var DAGInvalidLoopErr = errors.New("DAG is invalid, has loop")
var ErrChainCompiled = errors.New("chain has been compiled, cannot be modified")
ErrChainCompiled is returned when attempting to modify a chain after it has been compiled
var ErrExceedMaxSteps = errors.New("exceeds max steps")
ErrExceedMaxSteps graph will throw this error when the number of steps exceeds the maximum number of steps.
var ErrGraphCompiled = errors.New("graph has been compiled, cannot be modified")
ErrGraphCompiled is returned when attempting to modify a graph after it has been compiled
var InterruptAndRerun = errors.New("interrupt and rerun")
Functions ¶
func GetToolCallID ¶ added in v0.3.18
GetToolCallID gets the current tool call id from the context.
func InitGraphCompileCallbacks ¶
func InitGraphCompileCallbacks(cbs []GraphCompileCallback)
InitGraphCompileCallbacks set global graph compile callbacks, which ONLY will be added to top level graph compile options
func IsInterruptRerunError ¶ added in v0.3.38
func NewInterruptAndRerunErr ¶ added in v0.3.38
func ProcessState ¶ added in v0.3.10
ProcessState processes the state from the context in a concurrency-safe way. This is the recommended way to access and modify state in custom nodes. The provided function handler will be executed with exclusive access to the state (protected by mutex). note: this method will report error if state type doesn't match or state is not found in context e.g.
lambdaFunc := func(ctx context.Context, in string, opts ...any) (string, error) { err := compose.ProcessState[*testState](ctx, func(state *testState) error { // do something with state in a concurrency-safe way state.Count++ return nil }) if err != nil { return "", err } return in, nil } stateGraph := compose.NewStateGraph[string, string, testState](genStateFunc) stateGraph.AddNode("node1", lambdaFunc)
func RegisterInternalType ¶ added in v0.3.50
func RegisterSerializableType ¶ added in v0.3.18
RegisterSerializableType registers a custom type for eino serialization. This allows eino to properly serialize and deserialize custom types. Both custom interfaces and structs need to be registered using this function. Types only need to be registered once - pointers and other references will be handled automatically. All built-in eino types are already registered. Parameters: - name: A unique identifier for the type being registered (should not start with "_eino") - T: The generic type parameter representing the type to register Returns: - error: An error if registration fails (e.g., if the type is already registered)
func RegisterStreamChunkConcatFunc ¶
RegisterStreamChunkConcatFunc registers a function to concat stream chunks. It's required when you want to concat stream chunks of a specific type. for example you call Invoke() but node only implements Stream(). call at process init not thread safe eg.
type testStruct struct { field1 string field2 int } compose.RegisterStreamChunkConcatFunc(func(items []testStruct) (testStruct, error) { return testStruct{ field1: items[1].field1, // may implement inplace logic by your scenario field2: items[0].field2 + items[1].field2, }, nil })
func RegisterValuesMergeFunc ¶ added in v0.3.26
RegisterValuesMergeFunc registers a function to merge outputs from multiple nodes when fan-in. It's used to define how to merge for a specific type. For maps that already have a default merge function, you don't need to register a new one unless you want to customize the merge logic.
Types ¶
type AnyGraph ¶
type AnyGraph interface {
// contains filtered or unexported methods
}
AnyGraph the identifiers for composable and compilable Graph[I, O]、Chain[I, O] in Eino.
type Chain ¶
type Chain[I, O any] struct { // contains filtered or unexported fields }
Chain is a chain of components. Chain nodes can be parallel / branch / sequence components. Chain is designed to be used in a builder pattern (should Compile() before use). And the interface is `Chain style`, you can use it like: `chain.AppendXX(...).AppendXX(...)`
Normal usage:
- create a chain with input/output type: `chain := NewChain[inputType, outputType]()`
- add components to chainable list: 2.1 add components: `chain.AppendChatTemplate(...).AppendChatModel(...).AppendToolsNode(...)` 2.2 add parallel or branch node if needed: `chain.AppendParallel()`, `chain.AppendBranch()`
- compile: `r, err := c.Compile()`
- run: 4.1 `one input & one output` use `r.Invoke(ctx, input)` 4.2 `one input & multi output chunk` use `r.Stream(ctx, input)` 4.3 `multi input chunk & one output` use `r.Collect(ctx, inputReader)` 4.4 `multi input chunk & multi output chunk` use `r.Transform(ctx, inputReader)`
Using in graph or other chain: chain1 := NewChain[inputType, outputType]() graph := NewGraph[](runTypePregel) graph.AddGraph("key", chain1) // chain is an AnyGraph implementation
// or in another chain: chain2 := NewChain[inputType, outputType]() chain2.AppendGraph(chain1)
func NewChain ¶
func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O]
NewChain create a chain with input/output type.
func (*Chain[I, O]) AppendBranch ¶
func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]
AppendBranch add a conditional branch to chain. Each branch within the ChainBranch can be an AnyGraph. All branches should either lead to END, or converge to another node within the Chain. e.g.
cb := compose.NewChainBranch(conditionFunc) cb.AddChatTemplate("chat_template_key_01", chatTemplate) cb.AddChatTemplate("chat_template_key_02", chatTemplate2) chain.AppendBranch(cb)
func (*Chain[I, O]) AppendChatModel ¶
func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendChatModel add a ChatModel node to the chain. e.g.
model, err := openai.NewChatModel(ctx, config) if err != nil {...} chain.AppendChatModel(model)
func (*Chain[I, O]) AppendChatTemplate ¶
func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendChatTemplate add a ChatTemplate node to the chain. eg.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) chain.AppendChatTemplate(chatTemplate)
func (*Chain[I, O]) AppendDocumentTransformer ¶
func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendDocumentTransformer add a DocumentTransformer node to the chain. e.g.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) chain.AppendDocumentTransformer(markdownSplitter)
func (*Chain[I, O]) AppendEmbedding ¶
func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendEmbedding add a Embedding node to the chain. e.g.
embedder, err := openai.NewEmbedder(ctx, config) if err != nil {...} chain.AppendEmbedding(embedder)
func (*Chain[I, O]) AppendGraph ¶
func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendGraph add a AnyGraph node to the chain. AnyGraph can be a chain or a graph. e.g.
graph := compose.NewGraph[string, string]() chain.AppendGraph(graph)
func (*Chain[I, O]) AppendIndexer ¶
func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendIndexer add an Indexer node to the chain. Indexer is a node that can store documents. e.g.
vectorStoreImpl, err := vikingdb.NewVectorStorer(ctx, vikingdbConfig) // in components/vectorstore/vikingdb/vectorstore.go if err != nil {...} config := vectorstore.IndexerConfig{VectorStore: vectorStoreImpl} indexer, err := vectorstore.NewIndexer(ctx, config) if err != nil {...} chain.AppendIndexer(indexer)
func (*Chain[I, O]) AppendLambda ¶
func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendLambda add a Lambda node to the chain. Lambda is a node that can be used to implement custom logic. e.g.
lambdaNode := compose.InvokableLambda(func(ctx context.Context, docs []*schema.Document) (string, error) {...}) chain.AppendLambda(lambdaNode)
Note: to create a Lambda node, you need to use `compose.AnyLambda` or `compose.InvokableLambda` or `compose.StreamableLambda` or `compose.TransformableLambda`. if you want this node has real stream output, you need to use `compose.StreamableLambda` or `compose.TransformableLambda`, for example.
func (*Chain[I, O]) AppendLoader ¶
func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendLoader adds a Loader node to the chain. e.g.
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{}) if err != nil {...} chain.AppendLoader(loader)
func (*Chain[I, O]) AppendParallel ¶
AppendParallel add a Parallel structure (multiple concurrent nodes) to the chain. e.g.
parallel := compose.NewParallel() parallel.AddChatModel("openai", model1) // => "openai": *schema.Message{} parallel.AddChatModel("maas", model2) // => "maas": *schema.Message{} chain.AppendParallel(parallel) // => multiple concurrent nodes are added to the Chain The next node in the chain is either an END, or a node which accepts a map[string]any, where keys are `openai` `maas` as specified above.
func (*Chain[I, O]) AppendPassthrough ¶
func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]
AppendPassthrough add a Passthrough node to the chain. Could be used to connect multiple ChainBranch or Parallel. e.g.
chain.AppendPassthrough()
func (*Chain[I, O]) AppendRetriever ¶
func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendRetriever add a Retriever node to the chain. e.g.
retriever, err := vectorstore.NewRetriever(ctx, config) if err != nil {...} chain.AppendRetriever(retriever) or using fornax knowledge as retriever: config := fornaxknowledge.Config{...} retriever, err := fornaxknowledge.NewKnowledgeRetriever(ctx, config) if err != nil {...} chain.AppendRetriever(retriever)
func (*Chain[I, O]) AppendToolsNode ¶
func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendToolsNode add a ToolsNode node to the chain. e.g.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{ Tools: []tools.Tool{...}, }) chain.AppendToolsNode(toolsNode)
func (*Chain[I, O]) Compile ¶
func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
Compile to a Runnable. Runnable can be used directly. e.g.
chain := NewChain[string, string]() r, err := chain.Compile() if err != nil {} r.Invoke(ctx, input) // ping => pong r.Stream(ctx, input) // ping => stream out r.Collect(ctx, inputReader) // stream in => pong r.Transform(ctx, inputReader) // stream in => stream out
type ChainBranch ¶
type ChainBranch struct {
// contains filtered or unexported fields
}
ChainBranch represents a conditional branch in a chain of operations. It allows for dynamic routing of execution based on a condition. All branches within ChainBranch are expected to either end the Chain, or converge to another node in the Chain.
func NewChainBranch ¶
func NewChainBranch[T any](cond GraphBranchCondition[T]) *ChainBranch
NewChainBranch creates a new ChainBranch instance based on a given condition. It takes a generic type T and a GraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.
condition := func(ctx context.Context, in string, opts ...any) (endNode string, err error) { // logic to determine the next node return "some_next_node_key", nil } cb := NewChainBranch[string](condition) cb.AddPassthrough("next_node_key_01", xxx) // node in branch, represent one path of branch cb.AddPassthrough("next_node_key_02", xxx) // node in branch
func NewChainMultiBranch ¶ added in v0.3.18
func NewChainMultiBranch[T any](cond GraphMultiBranchCondition[T]) *ChainBranch
func NewStreamChainBranch ¶
func NewStreamChainBranch[T any](cond StreamGraphBranchCondition[T]) *ChainBranch
NewStreamChainBranch creates a new ChainBranch instance based on a given stream condition. It takes a generic type T and a StreamGraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.
condition := func(ctx context.Context, in *schema.StreamReader[string], opts ...any) (endNode string, err error) { // logic to determine the next node, you can read the stream and make a decision. // to save time, usually read the first chunk of stream, then make a decision which path to go. return "some_next_node_key", nil } cb := NewStreamChainBranch[string](condition)
func NewStreamChainMultiBranch ¶ added in v0.3.18
func NewStreamChainMultiBranch[T any](cond StreamGraphMultiBranchCondition[T]) *ChainBranch
func (*ChainBranch) AddChatModel ¶
func (cb *ChainBranch) AddChatModel(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *ChainBranch
AddChatModel adds a ChatModel node to the branch. eg.
chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o-mini", }) cb.AddChatModel("chat_model_key_01", chatModel01) cb.AddChatModel("chat_model_key_02", chatModel02)
func (*ChainBranch) AddChatTemplate ¶
func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch
AddChatTemplate adds a ChatTemplate node to the branch. eg.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) cb.AddChatTemplate("chat_template_key_01", chatTemplate) chatTemplate2, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}, you are not allowed to chat in other topics.", }) cb.AddChatTemplate("chat_template_key_02", chatTemplate2)
func (*ChainBranch) AddDocumentTransformer ¶
func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch
AddDocumentTransformer adds an Document Transformer node to the branch. eg.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) cb.AddDocumentTransformer("document_transformer_node_key", markdownSplitter)
func (*ChainBranch) AddEmbedding ¶
func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch
AddEmbedding adds an Embedding node to the branch. eg.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{ Model: "text-embedding-3-small", }) cb.AddEmbedding("embedding_node_key", embeddingNode)
func (*ChainBranch) AddGraph ¶
func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch
AddGraph adds a generic Graph node to the branch. eg.
graph, err := compose.NewGraph[string, string]() cb.AddGraph("graph_node_key", graph)
func (*ChainBranch) AddIndexer ¶
func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch
AddIndexer adds an Indexer node to the branch. eg.
indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{ Collection: "my_collection", }) cb.AddIndexer("indexer_node_key", indexer)
func (*ChainBranch) AddLambda ¶
func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch
AddLambda adds a Lambda node to the branch. eg.
lambdaFunc := func(ctx context.Context, in string, opts ...any) (out string, err error) { // logic to process the input return "processed_output", nil } cb.AddLambda("lambda_node_key", compose.InvokeLambda(lambdaFunc))
func (*ChainBranch) AddLoader ¶
func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch
AddLoader adds a Loader node to the branch. eg.
pdfParser, err := pdf.NewPDFParser() loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{ Parser: pdfParser, }) cb.AddLoader("loader_node_key", loader)
func (*ChainBranch) AddPassthrough ¶
func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch
AddPassthrough adds a Passthrough node to the branch. eg.
cb.AddPassthrough("passthrough_node_key")
func (*ChainBranch) AddRetriever ¶
func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch
AddRetriever adds a Retriever node to the branch. eg.
retriever, err := volc_vikingdb.NewRetriever(ctx, &volc_vikingdb.RetrieverConfig{ Collection: "my_collection", }) cb.AddRetriever("retriever_node_key", retriever)
func (*ChainBranch) AddToolsNode ¶
func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch
AddToolsNode adds a ToolsNode to the branch. eg.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{ Tools: []tools.Tool{...}, }) cb.AddToolsNode("tools_node_key", toolsNode)
type CheckPointStore ¶ added in v0.3.18
type Collect ¶
type Collect[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output O, err error)
Collect is the type of the collectable lambda function.
type CollectWOOpt ¶
type CollectWOOpt[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output O, err error)
CollectWOOpt is the type of the collectable lambda function without options.
type FanInMergeConfig ¶ added in v0.3.44
type FanInMergeConfig struct {
StreamMergeWithSourceEOF bool //indicates whether to emit a SourceEOF error for each stream
}
FanInMergeConfig defines the configuration for fan-in merge operations. It allows specifying how multiple inputs are merged into a single input. StreamMergeWithSourceEOF indicates whether to emit a SourceEOF error for each stream when it ends, before the final merged output is produced. This is useful for tracking the completion of individual input streams in a named stream merge.
type FieldMapping ¶ added in v0.3.8
type FieldMapping struct {
// contains filtered or unexported fields
}
func FromField ¶ added in v0.3.8
func FromField(from string) *FieldMapping
FromField creates a FieldMapping that maps a single predecessor field to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped. Field: either the field of a struct, or the key of a map.
func FromFieldPath ¶ added in v0.3.17
func FromFieldPath(fromFieldPath FieldPath) *FieldMapping
FromFieldPath creates a FieldMapping that maps a single predecessor field path to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped.
Example:
// Maps the 'name' field from nested 'user.profile' to the entire successor input FromFieldPath(FieldPath{"user", "profile", "name"})
Note: The field path elements must not contain the internal path separator character ('\x1F').
func MapFieldPaths ¶ added in v0.3.17
func MapFieldPaths(fromFieldPath, toFieldPath FieldPath) *FieldMapping
MapFieldPaths creates a FieldMapping that maps a single predecessor field path to a single successor field path.
Example:
// Maps user.profile.name to response.userName MapFieldPaths( FieldPath{"user", "profile", "name"}, FieldPath{"response", "userName"}, )
Note: The field path elements must not contain the internal path separator character ('\x1F').
func MapFields ¶ added in v0.3.8
func MapFields(from, to string) *FieldMapping
MapFields creates a FieldMapping that maps a single predecessor field to a single successor field. Field: either the field of a struct, or the key of a map.
func ToField ¶ added in v0.3.8
func ToField(to string, opts ...FieldMappingOption) *FieldMapping
ToField creates a FieldMapping that maps the entire predecessor output to a single successor field. Field: either the field of a struct, or the key of a map.
func ToFieldPath ¶ added in v0.3.17
func ToFieldPath(toFieldPath FieldPath, opts ...FieldMappingOption) *FieldMapping
ToFieldPath creates a FieldMapping that maps the entire predecessor output to a single successor field path.
Example:
// Maps the entire predecessor output to response.data.userName ToFieldPath(FieldPath{"response", "data", "userName"})
Note: The field path elements must not contain the internal path separator character ('\x1F').
func (*FieldMapping) Equals ¶ added in v0.3.48
func (m *FieldMapping) Equals(o *FieldMapping) bool
func (*FieldMapping) FromNodeKey ¶ added in v0.3.48
func (m *FieldMapping) FromNodeKey() string
func (*FieldMapping) FromPath ¶ added in v0.3.48
func (m *FieldMapping) FromPath() FieldPath
func (*FieldMapping) String ¶ added in v0.3.8
func (m *FieldMapping) String() string
String returns the string representation of the FieldMapping.
func (*FieldMapping) ToPath ¶ added in v0.3.48
func (m *FieldMapping) ToPath() FieldPath
type FieldMappingOption ¶ added in v0.3.48
type FieldMappingOption func(*FieldMapping)
FieldMappingOption is a functional option for configuring a FieldMapping.
func WithCustomExtractor ¶ added in v0.3.48
func WithCustomExtractor(extractor func(input any) (any, error)) FieldMappingOption
WithCustomExtractor sets a custom extractor function for the FieldMapping. The extractor function is used to extract a value from the 'source' of the FieldMapping. NOTE: if specified in this way, Eino can only check the validity of the field mapping at request time..
type FieldPath ¶ added in v0.3.17
type FieldPath []string
FieldPath represents a path to a nested field in a struct or map. Each element in the path is either: - a struct field name - a map key
Example paths:
- []string{"user"} // top-level field
- []string{"user", "name"} // nested struct field
- []string{"users", "admin"} // map key access
type GenLocalState ¶
GenLocalState is a function that generates the state.
type Graph ¶
type Graph[I, O any] struct { // contains filtered or unexported fields }
Graph is a generic graph that can be used to compose components. I: the input type of graph compiled product O: the output type of graph compiled product
func NewGraph ¶
func NewGraph[I, O any](opts ...NewGraphOption) *Graph[I, O]
NewGraph create a directed graph that can compose components, lambda, chain, parallel etc. simultaneously provide flexible and multi-granular aspect governance capabilities. I: the input type of graph compiled product O: the output type of graph compiled product
To share state between nodes, use WithGenLocalState option:
type testState struct { UserInfo *UserInfo KVs map[string]any } genStateFunc := func(ctx context.Context) *testState { return &testState{} } graph := compose.NewGraph[string, string](WithGenLocalState(genStateFunc)) // you can use WithStatePreHandler and WithStatePostHandler to do something with state graph.AddNode("node1", someNode, compose.WithPreHandler(func(ctx context.Context, in string, state *testState) (string, error) { // do something with state return in, nil }), compose.WithPostHandler(func(ctx context.Context, out string, state *testState) (string, error) { // do something with state return out, nil }))
func (Graph) AddBranch ¶
func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)
AddBranch adds a branch to the graph. e.g.
condition := func(ctx context.Context, in string) (string, error) { return "next_node_key", nil } endNodes := map[string]bool{"path01": true, "path02": true} branch := compose.NewGraphBranch(condition, endNodes) graph.AddBranch("start_node_key", branch)
func (Graph) AddChatModelNode ¶
func (g Graph) AddChatModelNode(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) error
AddChatModelNode add node that implements model.BaseChatModel. e.g.
chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) graph.AddChatModelNode("chat_model_node_key", chatModel)
func (Graph) AddChatTemplateNode ¶
func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error
AddChatTemplateNode add node that implements prompt.ChatTemplate. e.g.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) graph.AddChatTemplateNode("chat_template_node_key", chatTemplate)
func (Graph) AddDocumentTransformerNode ¶
func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error
AddDocumentTransformerNode adds a node that implements document.Transformer. e.g.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) graph.AddDocumentTransformerNode("document_transformer_node_key", markdownSplitter)
func (*Graph[I, O]) AddEdge ¶
AddEdge adds an edge to the graph, edge means a data flow from startNode to endNode. the previous node's output type must be set to the next node's input type. NOTE: startNode and endNode must have been added to the graph before adding edge. e.g.
graph.AddNode("start_node_key", compose.NewPassthroughNode()) graph.AddNode("end_node_key", compose.NewPassthroughNode()) err := graph.AddEdge("start_node_key", "end_node_key")
func (Graph) AddEmbeddingNode ¶
func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error
AddEmbeddingNode adds a node that implements embedding.Embedder. e.g.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{ Model: "text-embedding-3-small", }) graph.AddEmbeddingNode("embedding_node_key", embeddingNode)
func (Graph) AddGraphNode ¶
func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error
AddGraphNode add one kind of Graph[I, O]、Chain[I, O]、StateChain[I, O, S] as a node. for Graph[I, O], comes from NewGraph[I, O]() for Chain[I, O], comes from NewChain[I, O]()
func (Graph) AddIndexerNode ¶
func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error
AddIndexerNode adds a node that implements indexer.Indexer. e.g.
indexer, err := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{}) graph.AddIndexerNode("indexer_node_key", indexer)
func (Graph) AddLambdaNode ¶
func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error
AddLambdaNode add node that implements at least one of Invoke[I, O], Stream[I, O], Collect[I, O], Transform[I, O]. due to the lack of supporting method generics, we need to use function generics to generate Lambda run as Runnable[I, O]. for Invoke[I, O], use compose.InvokableLambda() for Stream[I, O], use compose.StreamableLambda() for Collect[I, O], use compose.CollectableLambda() for Transform[I, O], use compose.TransformableLambda() for arbitrary combinations of 4 kinds of lambda, use compose.AnyLambda()
func (Graph) AddLoaderNode ¶
func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error
AddLoaderNode adds a node that implements document.Loader. e.g.
loader, err := file.NewLoader(ctx, &file.LoaderConfig{}) graph.AddLoaderNode("loader_node_key", loader)
func (Graph) AddPassthroughNode ¶
func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error
AddPassthroughNode adds a passthrough node to the graph. mostly used in pregel mode of graph. e.g.
graph.AddPassthroughNode("passthrough_node_key")
func (Graph) AddRetrieverNode ¶
func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error
AddRetrieverNode adds a node that implements retriever.Retriever. e.g.
retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{}) graph.AddRetrieverNode("retriever_node_key", retrieverNode)
func (Graph) AddToolsNode ¶
func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error
AddToolsNode adds a node that implements tools.ToolsNode. e.g.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{}) graph.AddToolsNode("tools_node_key", toolsNode)
func (*Graph[I, O]) Compile ¶
func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
Compile take the raw graph and compile it into a form ready to be run. e.g.
graph, err := compose.NewGraph[string, string]() if err != nil {...} runnable, err := graph.Compile(ctx, compose.WithGraphName("my_graph")) if err != nil {...} runnable.Invoke(ctx, "input") // invoke runnable.Stream(ctx, "input") // stream runnable.Collect(ctx, inputReader) // collect runnable.Transform(ctx, inputReader) // transform
type GraphAddNodeOpt ¶
type GraphAddNodeOpt func(o *graphAddNodeOpts)
GraphAddNodeOpt is a functional option type for adding a node to a graph. e.g.
graph.AddNode("node_name", node, compose.WithInputKey("input_key"), compose.WithOutputKey("output_key"))
func WithGraphCompileOptions ¶
func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt
WithGraphCompileOptions when the node is an AnyGraph, use this option to set compile option for the node. e.g.
graph.AddNode("node_name", node, compose.WithGraphCompileOptions(compose.WithGraphName("my_sub_graph")))
func WithInputKey ¶
func WithInputKey(k string) GraphAddNodeOpt
WithInputKey sets the input key of the node. this will change the input value of the node, for example, if the pre node's output is map[string]any{"key01": "value01"}, and the current node's input key is "key01", then the current node's input value will be "value01".
func WithNodeKey ¶
func WithNodeKey(key string) GraphAddNodeOpt
WithNodeKey set the node key, which is used to identify the node in the chain. only for use in Chain/StateChain.
func WithNodeName ¶
func WithNodeName(n string) GraphAddNodeOpt
WithNodeName sets the name of the node.
func WithOutputKey ¶
func WithOutputKey(k string) GraphAddNodeOpt
WithOutputKey sets the output key of the node. this will change the output value of the node, for example, if the current node's output key is "key01", then the node's output value will be map[string]any{"key01": value}.
func WithStatePostHandler ¶
func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt
WithStatePostHandler modify node's output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStatePreHandler ¶
func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt
WithStatePreHandler modify node's input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStreamStatePostHandler ¶
func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt
WithStreamStatePostHandler modify node's streaming output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when current node's output is an actual stream, and you want the downstream node's input to remain an actual stream after state post handler. caution: while StreamStatePostHandler is thread safe, modifying state within your own goroutine is NOT. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStreamStatePreHandler ¶
func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt
WithStreamStatePreHandler modify node's streaming input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when upstream node's output is an actual stream, and you want the current node's input to remain an actual stream after state pre handler. caution: while StreamStatePreHandler is thread safe, modifying state within your own goroutine is NOT. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
type GraphBranch ¶
type GraphBranch struct {
// contains filtered or unexported fields
}
GraphBranch is the branch type for the graph. It is used to determine the next node based on the condition.
func NewGraphBranch ¶
func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
NewGraphBranch creates a new graph branch. It is used to determine the next node based on the condition. e.g.
condition := func(ctx context.Context, in string) (string, error) { // logic to determine the next node return "next_node_key", nil } endNodes := map[string]bool{"path01": true, "path02": true} branch := compose.NewGraphBranch(condition, endNodes) graph.AddBranch("key_of_node_before_branch", branch)
func NewGraphMultiBranch ¶ added in v0.3.18
func NewGraphMultiBranch[T any](condition GraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
func NewStreamGraphBranch ¶
func NewStreamGraphBranch[T any](condition StreamGraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
NewStreamGraphBranch creates a new stream graph branch. It is used to determine the next node based on the condition of stream input. e.g.
condition := func(ctx context.Context, in *schema.StreamReader[T]) (string, error) { // logic to determine the next node. // to use the feature of stream, you can use the first chunk to determine the next node. return "next_node_key", nil } endNodes := map[string]bool{"path01": true, "path02": true} branch := compose.NewStreamGraphBranch(condition, endNodes) graph.AddBranch("key_of_node_before_branch", branch)
func NewStreamGraphMultiBranch ¶ added in v0.3.18
func NewStreamGraphMultiBranch[T any](condition StreamGraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch
func (*GraphBranch) GetEndNode ¶
func (gb *GraphBranch) GetEndNode() map[string]bool
GetEndNode returns the all end nodes of the branch.
type GraphBranchCondition ¶
GraphBranchCondition is the condition type for the branch.
type GraphCompileCallback ¶
GraphCompileCallback is the callback which will be called when graph compilation finishes.
type GraphCompileOption ¶
type GraphCompileOption func(*graphCompileOptions)
GraphCompileOption options for compiling AnyGraph.
func WithCheckPointStore ¶ added in v0.3.18
func WithCheckPointStore(store CheckPointStore) GraphCompileOption
func WithEagerExecution ¶ added in v0.3.34
func WithEagerExecution() GraphCompileOption
WithEagerExecution enables the eager execution mode for the graph. In eager mode, nodes will be executed immediately once they are ready to run, without waiting for the completion of a super step, ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine Note: Eager mode is not allowed when the graph's trigger mode is set to AnyPredecessor. Workflow uses eager mode by default. Deprecated: Eager execution is automatically enabled by default when a node's trigger mode is set to AllPredecessor. If you were using this option previously, it can be safely removed without changing behavior.
func WithEagerExecutionDisabled ¶ added in v0.4.0
func WithEagerExecutionDisabled() GraphCompileOption
WithEagerExecutionDisabled disables the eager execution mode for the graph. By default, eager execution is enabled for Workflow and Graph with the AllPredecessor trigger mode. After using this option, nodes will wait for the completion of a super step instead of execute immediately once they are ready to run. ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine
func WithFanInMergeConfig ¶ added in v0.3.44
func WithFanInMergeConfig(confs map[string]FanInMergeConfig) GraphCompileOption
WithFanInMergeConfig sets the fan-in merge configurations for the graph nodes that receive inputs from multiple sources.
func WithGraphCompileCallbacks ¶
func WithGraphCompileCallbacks(cbs ...GraphCompileCallback) GraphCompileOption
WithGraphCompileCallbacks sets callbacks for graph compilation.
func WithGraphName ¶
func WithGraphName(graphName string) GraphCompileOption
WithGraphName sets a name for the graph. The name is used for debugging and logging purposes. If not set, a default name will be used.
func WithInterruptAfterNodes ¶ added in v0.3.18
func WithInterruptAfterNodes(nodes []string) GraphCompileOption
func WithInterruptBeforeNodes ¶ added in v0.3.18
func WithInterruptBeforeNodes(nodes []string) GraphCompileOption
func WithMaxRunSteps ¶
func WithMaxRunSteps(maxSteps int) GraphCompileOption
WithMaxRunSteps sets the maximum number of steps that a graph can run. This is useful to prevent infinite loops in graphs with cycles. If the number of steps exceeds maxSteps, the graph execution will be terminated with an error.
func WithNodeTriggerMode ¶
func WithNodeTriggerMode(triggerMode NodeTriggerMode) GraphCompileOption
WithNodeTriggerMode sets the trigger mode for nodes in the graph. The trigger mode determines when a node is triggered during graph execution, ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine AnyPredecessor by default.
func WithSerializer ¶ added in v0.3.50
func WithSerializer(serializer Serializer) GraphCompileOption
type GraphInfo ¶
type GraphInfo struct { CompileOptions []GraphCompileOption Nodes map[string]GraphNodeInfo // node key -> node info Edges map[string][]string // edge start node key -> edge end node key, control edges DataEdges map[string][]string Branches map[string][]GraphBranch // branch start node key -> branch InputType, OutputType reflect.Type Name string NewGraphOptions []NewGraphOption GenStateFn func(context.Context) any }
GraphInfo the info which end users pass in when they are compiling a graph. it is used in compile callback for user to get the node info and instance. you may need all details info of the graph for observation.
type GraphMultiBranchCondition ¶ added in v0.3.18
type GraphMultiBranchCondition[T any] func(ctx context.Context, in T) (endNode map[string]bool, err error)
GraphMultiBranchCondition is the condition type for the multi choice branch.
type GraphNodeInfo ¶
type GraphNodeInfo struct { Component components.Component Instance any GraphAddNodeOpts []GraphAddNodeOpt InputType, OutputType reflect.Type // mainly for lambda, whose input and output types cannot be inferred by component type Name string InputKey, OutputKey string GraphInfo *GraphInfo Mappings []*FieldMapping }
GraphNodeInfo the info which end users pass in when they are adding nodes to graph.
type InterruptInfo ¶ added in v0.3.18
type InterruptInfo struct { State any BeforeNodes []string AfterNodes []string RerunNodes []string RerunNodesExtra map[string]any SubGraphs map[string]*InterruptInfo }
func ExtractInterruptInfo ¶ added in v0.3.18
func ExtractInterruptInfo(err error) (info *InterruptInfo, existed bool)
type Invoke ¶
type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)
Invoke is the type of the invokable lambda function.
type InvokeWOOpt ¶
InvokeWOOpt is the type of the invokable lambda function without options.
type Lambda ¶
type Lambda struct {
// contains filtered or unexported fields
}
Lambda is the node that wraps the user provided lambda function. It can be used as a node in Graph or Chain (include Parallel and Branch). Create a Lambda by using AnyLambda/InvokableLambda/StreamableLambda/CollectableLambda/TransformableLambda. eg.
lambda := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) { return input, nil })
func AnyLambda ¶
func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption], c Collect[I, O, TOption], t Transform[I, O, TOption], opts ...LambdaOpt) (*Lambda, error)
AnyLambda creates a Lambda with any lambda function. you can only implement one or more of the four lambda functions, and the rest use nil. eg.
invokeFunc := func(ctx context.Context, input string, opts ...myOption) (output string, err error) { // ... } streamFunc := func(ctx context.Context, input string, opts ...myOption) (output *schema.StreamReader[string], err error) { // ... }
lambda := compose.AnyLambda(invokeFunc, streamFunc, nil, nil)
func CollectableLambda ¶
func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda
CollectableLambda creates a Lambda with collectable lambda function without options.
func CollectableLambdaWithOption ¶
func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda
CollectableLambdaWithOption creates a Lambda with collectable lambda function and options.
func InvokableLambda ¶
func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda
InvokableLambda creates a Lambda with invokable lambda function without options.
func InvokableLambdaWithOption ¶
func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda
InvokableLambdaWithOption creates a Lambda with invokable lambda function and options.
func MessageParser ¶
func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda
MessageParser creates a lambda that parses a message into an object T, usually used after a chatmodel. usage:
parser := schema.NewMessageJSONParser[MyStruct](&schema.MessageJSONParseConfig{ ParseFrom: schema.MessageParseFromContent, }) parserLambda := MessageParser(parser) chain := NewChain[*schema.Message, MyStruct]() chain.AppendChatModel(chatModel) chain.AppendLambda(parserLambda) r, err := chain.Compile(context.Background()) // parsed is a MyStruct object parsed, err := r.Invoke(context.Background(), &schema.Message{ Role: schema.MessageRoleUser, Content: "return a json string for my struct", })
func StreamableLambda ¶
func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda
StreamableLambda creates a Lambda with streamable lambda function without options.
func StreamableLambdaWithOption ¶
func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda
StreamableLambdaWithOption creates a Lambda with streamable lambda function and options.
func ToList ¶
ToList creates a Lambda that converts input I to a []I. It's useful when you want to convert a single input to a list of inputs. eg.
lambda := compose.ToList[*schema.Message]() chain := compose.NewChain[[]*schema.Message, []*schema.Message]() chain.AddChatModel(chatModel) // chatModel returns *schema.Message, but we need []*schema.Message chain.AddLambda(lambda) // convert *schema.Message to []*schema.Message
func TransformableLambda ¶
func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda
TransformableLambda creates a Lambda with transformable lambda function without options.
type LambdaOpt ¶
type LambdaOpt func(o *lambdaOpts)
LambdaOpt is the option for creating a Lambda.
func WithLambdaCallbackEnable ¶
WithLambdaCallbackEnable enables the callback aspect of the lambda function.
func WithLambdaType ¶
WithLambdaType sets the type of the lambda function.
type NewGraphOption ¶ added in v0.3.1
type NewGraphOption func(ngo *newGraphOptions)
func WithGenLocalState ¶ added in v0.3.1
func WithGenLocalState[S any](gls GenLocalState[S]) NewGraphOption
type NodePath ¶ added in v0.3.4
type NodePath struct {
// contains filtered or unexported fields
}
func NewNodePath ¶ added in v0.3.4
type NodeTriggerMode ¶
type NodeTriggerMode string
NodeTriggerMode controls the triggering mode of graph nodes.
const ( // AnyPredecessor means that the node will be triggered when any of its predecessors is included in the previous completed super step. // Ref:https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine AnyPredecessor NodeTriggerMode = "any_predecessor" // AllPredecessor means that the current node will only be triggered when all of its predecessor nodes have finished running. AllPredecessor NodeTriggerMode = "all_predecessor" )
type Option ¶
type Option struct {
// contains filtered or unexported fields
}
Option is a functional option type for calling a graph.
func WithCallbacks ¶
WithCallbacks set callback handlers for all components in a single call. e.g.
runnable.Invoke(ctx, "input", compose.WithCallbacks(&myCallbacks{}))
func WithChatModelOption ¶
WithChatModelOption is a functional option type for chat model component. e.g.
chatModelOption := compose.WithChatModelOption(model.WithTemperature(0.7)) runnable.Invoke(ctx, "input", chatModelOption)
func WithChatTemplateOption ¶
WithChatTemplateOption is a functional option type for chat template component.
func WithCheckPointID ¶ added in v0.3.18
func WithDocumentTransformerOption ¶
func WithDocumentTransformerOption(opts ...document.TransformerOption) Option
WithDocumentTransformerOption is a functional option type for document transformer component.
func WithEmbeddingOption ¶
WithEmbeddingOption is a functional option type for embedding component. e.g.
embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small")) runnable.Invoke(ctx, "input", embeddingOption)
func WithForceNewRun ¶ added in v0.3.45
func WithForceNewRun() Option
WithForceNewRun forces the graph to run from the beginning, ignoring any checkpoints.
func WithIndexerOption ¶
WithIndexerOption is a functional option type for indexer component. e.g.
indexerOption := compose.WithIndexerOption(indexer.WithSubIndexes([]string{"my_sub_index"})) runnable.Invoke(ctx, "input", indexerOption)
func WithLambdaOption ¶
WithLambdaOption is a functional option type for lambda component.
func WithLoaderOption ¶
func WithLoaderOption(opts ...document.LoaderOption) Option
WithLoaderOption is a functional option type for loader component. e.g.
loaderOption := compose.WithLoaderOption(document.WithCollection("my_collection")) runnable.Invoke(ctx, "input", loaderOption)
func WithRetrieverOption ¶
WithRetrieverOption is a functional option type for retriever component. e.g.
retrieverOption := compose.WithRetrieverOption(retriever.WithIndex("my_index")) runnable.Invoke(ctx, "input", retrieverOption)
func WithRuntimeMaxSteps ¶
WithRuntimeMaxSteps sets the maximum number of steps for the graph runtime. e.g.
runnable.Invoke(ctx, "input", compose.WithRuntimeMaxSteps(20))
func WithStateModifier ¶ added in v0.3.18
func WithStateModifier(sm StateModifier) Option
func WithToolsNodeOption ¶
func WithToolsNodeOption(opts ...ToolsNodeOption) Option
WithToolsNodeOption is a functional option type for tools node component.
func WithWriteToCheckPointID ¶ added in v0.3.53
WithWriteToCheckPointID specifies a different checkpoint ID to write to. If not provided, the checkpoint ID from WithCheckPointID will be used for writing. This is useful for scenarios where you want to load from an existed checkpoint but save the progress to a new, separate checkpoint.
func (Option) DesignateNode ¶
DesignateNode set the key of the node which will the option be applied to. notice: only effective at the top graph. e.g.
embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small")) runnable.Invoke(ctx, "input", embeddingOption.DesignateNode("my_embedding_node"))
func (Option) DesignateNodeWithPath ¶ added in v0.3.4
DesignateNodeWithPath sets the path of the node(s) to which the option will be applied to. You can make the option take effect in the subgraph by specifying the key of the subgraph. e.g. DesignateNodeWithPath({"sub graph node key", "node key within sub graph"})
type Parallel ¶
type Parallel struct {
// contains filtered or unexported fields
}
Parallel run multiple nodes in parallel
use `NewParallel()` to create a new parallel type Example:
parallel := NewParallel() parallel.AddChatModel("output_key01", chat01) parallel.AddChatModel("output_key01", chat02) chain := NewChain[any,any]() chain.AppendParallel(parallel)
func NewParallel ¶
func NewParallel() *Parallel
NewParallel creates a new parallel type. it is useful when you want to run multiple nodes in parallel in a chain.
func (*Parallel) AddChatModel ¶
func (p *Parallel) AddChatModel(outputKey string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *Parallel
AddChatModel adds a chat model to the parallel. eg.
chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) p.AddChatModel("output_key01", chatModel01) p.AddChatModel("output_key02", chatModel02)
func (*Parallel) AddChatTemplate ¶
func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel
AddChatTemplate adds a chat template to the parallel. eg.
chatTemplate01, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) p.AddChatTemplate("output_key01", chatTemplate01)
func (*Parallel) AddDocumentTransformer ¶
func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel
AddDocumentTransformer adds an Document Transformer node to the parallel. eg.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) p.AddDocumentTransformer("output_key01", markdownSplitter)
func (*Parallel) AddEmbedding ¶
func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel
AddEmbedding adds an embedding node to the parallel. eg.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{ Model: "text-embedding-3-small", }) p.AddEmbedding("output_key01", embeddingNode)
func (*Parallel) AddGraph ¶
func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel
AddGraph adds a graph node to the parallel. It is useful when you want to use a graph or a chain as a node in the parallel. eg.
graph, err := compose.NewChain[any,any]() p.AddGraph("output_key01", graph)
func (*Parallel) AddIndexer ¶
func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel
AddIndexer adds an indexer node to the parallel. eg.
indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{ Collection: "my_collection", }) p.AddIndexer("output_key01", indexer)
func (*Parallel) AddLambda ¶
func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel
AddLambda adds a lambda node to the parallel. eg.
lambdaFunc := func(ctx context.Context, input *schema.Message) ([]*schema.Message, error) { return []*schema.Message{input}, nil } p.AddLambda("output_key01", compose.InvokeLambda(lambdaFunc))
func (*Parallel) AddLoader ¶
func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel
AddLoader adds a loader node to the parallel. eg.
loader, err := file.NewLoader(ctx, &file.LoaderConfig{}) p.AddLoader("output_key01", loader)
func (*Parallel) AddPassthrough ¶
func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel
AddPassthrough adds a passthrough node to the parallel. eg.
p.AddPassthrough("output_key01")
func (*Parallel) AddRetriever ¶
func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel
AddRetriever adds a retriever node to the parallel. eg.
retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})
p.AddRetriever("output_key01", retriever)
func (*Parallel) AddToolsNode ¶
func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel
AddToolsNode adds a tools node to the parallel. eg.
toolsNode, err := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{ Tools: []tool.BaseTool{...}, }) p.AddToolsNode("output_key01", toolsNode)
type Runnable ¶
type Runnable[I, O any] interface { Invoke(ctx context.Context, input I, opts ...Option) (output O, err error) Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error) Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error) Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error) }
Runnable is the interface for an executable object. Graph, Chain can be compiled into Runnable. runnable is the core conception of eino, we do downgrade compatibility for four data flow patterns, and can automatically connect components that only implement one or more methods. eg, if a component only implements Stream() method, you can still call Invoke() to convert stream output to invoke output.
type Serializer ¶ added in v0.3.50
type StateModifier ¶ added in v0.3.18
type StatePostHandler ¶
StatePostHandler is a function called after the node is executed. Notice: if user called Stream but with StatePostHandler, the StatePostHandler will read all stream chunks and merge them into a single object.
type StatePreHandler ¶
StatePreHandler is a function called before the node is executed. Notice: if user called Stream but with StatePreHandler, the StatePreHandler will read all stream chunks and merge them into a single object.
type Stream ¶
type Stream[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output *schema.StreamReader[O], err error)
Stream is the type of the streamable lambda function.
type StreamGraphBranchCondition ¶
type StreamGraphBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNode string, err error)
StreamGraphBranchCondition is the condition type for the stream branch.
type StreamGraphMultiBranchCondition ¶ added in v0.3.18
type StreamGraphMultiBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNodes map[string]bool, err error)
StreamGraphMultiBranchCondition is the condition type for the stream multi choice branch.
type StreamStatePostHandler ¶
type StreamStatePostHandler[O, S any] func(ctx context.Context, out *schema.StreamReader[O], state S) (*schema.StreamReader[O], error)
StreamStatePostHandler is a function that is called after the node is executed with stream input and output.
type StreamStatePreHandler ¶
type StreamStatePreHandler[I, S any] func(ctx context.Context, in *schema.StreamReader[I], state S) (*schema.StreamReader[I], error)
StreamStatePreHandler is a function that is called before the node is executed with stream input and output.
type StreamWOOpt ¶
type StreamWOOpt[I, O any] func(ctx context.Context, input I) (output *schema.StreamReader[O], err error)
StreamWOOpt is the type of the streamable lambda function without options.
type ToolsInterruptAndRerunExtra ¶ added in v0.3.38
type ToolsNode ¶
type ToolsNode struct {
// contains filtered or unexported fields
}
ToolsNode represents a node capable of executing tools within a graph. The Graph Node interface is defined as follows:
Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
Input: An AssistantMessage containing ToolCalls Output: An array of ToolMessage where the order of elements corresponds to the order of ToolCalls in the input
func NewToolNode ¶
func NewToolNode(ctx context.Context, conf *ToolsNodeConfig) (*ToolsNode, error)
NewToolNode creates a new ToolsNode. e.g.
conf := &ToolsNodeConfig{ Tools: []tool.BaseTool{invokableTool1, streamableTool2}, } toolsNode, err := NewToolNode(ctx, conf)
func (*ToolsNode) Invoke ¶
func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
Invoke calls the tools and collects the results of invokable tools. it's parallel if there are multiple tool calls in the input message.
func (*ToolsNode) Stream ¶
func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
Stream calls the tools and collects the results of stream readers. it's parallel if there are multiple tool calls in the input message.
type ToolsNodeConfig ¶
type ToolsNodeConfig struct { // Tools specify the list of tools can be called which are BaseTool but must implement InvokableTool or StreamableTool. Tools []tool.BaseTool // UnknownToolsHandler handles tool calls for non-existent tools when LLM hallucinates. // This field is optional. When not set, calling a non-existent tool will result in an error. // When provided, if the LLM attempts to call a tool that doesn't exist in the Tools list, // this handler will be invoked instead of returning an error, allowing graceful handling of hallucinated tools. // Parameters: // - ctx: The context for the tool call // - name: The name of the non-existent tool // - input: The tool call input generated by llm // Returns: // - string: The response to be returned as if the tool was executed // - error: Any error that occurred during handling UnknownToolsHandler func(ctx context.Context, name, input string) (string, error) // ExecuteSequentially determines whether tool calls should be executed sequentially (in order) or in parallel. // When set to true, tool calls will be executed one after another in the order they appear in the input message. // When set to false (default), tool calls will be executed in parallel. ExecuteSequentially bool // ToolArgumentsHandler allows handling of tool arguments before execution. // When provided, this function will be called for each tool call to process the arguments. // Parameters: // - ctx: The context for the tool call // - name: The name of the tool being called // - arguments: The original arguments string for the tool // Returns: // - string: The processed arguments string to be used for tool execution // - error: Any error that occurred during preprocessing ToolArgumentsHandler func(ctx context.Context, name, arguments string) (string, error) }
ToolsNodeConfig is the config for ToolsNode.
type ToolsNodeOption ¶
type ToolsNodeOption func(o *toolsNodeOptions)
ToolsNodeOption is the option func type for ToolsNode.
func WithToolList ¶ added in v0.3.15
func WithToolList(tool ...tool.BaseTool) ToolsNodeOption
WithToolList sets the tool list for the ToolsNode.
func WithToolOption ¶
func WithToolOption(opts ...tool.Option) ToolsNodeOption
WithToolOption adds tool options to the ToolsNode.
type Transform ¶
type Transform[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)
Transform is the type of the transformable lambda function.
type TransformWOOpts ¶
type TransformWOOpts[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output *schema.StreamReader[O], err error)
TransformWOOpts is the type of the transformable lambda function without options.
type Workflow ¶ added in v0.3.8
type Workflow[I, O any] struct { // contains filtered or unexported fields }
Workflow is wrapper of graph, replacing AddEdge with declaring dependencies and field mappings between nodes. Under the hood it uses NodeTriggerMode(AllPredecessor), so does not support cycles.
func NewWorkflow ¶ added in v0.3.8
func NewWorkflow[I, O any](opts ...NewGraphOption) *Workflow[I, O]
NewWorkflow creates a new Workflow.
func (*Workflow[I, O]) AddBranch ¶ added in v0.3.18
func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch
AddBranch adds a branch to the workflow.
End Nodes Field Mappings: End nodes of the branch are required to define their own field mappings. This is a key distinction between Graph's Branch and Workflow's Branch: - Graph's Branch: Automatically passes its input to the selected node. - Workflow's Branch: Does not pass its input to the selected node.
func (*Workflow[I, O]) AddChatModelNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddChatTemplateNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddChatTemplateNode(key string, chatTemplate prompt.ChatTemplate, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddDocumentTransformerNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddDocumentTransformerNode(key string, transformer document.Transformer, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddEmbeddingNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddEmbeddingNode(key string, embedding embedding.Embedder, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddEnd
deprecated
added in
v0.3.8
func (wf *Workflow[I, O]) AddEnd(fromNodeKey string, inputs ...*FieldMapping) *Workflow[I, O]
Deprecated: use *Workflow[I,O].End() to obtain a WorkflowNode instance for END, then work with it just like a normal WorkflowNode.
func (*Workflow[I, O]) AddGraphNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddGraphNode(key string, graph AnyGraph, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddIndexerNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddIndexerNode(key string, indexer indexer.Indexer, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddLambdaNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddLambdaNode(key string, lambda *Lambda, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddLoaderNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddLoaderNode(key string, loader document.Loader, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddPassthroughNode ¶ added in v0.3.18
func (wf *Workflow[I, O]) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddRetrieverNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddRetrieverNode(key string, retriever retriever.Retriever, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) AddToolsNode ¶ added in v0.3.8
func (wf *Workflow[I, O]) AddToolsNode(key string, tools *ToolsNode, opts ...GraphAddNodeOpt) *WorkflowNode
func (*Workflow[I, O]) End ¶ added in v0.3.18
func (wf *Workflow[I, O]) End() *WorkflowNode
End returns the WorkflowNode representing END node.
type WorkflowAddInputOpt ¶ added in v0.3.18
type WorkflowAddInputOpt func(*workflowAddInputOpts)
func WithNoDirectDependency ¶ added in v0.3.18
func WithNoDirectDependency() WorkflowAddInputOpt
WithNoDirectDependency creates a data mapping without establishing a direct execution dependency. The predecessor node will still complete before the current node executes, but through indirect execution paths rather than a direct dependency.
In a workflow graph, node dependencies typically serve two purposes: 1. Execution order: determining when nodes should run 2. Data flow: specifying how data passes between nodes
This option separates these concerns by:
- Creating data mapping from the predecessor to the current node
- Relying on the predecessor's path to reach the current node through other nodes that have direct execution dependencies
Example:
node.AddInputWithOptions("dataNode", mappings, WithNoDirectDependency())
Important:
Branch scenarios: When connecting nodes on different sides of a branch, WithNoDirectDependency MUST be used to let the branch itself handle the execution order, preventing incorrect dependencies that could bypass the branch.
Execution guarantee: The predecessor will still complete before the current node executes because the predecessor must have a path (through other nodes) that eventually reaches the current node.
Graph validity: There MUST be a path from the predecessor that eventually reaches the current node through other nodes with direct dependencies. This ensures the execution order while avoiding redundant direct dependencies.
Common use cases: - Cross-branch data access where the branch handles execution order - Avoiding redundant dependencies when a path already exists
type WorkflowBranch ¶ added in v0.3.18
type WorkflowBranch struct { *GraphBranch // contains filtered or unexported fields }
type WorkflowNode ¶ added in v0.3.8
type WorkflowNode struct {
// contains filtered or unexported fields
}
WorkflowNode is the node of the Workflow.
func (*WorkflowNode) AddDependency ¶ added in v0.3.18
func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode
AddDependency creates an execution-only dependency between nodes. The current node will wait for the predecessor node to complete before executing, but no data will be passed between them.
Parameters:
- fromNodeKey: the key of the predecessor node that must complete before this node starts
Example:
// Wait for "setupNode" to complete before executing node.AddDependency("setupNode")
This is useful when: - You need to ensure execution order without data transfer - The predecessor performs setup or initialization that must complete first - You want to explicitly separate execution dependencies from data flow
Returns the current node for method chaining.
func (*WorkflowNode) AddInput ¶ added in v0.3.8
func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode
AddInput creates both data and execution dependencies between nodes. It configures how data flows from the predecessor node (fromNodeKey) to the current node, and ensures the current node only executes after the predecessor completes.
Parameters:
- fromNodeKey: the key of the predecessor node
- inputs: field mappings that specify how data should flow from the predecessor to the current node. If no mappings are provided, the entire output of the predecessor will be used as input.
Example:
// Map between specific field node.AddInput("userNode", MapFields("user.name", "displayName")) // Use entire output node.AddInput("dataNode")
Returns the current node for method chaining.
func (*WorkflowNode) AddInputWithOptions ¶ added in v0.3.18
func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode
AddInputWithOptions creates a dependency between nodes with custom configuration options. It allows fine-grained control over both data flow and execution dependencies.
Parameters:
- fromNodeKey: the key of the predecessor node
- inputs: field mappings that specify how data flows from the predecessor to the current node. If no mappings are provided, the entire output of the predecessor will be used as input.
- opts: configuration options that control how the dependency is established
Example:
// Create data mapping without direct execution dependency node.AddInputWithOptions("dataNode", mappings, WithNoDirectDependency())
Returns the current node for method chaining.
func (*WorkflowNode) SetStaticValue ¶ added in v0.3.18
func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode
SetStaticValue sets a static value for a field path that will be available during workflow execution. These values are determined at compile time and remain constant throughout the workflow's lifecycle.
Example:
node.SetStaticValue(FieldPath{"query"}, "static query")
Source Files
¶
- branch.go
- chain.go
- chain_branch.go
- chain_parallel.go
- checkpoint.go
- component_to_graph_node.go
- dag.go
- doc.go
- error.go
- field_mapping.go
- generic_graph.go
- generic_helper.go
- graph.go
- graph_add_node_options.go
- graph_call_options.go
- graph_compile_options.go
- graph_manager.go
- graph_node.go
- graph_run.go
- interrupt.go
- introspect.go
- pregel.go
- runnable.go
- state.go
- stream_concat.go
- stream_reader.go
- tool_node.go
- types.go
- types_composable.go
- types_lambda.go
- utils.go
- values_merge.go
- workflow.go