Documentation
¶
Index ¶
- func GetNodeID(ctx context.Context) (string, bool)
- func Interrupt(ctx context.Context, message string, reason string, values Requirements) (map[string]string, error)
- func InterruptWithValidation(ctx context.Context, message string, reason string, values Requirements, ...) (map[string]string, error)
- func LoadInterrupt(ctx context.Context, interrupt HITLInterrupt, values map[string]string) context.Context
- func LoadNodeID(ctx context.Context, nodeID string) context.Context
- type CheckpointState
- type ConditionalEdge
- type ConditionalFunction
- type ConditionalNode
- type ConitionalInterrupt
- type ConstEdge
- type EdgeResolver
- type Edger
- type ErrRequirementInvalidValue
- type ErrRequirmentKeyNotFound
- type ExecutionID
- type ExecutionState
- type Flow
- func (f *Flow[T]) Execute(ctx context.Context, state T) (T, error)
- func (f *Flow[T]) Name() string
- func (f *Flow[T]) OnGraphEnd(cb FlowCallback[T]) *Flow[T]
- func (f *Flow[T]) OnInterrupt(cb FlowCallback[T]) *Flow[T]
- func (f *Flow[T]) OnNodeExec(cb FlowCallback[T]) *Flow[T]
- func (f *Flow[T]) WithCheckpoint(cp CheckpointState) *Flow[T]
- type FlowCallback
- type FunctionNode
- type Graph
- type GraphBuilder
- func (gb *GraphBuilder[T]) AddConditionalEdge(start string, end ConditionalNode[T], redirections map[string]string) *GraphBuilder[T]
- func (gb *GraphBuilder[T]) AddEdge(start, end string) *GraphBuilder[T]
- func (gb *GraphBuilder[T]) AddNode(name string, node Node[T]) *GraphBuilder[T]
- func (gb *GraphBuilder[T]) AddNodes(nodes map[string]Node[T]) *GraphBuilder[T]
- func (gb *GraphBuilder[T]) Build() (Graph[T], error)
- func (gb *GraphBuilder[T]) SetStartNode(start string) *GraphBuilder[T]
- type HITLInterrupt
- type InMemoryStore
- type InterruptID
- type Node
- type Pipe
- type Requirement
- type RequirementTypes
- type Requirements
- type ResolvedHITLInterrupt
- type ResumeConfig
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Interrupt ¶
func Interrupt( ctx context.Context, message string, reason string, values Requirements, ) (map[string]string, error)
Interrupt is a helper function which calls InterruptWithValidation with a no validation.
func InterruptWithValidation ¶
func InterruptWithValidation( ctx context.Context, message string, reason string, values Requirements, fn func(map[string]string) error, ) (map[string]string, error)
InterruptWithValidation is used to test and return if the execution context contains any resolved interrupts or return a newly created interrupt. This also validates the resolved values passed for the interrupt in the context.
Usage: When a node calls this function for the first time (read: no HITL is present for a graph nodeID) a new interrupt is created and returned as a error. When the smae node calls this function again (read: a resolved HITL interrupt is possibly stored in the context) the previously created HITL interrupt is found with resolved answer from the user.
Validation: When the resolved interrupt is found, the values sumbitted by the user is passed through the validation function. Any error returned will be bubbled up as a HITL Interrupt with a validation error attached to it.
func LoadInterrupt ¶
func LoadInterrupt(ctx context.Context, interrupt HITLInterrupt, values map[string]string) context.Context
LoadInterrupt is used to load the context with a resolved interrupt (original HITLInterrupt and answer values).
Types ¶
type CheckpointState ¶
type CheckpointState struct {
// CheckpointID is the name of the graph node which will be picked up next when
// the flow is executed.
CheckpointID string `json:"checkpoint_id"`
// Visited stores all the visited graph node (node IDs).
Visited []string `json:"visited"`
// Interrupt stores the Human in the loop interrupt when any node return a HITLInterrupt error.
Interrupt HITLInterrupt `json:"interrupt"`
// InterruptHistory stores all the resolved HITL interrupts.
InterruptHistory []ResolvedHITLInterrupt `json:"interrupt_history"`
}
CheckpointState stores flow execution state which will be used to resume when the execution is interrupted.
type ConditionalEdge ¶
type ConditionalEdge[T any] struct { // contains filtered or unexported fields }
ConditionalEdge is used to redirect to different branches based on the value returned by the ConditionalNode.
func (ConditionalEdge[T]) Resolve ¶
func (ce ConditionalEdge[T]) Resolve(ctx context.Context, state T) string
Resolve implements the EdgeResolver interface for ConditionalEdge.
type ConditionalFunction ¶
ConditionalFunction is a function type which implements the ConditionalNode interface. This is used when no pre-defined state is necessary for the conditional node.
func (ConditionalFunction[T]) Execute ¶
func (fn ConditionalFunction[T]) Execute(ctx context.Context, state T) string
Execute implements the ConditionalNode interface for FunctionNode
type ConditionalNode ¶
ConditionalNode is a node type which inspects the state to decide the next Node.
type ConitionalInterrupt ¶
type ConitionalInterrupt struct {
Value string
}
ConditionalInterrupt is used to direct the execution of a flow using a alias value. This value will then be used to choose the next edge of the graph.
func (ConitionalInterrupt) Error ¶
func (ci ConitionalInterrupt) Error() string
Error implements the error interface for the conditional interrupt.
type ConstEdge ¶
ConstEdge is simple implementation of the EdgeResolver which returns a constant next node id no matter what the current the state.
type EdgeResolver ¶
EdgeResolver interface defines the edge routing configuration which returns the next node id for the passed state.
type Edger ¶
type Edger interface {
// contains filtered or unexported methods
}
Edger returns the static list of possible target node names for an edge.
type ErrRequirementInvalidValue ¶
func RequirementInvalid ¶
func RequirementInvalid(key string, value string, suggestions []string) ErrRequirementInvalidValue
func (ErrRequirementInvalidValue) Error ¶
func (iv ErrRequirementInvalidValue) Error() string
type ErrRequirmentKeyNotFound ¶
type ErrRequirmentKeyNotFound string
func RequirementKeyNotFound ¶
func RequirementKeyNotFound(key string) ErrRequirmentKeyNotFound
func (ErrRequirmentKeyNotFound) Error ¶
func (k ErrRequirmentKeyNotFound) Error() string
type ExecutionID ¶
ExecutionID is a compound ID of a unique ID passed by the modules callers and name of the flow that is being executed.
type ExecutionState ¶
type ExecutionState[T any] struct { CheckpointState CheckpointState `json:"checkpoint_state"` ApplicationState T `json:"application_state"` }
ExecutionState stores the execution state CheckpointState and app state between executions.
type Flow ¶
type Flow[T any] struct { // contains filtered or unexported fields }
Flow is a construct used start or resume execution of a graph with the passed initial app and checkpoint state.
func (*Flow[T]) Execute ¶
Execute executes the graph with provided initial state and resumes based on the passed checkpoint state configuration.
func (*Flow[T]) OnGraphEnd ¶
func (f *Flow[T]) OnGraphEnd(cb FlowCallback[T]) *Flow[T]
OnGraphEnd sets the callback function to be called after the graph execution is completed.
func (*Flow[T]) OnInterrupt ¶
func (f *Flow[T]) OnInterrupt(cb FlowCallback[T]) *Flow[T]
OnInterrupt sets the callback function to be called when the flow is interrupted.
func (*Flow[T]) OnNodeExec ¶
func (f *Flow[T]) OnNodeExec(cb FlowCallback[T]) *Flow[T]
OnNodeExec sets the callback function to be called after a node is executed.
func (*Flow[T]) WithCheckpoint ¶
func (f *Flow[T]) WithCheckpoint(cp CheckpointState) *Flow[T]
WithCheckpoint is used to set the checkpoint state for this flow execution.
type FlowCallback ¶
type FlowCallback[T any] func(cs CheckpointState, runState T) error
FlowCallback is a helper type which will be called during flow execution during different steps.
func (FlowCallback[T]) Call ¶
func (fc FlowCallback[T]) Call(cs CheckpointState, runState T) error
Call is a helper method which checks if the flow function passed is not nil and calls the same.
type FunctionNode ¶
FunctionNode is a function type which implements the Node interface. This is useful when the [Node]s don't require any preloaded state.
type Graph ¶
type Graph[T any] struct { // contains filtered or unexported fields }
Graph stores the graph nodes and edge configuration.
type GraphBuilder ¶
type GraphBuilder[T any] struct { // contains filtered or unexported fields }
GraphBuilder is a helper type which contains methods to build a graph.
func NewGraphBuilder ¶
func NewGraphBuilder[T any]() *GraphBuilder[T]
NewGraphBuilder returns a new GraphBuilder. Chain this return with other methods to GraphBuilder.Build a graph.
func (*GraphBuilder[T]) AddConditionalEdge ¶
func (gb *GraphBuilder[T]) AddConditionalEdge(start string, end ConditionalNode[T], redirections map[string]string) *GraphBuilder[T]
AddEdge adds a single edge relation with a conditional redirection.
func (*GraphBuilder[T]) AddEdge ¶
func (gb *GraphBuilder[T]) AddEdge(start, end string) *GraphBuilder[T]
AddEdge adds a single edge relation.
func (*GraphBuilder[T]) AddNode ¶
func (gb *GraphBuilder[T]) AddNode(name string, node Node[T]) *GraphBuilder[T]
AddNode adds a node to the graph.
func (*GraphBuilder[T]) AddNodes ¶
func (gb *GraphBuilder[T]) AddNodes(nodes map[string]Node[T]) *GraphBuilder[T]
AddNodes adds multiple named nodes to the graph.
func (*GraphBuilder[T]) Build ¶
func (gb *GraphBuilder[T]) Build() (Graph[T], error)
Build checks for the validity of the graph and returns the graph.
func (*GraphBuilder[T]) SetStartNode ¶
func (gb *GraphBuilder[T]) SetStartNode(start string) *GraphBuilder[T]
SetStartNode sets the start node of the graph.
type HITLInterrupt ¶
type HITLInterrupt struct {
Reason string `json:"reason"`
Message string `json:"message"`
ValidationError error `json:"validation_error"`
Requirements Requirements `json:"requirements"`
InterruptID InterruptID `json:"interrupt_id"`
}
HITLInterrupt is used to return a invoke a human in the loop routine as a part of the flow.
func (HITLInterrupt) Error ¶
func (it HITLInterrupt) Error() string
Error implements the error interface for the task interrupt.
type InMemoryStore ¶
type InMemoryStore[T any] struct { // contains filtered or unexported fields }
InMemoryStore implements the Store interface to store the checkpointing data in a in-memory map.
func NewInMemoryStore ¶
func NewInMemoryStore[T any]() *InMemoryStore[T]
NewInMemoryStore create a new InMemoryStore.
func (*InMemoryStore[T]) Get ¶
func (s *InMemoryStore[T]) Get(ctx context.Context, id ExecutionID) (ExecutionState[T], error)
Get implements the [Store.Get] method of the Store interface.
func (*InMemoryStore[T]) Set ¶
func (s *InMemoryStore[T]) Set(ctx context.Context, id ExecutionID, state ExecutionState[T]) error
Set implements the [Store.Set] method of the Store interface.
type InterruptID ¶
InterruptID is used to identify the interrupt against the Node which threw the interrupt.
func (InterruptID) String ¶
func (i InterruptID) String() string
String returns the string representation of the interrupt.
type Node ¶
type Node[T any] interface { // Execute runs the node logic with the passed in state. Execute(ctx context.Context, state T) (T, error) }
Node represents any node of the execution graph.
type Pipe ¶
type Pipe[T any] struct { // contains filtered or unexported fields }
Pipe is a graph execution supervisor which loads the necessary data from the checkpoint store and validates any HITL responses during resumption, after which it executes the flow with the right context.
func NewPipe ¶
NewPipe creates a new Pipe state for the passed flow name, graph and store implementation.
type Requirement ¶
type Requirement struct {
// Defines the type of requirmenet.
Type RequirementTypes `json:"type"`
// Suggestions for the values of the requirement.
Suggestions []string `json:"suggestions"`
}
Requirement defines the constaints for the interrupt requirements.
type RequirementTypes ¶
type RequirementTypes string
RequirementTypes defines the type of the requirement.
const ( // Enum requirements can only choose values from the provided suggestions. Enum RequirementTypes = "enum" // Custom requirements can input any text as the interrupt value. Custom RequirementTypes = "custom" // CustomWithSuggestions requirements can input any with a given suggestions as hints. CustomWithSuggestions RequirementTypes = "custom_with_suggestions" )
type Requirements ¶
type Requirements map[string]Requirement
Requirements is hash map of all the requirements which needs input from the user.
type ResolvedHITLInterrupt ¶
type ResolvedHITLInterrupt struct {
HITLInterrupt
Values map[string]string
}
ResolvedHITLInterrupt contains the original HITL interrupt and the answer values submitted by the user.
type ResumeConfig ¶
type ResumeConfig struct {
// InterruptValues stores answer values provided during the HITL interration
// for the HITL Interrupt in the previous execution step.
InterruptValues map[string]string
}
ResumeConfig defines the values required for resuming the flow execution.
type Store ¶
type Store[T any] interface { Get(ctx context.Context, id ExecutionID) (ExecutionState[T], error) Set(ctx context.Context, id ExecutionID, state ExecutionState[T]) error }
Store interface defines all the necessary functions used to store the execution and application state.