Documentation
¶
Index ¶
- Variables
- type APIConnection
- func (ac *APIConnection) Authenticate() (err error)
- func (ac *APIConnection) Close() error
- func (ac *APIConnection) CloseAllQueues()
- func (ac *APIConnection) EnsureAuthenticated() error
- func (ac *APIConnection) GetQueue(name string) (*iop.Queue, bool)
- func (ac *APIConnection) GetSyncedState(endpointName string) (data map[string]map[string]any, err error)
- func (ac *APIConnection) IsAuthExpired() bool
- func (ac *APIConnection) ListEndpoints(patterns ...string) (endpoints Endpoints, err error)
- func (ac *APIConnection) MakeDynamicEndpointIterator(iter *Iterate) (err error)
- func (ac *APIConnection) PutSyncedState(endpointName string, data map[string]map[string]any) (err error)
- func (ac *APIConnection) ReadDataflow(endpointName string, sCfg APIStreamConfig) (df *iop.Dataflow, err error)
- func (ac *APIConnection) RegisterQueue(name string) (*iop.Queue, error)
- func (ac *APIConnection) RemoveQueue(name string) error
- func (ac *APIConnection) RenderDynamicEndpoints() (err error)
- type APIState
- type APIStateAuth
- type APIStreamConfig
- type AggregateState
- type AggregationType
- type AuthType
- type Authentication
- type BackoffType
- type Call
- type DynamicEndpoint
- type DynamicEndpoints
- type Endpoint
- type EndpointMap
- type Endpoints
- type HTTPMethod
- type Iterate
- type Iteration
- type OAuthFlow
- type Pagination
- type Processor
- type Records
- type Request
- type RequestState
- type Response
- type ResponseState
- type Rule
- type RuleType
- type Sequence
- type SingleRequest
- type Spec
- type StateMap
Constants ¶
This section is empty.
Variables ¶
var AggregationTypes = []AggregationType{ AggregationTypeNone, AggregationTypeMaximum, AggregationTypeMinimum, AggregationTypeFlatten, AggregationTypeFirst, AggregationTypeLast, }
Functions ¶
This section is empty.
Types ¶
type APIConnection ¶
type APIConnection struct {
Spec Spec
State *APIState
Context *g.Context
// contains filtered or unexported fields
}
func NewAPIConnection ¶
func NewAPIConnection(ctx context.Context, spec Spec, data map[string]any) (ac *APIConnection, err error)
NewAPIConnection creates an
func (*APIConnection) Authenticate ¶
func (ac *APIConnection) Authenticate() (err error)
Authenticate performs the auth workflow if needed. Like a Connect step. Header based auths (such as Basic, or Bearer) don't need this step save payload in APIState.Auth
func (*APIConnection) Close ¶
func (ac *APIConnection) Close() error
Close performs cleanup of all resources
func (*APIConnection) CloseAllQueues ¶
func (ac *APIConnection) CloseAllQueues()
CloseAllQueues closes all queues associated with this connection
func (*APIConnection) EnsureAuthenticated ¶ added in v1.4.14
func (ac *APIConnection) EnsureAuthenticated() error
EnsureAuthenticated checks if authentication is valid and re-authenticates if needed This method ensures thread-safe authentication checks and re-authentication
func (*APIConnection) GetQueue ¶
func (ac *APIConnection) GetQueue(name string) (*iop.Queue, bool)
GetQueue retrieves a queue by name
func (*APIConnection) GetSyncedState ¶
func (ac *APIConnection) GetSyncedState(endpointName string) (data map[string]map[string]any, err error)
GetSyncedState cycles through each endpoint, and collects the values for each of the Endpoint.Sync values. Output is a map[Endpoint.Name]map[Sync.value] = Endpoint.syncMap[Sync.value]
func (*APIConnection) IsAuthExpired ¶ added in v1.4.14
func (ac *APIConnection) IsAuthExpired() bool
IsAuthExpired checks if the authentication has expired
func (*APIConnection) ListEndpoints ¶
func (ac *APIConnection) ListEndpoints(patterns ...string) (endpoints Endpoints, err error)
func (*APIConnection) MakeDynamicEndpointIterator ¶
func (ac *APIConnection) MakeDynamicEndpointIterator(iter *Iterate) (err error)
func (*APIConnection) PutSyncedState ¶
func (ac *APIConnection) PutSyncedState(endpointName string, data map[string]map[string]any) (err error)
PutSyncedState restores the state from previous run in each endpoint using the Endpoint.Sync values. Inputs is map[Endpoint.Name]map[Sync.value] = Endpoint.syncMap[Sync.value]
func (*APIConnection) ReadDataflow ¶
func (ac *APIConnection) ReadDataflow(endpointName string, sCfg APIStreamConfig) (df *iop.Dataflow, err error)
func (*APIConnection) RegisterQueue ¶
func (ac *APIConnection) RegisterQueue(name string) (*iop.Queue, error)
RegisterQueue creates a new queue with the given name If a queue with the same name already exists, it is returned
func (*APIConnection) RemoveQueue ¶
func (ac *APIConnection) RemoveQueue(name string) error
RemoveQueue closes and removes a queue
func (*APIConnection) RenderDynamicEndpoints ¶ added in v1.4.14
func (ac *APIConnection) RenderDynamicEndpoints() (err error)
RenderDynamicEndpoints will render the dynamic objects basically mutating the spec endpoints. Needs to authenticate first
type APIStateAuth ¶
type APIStateAuth struct {
Authenticated bool `json:"authenticated,omitempty"`
Token string `json:"token,omitempty"` // refresh token?
Headers map[string]string `json:"-"` // to inject
ExpiresAt int64 `json:"expires_at,omitempty"` // Unix timestamp when auth expires
Sign func(context.Context, *http.Request, []byte) error `json:"-"` // for AWS Sigv4
Mutex *sync.Mutex `json:"-" yaml:"-"` // Mutex for auth operations
}
type APIStreamConfig ¶
type AggregateState ¶
type AggregateState struct {
// contains filtered or unexported fields
}
AggregateState stores aggregated values during response processing
type AggregationType ¶
type AggregationType string
const ( AggregationTypeNone AggregationType = "" // No aggregation, apply transformation at record level AggregationTypeMaximum AggregationType = "maximum" // Keep the maximum value across records AggregationTypeMinimum AggregationType = "minimum" // Keep the minimum value across records AggregationTypeFlatten AggregationType = "flatten" // Collect all values into an array AggregationTypeFirst AggregationType = "first" // Keep only the first encountered value AggregationTypeLast AggregationType = "last" // Keep only the last encountered value )
type Authentication ¶
type Authentication struct {
Type AuthType `yaml:"type" json:"type"`
// when set, re-auth after number of seconds
Expires int `yaml:"expires" json:"expires,omitempty"`
// custom authentication workflow
Sequence Sequence `yaml:"sequence" json:"sequence,omitempty"`
// Basic Auth
Username string `yaml:"username,omitempty" json:"username,omitempty"`
Password string `yaml:"password,omitempty" json:"password,omitempty"`
// OAuth
Flow OAuthFlow `yaml:"flow,omitempty" json:"flow,omitempty"`
AuthenticationURL string `yaml:"authentication_url,omitempty" json:"authentication_url,omitempty"`
ClientID string `yaml:"client_id,omitempty" json:"client_id,omitempty"`
ClientSecret string `yaml:"client_secret,omitempty" json:"client_secret,omitempty"`
Token string `yaml:"token,omitempty" json:"token,omitempty"`
Scopes []string `yaml:"scopes,omitempty" json:"scopes,omitempty"`
RedirectURI string `yaml:"redirect_uri,omitempty" json:"redirect_uri,omitempty"`
RefreshToken string `yaml:"refresh_token,omitempty" json:"refresh_token,omitempty"`
RefreshOnExpire bool `yaml:"refresh_on_expire,omitempty" json:"refresh_on_expire,omitempty"`
// AWS
AwsService string `yaml:"aws_service,omitempty" json:"aws_service,omitempty"`
AwsAccessKeyID string `yaml:"aws_access_key_id,omitempty" json:"aws_access_key_id,omitempty"`
AwsSecretAccessKey string `yaml:"aws_secret_access_key,omitempty" json:"aws_secret_access_key,omitempty"`
AwsSessionToken string `yaml:"aws_session_token,omitempty" json:"aws_session_token,omitempty"`
AwsRegion string `yaml:"aws_region,omitempty" json:"aws_region,omitempty"`
AwsProfile string `yaml:"aws_profile,omitempty" json:"aws_profile,omitempty"`
}
Authentication defines how to authenticate with the API
type BackoffType ¶
type BackoffType string
const ( BackoffTypeNone BackoffType = "" // No delay between retries BackoffTypeConstant BackoffType = "constant" // Fixed delay between retries BackoffTypeLinear BackoffType = "linear" // Delay increases linearly with each attempt BackoffTypeExponential BackoffType = "exponential" // Delay increases exponentially (common pattern) BackoffTypeJitter BackoffType = "jitter" // Exponential backoff with randomization to avoid thundering herd )
type Call ¶
type Call struct {
If string `yaml:"if" json:"if"`
Request Request `yaml:"request" json:"request"`
Pagination Pagination `yaml:"pagination" json:"pagination"`
Response Response `yaml:"response" json:"response"`
}
type DynamicEndpoint ¶ added in v1.4.14
type DynamicEndpoints ¶ added in v1.4.14
type DynamicEndpoints []DynamicEndpoint
type Endpoint ¶
type Endpoint struct {
Name string `yaml:"name" json:"name"`
Description string `yaml:"description" json:"description,omitempty"`
Docs string `yaml:"docs" json:"docs,omitempty"`
Disabled bool `yaml:"disabled" json:"disabled"`
State StateMap `yaml:"state" json:"state"`
Sync []string `yaml:"sync" json:"sync,omitempty"`
Request Request `yaml:"request" json:"request"`
Pagination Pagination `yaml:"pagination" json:"pagination"`
Response Response `yaml:"response" json:"response"`
Iterate Iterate `yaml:"iterate" json:"iterate,omitempty"` // state expression to use to loop
Setup Sequence `yaml:"setup" json:"setup,omitempty"`
Teardown Sequence `yaml:"teardown" json:"teardown,omitempty"`
// contains filtered or unexported fields
}
Endpoint is the top-level configuration structure
func (*Endpoint) SetStateVal ¶
type HTTPMethod ¶
type HTTPMethod string
const ( MethodGet HTTPMethod = "GET" MethodHead HTTPMethod = "HEAD" MethodPost HTTPMethod = "POST" MethodPut HTTPMethod = "PUT" MethodPatch HTTPMethod = "PATCH" MethodDelete HTTPMethod = "DELETE" MethodConnect HTTPMethod = "CONNECT" MethodOptions HTTPMethod = "OPTIONS" MethodTrace HTTPMethod = "TRACE" )
type Iterate ¶
type Iterate struct {
Over any `yaml:"over" json:"iterate,omitempty"` // expression
Into string `yaml:"into" json:"into,omitempty"` // state variable
If string `yaml:"id" json:"id,omitempty"` // if we should iterate
Concurrency int `yaml:"concurrency" json:"concurrency,omitempty"`
// contains filtered or unexported fields
}
Iterate is for configuring looping values for requests
type Iteration ¶
type Iteration struct {
// contains filtered or unexported fields
}
func (*Iteration) DetermineStateRenderOrder ¶ added in v1.4.14
type Pagination ¶
type Pagination struct {
NextState map[string]any `yaml:"next_state" json:"next_state,omitempty"`
StopCondition string `yaml:"stop_condition" json:"stop_condition,omitempty"`
}
Pagination configures how to navigate through multiple pages of API results
type Processor ¶
type Processor struct {
Aggregation AggregationType `yaml:"aggregation" json:"aggregation"`
Expression string `yaml:"expression" json:"expression"`
Output string `yaml:"output" json:"output"`
}
Processor represents a way to process data without aggregation, represents a transformation applied at record level with aggregation to reduce/aggregate record data, and save into the state
type Records ¶
type Records struct {
JmesPath string `yaml:"jmespath" json:"jmespath,omitempty"` // for json or xml
PrimaryKey []string `yaml:"primary_key" json:"primary_key,omitempty"`
UpdateKey string `yaml:"update_key" json:"update_key,omitempty"`
Limit int `yaml:"limit" json:"limit,omitempty"` // to limit the records, useful for testing
DuplicateTolerance string `yaml:"duplicate_tolerance" json:"duplicate_tolerance,omitempty"`
}
Records configures how to extract and process data records from a response
type Request ¶
type Request struct {
URL string `yaml:"url" json:"url,omitempty"`
Timeout int `yaml:"timeout" json:"timeout,omitempty"`
Method HTTPMethod `yaml:"method" json:"method,omitempty"`
Headers map[string]any `yaml:"headers" json:"headers,omitempty"`
Parameters map[string]any `yaml:"parameters" json:"parameters,omitempty"`
Payload any `yaml:"payload" json:"payload,omitempty"`
Rate float64 `yaml:"rate" json:"rate,omitempty"` // maximum request per second
Concurrency int `yaml:"concurrency" json:"concurrency,omitempty"` // maximum concurrent requests
}
Request defines how to construct an HTTP request to the API
type RequestState ¶
type RequestState struct {
Method string `yaml:"method" json:"method"`
URL string `yaml:"url" json:"url"`
Headers map[string]any `yaml:"headers" json:"headers"`
Payload any `yaml:"payload" json:"payload"`
Attempts int `yaml:"attempts" json:"attempts"`
}
RequestState captures the state of the HTTP request for reference and debugging
type Response ¶
type Response struct {
Format dbio.FileType `yaml:"format" json:"format,omitempty"` // force response format
Records Records `yaml:"records" json:"records"`
Processors []Processor `yaml:"processors" json:"processors,omitempty"`
Rules []Rule `yaml:"rules" json:"rules,omitempty"`
}
Response defines how to process the API response and extract records
type ResponseState ¶
type ResponseState struct {
Status int `yaml:"status" json:"status"`
Headers map[string]any `yaml:"headers" json:"headers"`
Text string `yaml:"text" json:"text"`
JSON any `yaml:"json" json:"json"`
Records []any `yaml:"records" json:"records"`
}
ResponseState captures the state of the HTTP response for reference and debugging
type Rule ¶
type Rule struct {
Action RuleType `yaml:"action" json:"action"`
Condition string `yaml:"condition" json:"condition"` // an expression
MaxAttempts int `yaml:"max_attempts" json:"max_attempts"`
Backoff BackoffType `yaml:"backoff" json:"backoff"`
BackoffBase int `yaml:"backoff_base" json:"backoff_base"` // base duration, number of seconds. default is 1
Message string `yaml:"message" json:"message"`
}
Rule represents a response rule
type RuleType ¶
type RuleType string
const ( RuleTypeRetry RuleType = "retry" // Retry the request up to MaxAttempts times RuleTypeContinue RuleType = "continue" // Continue processing responses and rules RuleTypeStop RuleType = "stop" // Stop processing requests for this endpoint RuleTypeFail RuleType = "fail" // Stop processing and return an error )
type Sequence ¶ added in v1.4.14
type Sequence []Call
Sequence is many calls (perfect for async jobs, custom auth)
type SingleRequest ¶
type SingleRequest struct {
Request *RequestState `yaml:"request" json:"request"`
Response *ResponseState `yaml:"response" json:"response"`
Aggregate AggregateState `yaml:"-" json:"-"`
// contains filtered or unexported fields
}
SingleRequest represents a single HTTP request/response cycle
func NewSingleRequest ¶
func NewSingleRequest(iter *Iteration) *SingleRequest
func (*SingleRequest) Debug ¶
func (lrs *SingleRequest) Debug(text string, args ...any)
func (*SingleRequest) Map ¶
func (lrs *SingleRequest) Map() map[string]any
func (*SingleRequest) Records ¶
func (lrs *SingleRequest) Records() []any
func (*SingleRequest) Trace ¶ added in v1.4.6
func (lrs *SingleRequest) Trace(text string, args ...any)
type Spec ¶
type Spec struct {
Name string `yaml:"name" json:"name"`
Description string `yaml:"description" json:"description"`
Queues []string `yaml:"queues" json:"queues"`
Defaults Endpoint `yaml:"defaults" json:"defaults"`
Authentication Authentication `yaml:"authentication" json:"authentication"`
EndpointMap EndpointMap `yaml:"endpoints" json:"endpoints"`
DynamicEndpoints DynamicEndpoints `yaml:"dynamic_endpoints" json:"dynamic_endpoints"`
// contains filtered or unexported fields
}
Spec defines the complete API specification with endpoints and authentication