ocsqs

package
v0.0.0-...-9ac8be7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2019 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package ocsqs provides a drop in replacement for your exisitng SQS client providing methods for persisiting and creating spans from SQS message attributes.

client := ocsqs.New(sqs.New(session))

Raw Message Delivery

SNS allows you to create subscriptions with RawMessageDelivery enabled. If you have connected your SQS queue to an SNS topic with RawMessageDelivery you must create the ocsqs.SQS client also with RawMessageDelivery enabled as this affects how span contexts are retrieved from message attributes.

client := ocsqs.New(sqs.New(session), ocsqs.WithRawMessageDelivery())

Rember to set the MessageAttributeNames field on ReceiveMessageInput to All to ensure message attributes are added to the message:

msgs, err := sqs.ReceiveMessage(&sqs.ReceiveMessageInput{
    QueueUrl:              aws.String("your-queue-url"),
    MessageAttributeNames: []*string{aws.String("All")},
})

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultFormatSpanName

func DefaultFormatSpanName(msg *sqs.Message) string

DefaultFormatSpanName formats a span name according to the given SQS message.

func GetMessageAttributes

func GetMessageAttributes(msg *sqs.Message) map[string]*sqs.MessageAttributeValue

GetMessageAttributes returns message attributes from an SQS message

func SendMessageInputWithSpan

func SendMessageInputWithSpan(ctx context.Context, in *sqs.SendMessageInput, opts ...Option) *sqs.SendMessageInput

SendMessageInputWithSpan adds span data to message input to propagate spans being send through SQS directly.

func SpanFromContext

func SpanFromContext(ctx context.Context) (trace.SpanContext, bool)

SpanFromContext will return a span context from context

func StartSpan

func StartSpan(ctx context.Context, msg *sqs.Message, opts ...Option) (context.Context, *trace.Span)

StartSpan starts a span from an SQS Message

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/sqs"
	"go.krak3n.codes/ocaws/ocawstest"
	"go.krak3n.codes/ocaws/ocsqs"
)

func main() {
	attr, _ := json.Marshal(map[string]map[string]string{
		b3.TraceIDKey: map[string]string{
			"Value": ocawstest.DefaultTraceID.String(),
		},
		b3.SpanIDKey: map[string]string{
			"Value": ocawstest.DefaultSpanID.String(),
		},
		b3.SpanSampledKey: map[string]string{
			"Value": "0",
		},
	})

	body, _ := json.Marshal(map[string]json.RawMessage{
		"MessageAttributes": attr,
	})

	msg := &sqs.Message{
		Body: aws.String(string(body)),
	}

	ctx := context.Background()
	ctx, span := ocsqs.StartSpan(ctx, msg)
	defer span.End()

	if span != nil {
		sc := span.SpanContext()
		fmt.Println("TraceID:", sc.TraceID.String())
		fmt.Println("SpanID:", sc.SpanID.String())
		fmt.Println("Span Sampled:", sc.IsSampled())
	}

}
Output:

TraceID: 616263646566676869676b6c6d6e6f71
SpanID: 6162636465666768
Span Sampled: false
Example (With_raw_message_delivery)
package main

import (
	"context"
	"fmt"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/sqs"
	"go.krak3n.codes/ocaws/ocawstest"
	"go.krak3n.codes/ocaws/ocsqs"
)

func main() {
	// Create a message with trace attributes, publish a message via SNS or SQS
	msg := &sqs.Message{
		MessageAttributes: map[string]*sqs.MessageAttributeValue{
			b3.TraceIDKey: {
				DataType:    aws.String("String"),
				StringValue: aws.String(ocawstest.DefaultTraceID.String()),
			},
			b3.SpanIDKey: {
				DataType:    aws.String("String"),
				StringValue: aws.String(ocawstest.DefaultSpanID.String()),
			},
			b3.SpanSampledKey: {
				DataType:    aws.String("String"),
				StringValue: aws.String("0"),
			},
		},
	}

	ctx := context.Background()
	ctx, span := ocsqs.StartSpan(ctx, msg)
	defer span.End()

	if span != nil {
		sc := span.SpanContext()
		fmt.Println("TraceID:", sc.TraceID.String())
		fmt.Println("SpanID:", sc.SpanID.String())
		fmt.Println("Span Sampled:", sc.IsSampled())
	}

}
Output:

TraceID: 616263646566676869676b6c6d6e6f71
SpanID: 6162636465666768
Span Sampled: false

func WithContext

func WithContext(ctx context.Context, msg *sqs.Message, opts ...Option) context.Context

WithContext will create a new span context and place it on the given context from a message. This is useful if you wish to defer the starting of a span

Example
package main

import (
	"context"
	"fmt"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/sqs"
	"go.krak3n.codes/ocaws/ocawstest"
	"go.krak3n.codes/ocaws/ocsqs"
)

func main() {
	// Create a message with trace attributes, publish a message via SNS or SQS
	msg := &sqs.Message{
		MessageAttributes: map[string]*sqs.MessageAttributeValue{
			b3.TraceIDKey: {
				DataType:    aws.String("String"),
				StringValue: aws.String(ocawstest.DefaultTraceID.String()),
			},
			b3.SpanIDKey: {
				DataType:    aws.String("String"),
				StringValue: aws.String(ocawstest.DefaultSpanID.String()),
			},
			b3.SpanSampledKey: {
				DataType:    aws.String("String"),
				StringValue: aws.String("0"),
			},
		},
	}

	ctx := context.Background()
	sc, ok := ocsqs.SpanFromContext(ocsqs.WithContext(ctx, msg))
	if ok {
		fmt.Println("TraceID:", sc.TraceID.String())
		fmt.Println("SpanID:", sc.SpanID.String())
		fmt.Println("Span Sampled:", sc.IsSampled())
	}

}
Output:

TraceID: 616263646566676869676b6c6d6e6f71
SpanID: 6162636465666768
Span Sampled: false

Types

type FormatSpanNameFunc

type FormatSpanNameFunc func(*sqs.Message) string

A FormatSpanNameFunc formats a span name from the sqs message

type GetStartOptionsFunc

type GetStartOptionsFunc func(*sqs.Message) trace.StartOptions

A GetStartOptionsFunc returns start options on message by message basis

type Option

type Option func(*Options)

Option overrides default Options configuration

func WithFormatSpanName

func WithFormatSpanName(fn FormatSpanNameFunc) Option

WithFormatSpanName sets the SQS clients formant name func

func WithGetStartOptions

func WithGetStartOptions(fn GetStartOptionsFunc) Option

WithGetStartOptions sets the SQS clients GetStartOptions func

func WithPropagator

func WithPropagator(p propagation.Propagator) Option

WithPropagator sets the clients propagator

func WithStartOptions

func WithStartOptions(s trace.StartOptions) Option

WithStartOptions sets the clients StartOptions

type Options

type Options struct {
	// Propagator defines how traces will be propagated, if not specified this
	// will be B3
	Propagator propagation.Propagator

	// StartOptions are applied to the span started by this Handler around each
	// message.
	// StartOptions.SpanKind will always be set to trace.SpanKindServer
	// for spans started by this transport.
	StartOptions trace.StartOptions

	// GetStartOptions allows to set start options per message. If set,
	// StartOptions is going to be ignored.
	GetStartOptions GetStartOptionsFunc

	// FormatSpanName formats the span name based on the given sqs.Message. See
	// DefaultFormatSpanName for the default format
	FormatSpanName FormatSpanNameFunc
}

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns sane default options

type SQS

type SQS struct {
	*sqs.SQS
	// contains filtered or unexported fields
}

SQS provides methods for sending messages with trace attributes and starting spans from messages. It embeds the SQS client allowing this to be used as a drop in replacement.

func New

func New(client *sqs.SQS, opts ...Option) *SQS

New constructs a new SQS client with default configuration values. Use Option functions to customize configuration. By default the propagator used is B3.

func (*SQS) SendMessageWithContext

func (s *SQS) SendMessageWithContext(ctx aws.Context, input *sqs.SendMessageInput, opts ...request.Option) (*sqs.SendMessageOutput, error)

SendMessageWithContext shadows the sqs clients SendMessageWithContext adding trace span data to the send message input

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"go.krak3n.codes/ocaws"
	"go.krak3n.codes/ocaws/ocsqs"
	"go.krak3n.codes/ocaws/propagation/b3"
	"go.opencensus.io/trace"
)

var sess *session.Session

func main() {
	ctx, span := trace.StartSpan(context.Background(), "sqs/ExampleSQS_SendMessageWithContext")
	defer span.End()

	// Create SNS Client
	c := ocsqs.New(sqs.New(sess))

	// Create Topic
	q, err := c.CreateQueue(&sqs.CreateQueueInput{
		QueueName: aws.String("foo"),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Publish message with span context message attributes
	in := &sqs.SendMessageInput{
		QueueUrl:    q.QueueUrl,
		MessageBody: aws.String(`{"foo":"bar"}`),
	}

	if _, err := c.SendMessageWithContext(ctx, in); err != nil {
		log.Fatal(err)
	}

	fmt.Println("TraceID:", *in.MessageAttributes[b3.TraceIDKey].StringValue)
	fmt.Println("SpanID:", *in.MessageAttributes[b3.SpanIDKey].StringValue)
	fmt.Println("Span Sampled:", *in.MessageAttributes[b3.SpanSampledKey].StringValue)
	fmt.Println("Trace Queue URL:", *in.MessageAttributes[ocaws.TraceQueueURL].StringValue)

}
Output:

TraceID: 616263646566676869676b6c6d6e6f71
SpanID: 6162636465666768
Span Sampled: 0
Trace Queue URL: http://localhost:4576/queue/foo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL