compose

package
v0.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2025 License: Apache-2.0 Imports: 25 Imported by: 111

Documentation

Index

Constants

View Source
const (
	ComponentOfUnknown     component = "Unknown"
	ComponentOfGraph       component = "Graph"
	ComponentOfWorkflow    component = "Workflow"
	ComponentOfChain       component = "Chain"
	ComponentOfPassthrough component = "Passthrough"
	ComponentOfToolsNode   component = "ToolsNode"
	ComponentOfLambda      component = "Lambda"
)

built-in component types in graph node. it represents the type of the most primitive executable object provided by the user.

View Source
const END = "end"

END is the end node of the graph. You can add your last edge with END.

View Source
const START = "start"

START is the start node of the graph. You can add your first edge with START.

Variables

View Source
var DAGInvalidLoopErr = errors.New("DAG is invalid, has loop")
View Source
var ErrChainCompiled = errors.New("chain has been compiled, cannot be modified")

ErrChainCompiled is returned when attempting to modify a chain after it has been compiled

View Source
var ErrExceedMaxSteps = errors.New("exceeds max steps")

ErrExceedMaxSteps graph will throw this error when the number of steps exceeds the maximum number of steps.

View Source
var ErrGraphCompiled = errors.New("graph has been compiled, cannot be modified")

ErrGraphCompiled is returned when attempting to modify a graph after it has been compiled

View Source
var InterruptAndRerun = errors.New("interrupt and rerun")

Functions

func GetToolCallID added in v0.3.18

func GetToolCallID(ctx context.Context) string

GetToolCallID gets the current tool call id from the context.

func InitGraphCompileCallbacks

func InitGraphCompileCallbacks(cbs []GraphCompileCallback)

InitGraphCompileCallbacks set global graph compile callbacks, which ONLY will be added to top level graph compile options

func IsInterruptRerunError added in v0.3.38

func IsInterruptRerunError(err error) (any, bool)

func NewInterruptAndRerunErr added in v0.3.38

func NewInterruptAndRerunErr(extra any) error

func ProcessState added in v0.3.10

func ProcessState[S any](ctx context.Context, handler func(context.Context, S) error) error

ProcessState processes the state from the context in a concurrency-safe way. This is the recommended way to access and modify state in custom nodes. The provided function handler will be executed with exclusive access to the state (protected by mutex). note: this method will report error if state type doesn't match or state is not found in context e.g.

lambdaFunc := func(ctx context.Context, in string, opts ...any) (string, error) {
	err := compose.ProcessState[*testState](ctx, func(state *testState) error {
		// do something with state in a concurrency-safe way
		state.Count++
		return nil
	})
	if err != nil {
		return "", err
	}
	return in, nil
}

stateGraph := compose.NewStateGraph[string, string, testState](genStateFunc)
stateGraph.AddNode("node1", lambdaFunc)

func RegisterInternalType added in v0.3.50

func RegisterInternalType(f func(key string, value any) error) error

func RegisterSerializableType added in v0.3.18

func RegisterSerializableType[T any](name string) error

RegisterSerializableType registers a custom type for eino serialization. This allows eino to properly serialize and deserialize custom types. Both custom interfaces and structs need to be registered using this function. Types only need to be registered once - pointers and other references will be handled automatically. All built-in eino types are already registered. Parameters: - name: A unique identifier for the type being registered (should not start with "_eino") - T: The generic type parameter representing the type to register Returns: - error: An error if registration fails (e.g., if the type is already registered)

func RegisterStreamChunkConcatFunc

func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))

RegisterStreamChunkConcatFunc registers a function to concat stream chunks. It's required when you want to concat stream chunks of a specific type. for example you call Invoke() but node only implements Stream(). call at process init not thread safe eg.

type testStruct struct {
	field1 string
	field2 int
}
compose.RegisterStreamChunkConcatFunc(func(items []testStruct) (testStruct, error) {
	return testStruct{
		field1: items[1].field1, // may implement inplace logic by your scenario
		field2: items[0].field2 + items[1].field2,
	}, nil
})

func RegisterValuesMergeFunc added in v0.3.26

func RegisterValuesMergeFunc[T any](fn func([]T) (T, error))

RegisterValuesMergeFunc registers a function to merge outputs from multiple nodes when fan-in. It's used to define how to merge for a specific type. For maps that already have a default merge function, you don't need to register a new one unless you want to customize the merge logic.

Types

type AnyGraph

type AnyGraph interface {
	// contains filtered or unexported methods
}

AnyGraph the identifiers for composable and compilable Graph[I, O]、Chain[I, O] in Eino.

type Chain

type Chain[I, O any] struct {
	// contains filtered or unexported fields
}

Chain is a chain of components. Chain nodes can be parallel / branch / sequence components. Chain is designed to be used in a builder pattern (should Compile() before use). And the interface is `Chain style`, you can use it like: `chain.AppendXX(...).AppendXX(...)`

Normal usage:

  1. create a chain with input/output type: `chain := NewChain[inputType, outputType]()`
  2. add components to chainable list: 2.1 add components: `chain.AppendChatTemplate(...).AppendChatModel(...).AppendToolsNode(...)` 2.2 add parallel or branch node if needed: `chain.AppendParallel()`, `chain.AppendBranch()`
  3. compile: `r, err := c.Compile()`
  4. run: 4.1 `one input & one output` use `r.Invoke(ctx, input)` 4.2 `one input & multi output chunk` use `r.Stream(ctx, input)` 4.3 `multi input chunk & one output` use `r.Collect(ctx, inputReader)` 4.4 `multi input chunk & multi output chunk` use `r.Transform(ctx, inputReader)`

Using in graph or other chain: chain1 := NewChain[inputType, outputType]() graph := NewGraph[](runTypePregel) graph.AddGraph("key", chain1) // chain is an AnyGraph implementation

// or in another chain: chain2 := NewChain[inputType, outputType]() chain2.AppendGraph(chain1)

func NewChain

func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O]

NewChain create a chain with input/output type.

func (*Chain[I, O]) AppendBranch

func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]

AppendBranch add a conditional branch to chain. Each branch within the ChainBranch can be an AnyGraph. All branches should either lead to END, or converge to another node within the Chain. e.g.

cb := compose.NewChainBranch(conditionFunc)
cb.AddChatTemplate("chat_template_key_01", chatTemplate)
cb.AddChatTemplate("chat_template_key_02", chatTemplate2)
chain.AppendBranch(cb)

func (*Chain[I, O]) AppendChatModel

func (c *Chain[I, O]) AppendChatModel(node model.BaseChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendChatModel add a ChatModel node to the chain. e.g.

model, err := openai.NewChatModel(ctx, config)
if err != nil {...}
chain.AppendChatModel(model)

func (*Chain[I, O]) AppendChatTemplate

func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendChatTemplate add a ChatTemplate node to the chain. eg.

chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

chain.AppendChatTemplate(chatTemplate)

func (*Chain[I, O]) AppendDocumentTransformer

func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendDocumentTransformer add a DocumentTransformer node to the chain. e.g.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

chain.AppendDocumentTransformer(markdownSplitter)

func (*Chain[I, O]) AppendEmbedding

func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendEmbedding add a Embedding node to the chain. e.g.

embedder, err := openai.NewEmbedder(ctx, config)
if err != nil {...}
chain.AppendEmbedding(embedder)

func (*Chain[I, O]) AppendGraph

func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendGraph add a AnyGraph node to the chain. AnyGraph can be a chain or a graph. e.g.

graph := compose.NewGraph[string, string]()
chain.AppendGraph(graph)

func (*Chain[I, O]) AppendIndexer

func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendIndexer add an Indexer node to the chain. Indexer is a node that can store documents. e.g.

vectorStoreImpl, err := vikingdb.NewVectorStorer(ctx, vikingdbConfig) // in components/vectorstore/vikingdb/vectorstore.go
if err != nil {...}

config := vectorstore.IndexerConfig{VectorStore: vectorStoreImpl}
indexer, err := vectorstore.NewIndexer(ctx, config)
if err != nil {...}

chain.AppendIndexer(indexer)

func (*Chain[I, O]) AppendLambda

func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendLambda add a Lambda node to the chain. Lambda is a node that can be used to implement custom logic. e.g.

lambdaNode := compose.InvokableLambda(func(ctx context.Context, docs []*schema.Document) (string, error) {...})
chain.AppendLambda(lambdaNode)

Note: to create a Lambda node, you need to use `compose.AnyLambda` or `compose.InvokableLambda` or `compose.StreamableLambda` or `compose.TransformableLambda`. if you want this node has real stream output, you need to use `compose.StreamableLambda` or `compose.TransformableLambda`, for example.

func (*Chain[I, O]) AppendLoader

func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendLoader adds a Loader node to the chain. e.g.

loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{})
if err != nil {...}
chain.AppendLoader(loader)

func (*Chain[I, O]) AppendParallel

func (c *Chain[I, O]) AppendParallel(p *Parallel) *Chain[I, O]

AppendParallel add a Parallel structure (multiple concurrent nodes) to the chain. e.g.

parallel := compose.NewParallel()
parallel.AddChatModel("openai", model1) // => "openai": *schema.Message{}
parallel.AddChatModel("maas", model2) // => "maas": *schema.Message{}

chain.AppendParallel(parallel) // => multiple concurrent nodes are added to the Chain

The next node in the chain is either an END, or a node which accepts a map[string]any, where keys are `openai` `maas` as specified above.

func (*Chain[I, O]) AppendPassthrough

func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]

AppendPassthrough add a Passthrough node to the chain. Could be used to connect multiple ChainBranch or Parallel. e.g.

chain.AppendPassthrough()

func (*Chain[I, O]) AppendRetriever

func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendRetriever add a Retriever node to the chain. e.g.

	retriever, err := vectorstore.NewRetriever(ctx, config)
	if err != nil {...}
	chain.AppendRetriever(retriever)

 or using fornax knowledge as retriever:

	config := fornaxknowledge.Config{...}
	retriever, err := fornaxknowledge.NewKnowledgeRetriever(ctx, config)
	if err != nil {...}
	chain.AppendRetriever(retriever)

func (*Chain[I, O]) AppendToolsNode

func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendToolsNode add a ToolsNode node to the chain. e.g.

toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{
	Tools: []tools.Tool{...},
})

chain.AppendToolsNode(toolsNode)

func (*Chain[I, O]) Compile

func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)

Compile to a Runnable. Runnable can be used directly. e.g.

	chain := NewChain[string, string]()
	r, err := chain.Compile()
	if err != nil {}

 	r.Invoke(ctx, input) // ping => pong
	r.Stream(ctx, input) // ping => stream out
	r.Collect(ctx, inputReader) // stream in => pong
	r.Transform(ctx, inputReader) // stream in => stream out

type ChainBranch

type ChainBranch struct {
	// contains filtered or unexported fields
}

ChainBranch represents a conditional branch in a chain of operations. It allows for dynamic routing of execution based on a condition. All branches within ChainBranch are expected to either end the Chain, or converge to another node in the Chain.

func NewChainBranch

func NewChainBranch[T any](cond GraphBranchCondition[T]) *ChainBranch

NewChainBranch creates a new ChainBranch instance based on a given condition. It takes a generic type T and a GraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.

condition := func(ctx context.Context, in string, opts ...any) (endNode string, err error) {
	// logic to determine the next node
	return "some_next_node_key", nil
}

cb := NewChainBranch[string](condition)
cb.AddPassthrough("next_node_key_01", xxx) // node in branch, represent one path of branch
cb.AddPassthrough("next_node_key_02", xxx) // node in branch

func NewChainMultiBranch added in v0.3.18

func NewChainMultiBranch[T any](cond GraphMultiBranchCondition[T]) *ChainBranch

func NewStreamChainBranch

func NewStreamChainBranch[T any](cond StreamGraphBranchCondition[T]) *ChainBranch

NewStreamChainBranch creates a new ChainBranch instance based on a given stream condition. It takes a generic type T and a StreamGraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.

condition := func(ctx context.Context, in *schema.StreamReader[string], opts ...any) (endNode string, err error) {
	// logic to determine the next node, you can read the stream and make a decision.
	// to save time, usually read the first chunk of stream, then make a decision which path to go.
	return "some_next_node_key", nil
}

cb := NewStreamChainBranch[string](condition)

func NewStreamChainMultiBranch added in v0.3.18

func NewStreamChainMultiBranch[T any](cond StreamGraphMultiBranchCondition[T]) *ChainBranch

func (*ChainBranch) AddChatModel

func (cb *ChainBranch) AddChatModel(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *ChainBranch

AddChatModel adds a ChatModel node to the branch. eg.

chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})
chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o-mini",
})
cb.AddChatModel("chat_model_key_01", chatModel01)
cb.AddChatModel("chat_model_key_02", chatModel02)

func (*ChainBranch) AddChatTemplate

func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch

AddChatTemplate adds a ChatTemplate node to the branch. eg.

chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

cb.AddChatTemplate("chat_template_key_01", chatTemplate)

chatTemplate2, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}, you are not allowed to chat in other topics.",
})

cb.AddChatTemplate("chat_template_key_02", chatTemplate2)

func (*ChainBranch) AddDocumentTransformer

func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch

AddDocumentTransformer adds an Document Transformer node to the branch. eg.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

cb.AddDocumentTransformer("document_transformer_node_key", markdownSplitter)

func (*ChainBranch) AddEmbedding

func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch

AddEmbedding adds an Embedding node to the branch. eg.

embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
	Model: "text-embedding-3-small",
})

cb.AddEmbedding("embedding_node_key", embeddingNode)

func (*ChainBranch) AddGraph

func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch

AddGraph adds a generic Graph node to the branch. eg.

graph, err := compose.NewGraph[string, string]()

cb.AddGraph("graph_node_key", graph)

func (*ChainBranch) AddIndexer

func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch

AddIndexer adds an Indexer node to the branch. eg.

indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{
	Collection: "my_collection",
})

cb.AddIndexer("indexer_node_key", indexer)

func (*ChainBranch) AddLambda

func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch

AddLambda adds a Lambda node to the branch. eg.

lambdaFunc := func(ctx context.Context, in string, opts ...any) (out string, err error) {
	// logic to process the input
	return "processed_output", nil
}

cb.AddLambda("lambda_node_key", compose.InvokeLambda(lambdaFunc))

func (*ChainBranch) AddLoader

func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch

AddLoader adds a Loader node to the branch. eg.

pdfParser, err := pdf.NewPDFParser()
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{
	Parser: pdfParser,
})

cb.AddLoader("loader_node_key", loader)

func (*ChainBranch) AddPassthrough

func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch

AddPassthrough adds a Passthrough node to the branch. eg.

cb.AddPassthrough("passthrough_node_key")

func (*ChainBranch) AddRetriever

func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch

AddRetriever adds a Retriever node to the branch. eg.

retriever, err := volc_vikingdb.NewRetriever(ctx, &volc_vikingdb.RetrieverConfig{
	Collection: "my_collection",
})

cb.AddRetriever("retriever_node_key", retriever)

func (*ChainBranch) AddToolsNode

func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch

AddToolsNode adds a ToolsNode to the branch. eg.

toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{
	Tools: []tools.Tool{...},
})

cb.AddToolsNode("tools_node_key", toolsNode)

type CheckPointStore added in v0.3.18

type CheckPointStore interface {
	Get(ctx context.Context, checkPointID string) ([]byte, bool, error)
	Set(ctx context.Context, checkPointID string, checkPoint []byte) error
}

type Collect

type Collect[I, O, TOption any] func(ctx context.Context,
	input *schema.StreamReader[I], opts ...TOption) (output O, err error)

Collect is the type of the collectable lambda function.

type CollectWOOpt

type CollectWOOpt[I, O any] func(ctx context.Context,
	input *schema.StreamReader[I]) (output O, err error)

CollectWOOpt is the type of the collectable lambda function without options.

type FanInMergeConfig added in v0.3.44

type FanInMergeConfig struct {
	StreamMergeWithSourceEOF bool //indicates whether to emit a SourceEOF error for each stream
}

FanInMergeConfig defines the configuration for fan-in merge operations. It allows specifying how multiple inputs are merged into a single input. StreamMergeWithSourceEOF indicates whether to emit a SourceEOF error for each stream when it ends, before the final merged output is produced. This is useful for tracking the completion of individual input streams in a named stream merge.

type FieldMapping added in v0.3.8

type FieldMapping struct {
	// contains filtered or unexported fields
}

func FromField added in v0.3.8

func FromField(from string) *FieldMapping

FromField creates a FieldMapping that maps a single predecessor field to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped. Field: either the field of a struct, or the key of a map.

func FromFieldPath added in v0.3.17

func FromFieldPath(fromFieldPath FieldPath) *FieldMapping

FromFieldPath creates a FieldMapping that maps a single predecessor field path to the entire successor input. This is an exclusive mapping - once set, no other field mappings can be added since the successor input has already been fully mapped.

Example:

// Maps the 'name' field from nested 'user.profile' to the entire successor input
FromFieldPath(FieldPath{"user", "profile", "name"})

Note: The field path elements must not contain the internal path separator character ('\x1F').

func MapFieldPaths added in v0.3.17

func MapFieldPaths(fromFieldPath, toFieldPath FieldPath) *FieldMapping

MapFieldPaths creates a FieldMapping that maps a single predecessor field path to a single successor field path.

Example:

// Maps user.profile.name to response.userName
MapFieldPaths(
    FieldPath{"user", "profile", "name"},
    FieldPath{"response", "userName"},
)

Note: The field path elements must not contain the internal path separator character ('\x1F').

func MapFields added in v0.3.8

func MapFields(from, to string) *FieldMapping

MapFields creates a FieldMapping that maps a single predecessor field to a single successor field. Field: either the field of a struct, or the key of a map.

func ToField added in v0.3.8

func ToField(to string, opts ...FieldMappingOption) *FieldMapping

ToField creates a FieldMapping that maps the entire predecessor output to a single successor field. Field: either the field of a struct, or the key of a map.

func ToFieldPath added in v0.3.17

func ToFieldPath(toFieldPath FieldPath, opts ...FieldMappingOption) *FieldMapping

ToFieldPath creates a FieldMapping that maps the entire predecessor output to a single successor field path.

Example:

// Maps the entire predecessor output to response.data.userName
ToFieldPath(FieldPath{"response", "data", "userName"})

Note: The field path elements must not contain the internal path separator character ('\x1F').

func (*FieldMapping) Equals added in v0.3.48

func (m *FieldMapping) Equals(o *FieldMapping) bool

func (*FieldMapping) FromNodeKey added in v0.3.48

func (m *FieldMapping) FromNodeKey() string

func (*FieldMapping) FromPath added in v0.3.48

func (m *FieldMapping) FromPath() FieldPath

func (*FieldMapping) String added in v0.3.8

func (m *FieldMapping) String() string

String returns the string representation of the FieldMapping.

func (*FieldMapping) ToPath added in v0.3.48

func (m *FieldMapping) ToPath() FieldPath

type FieldMappingOption added in v0.3.48

type FieldMappingOption func(*FieldMapping)

FieldMappingOption is a functional option for configuring a FieldMapping.

func WithCustomExtractor added in v0.3.48

func WithCustomExtractor(extractor func(input any) (any, error)) FieldMappingOption

WithCustomExtractor sets a custom extractor function for the FieldMapping. The extractor function is used to extract a value from the 'source' of the FieldMapping. NOTE: if specified in this way, Eino can only check the validity of the field mapping at request time..

type FieldPath added in v0.3.17

type FieldPath []string

FieldPath represents a path to a nested field in a struct or map. Each element in the path is either: - a struct field name - a map key

Example paths:

  • []string{"user"} // top-level field
  • []string{"user", "name"} // nested struct field
  • []string{"users", "admin"} // map key access

type GenLocalState

type GenLocalState[S any] func(ctx context.Context) (state S)

GenLocalState is a function that generates the state.

type Graph

type Graph[I, O any] struct {
	// contains filtered or unexported fields
}

Graph is a generic graph that can be used to compose components. I: the input type of graph compiled product O: the output type of graph compiled product

func NewGraph

func NewGraph[I, O any](opts ...NewGraphOption) *Graph[I, O]

NewGraph create a directed graph that can compose components, lambda, chain, parallel etc. simultaneously provide flexible and multi-granular aspect governance capabilities. I: the input type of graph compiled product O: the output type of graph compiled product

To share state between nodes, use WithGenLocalState option:

type testState struct {
	UserInfo *UserInfo
	KVs     map[string]any
}

genStateFunc := func(ctx context.Context) *testState {
	return &testState{}
}

graph := compose.NewGraph[string, string](WithGenLocalState(genStateFunc))

// you can use WithStatePreHandler and WithStatePostHandler to do something with state
graph.AddNode("node1", someNode, compose.WithPreHandler(func(ctx context.Context, in string, state *testState) (string, error) {
	// do something with state
	return in, nil
}), compose.WithPostHandler(func(ctx context.Context, out string, state *testState) (string, error) {
	// do something with state
	return out, nil
}))

func (Graph) AddBranch

func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)

AddBranch adds a branch to the graph. e.g.

condition := func(ctx context.Context, in string) (string, error) {
	return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewGraphBranch(condition, endNodes)

graph.AddBranch("start_node_key", branch)

func (Graph) AddChatModelNode

func (g Graph) AddChatModelNode(key string, node model.BaseChatModel, opts ...GraphAddNodeOpt) error

AddChatModelNode add node that implements model.BaseChatModel. e.g.

chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})

graph.AddChatModelNode("chat_model_node_key", chatModel)

func (Graph) AddChatTemplateNode

func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error

AddChatTemplateNode add node that implements prompt.ChatTemplate. e.g.

chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

graph.AddChatTemplateNode("chat_template_node_key", chatTemplate)

func (Graph) AddDocumentTransformerNode

func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error

AddDocumentTransformerNode adds a node that implements document.Transformer. e.g.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

graph.AddDocumentTransformerNode("document_transformer_node_key", markdownSplitter)

func (*Graph[I, O]) AddEdge

func (g *Graph[I, O]) AddEdge(startNode, endNode string) (err error)

AddEdge adds an edge to the graph, edge means a data flow from startNode to endNode. the previous node's output type must be set to the next node's input type. NOTE: startNode and endNode must have been added to the graph before adding edge. e.g.

graph.AddNode("start_node_key", compose.NewPassthroughNode())
graph.AddNode("end_node_key", compose.NewPassthroughNode())

err := graph.AddEdge("start_node_key", "end_node_key")

func (Graph) AddEmbeddingNode

func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error

AddEmbeddingNode adds a node that implements embedding.Embedder. e.g.

embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
	Model: "text-embedding-3-small",
})

graph.AddEmbeddingNode("embedding_node_key", embeddingNode)

func (Graph) AddGraphNode

func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error

AddGraphNode add one kind of Graph[I, O]、Chain[I, O]、StateChain[I, O, S] as a node. for Graph[I, O], comes from NewGraph[I, O]() for Chain[I, O], comes from NewChain[I, O]()

func (Graph) AddIndexerNode

func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error

AddIndexerNode adds a node that implements indexer.Indexer. e.g.

indexer, err := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{})

graph.AddIndexerNode("indexer_node_key", indexer)

func (Graph) AddLambdaNode

func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error

AddLambdaNode add node that implements at least one of Invoke[I, O], Stream[I, O], Collect[I, O], Transform[I, O]. due to the lack of supporting method generics, we need to use function generics to generate Lambda run as Runnable[I, O]. for Invoke[I, O], use compose.InvokableLambda() for Stream[I, O], use compose.StreamableLambda() for Collect[I, O], use compose.CollectableLambda() for Transform[I, O], use compose.TransformableLambda() for arbitrary combinations of 4 kinds of lambda, use compose.AnyLambda()

func (Graph) AddLoaderNode

func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error

AddLoaderNode adds a node that implements document.Loader. e.g.

loader, err := file.NewLoader(ctx, &file.LoaderConfig{})

graph.AddLoaderNode("loader_node_key", loader)

func (Graph) AddPassthroughNode

func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error

AddPassthroughNode adds a passthrough node to the graph. mostly used in pregel mode of graph. e.g.

graph.AddPassthroughNode("passthrough_node_key")

func (Graph) AddRetrieverNode

func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error

AddRetrieverNode adds a node that implements retriever.Retriever. e.g.

retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})

graph.AddRetrieverNode("retriever_node_key", retrieverNode)

func (Graph) AddToolsNode

func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error

AddToolsNode adds a node that implements tools.ToolsNode. e.g.

toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{})

graph.AddToolsNode("tools_node_key", toolsNode)

func (*Graph[I, O]) Compile

func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)

Compile take the raw graph and compile it into a form ready to be run. e.g.

graph, err := compose.NewGraph[string, string]()
if err != nil {...}

runnable, err := graph.Compile(ctx, compose.WithGraphName("my_graph"))
if err != nil {...}

runnable.Invoke(ctx, "input") // invoke
runnable.Stream(ctx, "input") // stream
runnable.Collect(ctx, inputReader) // collect
runnable.Transform(ctx, inputReader) // transform

func (Graph) GetType

func (g Graph) GetType() string

type GraphAddNodeOpt

type GraphAddNodeOpt func(o *graphAddNodeOpts)

GraphAddNodeOpt is a functional option type for adding a node to a graph. e.g.

graph.AddNode("node_name", node, compose.WithInputKey("input_key"), compose.WithOutputKey("output_key"))

func WithGraphCompileOptions

func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt

WithGraphCompileOptions when the node is an AnyGraph, use this option to set compile option for the node. e.g.

graph.AddNode("node_name", node, compose.WithGraphCompileOptions(compose.WithGraphName("my_sub_graph")))

func WithInputKey

func WithInputKey(k string) GraphAddNodeOpt

WithInputKey sets the input key of the node. this will change the input value of the node, for example, if the pre node's output is map[string]any{"key01": "value01"}, and the current node's input key is "key01", then the current node's input value will be "value01".

func WithNodeKey

func WithNodeKey(key string) GraphAddNodeOpt

WithNodeKey set the node key, which is used to identify the node in the chain. only for use in Chain/StateChain.

func WithNodeName

func WithNodeName(n string) GraphAddNodeOpt

WithNodeName sets the name of the node.

func WithOutputKey

func WithOutputKey(k string) GraphAddNodeOpt

WithOutputKey sets the output key of the node. this will change the output value of the node, for example, if the current node's output key is "key01", then the node's output value will be map[string]any{"key01": value}.

func WithStatePostHandler

func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt

WithStatePostHandler modify node's output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

func WithStatePreHandler

func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt

WithStatePreHandler modify node's input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

func WithStreamStatePostHandler

func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt

WithStreamStatePostHandler modify node's streaming output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when current node's output is an actual stream, and you want the downstream node's input to remain an actual stream after state post handler. caution: while StreamStatePostHandler is thread safe, modifying state within your own goroutine is NOT. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

func WithStreamStatePreHandler

func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt

WithStreamStatePreHandler modify node's streaming input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when upstream node's output is an actual stream, and you want the current node's input to remain an actual stream after state pre handler. caution: while StreamStatePreHandler is thread safe, modifying state within your own goroutine is NOT. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

type GraphBranch

type GraphBranch struct {
	// contains filtered or unexported fields
}

GraphBranch is the branch type for the graph. It is used to determine the next node based on the condition.

func NewGraphBranch

func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch

NewGraphBranch creates a new graph branch. It is used to determine the next node based on the condition. e.g.

condition := func(ctx context.Context, in string) (string, error) {
	// logic to determine the next node
	return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewGraphBranch(condition, endNodes)

graph.AddBranch("key_of_node_before_branch", branch)

func NewGraphMultiBranch added in v0.3.18

func NewGraphMultiBranch[T any](condition GraphMultiBranchCondition[T], endNodes map[string]bool) *GraphBranch

func NewStreamGraphBranch

func NewStreamGraphBranch[T any](condition StreamGraphBranchCondition[T], endNodes map[string]bool) *GraphBranch

NewStreamGraphBranch creates a new stream graph branch. It is used to determine the next node based on the condition of stream input. e.g.

condition := func(ctx context.Context, in *schema.StreamReader[T]) (string, error) {
	// logic to determine the next node.
	// to use the feature of stream, you can use the first chunk to determine the next node.
	return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewStreamGraphBranch(condition, endNodes)

graph.AddBranch("key_of_node_before_branch", branch)

func NewStreamGraphMultiBranch added in v0.3.18

func NewStreamGraphMultiBranch[T any](condition StreamGraphMultiBranchCondition[T],
	endNodes map[string]bool) *GraphBranch

func (*GraphBranch) GetEndNode

func (gb *GraphBranch) GetEndNode() map[string]bool

GetEndNode returns the all end nodes of the branch.

type GraphBranchCondition

type GraphBranchCondition[T any] func(ctx context.Context, in T) (endNode string, err error)

GraphBranchCondition is the condition type for the branch.

type GraphCompileCallback

type GraphCompileCallback interface {
	OnFinish(ctx context.Context, info *GraphInfo)
}

GraphCompileCallback is the callback which will be called when graph compilation finishes.

type GraphCompileOption

type GraphCompileOption func(*graphCompileOptions)

GraphCompileOption options for compiling AnyGraph.

func WithCheckPointStore added in v0.3.18

func WithCheckPointStore(store CheckPointStore) GraphCompileOption

func WithEagerExecution added in v0.3.34

func WithEagerExecution() GraphCompileOption

WithEagerExecution enables the eager execution mode for the graph. In eager mode, nodes will be executed immediately once they are ready to run, without waiting for the completion of a super step, ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine Note: Eager mode is not allowed when the graph's trigger mode is set to AnyPredecessor. Workflow uses eager mode by default. Deprecated: Eager execution is automatically enabled by default when a node's trigger mode is set to AllPredecessor. If you were using this option previously, it can be safely removed without changing behavior.

func WithEagerExecutionDisabled added in v0.4.0

func WithEagerExecutionDisabled() GraphCompileOption

WithEagerExecutionDisabled disables the eager execution mode for the graph. By default, eager execution is enabled for Workflow and Graph with the AllPredecessor trigger mode. After using this option, nodes will wait for the completion of a super step instead of execute immediately once they are ready to run. ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine

func WithFanInMergeConfig added in v0.3.44

func WithFanInMergeConfig(confs map[string]FanInMergeConfig) GraphCompileOption

WithFanInMergeConfig sets the fan-in merge configurations for the graph nodes that receive inputs from multiple sources.

func WithGraphCompileCallbacks

func WithGraphCompileCallbacks(cbs ...GraphCompileCallback) GraphCompileOption

WithGraphCompileCallbacks sets callbacks for graph compilation.

func WithGraphName

func WithGraphName(graphName string) GraphCompileOption

WithGraphName sets a name for the graph. The name is used for debugging and logging purposes. If not set, a default name will be used.

func WithInterruptAfterNodes added in v0.3.18

func WithInterruptAfterNodes(nodes []string) GraphCompileOption

func WithInterruptBeforeNodes added in v0.3.18

func WithInterruptBeforeNodes(nodes []string) GraphCompileOption

func WithMaxRunSteps

func WithMaxRunSteps(maxSteps int) GraphCompileOption

WithMaxRunSteps sets the maximum number of steps that a graph can run. This is useful to prevent infinite loops in graphs with cycles. If the number of steps exceeds maxSteps, the graph execution will be terminated with an error.

func WithNodeTriggerMode

func WithNodeTriggerMode(triggerMode NodeTriggerMode) GraphCompileOption

WithNodeTriggerMode sets the trigger mode for nodes in the graph. The trigger mode determines when a node is triggered during graph execution, ref: https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine AnyPredecessor by default.

func WithSerializer added in v0.3.50

func WithSerializer(serializer Serializer) GraphCompileOption

type GraphInfo

type GraphInfo struct {
	CompileOptions        []GraphCompileOption
	Nodes                 map[string]GraphNodeInfo // node key -> node info
	Edges                 map[string][]string      // edge start node key -> edge end node key, control edges
	DataEdges             map[string][]string
	Branches              map[string][]GraphBranch // branch start node key -> branch
	InputType, OutputType reflect.Type
	Name                  string

	NewGraphOptions []NewGraphOption
	GenStateFn      func(context.Context) any
}

GraphInfo the info which end users pass in when they are compiling a graph. it is used in compile callback for user to get the node info and instance. you may need all details info of the graph for observation.

type GraphMultiBranchCondition added in v0.3.18

type GraphMultiBranchCondition[T any] func(ctx context.Context, in T) (endNode map[string]bool, err error)

GraphMultiBranchCondition is the condition type for the multi choice branch.

type GraphNodeInfo

type GraphNodeInfo struct {
	Component             components.Component
	Instance              any
	GraphAddNodeOpts      []GraphAddNodeOpt
	InputType, OutputType reflect.Type // mainly for lambda, whose input and output types cannot be inferred by component type
	Name                  string
	InputKey, OutputKey   string
	GraphInfo             *GraphInfo
	Mappings              []*FieldMapping
}

GraphNodeInfo the info which end users pass in when they are adding nodes to graph.

type InterruptInfo added in v0.3.18

type InterruptInfo struct {
	State           any
	BeforeNodes     []string
	AfterNodes      []string
	RerunNodes      []string
	RerunNodesExtra map[string]any
	SubGraphs       map[string]*InterruptInfo
}

func ExtractInterruptInfo added in v0.3.18

func ExtractInterruptInfo(err error) (info *InterruptInfo, existed bool)

type Invoke

type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)

Invoke is the type of the invokable lambda function.

type InvokeWOOpt

type InvokeWOOpt[I, O any] func(ctx context.Context, input I) (output O, err error)

InvokeWOOpt is the type of the invokable lambda function without options.

type Lambda

type Lambda struct {
	// contains filtered or unexported fields
}

Lambda is the node that wraps the user provided lambda function. It can be used as a node in Graph or Chain (include Parallel and Branch). Create a Lambda by using AnyLambda/InvokableLambda/StreamableLambda/CollectableLambda/TransformableLambda. eg.

lambda := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
	return input, nil
})

func AnyLambda

func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption],
	c Collect[I, O, TOption], t Transform[I, O, TOption], opts ...LambdaOpt) (*Lambda, error)

AnyLambda creates a Lambda with any lambda function. you can only implement one or more of the four lambda functions, and the rest use nil. eg.

invokeFunc := func(ctx context.Context, input string, opts ...myOption) (output string, err error) {
	// ...
}
streamFunc := func(ctx context.Context, input string, opts ...myOption) (output *schema.StreamReader[string], err error) {
	// ...
}

lambda := compose.AnyLambda(invokeFunc, streamFunc, nil, nil)

func CollectableLambda

func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda

CollectableLambda creates a Lambda with collectable lambda function without options.

func CollectableLambdaWithOption

func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda

CollectableLambdaWithOption creates a Lambda with collectable lambda function and options.

func InvokableLambda

func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda

InvokableLambda creates a Lambda with invokable lambda function without options.

func InvokableLambdaWithOption

func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda

InvokableLambdaWithOption creates a Lambda with invokable lambda function and options.

func MessageParser

func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda

MessageParser creates a lambda that parses a message into an object T, usually used after a chatmodel. usage:

parser := schema.NewMessageJSONParser[MyStruct](&schema.MessageJSONParseConfig{
	ParseFrom: schema.MessageParseFromContent,
})
parserLambda := MessageParser(parser)

chain := NewChain[*schema.Message, MyStruct]()
chain.AppendChatModel(chatModel)
chain.AppendLambda(parserLambda)

r, err := chain.Compile(context.Background())

// parsed is a MyStruct object
parsed, err := r.Invoke(context.Background(), &schema.Message{
	Role:    schema.MessageRoleUser,
	Content: "return a json string for my struct",
})

func StreamableLambda

func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda

StreamableLambda creates a Lambda with streamable lambda function without options.

func StreamableLambdaWithOption

func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda

StreamableLambdaWithOption creates a Lambda with streamable lambda function and options.

func ToList

func ToList[I any](opts ...LambdaOpt) *Lambda

ToList creates a Lambda that converts input I to a []I. It's useful when you want to convert a single input to a list of inputs. eg.

lambda := compose.ToList[*schema.Message]()
chain := compose.NewChain[[]*schema.Message, []*schema.Message]()

chain.AddChatModel(chatModel) // chatModel returns *schema.Message, but we need []*schema.Message
chain.AddLambda(lambda) // convert *schema.Message to []*schema.Message

func TransformableLambda

func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda

TransformableLambda creates a Lambda with transformable lambda function without options.

func TransformableLambdaWithOption

func TransformableLambdaWithOption[I, O, TOption any](t Transform[I, O, TOption], opts ...LambdaOpt) *Lambda

TransformableLambdaWithOption creates a Lambda with transformable lambda function and options.

type LambdaOpt

type LambdaOpt func(o *lambdaOpts)

LambdaOpt is the option for creating a Lambda.

func WithLambdaCallbackEnable

func WithLambdaCallbackEnable(y bool) LambdaOpt

WithLambdaCallbackEnable enables the callback aspect of the lambda function.

func WithLambdaType

func WithLambdaType(t string) LambdaOpt

WithLambdaType sets the type of the lambda function.

type NewGraphOption added in v0.3.1

type NewGraphOption func(ngo *newGraphOptions)

func WithGenLocalState added in v0.3.1

func WithGenLocalState[S any](gls GenLocalState[S]) NewGraphOption

type NodePath added in v0.3.4

type NodePath struct {
	// contains filtered or unexported fields
}

func NewNodePath added in v0.3.4

func NewNodePath(path ...string) *NodePath

func (*NodePath) GetPath added in v0.3.22

func (p *NodePath) GetPath() []string

type NodeTriggerMode

type NodeTriggerMode string

NodeTriggerMode controls the triggering mode of graph nodes.

const (
	// AnyPredecessor means that the node will be triggered when any of its predecessors is included in the previous completed super step.
	// Ref:https://www.cloudwego.io/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/#runtime-engine
	AnyPredecessor NodeTriggerMode = "any_predecessor"
	// AllPredecessor means that the current node will only be triggered when all of its predecessor nodes have finished running.
	AllPredecessor NodeTriggerMode = "all_predecessor"
)

type Option

type Option struct {
	// contains filtered or unexported fields
}

Option is a functional option type for calling a graph.

func WithCallbacks

func WithCallbacks(cbs ...callbacks.Handler) Option

WithCallbacks set callback handlers for all components in a single call. e.g.

runnable.Invoke(ctx, "input", compose.WithCallbacks(&myCallbacks{}))

func WithChatModelOption

func WithChatModelOption(opts ...model.Option) Option

WithChatModelOption is a functional option type for chat model component. e.g.

chatModelOption := compose.WithChatModelOption(model.WithTemperature(0.7))
runnable.Invoke(ctx, "input", chatModelOption)

func WithChatTemplateOption

func WithChatTemplateOption(opts ...prompt.Option) Option

WithChatTemplateOption is a functional option type for chat template component.

func WithCheckPointID added in v0.3.18

func WithCheckPointID(checkPointID string) Option

func WithDocumentTransformerOption

func WithDocumentTransformerOption(opts ...document.TransformerOption) Option

WithDocumentTransformerOption is a functional option type for document transformer component.

func WithEmbeddingOption

func WithEmbeddingOption(opts ...embedding.Option) Option

WithEmbeddingOption is a functional option type for embedding component. e.g.

embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
runnable.Invoke(ctx, "input", embeddingOption)

func WithForceNewRun added in v0.3.45

func WithForceNewRun() Option

WithForceNewRun forces the graph to run from the beginning, ignoring any checkpoints.

func WithIndexerOption

func WithIndexerOption(opts ...indexer.Option) Option

WithIndexerOption is a functional option type for indexer component. e.g.

indexerOption := compose.WithIndexerOption(indexer.WithSubIndexes([]string{"my_sub_index"}))
runnable.Invoke(ctx, "input", indexerOption)

func WithLambdaOption

func WithLambdaOption(opts ...any) Option

WithLambdaOption is a functional option type for lambda component.

func WithLoaderOption

func WithLoaderOption(opts ...document.LoaderOption) Option

WithLoaderOption is a functional option type for loader component. e.g.

loaderOption := compose.WithLoaderOption(document.WithCollection("my_collection"))
runnable.Invoke(ctx, "input", loaderOption)

func WithRetrieverOption

func WithRetrieverOption(opts ...retriever.Option) Option

WithRetrieverOption is a functional option type for retriever component. e.g.

retrieverOption := compose.WithRetrieverOption(retriever.WithIndex("my_index"))
runnable.Invoke(ctx, "input", retrieverOption)

func WithRuntimeMaxSteps

func WithRuntimeMaxSteps(maxSteps int) Option

WithRuntimeMaxSteps sets the maximum number of steps for the graph runtime. e.g.

runnable.Invoke(ctx, "input", compose.WithRuntimeMaxSteps(20))

func WithStateModifier added in v0.3.18

func WithStateModifier(sm StateModifier) Option

func WithToolsNodeOption

func WithToolsNodeOption(opts ...ToolsNodeOption) Option

WithToolsNodeOption is a functional option type for tools node component.

func WithWriteToCheckPointID added in v0.3.53

func WithWriteToCheckPointID(checkPointID string) Option

WithWriteToCheckPointID specifies a different checkpoint ID to write to. If not provided, the checkpoint ID from WithCheckPointID will be used for writing. This is useful for scenarios where you want to load from an existed checkpoint but save the progress to a new, separate checkpoint.

func (Option) DesignateNode

func (o Option) DesignateNode(key ...string) Option

DesignateNode set the key of the node which will the option be applied to. notice: only effective at the top graph. e.g.

embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
runnable.Invoke(ctx, "input", embeddingOption.DesignateNode("my_embedding_node"))

func (Option) DesignateNodeWithPath added in v0.3.4

func (o Option) DesignateNodeWithPath(path ...*NodePath) Option

DesignateNodeWithPath sets the path of the node(s) to which the option will be applied to. You can make the option take effect in the subgraph by specifying the key of the subgraph. e.g. DesignateNodeWithPath({"sub graph node key", "node key within sub graph"})

type Parallel

type Parallel struct {
	// contains filtered or unexported fields
}

Parallel run multiple nodes in parallel

use `NewParallel()` to create a new parallel type Example:

parallel := NewParallel()
parallel.AddChatModel("output_key01", chat01)
parallel.AddChatModel("output_key01", chat02)

chain := NewChain[any,any]()
chain.AppendParallel(parallel)

func NewParallel

func NewParallel() *Parallel

NewParallel creates a new parallel type. it is useful when you want to run multiple nodes in parallel in a chain.

func (*Parallel) AddChatModel

func (p *Parallel) AddChatModel(outputKey string, node model.BaseChatModel, opts ...GraphAddNodeOpt) *Parallel

AddChatModel adds a chat model to the parallel. eg.

chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})

chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})

p.AddChatModel("output_key01", chatModel01)
p.AddChatModel("output_key02", chatModel02)

func (*Parallel) AddChatTemplate

func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel

AddChatTemplate adds a chat template to the parallel. eg.

chatTemplate01, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

p.AddChatTemplate("output_key01", chatTemplate01)

func (*Parallel) AddDocumentTransformer

func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel

AddDocumentTransformer adds an Document Transformer node to the parallel. eg.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

p.AddDocumentTransformer("output_key01", markdownSplitter)

func (*Parallel) AddEmbedding

func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel

AddEmbedding adds an embedding node to the parallel. eg.

embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
	Model: "text-embedding-3-small",
})

p.AddEmbedding("output_key01", embeddingNode)

func (*Parallel) AddGraph

func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel

AddGraph adds a graph node to the parallel. It is useful when you want to use a graph or a chain as a node in the parallel. eg.

graph, err := compose.NewChain[any,any]()

p.AddGraph("output_key01", graph)

func (*Parallel) AddIndexer

func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel

AddIndexer adds an indexer node to the parallel. eg.

indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{
	Collection: "my_collection",
})

p.AddIndexer("output_key01", indexer)

func (*Parallel) AddLambda

func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel

AddLambda adds a lambda node to the parallel. eg.

lambdaFunc := func(ctx context.Context, input *schema.Message) ([]*schema.Message, error) {
	return []*schema.Message{input}, nil
}

p.AddLambda("output_key01", compose.InvokeLambda(lambdaFunc))

func (*Parallel) AddLoader

func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel

AddLoader adds a loader node to the parallel. eg.

loader, err := file.NewLoader(ctx, &file.LoaderConfig{})

p.AddLoader("output_key01", loader)

func (*Parallel) AddPassthrough

func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel

AddPassthrough adds a passthrough node to the parallel. eg.

p.AddPassthrough("output_key01")

func (*Parallel) AddRetriever

func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel

AddRetriever adds a retriever node to the parallel. eg.

retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})

p.AddRetriever("output_key01", retriever)

func (*Parallel) AddToolsNode

func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel

AddToolsNode adds a tools node to the parallel. eg.

toolsNode, err := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{
	Tools: []tool.BaseTool{...},
})

p.AddToolsNode("output_key01", toolsNode)

type Runnable

type Runnable[I, O any] interface {
	Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
	Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
	Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
	Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}

Runnable is the interface for an executable object. Graph, Chain can be compiled into Runnable. runnable is the core conception of eino, we do downgrade compatibility for four data flow patterns, and can automatically connect components that only implement one or more methods. eg, if a component only implements Stream() method, you can still call Invoke() to convert stream output to invoke output.

type Serializer added in v0.3.50

type Serializer interface {
	Marshal(v any) ([]byte, error)
	Unmarshal(data []byte, v any) error
}

type StateModifier added in v0.3.18

type StateModifier func(ctx context.Context, path NodePath, state any) error

type StatePostHandler

type StatePostHandler[O, S any] func(ctx context.Context, out O, state S) (O, error)

StatePostHandler is a function called after the node is executed. Notice: if user called Stream but with StatePostHandler, the StatePostHandler will read all stream chunks and merge them into a single object.

type StatePreHandler

type StatePreHandler[I, S any] func(ctx context.Context, in I, state S) (I, error)

StatePreHandler is a function called before the node is executed. Notice: if user called Stream but with StatePreHandler, the StatePreHandler will read all stream chunks and merge them into a single object.

type Stream

type Stream[I, O, TOption any] func(ctx context.Context,
	input I, opts ...TOption) (output *schema.StreamReader[O], err error)

Stream is the type of the streamable lambda function.

type StreamGraphBranchCondition

type StreamGraphBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNode string, err error)

StreamGraphBranchCondition is the condition type for the stream branch.

type StreamGraphMultiBranchCondition added in v0.3.18

type StreamGraphMultiBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNodes map[string]bool, err error)

StreamGraphMultiBranchCondition is the condition type for the stream multi choice branch.

type StreamStatePostHandler

type StreamStatePostHandler[O, S any] func(ctx context.Context, out *schema.StreamReader[O], state S) (*schema.StreamReader[O], error)

StreamStatePostHandler is a function that is called after the node is executed with stream input and output.

type StreamStatePreHandler

type StreamStatePreHandler[I, S any] func(ctx context.Context, in *schema.StreamReader[I], state S) (*schema.StreamReader[I], error)

StreamStatePreHandler is a function that is called before the node is executed with stream input and output.

type StreamWOOpt

type StreamWOOpt[I, O any] func(ctx context.Context,
	input I) (output *schema.StreamReader[O], err error)

StreamWOOpt is the type of the streamable lambda function without options.

type ToolsInterruptAndRerunExtra added in v0.3.38

type ToolsInterruptAndRerunExtra struct {
	ToolCalls     []schema.ToolCall
	ExecutedTools map[string]string
	RerunTools    []string
	RerunExtraMap map[string]any
}

type ToolsNode

type ToolsNode struct {
	// contains filtered or unexported fields
}

ToolsNode represents a node capable of executing tools within a graph. The Graph Node interface is defined as follows:

Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)

Input: An AssistantMessage containing ToolCalls Output: An array of ToolMessage where the order of elements corresponds to the order of ToolCalls in the input

func NewToolNode

func NewToolNode(ctx context.Context, conf *ToolsNodeConfig) (*ToolsNode, error)

NewToolNode creates a new ToolsNode. e.g.

conf := &ToolsNodeConfig{
	Tools: []tool.BaseTool{invokableTool1, streamableTool2},
}
toolsNode, err := NewToolNode(ctx, conf)

func (*ToolsNode) GetType

func (tn *ToolsNode) GetType() string

func (*ToolsNode) Invoke

func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message,
	opts ...ToolsNodeOption) ([]*schema.Message, error)

Invoke calls the tools and collects the results of invokable tools. it's parallel if there are multiple tool calls in the input message.

func (*ToolsNode) Stream

func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message,
	opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)

Stream calls the tools and collects the results of stream readers. it's parallel if there are multiple tool calls in the input message.

type ToolsNodeConfig

type ToolsNodeConfig struct {
	// Tools specify the list of tools can be called which are BaseTool but must implement InvokableTool or StreamableTool.
	Tools []tool.BaseTool

	// UnknownToolsHandler handles tool calls for non-existent tools when LLM hallucinates.
	// This field is optional. When not set, calling a non-existent tool will result in an error.
	// When provided, if the LLM attempts to call a tool that doesn't exist in the Tools list,
	// this handler will be invoked instead of returning an error, allowing graceful handling of hallucinated tools.
	// Parameters:
	//   - ctx: The context for the tool call
	//   - name: The name of the non-existent tool
	//   - input: The tool call input generated by llm
	// Returns:
	//   - string: The response to be returned as if the tool was executed
	//   - error: Any error that occurred during handling
	UnknownToolsHandler func(ctx context.Context, name, input string) (string, error)

	// ExecuteSequentially determines whether tool calls should be executed sequentially (in order) or in parallel.
	// When set to true, tool calls will be executed one after another in the order they appear in the input message.
	// When set to false (default), tool calls will be executed in parallel.
	ExecuteSequentially bool

	// ToolArgumentsHandler allows handling of tool arguments before execution.
	// When provided, this function will be called for each tool call to process the arguments.
	// Parameters:
	//   - ctx: The context for the tool call
	//   - name: The name of the tool being called
	//   - arguments: The original arguments string for the tool
	// Returns:
	//   - string: The processed arguments string to be used for tool execution
	//   - error: Any error that occurred during preprocessing
	ToolArgumentsHandler func(ctx context.Context, name, arguments string) (string, error)
}

ToolsNodeConfig is the config for ToolsNode.

type ToolsNodeOption

type ToolsNodeOption func(o *toolsNodeOptions)

ToolsNodeOption is the option func type for ToolsNode.

func WithToolList added in v0.3.15

func WithToolList(tool ...tool.BaseTool) ToolsNodeOption

WithToolList sets the tool list for the ToolsNode.

func WithToolOption

func WithToolOption(opts ...tool.Option) ToolsNodeOption

WithToolOption adds tool options to the ToolsNode.

type Transform

type Transform[I, O, TOption any] func(ctx context.Context,
	input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)

Transform is the type of the transformable lambda function.

type TransformWOOpts

type TransformWOOpts[I, O any] func(ctx context.Context,
	input *schema.StreamReader[I]) (output *schema.StreamReader[O], err error)

TransformWOOpts is the type of the transformable lambda function without options.

type Workflow added in v0.3.8

type Workflow[I, O any] struct {
	// contains filtered or unexported fields
}

Workflow is wrapper of graph, replacing AddEdge with declaring dependencies and field mappings between nodes. Under the hood it uses NodeTriggerMode(AllPredecessor), so does not support cycles.

func NewWorkflow added in v0.3.8

func NewWorkflow[I, O any](opts ...NewGraphOption) *Workflow[I, O]

NewWorkflow creates a new Workflow.

func (*Workflow[I, O]) AddBranch added in v0.3.18

func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch

AddBranch adds a branch to the workflow.

End Nodes Field Mappings: End nodes of the branch are required to define their own field mappings. This is a key distinction between Graph's Branch and Workflow's Branch: - Graph's Branch: Automatically passes its input to the selected node. - Workflow's Branch: Does not pass its input to the selected node.

func (*Workflow[I, O]) AddChatModelNode added in v0.3.8

func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddChatTemplateNode added in v0.3.8

func (wf *Workflow[I, O]) AddChatTemplateNode(key string, chatTemplate prompt.ChatTemplate, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddDocumentTransformerNode added in v0.3.8

func (wf *Workflow[I, O]) AddDocumentTransformerNode(key string, transformer document.Transformer, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddEmbeddingNode added in v0.3.8

func (wf *Workflow[I, O]) AddEmbeddingNode(key string, embedding embedding.Embedder, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddEnd deprecated added in v0.3.8

func (wf *Workflow[I, O]) AddEnd(fromNodeKey string, inputs ...*FieldMapping) *Workflow[I, O]

Deprecated: use *Workflow[I,O].End() to obtain a WorkflowNode instance for END, then work with it just like a normal WorkflowNode.

func (*Workflow[I, O]) AddGraphNode added in v0.3.8

func (wf *Workflow[I, O]) AddGraphNode(key string, graph AnyGraph, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddIndexerNode added in v0.3.8

func (wf *Workflow[I, O]) AddIndexerNode(key string, indexer indexer.Indexer, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddLambdaNode added in v0.3.8

func (wf *Workflow[I, O]) AddLambdaNode(key string, lambda *Lambda, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddLoaderNode added in v0.3.8

func (wf *Workflow[I, O]) AddLoaderNode(key string, loader document.Loader, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddPassthroughNode added in v0.3.18

func (wf *Workflow[I, O]) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddRetrieverNode added in v0.3.8

func (wf *Workflow[I, O]) AddRetrieverNode(key string, retriever retriever.Retriever, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) AddToolsNode added in v0.3.8

func (wf *Workflow[I, O]) AddToolsNode(key string, tools *ToolsNode, opts ...GraphAddNodeOpt) *WorkflowNode

func (*Workflow[I, O]) Compile added in v0.3.8

func (wf *Workflow[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)

func (*Workflow[I, O]) End added in v0.3.18

func (wf *Workflow[I, O]) End() *WorkflowNode

End returns the WorkflowNode representing END node.

type WorkflowAddInputOpt added in v0.3.18

type WorkflowAddInputOpt func(*workflowAddInputOpts)

func WithNoDirectDependency added in v0.3.18

func WithNoDirectDependency() WorkflowAddInputOpt

WithNoDirectDependency creates a data mapping without establishing a direct execution dependency. The predecessor node will still complete before the current node executes, but through indirect execution paths rather than a direct dependency.

In a workflow graph, node dependencies typically serve two purposes: 1. Execution order: determining when nodes should run 2. Data flow: specifying how data passes between nodes

This option separates these concerns by:

  • Creating data mapping from the predecessor to the current node
  • Relying on the predecessor's path to reach the current node through other nodes that have direct execution dependencies

Example:

node.AddInputWithOptions("dataNode", mappings, WithNoDirectDependency())

Important:

  1. Branch scenarios: When connecting nodes on different sides of a branch, WithNoDirectDependency MUST be used to let the branch itself handle the execution order, preventing incorrect dependencies that could bypass the branch.

  2. Execution guarantee: The predecessor will still complete before the current node executes because the predecessor must have a path (through other nodes) that eventually reaches the current node.

  3. Graph validity: There MUST be a path from the predecessor that eventually reaches the current node through other nodes with direct dependencies. This ensures the execution order while avoiding redundant direct dependencies.

Common use cases: - Cross-branch data access where the branch handles execution order - Avoiding redundant dependencies when a path already exists

type WorkflowBranch added in v0.3.18

type WorkflowBranch struct {
	*GraphBranch
	// contains filtered or unexported fields
}

type WorkflowNode added in v0.3.8

type WorkflowNode struct {
	// contains filtered or unexported fields
}

WorkflowNode is the node of the Workflow.

func (*WorkflowNode) AddDependency added in v0.3.18

func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode

AddDependency creates an execution-only dependency between nodes. The current node will wait for the predecessor node to complete before executing, but no data will be passed between them.

Parameters:

  • fromNodeKey: the key of the predecessor node that must complete before this node starts

Example:

// Wait for "setupNode" to complete before executing
node.AddDependency("setupNode")

This is useful when: - You need to ensure execution order without data transfer - The predecessor performs setup or initialization that must complete first - You want to explicitly separate execution dependencies from data flow

Returns the current node for method chaining.

func (*WorkflowNode) AddInput added in v0.3.8

func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode

AddInput creates both data and execution dependencies between nodes. It configures how data flows from the predecessor node (fromNodeKey) to the current node, and ensures the current node only executes after the predecessor completes.

Parameters:

  • fromNodeKey: the key of the predecessor node
  • inputs: field mappings that specify how data should flow from the predecessor to the current node. If no mappings are provided, the entire output of the predecessor will be used as input.

Example:

// Map between specific field
node.AddInput("userNode", MapFields("user.name", "displayName"))

// Use entire output
node.AddInput("dataNode")

Returns the current node for method chaining.

func (*WorkflowNode) AddInputWithOptions added in v0.3.18

func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode

AddInputWithOptions creates a dependency between nodes with custom configuration options. It allows fine-grained control over both data flow and execution dependencies.

Parameters:

  • fromNodeKey: the key of the predecessor node
  • inputs: field mappings that specify how data flows from the predecessor to the current node. If no mappings are provided, the entire output of the predecessor will be used as input.
  • opts: configuration options that control how the dependency is established

Example:

// Create data mapping without direct execution dependency
node.AddInputWithOptions("dataNode", mappings, WithNoDirectDependency())

Returns the current node for method chaining.

func (*WorkflowNode) SetStaticValue added in v0.3.18

func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode

SetStaticValue sets a static value for a field path that will be available during workflow execution. These values are determined at compile time and remain constant throughout the workflow's lifecycle.

Example:

node.SetStaticValue(FieldPath{"query"}, "static query")

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL