pipeline

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package pipeline provides a client for the Pipeline API. The client provides allows you to both send messages to Pipeline and consume a stream of messages from it. Moreover, when consuming messages, it allows you to manually synchronize the last read position in a stream of messages.

Example (Authentication)
package main

import (
	"context"
	"fmt"
	"github.com/adobe/ims-go/ims"
	"github.com/adobe/pipeline-go/pipeline"
	"log"
	"os"
)

func main() {
	var (
		imsURL          = os.Getenv("IMS_URL")
		imsCode         = os.Getenv("IMS_CODE")
		imsClientID     = os.Getenv("IMS_CLIENT_ID")
		imsClientSecret = os.Getenv("IMS_CLIENT_SECRET")
		pipelineURL     = os.Getenv("PIPELINE_URL")
		pipelineGroup   = os.Getenv("PIPELINE_URL")
	)

	// Create an IMS client.

	imsClient, err := ims.NewClient(&ims.ClientConfig{
		URL: imsURL,
	})
	if err != nil {
		log.Fatalf("error: create IMS client: %v", err)
	}

	// Create a TokenGetter based on the IMS client.

	tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) {
		res, err := imsClient.Token(&ims.TokenRequest{
			Code:         imsCode,
			ClientID:     imsClientID,
			ClientSecret: imsClientSecret,
		})
		if err != nil {
			return "", fmt.Errorf("read token: %v", err)
		}
		return res.AccessToken, nil
	})

	// Create a Pipeline client.

	_, err = pipeline.NewClient(&pipeline.ClientConfig{
		PipelineURL: pipelineURL,
		Group:       pipelineGroup,
		TokenGetter: tokenGetter,
	})
	if err != nil {
		log.Fatalf("error: create Pipeline client: %v", err)
	}
}
Output:

Example (Receive)
package main

import (
	"context"
	"github.com/adobe/pipeline-go/pipeline"
	"log"
	"os"
	"time"
)

func main() {
	var (
		pipelineURL         = os.Getenv("PIPELINE_URL")
		pipelineGroup       = os.Getenv("PIPELINE_GROUP")
		pipelineToken       = os.Getenv("PIPELINE_TOKEN")
		pipelineTopic       = os.Getenv("PIPELINE_TOPIC")
		receiveOrganization = os.Getenv("RECEIVE_ORGANIZATION")
		receiveSource       = os.Getenv("RECEIVE_SOURCE")
	)

	// Create a TokenGetter.

	tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) {
		return pipelineToken, nil
	})

	// Create a Pipeline client.

	client, err := pipeline.NewClient(&pipeline.ClientConfig{
		PipelineURL: pipelineURL,
		Group:       pipelineGroup,
		TokenGetter: tokenGetter,
	})
	if err != nil {
		log.Fatalf("error: create client: %v", err)
	}

	// Consume messages from Pipeline.

	ch := client.Receive(context.Background(), pipelineTopic, &pipeline.ReceiveRequest{
		Organizations:     []string{receiveOrganization},
		Sources:           []string{receiveSource},
		ReconnectionDelay: 1 * time.Minute,
		PingTimeout:       5 * time.Minute,
	})

	for msg := range ch {
		switch {
		case msg.Err != nil:
			log.Println("error:", msg.Err)
		default:
			log.Println("message received:", msg.Envelope.Type)
		}
	}
}
Output:

Example (Send)
package main

import (
	"context"
	"github.com/adobe/pipeline-go/pipeline"
	"log"
	"os"
)

func main() {
	var (
		pipelineURL   = os.Getenv("PIPELINE_URL")
		pipelineGroup = os.Getenv("PIPELINE_GROUP")
		pipelineToken = os.Getenv("PIPELINE_TOKEN")
		pipelineTopic = os.Getenv("PIPELINE_TOPIC")
	)

	// Create a TokenGetter.

	tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) {
		return pipelineToken, nil
	})

	// Create a Pipeline client.

	client, err := pipeline.NewClient(&pipeline.ClientConfig{
		PipelineURL: pipelineURL,
		Group:       pipelineGroup,
		TokenGetter: tokenGetter,
	})
	if err != nil {
		log.Fatalf("error: create client: %v", err)
	}

	// Send a message over the Pipeline to the VA6 and VA7 locations.

	err = client.Send(context.Background(), pipelineTopic, &pipeline.SendRequest{
		Messages: []pipeline.Message{
			{
				Value:     []byte(`"this is a test message"`),
				Locations: []string{"VA6", "VA7"},
			},
		},
	})
	if err != nil {
		log.Fatalf("error: send message: %v", err)
	}
}
Output:

Example (Sync)
package main

import (
	"context"
	"github.com/adobe/pipeline-go/pipeline"
	"log"
	"os"
	"time"
)

func main() {
	var (
		pipelineURL         = os.Getenv("PIPELINE_URL")
		pipelineGroup       = os.Getenv("PIPELINE_GROUP")
		pipelineToken       = os.Getenv("PIPELINE_TOKEN")
		pipelineTopic       = os.Getenv("PIPELINE_TOPIC")
		receiveOrganization = os.Getenv("RECEIVE_ORGANIZATION")
		receiveSource       = os.Getenv("RECEIVE_SOURCE")
	)

	// Create a TokenGetter.

	tokenGetter := pipeline.TokenGetterFunc(func(ctx context.Context) (string, error) {
		return pipelineToken, nil
	})

	// Create a Pipeline client.

	client, err := pipeline.NewClient(&pipeline.ClientConfig{
		PipelineURL: pipelineURL,
		Group:       pipelineGroup,
		TokenGetter: tokenGetter,
	})
	if err != nil {
		log.Fatalf("error: create client: %v", err)
	}

	// Consume messages from Pipeline. Note the use of the SyncInterval in the
	// ReceiveRequest, which instructs the Pipeline API to send SYNC envelopes
	// with a sync marker in it.

	ctx := context.Background()

	ch := client.Receive(ctx, pipelineTopic, &pipeline.ReceiveRequest{
		Organizations:     []string{receiveOrganization},
		Sources:           []string{receiveSource},
		ReconnectionDelay: 1 * time.Minute,
		PingTimeout:       5 * time.Minute,
		SyncInterval:      10 * time.Second,
	})

	// While processing envelopes, send the sync marker found in SYNC envelopes
	// back to the Pipeline API.

	for msg := range ch {
		switch {
		case msg.Err != nil:
			log.Println("error:", msg.Err)
		case msg.Envelope.Type == "SYNC":
			if err := client.Sync(ctx, msg.Envelope.SyncMarker); err != nil {
				log.Println("sync error:", err)
			}
		default:
			log.Println("message received:", msg.Envelope.Type)
		}
	}
}
Output:

Index

Examples

Constants

View Source
const (
	// Read from the earliest marked position still available to the pipeline.
	ResetEarliest = 1
	// Read from the latest marked position still available to the pipeline.
	ResetLatest = 2
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a client for Adobe Pipeline.

func NewClient

func NewClient(cfg *ClientConfig) (*Client, error)

NewClient creates a Client given a ClientConfig.

func (*Client) Receive

func (c *Client) Receive(ctx context.Context, topic string, r *ReceiveRequest) <-chan EnvelopeOrError

Receive opens a connection to Adobe Pipeline and consumes messages sent to the client. This function automatically handles connection failures and reconnects to the Adobe Pipeline.

func (*Client) Send

func (c *Client) Send(ctx context.Context, topic string, sendRequest *SendRequest) error

func (*Client) Sync

func (c *Client) Sync(ctx context.Context, marker string) error

Sync track the consuming application's last read position for a given topic and consumer group.

type ClientConfig

type ClientConfig struct {
	// HTTP client. If not provided it defaults to the default HTTP client.
	Client *http.Client
	// The URL of the Adobe Pipeline endpoint. Mandatory.
	PipelineURL string
	// The consumer group for this client. Mandatory.
	Group string
	// The strategy for getting an authorization token. Mandatory.
	TokenGetter TokenGetter
}

ClientConfig is the configuration for a Client.

type Envelope

type Envelope struct {
	// The type of the envelope. Can be DATA, SYNC, PING, or END_OF_STREAM.
	Type string `json:"envelopeType"`
	// The Kafka partition from which the message came. Only relevant for
	// envelopes of type DATA.
	Partition int `json:"partition"`
	// An optional message key that was used for partition assignment.
	Key string `json:"key"`
	// The Kafka offset of the message. Only relevant for envelopes of type
	// DATA.
	Offset int `json:"offset"`
	// The Kafka topic of the message. Only relevant for envelopes of type DATA.
	Topic string `json:"topic"`
	// The time (UTC) the message was placed onto the consumer's stream. This
	// can be used to track how far beyond the connection is running.
	CreateTime uint64 `json:"createTime"`
	// For envelopes of type DATA, the actual message.
	Message Message `json:"pipelineMessage"`
	// Only populated for envelopes of type SYNC.
	SyncMarker string `json:"syncMarker"`
}

Envelope is the envelope sent from the pipeline.

type EnvelopeOrError

type EnvelopeOrError struct {
	// The envelope read from the pipeline.
	Envelope *Envelope
	// An error occurred while reading from the pipeline. In case of error
	// (i.e. when this field is non-nil) no special care needs to be taken. If
	// necessary, the client will automatically reinitialize the connection to
	// the pipeline.
	Err error
}

EnvelopeOrError is one message sent to the client when reading from the pipeline. Only one of this struct field will be non-nil at any given time.

type Error

type Error struct {
	// The HTTP status code of the response.
	StatusCode int
	// The status of the error response.
	Status int `json:"status"`
	// A human-readable message for the error.
	Title string `json:"title"`
	// A more detailed report of individual errors.
	Report Report `json:"report"`
}

Error is an error message whose information is gathered from an error response returned by Adobe Pipeline.

func (*Error) Error

func (e *Error) Error() string

type Message

type Message struct {
	// Usually it's the imsOrg for the customer that own the data in the
	// message. Only required if publishing to a routed topic.
	ImsOrg string `json:"imsOrg,omitempty"`
	// The message key is used for partitioning/ordering.
	Key string `json:"key,omitempty"`
	// The pipeline instance where this message should be routed or where this
	// message came from. Only valid for routed topics.
	Locations []string `json:"locations,omitempty"`
	// Identifies the service that generated the message.
	Source string `json:"source,omitempty"`
	// This is the actual JSON message.
	Value json.RawMessage `json:"value"`
}

Message is a message published by a client or received through the pipeline.

type ReceiveRequest

type ReceiveRequest struct {
	// The interval at which the server should send SYNC envelopes. If not
	// specified, SYNC envelopes are not sent. If specified, it has to be
	// greater than or equal to 5s.
	SyncInterval time.Duration
	// The SYNC envelopes can also be sent every "N" messages. If only the
	// SyncMessages parameter is present and if at any point the number of
	// messages consumed is less than the number specified and there are no
	// new messages to be consumed from the topic for 5 seconds, a SYNC
	// envelope will be sent after 5 seconds.
	SyncMessages int
	// If specified, send only messages for these IMS organizations.
	Organizations []string
	// If specified, send only messages from these sources.
	Sources []string
	// Instructs where to read messages from when connecting to the pipeline.
	Reset Reset
	// If the implementation experiences a failure, it will reconnect to the
	// Adobe Pipeline API. If specified, this field controls how long to wait
	// between reconnects. If not specified, it defaults to 10s.
	ReconnectionDelay time.Duration
	// This timeout specifies the timeout between two PING envelopes. If this
	// timeout expires the library will automatically reconnect to Adobe
	// Pipeline. If not specified, it defaults to 90s.
	PingTimeout time.Duration
}

ReceiveRequest is the information sent for setting up the reception of messages via Adobe Pipeline.

type Report

type Report struct {
	// Errors is a collection of detailed errors.
	Errors []ReportError `json:"errors"`
}

Report is a collection of Adobe Pipeline errors.

type ReportError

type ReportError struct {
	// The ID for this error.
	ID string `json:"id"`
	// A code associated to this error.
	Code string `json:"code"`
	// A message associated to this error.
	Message string `json:"message"`
}

ReportError is a detailed error returned by Adobe Pipeline.

type Reset

type Reset int

Reset indicates where to read messages from when connecting to the pipeline.

type SendRequest

type SendRequest struct {
	Messages []Message `json:"messages"`
}

type TokenGetter

type TokenGetter interface {
	Token(ctx context.Context) (string, error)
}

TokenGetter is the user-provided logic for obtaining a Bearer token.

type TokenGetterFunc

type TokenGetterFunc func(ctx context.Context) (string, error)

TokenGetterFunc implements a TokenGetter backed by a function.

func (TokenGetterFunc) Token

func (f TokenGetterFunc) Token(ctx context.Context) (string, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL