Documentation ¶
Overview ¶
Package pipeline provides a client for the Pipeline API. The client provides allows you to both send messages to Pipeline and consume a stream of messages from it. Moreover, when consuming messages, it allows you to manually synchronize the last read position in a stream of messages.
Example (Authentication) ¶
package main import ( "context" "fmt" "github.com/adobe/ims-go/ims" "github.com/adobe/pipeline-go/pipeline" "log" "os" ) func main() { var ( imsURL = os.Getenv("IMS_URL") imsCode = os.Getenv("IMS_CODE") imsClientID = os.Getenv("IMS_CLIENT_ID") imsClientSecret = os.Getenv("IMS_CLIENT_SECRET") pipelineURL = os.Getenv("PIPELINE_URL") pipelineGroup = os.Getenv("PIPELINE_URL") ) // Create an IMS client. imsClient, err := ims.NewClient(&ims.ClientConfig{ URL: imsURL, }) if err != nil { log.Fatalf("error: create IMS client: %v", err) } // Create a TokenGetter based on the IMS client. tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) { res, err := imsClient.Token(&ims.TokenRequest{ Code: imsCode, ClientID: imsClientID, ClientSecret: imsClientSecret, }) if err != nil { return "", fmt.Errorf("read token: %v", err) } return res.AccessToken, nil }) // Create a Pipeline client. _, err = pipeline.NewClient(&pipeline.ClientConfig{ PipelineURL: pipelineURL, Group: pipelineGroup, TokenGetter: tokenGetter, }) if err != nil { log.Fatalf("error: create Pipeline client: %v", err) } }
Output:
Example (Receive) ¶
package main import ( "context" "github.com/adobe/pipeline-go/pipeline" "log" "os" "time" ) func main() { var ( pipelineURL = os.Getenv("PIPELINE_URL") pipelineGroup = os.Getenv("PIPELINE_GROUP") pipelineToken = os.Getenv("PIPELINE_TOKEN") pipelineTopic = os.Getenv("PIPELINE_TOPIC") receiveOrganization = os.Getenv("RECEIVE_ORGANIZATION") receiveSource = os.Getenv("RECEIVE_SOURCE") ) // Create a TokenGetter. tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) { return pipelineToken, nil }) // Create a Pipeline client. client, err := pipeline.NewClient(&pipeline.ClientConfig{ PipelineURL: pipelineURL, Group: pipelineGroup, TokenGetter: tokenGetter, }) if err != nil { log.Fatalf("error: create client: %v", err) } // Consume messages from Pipeline. ch := client.Receive(context.Background(), pipelineTopic, &pipeline.ReceiveRequest{ Organizations: []string{receiveOrganization}, Sources: []string{receiveSource}, ReconnectionDelay: 1 * time.Minute, PingTimeout: 5 * time.Minute, }) for msg := range ch { switch { case msg.Err != nil: log.Println("error:", msg.Err) default: log.Println("message received:", msg.Envelope.Type) } } }
Output:
Example (Send) ¶
package main import ( "context" "github.com/adobe/pipeline-go/pipeline" "log" "os" ) func main() { var ( pipelineURL = os.Getenv("PIPELINE_URL") pipelineGroup = os.Getenv("PIPELINE_GROUP") pipelineToken = os.Getenv("PIPELINE_TOKEN") pipelineTopic = os.Getenv("PIPELINE_TOPIC") ) // Create a TokenGetter. tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) { return pipelineToken, nil }) // Create a Pipeline client. client, err := pipeline.NewClient(&pipeline.ClientConfig{ PipelineURL: pipelineURL, Group: pipelineGroup, TokenGetter: tokenGetter, }) if err != nil { log.Fatalf("error: create client: %v", err) } // Send a message over the Pipeline to the VA6 and VA7 locations. err = client.Send(context.Background(), pipelineTopic, &pipeline.SendRequest{ Messages: []pipeline.Message{ { Value: []byte(`"this is a test message"`), Locations: []string{"VA6", "VA7"}, }, }, }) if err != nil { log.Fatalf("error: send message: %v", err) } }
Output:
Example (Sync) ¶
package main import ( "context" "github.com/adobe/pipeline-go/pipeline" "log" "os" "time" ) func main() { var ( pipelineURL = os.Getenv("PIPELINE_URL") pipelineGroup = os.Getenv("PIPELINE_GROUP") pipelineToken = os.Getenv("PIPELINE_TOKEN") pipelineTopic = os.Getenv("PIPELINE_TOPIC") receiveOrganization = os.Getenv("RECEIVE_ORGANIZATION") receiveSource = os.Getenv("RECEIVE_SOURCE") ) // Create a TokenGetter. tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) { return pipelineToken, nil }) // Create a Pipeline client. client, err := pipeline.NewClient(&pipeline.ClientConfig{ PipelineURL: pipelineURL, Group: pipelineGroup, TokenGetter: tokenGetter, }) if err != nil { log.Fatalf("error: create client: %v", err) } // Consume messages from Pipeline. Note the use of the SyncInterval in the // ReceiveRequest, which instructs the Pipeline API to send SYNC envelopes // with a sync marker in it. ctx := context.Background() ch := client.Receive(ctx, pipelineTopic, &pipeline.ReceiveRequest{ Organizations: []string{receiveOrganization}, Sources: []string{receiveSource}, ReconnectionDelay: 1 * time.Minute, PingTimeout: 5 * time.Minute, SyncInterval: 10 * time.Second, }) // While processing envelopes, send the sync marker found in SYNC envelopes // back to the Pipeline API. for msg := range ch { switch { case msg.Err != nil: log.Println("error:", msg.Err) case msg.Envelope.Type == "SYNC": if err := client.Sync(ctx, msg.Envelope.SyncMarker); err != nil { log.Println("sync error:", err) } default: log.Println("message received:", msg.Envelope.Type) } } }
Output:
Index ¶
Examples ¶
Constants ¶
const ( // Read from the earliest marked position still available to the pipeline. ResetEarliest = 1 // Read from the latest marked position still available to the pipeline. ResetLatest = 2 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a client for Adobe Pipeline.
func NewClient ¶
func NewClient(cfg *ClientConfig) (*Client, error)
NewClient creates a Client given a ClientConfig.
func (*Client) Receive ¶
func (c *Client) Receive(ctx context.Context, topic string, r *ReceiveRequest) <-chan EnvelopeOrError
Receive opens a connection to Adobe Pipeline and consumes messages sent to the client. This function automatically handles connection failures and reconnects to the Adobe Pipeline.
type ClientConfig ¶
type ClientConfig struct { // HTTP client. If not provided it defaults to the default HTTP client. Client *http.Client // The URL of the Adobe Pipeline endpoint. Mandatory. PipelineURL string // The consumer group for this client. Mandatory. Group string // The strategy for getting an authorization token. Mandatory. TokenGetter TokenGetter }
ClientConfig is the configuration for a Client.
type Envelope ¶
type Envelope struct { // The type of the envelope. Can be DATA, SYNC, PING, or END_OF_STREAM. Type string `json:"envelopeType"` // The Kafka partition from which the message came. Only relevant for // envelopes of type DATA. Partition int `json:"partition"` // An optional message key that was used for partition assignment. Key string `json:"key"` // The Kafka offset of the message. Only relevant for envelopes of type // DATA. Offset int `json:"offset"` // The Kafka topic of the message. Only relevant for envelopes of type DATA. Topic string `json:"topic"` // The time (UTC) the message was placed onto the consumer's stream. This // can be used to track how far beyond the connection is running. CreateTime uint64 `json:"createTime"` // For envelopes of type DATA, the actual message. Message Message `json:"pipelineMessage"` // Only populated for envelopes of type SYNC. SyncMarker string `json:"syncMarker"` }
Envelope is the envelope sent from the pipeline.
type EnvelopeOrError ¶
type EnvelopeOrError struct { // The envelope read from the pipeline. Envelope *Envelope // An error occurred while reading from the pipeline. In case of error // (i.e. when this field is non-nil) no special care needs to be taken. If // necessary, the client will automatically reinitialize the connection to // the pipeline. Err error }
EnvelopeOrError is one message sent to the client when reading from the pipeline. Only one of this struct field will be non-nil at any given time.
type Error ¶
type Error struct { // The HTTP status code of the response. StatusCode int // The status of the error response. Status int `json:"status"` // A human-readable message for the error. Title string `json:"title"` // A more detailed report of individual errors. Report Report `json:"report"` }
Error is an error message whose information is gathered from an error response returned by Adobe Pipeline.
type Message ¶
type Message struct { // Usually it's the imsOrg for the customer that own the data in the // message. Only required if publishing to a routed topic. ImsOrg string `json:"imsOrg,omitempty"` // The message key is used for partitioning/ordering. Key string `json:"key,omitempty"` // The pipeline instance where this message should be routed or where this // message came from. Only valid for routed topics. Locations []string `json:"locations,omitempty"` // Identifies the service that generated the message. Source string `json:"source,omitempty"` // This is the actual JSON message. Value json.RawMessage `json:"value"` }
Message is a message published by a client or received through the pipeline.
type ReceiveRequest ¶
type ReceiveRequest struct { // The interval at which the server should send SYNC envelopes. If not // specified, SYNC envelopes are not sent. If specified, it has to be // greater than or equal to 5s. SyncInterval time.Duration // The SYNC envelopes can also be sent every "N" messages. If only the // SyncMessages parameter is present and if at any point the number of // messages consumed is less than the number specified and there are no // new messages to be consumed from the topic for 5 seconds, a SYNC // envelope will be sent after 5 seconds. SyncMessages int // If specified, send only messages for these IMS organizations. Organizations []string // If specified, send only messages from these sources. Sources []string // Instructs where to read messages from when connecting to the pipeline. Reset Reset // If the implementation experiences a failure, it will reconnect to the // Adobe Pipeline API. If specified, this field controls how long to wait // between reconnects. If not specified, it defaults to 10s. ReconnectionDelay time.Duration // This timeout specifies the timeout between two PING envelopes. If this // timeout expires the library will automatically reconnect to Adobe // Pipeline. If not specified, it defaults to 90s. PingTimeout time.Duration }
ReceiveRequest is the information sent for setting up the reception of messages via Adobe Pipeline.
type Report ¶
type Report struct { // Errors is a collection of detailed errors. Errors []ReportError `json:"errors"` }
Report is a collection of Adobe Pipeline errors.
type ReportError ¶
type ReportError struct { // The ID for this error. ID string `json:"id"` // A code associated to this error. Code string `json:"code"` // A message associated to this error. Message string `json:"message"` }
ReportError is a detailed error returned by Adobe Pipeline.
type Reset ¶
type Reset int
Reset indicates where to read messages from when connecting to the pipeline.
type SendRequest ¶
type SendRequest struct {
Messages []Message `json:"messages"`
}
type TokenGetter ¶
TokenGetter is the user-provided logic for obtaining a Bearer token.