eventhub

package
v0.0.0-...-d7d6881 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2018 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package eventhub allows to connect to the Azure Event Hub via AMQP 1.0, it then allows to send and receive messages.

Index

Constants

View Source
const LatestOffset = "@latest"

LatestOffset could be used by the receiver as the value for each partition in the input field ReceiverOpts.PartitionOffsets to be able to ignore the past messages stored in the Event Hub and start receiving messages published after the AMQP connection/link has been established. For more details about the context around filters cf. http://azure.github.io/amqpnetlite/articles/azure_eventhubs.html#filter

Variables

This section is empty.

Functions

This section is empty.

Types

type EhMessage

type EhMessage struct {
	Body                  string                 `json:"body"`
	SequenceNumber        int64                  `json:"sequence_number"`
	PartitionKey          string                 `json:"partiton_key"`
	Offset                string                 `json:"offset"`
	EnqueuedTime          time.Time              `json:"enqueued_time"`
	EhEndpoint            string                 `json:"eh_endpoint"`
	PartitionID           int                    `json:"partition_id"`
	ApplicationProperties map[string]interface{} `json:"application_properties"`
}

EhMessage is the struct containing all the useful information provided by the Azure Event Hub. Body is the main field, containing the body of the message, but there are other interesting fields regarding the Partition offset, when the message was enqueued, along with other details about the Event Hub partitions.

The Application Properties provided by the sender are also exposed. This is to allow the user to parse any potential custom property.

func (EhMessage) String

func (msg EhMessage) String() string

String is the string (JSON) representation of a EhMessage

type RawMessage

type RawMessage struct {
	AmqpMsg  amqp.Message `json:"amqp_msg"`
	Endpoint string       `json:"endpoint"`
}

RawMessage contains the raw fields coming from the AMQP connection. No information has been extracted from them.

func (RawMessage) ExtractPartitionID

func (rawMsg RawMessage) ExtractPartitionID() int

ExtractPartitionID gets the partition number from the RawMessage finding it from the Endpoint field

func (RawMessage) String

func (rawMsg RawMessage) String() string

String is the string (JSON) representation of a RawMessage

func (RawMessage) ToEhMessage

func (rawMsg RawMessage) ToEhMessage() EhMessage

ToEhMessage transforms a RawMessage to a EhMessage

type Receiver

type Receiver interface {
	// Close allows to close the AMQP connection to the Event Hub
	Close()
	// AsyncFetch triggers the infinite loop to fetch messages from the Azure Event Hub
	AsyncFetch()
	// ReceiveChan is a channel allowing to consume the messages coming from the Event Hub via AMQP 1.0
	ReceiveChan() chan EhMessage
	// ErrorChan is a channel allowing to to consume all the errors coming from the AMQP connection
	ErrorChan() chan error
}

Receiver allows to consume messages from the Azure Event Hub

func NewReceiver

func NewReceiver(recOpts ReceiverOpts) (Receiver, error)

NewReceiver returns a new receiver connected to the Azure Event Hub specified in the input ReceiverOpts struct

type ReceiverOpts

type ReceiverOpts struct {
	EventHubNamespace    string
	EventHubName         string
	SasPolicyName        string
	SasPolicyKey         string
	ConsumerGroupName    string
	LinkCapacity         int
	PartitionOffsets     []string
	PartitionOffsetsPath string
	TimeFilterUTC        *time.Time
	OffsetsFlushInterval time.Duration
	TokenExpiryInterval  time.Duration
	Debug                bool
}

ReceiverOpts allows to configure the receiver when creating the instance

type Sender

type Sender interface {
	// Close can be used to close the AMQP 1.0 connection to the Event Hub
	Close()
	// Send synchronously sends a message returning either an error or the ACK value
	Send(message string, appProperties map[string]interface{}) (int32, error)
	// SendAsync returns straight away
	SendAsync(message string, appProperties map[string]interface{}) int32
	// SendAsyncTimeout is asynchronous but it uses a timeout
	SendAsyncTimeout(message string, appProperties map[string]interface{}, timeout time.Duration) int32
	// ErrorChan is a channel allowing to to consume all the errors coming from the AMQP connection
	ErrorChan() chan error
}

Sender allows to send messages to the Azure Event Hub.

func NewSender

func NewSender(sendOpts SenderOpts) (Sender, error)

NewSender creates a new AMQP sender to send messages to the Azure Event Hub

type SenderOpts

type SenderOpts struct {
	EventHubNamespace   string
	EventHubName        string
	SasPolicyName       string
	SasPolicyKey        string
	TokenExpiryInterval time.Duration
	Debug               bool
}

SenderOpts allows to configure the sender when creating the instance

type StdLogger

type StdLogger interface {
	Printf(format string, v ...interface{})
}

StdLogger is inspired by the Sarama package (a Go Kafka client) on how to use the logger from the standard library

var Logger StdLogger = log.New(ioutil.Discard, "[Event Hub]", log.LstdFlags)

Logger can be used simultaneously from multiple go routines; it guarantees to serialize access to the Writer. cf. https://golang.org/pkg/log/#Logger

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL