broker

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2018 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package broker implements a low-level interface for communicating with RDSS via message brokers such Amazon Kinesis Stream or RabbitMQ.

The goal of this package is to be reusable by any user. It's not coupled to Archivematica.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrInvalidRepositoryConfig = errors.New("invalid repository configuration")

Functions

This section is empty.

Types

type Broker

type Broker struct {
	Metadata   MetadataService
	Vocabulary VocabularyService
	// contains filtered or unexported fields
}

Broker is a broker client conforming to the RDSS messaging API.

Example
// See the definition of newBroker for more details.
var b, _, _ = newBroker(nil)

// Subscribe to MetadataDelete messages.
b.SubscribeType(message.MessageTypeMetadataDelete, func(msg *message.Message) error {
	fmt.Println("MetadataCreate message received!")
	return nil
})

// A publisher can publish a MetadataDelete request. Make sure that the
// message is valid otherwise it will not be delivered to the subscriber.
_ = b.Metadata.Delete(context.Background(), &message.MetadataDeleteRequest{
	ObjectUuid: message.MustUUID("8468f86b-a936-41b3-a8a7-ef37e3008ba8"),
})
Output:

MetadataCreate message received!

func New

func New(backend backend.Backend, logger log.FieldLogger, config *Config) (*Broker, error)

New returns a new Broker.

func (*Broker) Close

func (b *Broker) Close() error

func (*Broker) Count

func (b *Broker) Count() uint64

Count returns the total number of messages received by this broker since it was executed.

func (*Broker) Request

func (b *Broker) Request(_ context.Context, msg *message.Message) error

Request sends a fire-and-forget request to RDSS.

func (*Broker) RequestResponse

func (b *Broker) RequestResponse(context.Context, *message.Message) (*message.Message, error)

RequestResponse sends a request and waits until a response is received.

func (*Broker) Subscribe

func (b *Broker) Subscribe(cb MessageHandler)

Subscribe creates a new subscription associated to every message received.

func (*Broker) SubscribeType

func (b *Broker) SubscribeType(t message.MessageType, cb MessageHandler)

SubscribeType creates a new subscription associated to a particular message type.

type Config

type Config struct {
	QueueMain        string
	QueueInvalid     string
	QueueError       string
	RepositoryConfig *RepositoryConfig
	Validation       ValidationMode
}

func (*Config) SetValidationMode added in v0.6.0

func (c *Config) SetValidationMode(mode string)

func (*Config) Validate

func (c *Config) Validate() error

type MessageHandler

type MessageHandler func(msg *message.Message) error

MessageHandler is a callback function supplied by subscribers.

type MetadataServiceOp

type MetadataServiceOp struct {
	// contains filtered or unexported fields
}

func (*MetadataServiceOp) Create

Create implements MetadataService

func (*MetadataServiceOp) Delete

Delete implements MetadataService

func (*MetadataServiceOp) Read

Read implements MetadataService

func (*MetadataServiceOp) Update

Update implements MetadataService

type Repository added in v0.2.0

type Repository interface {
	Get(ID string) *RepositoryMessage
	Put(*message.Message) error
}

func MustRepository added in v0.2.0

func MustRepository(r Repository, err error) Repository

MustRepository is a helper that wraps a call to a function returning (Repository, error) and panics if the error is non-nil. It is intended for use in variable initializations such as

var t = template.Must(template.New("name").Parse("text"))

func NewRepository added in v0.2.0

func NewRepository(config *RepositoryConfig) (Repository, error)

type RepositoryBuiltinImpl added in v0.2.0

type RepositoryBuiltinImpl struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RepositoryInMemoryImpl is a memory-based Repository.

func (*RepositoryBuiltinImpl) Get added in v0.2.0

func (*RepositoryBuiltinImpl) Put added in v0.2.0

type RepositoryConfig added in v0.2.0

type RepositoryConfig struct {
	Backend          string
	DynamoDBTLS      bool
	DynamoDBRegion   string
	DynamoDBEndpoint string
	DynamoDBTable    string
}

type RepositoryDynamoDBImpl added in v0.2.0

type RepositoryDynamoDBImpl struct {
	DynamoDB dynamodbiface.DynamoDBAPI
	Table    string
}

RepositoryDynamoDBImpl implements Repository.

func (*RepositoryDynamoDBImpl) Get added in v0.2.0

func (*RepositoryDynamoDBImpl) Put added in v0.2.0

type RepositoryMessage added in v0.2.0

type RepositoryMessage struct {
	MessageId    string                 `dynamodbav:"ID"`
	MessageClass string                 `dynamodbav:"messageClass"`
	MessageType  string                 `dynamodbav:"messageType"`
	Sequence     string                 `dynamodbav:"sequence"`
	Position     int                    `dynamodbav:"position"`
	Status       RepositoryMessageState `dynamodbav:"status"`
}

RepositoryMessage is a minifed version of message.Message meant to be stored in the local data repository as specified in the RDSS API docs.

type RepositoryMessageState added in v0.2.0

type RepositoryMessageState int
const (
	RepositoryMessageStateReceived RepositoryMessageState = iota
	RepositoryMessageStateSent
	RepositoryMessageStateToSend
)

func (RepositoryMessageState) String added in v0.2.0

func (s RepositoryMessageState) String() string

type ValidationMode added in v0.6.0

type ValidationMode int

ValidationMode determines the type of message validation that the client is going to perform when new messages are received.

const (
	// Messages are rejected if invalid, validation issues will be logged.
	ValidationModeStrict ValidationMode = iota

	// Messages will not be rejected but the validation issues will be logged.
	ValidationModeWarnings

	// Message validator is disabled.
	ValidationModeDisabled
)

type VocabularyServiceOp added in v0.2.0

type VocabularyServiceOp struct {
	// contains filtered or unexported fields
}

func (*VocabularyServiceOp) Patch added in v0.2.0

Patch implements VocabularyService

func (*VocabularyServiceOp) Read added in v0.2.0

Read implements VocabularyService

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL