shared

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttrAMQPDeliveryState string = "amqp.delivery_state"
	AttrAMQPStatusCode    string = "amqp.status_code"

	// TODO: I made these up entirely
	AttrMessageCount string = "amqp.message_count"
)
View Source
const (
	MetricStressSuccessfulCancels = "stress.cancels"
)

these metrics are specific to stress tests and wouldn't be in customer code.

Variables

This section is empty.

Functions

func AddAuthFlags

func AddAuthFlags(fs *flag.FlagSet) func() (*azservicebus.Client, *admin.Client, error)

AddAuthFlags adds the flags needed for authenticating to Service Bus. Returns a function that can be called after the flags have been parsed, which will create the an *azservicebus.Client.

func ConstantlyUpdateQueue

func ConstantlyUpdateQueue(ctx context.Context, adminClient *admin.Client, queue string, updateInterval time.Duration) error

ConstantlyUpdateQueue updates queue, changing the MaxDeliveryCount properly between 11 and 10, every `updateInterval` This will cause Service Bus to issue force-detaches to our links, allowing us to exercise our recovery logic.

func ForceQueueDetach added in v0.3.3

func ForceQueueDetach(ctx context.Context, adminClient *admin.Client, queue string) error

func LoadEnvironment

func LoadEnvironment() error

LoadEnvironment loads an .env file. If the env var `ENV_FILE` exists, we assume the value is a path to an .env file Otherwise we fall back to loading from the current directory.

func MustCreateAutoDeletingQueue

func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.QueueProperties) *admin.Client

MustCreateAutoDeletingQueue creates a queue that will auto-delete 10 minutes after activity has ceased.

func MustCreateSubscriptions

func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, options *MustCreateSubscriptionsOptions) func()

func MustGenerateMessages

func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimit int, numExtraBytes int)

func NewCtrlCContext

func NewCtrlCContext() (context.Context, context.CancelFunc)

NewCtrlCContext creates a context that cancels if the user hits ctrl+c.

func TrackDuration added in v1.6.0

func TrackDuration(ctx context.Context, tc appinsights.TelemetryClient, name Metric) func(map[string]string)

TrackDuration tracks durations (as a metric), using the initial call to TrackDuration as the start. The duration is ended when you call the returned function. TrackDuration respects any included baggage in the context.

func TrackError added in v1.6.0

func TrackError(ctx context.Context, tc appinsights.TelemetryClient, err error)

TrackError tracks an error (using the AppInsights exceptions table). TrackError respects any included baggage in the context.

NOTE: this function does not consider context cancellations/deadlines as errors.

func TrackMetric added in v1.6.0

func TrackMetric(ctx context.Context, tc appinsights.TelemetryClient, name Metric, value float64, attrs map[string]string)

TrackMetric tracks metric and respects any included baggage in the context.

func UpdateBaggage added in v1.6.0

func UpdateBaggage(ctx context.Context, baggage map[string]string) map[string]string

func WithBaggage added in v1.6.0

func WithBaggage(ctx context.Context, baggage map[string]string) context.Context

Types

type Metric added in v1.6.0

type Metric string
const (
	MetricConnectionLost Metric = "messaging.servicebus.connectionlost"
	MetricMessagesSent   Metric = "messaging.servicebus.messages.sent"

	// metrics related to Service Bus sessions (NOT amqp sessions)
	MetricSessionAccept    Metric = "messaging.servicebus.session.accept"
	MetricSessionTimeoutMS Metric = "messaging.servicebus.session.timeout"

	MetricSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration"
	MetricReceiveLag                Metric = "messaging.servicebus.receiver.lag"

	MetricAMQPSendDuration Metric = "messaging.az.amqp.producer.send.duration"

	MetricAMQPMgmtRequestDuration Metric = "messaging.az.amqp.management.request.duration"

	MetricAMQPSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration"
	MetricAMQPSettlementSequenceNum     Metric = "messaging.servicebus.settlement.sequence_number"

	// TODO: I've made these up entirely.
	MetricMessageReceived Metric = "messaging.servicebus.messages.received"
	MetricMessagePeeked   Metric = "messaging.servicebus.messages.peeked"
	MetricCloseDuration   Metric = "messaging.servicebus.close.duration"
	MetricLockRenew       Metric = "messaging.servicebus.lockrenew.duration" // TODO: separate for session vs message lock?
)

These names are modeled off of the metrics from Java https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusMeter.java

and from our standard for attributes: https://gist.github.com/lmolkova/e4215c0f44a49ef824983382762e6b92

type MustCreateSubscriptionsOptions added in v1.1.4

type MustCreateSubscriptionsOptions struct {
	Topic        *admin.CreateTopicOptions
	Subscription *admin.CreateSubscriptionOptions
}

type StreamingMessageBatch

type StreamingMessageBatch struct {
	// contains filtered or unexported fields
}

func NewStreamingMessageBatch

func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error)

func (*StreamingMessageBatch) Add

Add appends to the current batch. If it's full it'll send it, allocate a new one.

func (*StreamingMessageBatch) Close

func (sb *StreamingMessageBatch) Close(ctx context.Context) error

Close sends any messages currently held in our batch.

type StressContext

type StressContext struct {
	TC      appinsights.TelemetryClient
	Context context.Context

	// TestRunID represents the test run and can be used to tie into other container metrics generated within the test cluster.
	TestRunID string

	// Nano is the nanoseconds start time for the stress test run
	Nano string

	// ConnectionString represents the value of the environment variable SERVICEBUS_CONNECTION_STRING.
	ConnectionString string
	// contains filtered or unexported fields
}

StressContext holds onto some common useful state for stress tests, including some simple stats tracking, a telemetry client and a context that represents the lifetime of the test itself (and will be cancelled if the user quits out of the stress)

func MustCreateStressContext

func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext

func (*StressContext) Assert added in v0.3.4

func (tracker *StressContext) Assert(condition bool, message string)

func (*StressContext) End

func (sc *StressContext) End()

func (*StressContext) Equal added in v1.1.4

func (tracker *StressContext) Equal(val1 any, val2 any)

func (*StressContext) Failf added in v1.1.0

func (tracker *StressContext) Failf(format string, args ...any)

func (*StressContext) LogIfFailed

func (sc *StressContext) LogIfFailed(message string, err error)

func (*StressContext) Nil added in v1.1.4

func (tracker *StressContext) Nil(val1 any)

func (*StressContext) NoError added in v1.1.4

func (tracker *StressContext) NoError(err error)

func (*StressContext) NoErrorf added in v1.1.4

func (tracker *StressContext) NoErrorf(err error, format string, args ...any)

func (*StressContext) PanicOnError

func (tracker *StressContext) PanicOnError(message string, err error)

PanicOnError logs, sends telemetry and then closes on error

func (*StressContext) Start

func (sc *StressContext) Start(entityName string, attributes map[string]string)

type StressContextOptions added in v1.3.0

type StressContextOptions struct {
	// Duration is the amount of time the stress test should run before
	// the StressContext.Context expires.
	Duration time.Duration

	// CommonBaggage will be added as part of the telemetry client, and will be included in each
	// metric/event/error that's reported.
	CommonBaggage map[string]string

	// EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry.
	EmitStartEvent bool
}

type TestContext

type TestContext struct {
	*StressContext
	Client *azservicebus.Client
}

type TrackingReceiver added in v1.6.0

type TrackingReceiver struct {
	// contains filtered or unexported fields
}

TrackingReceiver reports metrics and errors automatically for its methods.

func NewTrackingReceiverForQueue added in v1.6.0

func NewTrackingReceiverForQueue(tc appinsights.TelemetryClient, client *azservicebus.Client, queueName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)

func NewTrackingReceiverForSubscription added in v1.6.0

func NewTrackingReceiverForSubscription(tc appinsights.TelemetryClient, client *azservicebus.Client, topicName string, subscriptionName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)

func (*TrackingReceiver) AbandonMessage added in v1.6.0

func (*TrackingReceiver) Close added in v1.6.0

func (tr *TrackingReceiver) Close(ctx context.Context) error

func (*TrackingReceiver) CompleteMessage added in v1.6.0

func (*TrackingReceiver) PeekMessages added in v1.6.0

func (tr *TrackingReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *azservicebus.PeekMessagesOptions) ([]*azservicebus.ReceivedMessage, error)

func (*TrackingReceiver) ReceiveMessages added in v1.6.0

func (tr *TrackingReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)

func (*TrackingReceiver) RenewMessageLock added in v1.6.0

type TrackingSender added in v1.6.0

type TrackingSender struct {
	// contains filtered or unexported fields
}

TrackingSender reports metrics and errors automatically for its methods.

func NewTrackingSender added in v1.6.0

func NewTrackingSender(tc appinsights.TelemetryClient, client *azservicebus.Client, queueOrTopic string, options *azservicebus.NewSenderOptions) (*TrackingSender, error)

func (*TrackingSender) Close added in v1.6.0

func (ts *TrackingSender) Close(ctx context.Context) error

func (*TrackingSender) NewMessageBatch added in v1.6.0

func (*TrackingSender) SendMessage added in v1.6.0

func (ts *TrackingSender) SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error

func (*TrackingSender) SendMessageBatch added in v1.6.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL