Documentation ¶
Overview ¶
Package broker implements a low-level interface for communicating with RDSS via message brokers such Amazon Kinesis Stream or RabbitMQ.
The goal of this package is to be reusable by any user. It's not coupled to Archivematica.
Index ¶
- Variables
- type Broker
- func (b *Broker) Close() error
- func (b *Broker) Count() uint64
- func (b *Broker) Request(_ context.Context, msg *message.Message) error
- func (b *Broker) RequestResponse(context.Context, *message.Message) (*message.Message, error)
- func (b *Broker) Subscribe(cb MessageHandler)
- func (b *Broker) SubscribeType(t message.MessageType, cb MessageHandler)
- type Config
- type MessageHandler
- type MetadataService
- type MetadataServiceOp
- func (s *MetadataServiceOp) Create(ctx context.Context, req *message.MetadataCreateRequest) error
- func (s *MetadataServiceOp) Delete(ctx context.Context, req *message.MetadataDeleteRequest) error
- func (s *MetadataServiceOp) Read(ctx context.Context, req *message.MetadataReadRequest) (*message.MetadataReadResponse, error)
- func (s *MetadataServiceOp) Update(ctx context.Context, req *message.MetadataUpdateRequest) error
- type Repository
- type RepositoryBuiltinImpl
- type RepositoryConfig
- type RepositoryDynamoDBImpl
- type RepositoryMessage
- type RepositoryMessageState
- type ValidationMode
- type VocabularyService
- type VocabularyServiceOp
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidRepositoryConfig = errors.New("invalid repository configuration")
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct { Metadata MetadataService Vocabulary VocabularyService // contains filtered or unexported fields }
Broker is a broker client conforming to the RDSS messaging API.
Example ¶
// See the definition of newBroker for more details. var b, _, _ = newBroker(nil) // Subscribe to MetadataDelete messages. b.SubscribeType(message.MessageTypeMetadataDelete, func(msg *message.Message) error { fmt.Println("MetadataCreate message received!") return nil }) // A publisher can publish a MetadataDelete request. Make sure that the // message is valid otherwise it will not be delivered to the subscriber. _ = b.Metadata.Delete(context.Background(), &message.MetadataDeleteRequest{ ObjectUuid: message.MustUUID("8468f86b-a936-41b3-a8a7-ef37e3008ba8"), })
Output: MetadataCreate message received!
func (*Broker) Count ¶
Count returns the total number of messages received by this broker since it was executed.
func (*Broker) RequestResponse ¶
RequestResponse sends a request and waits until a response is received.
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(cb MessageHandler)
Subscribe creates a new subscription associated to every message received.
func (*Broker) SubscribeType ¶
func (b *Broker) SubscribeType(t message.MessageType, cb MessageHandler)
SubscribeType creates a new subscription associated to a particular message type.
type Config ¶
type Config struct { QueueMain string QueueInvalid string QueueError string RepositoryConfig *RepositoryConfig Validation ValidationMode }
func (*Config) SetValidationMode ¶ added in v0.6.0
type MessageHandler ¶
MessageHandler is a callback function supplied by subscribers.
type MetadataService ¶
type MetadataService interface { Create(context.Context, *message.MetadataCreateRequest) error Read(context.Context, *message.MetadataReadRequest) (*message.MetadataReadResponse, error) Update(context.Context, *message.MetadataUpdateRequest) error Delete(context.Context, *message.MetadataDeleteRequest) error }
type MetadataServiceOp ¶
type MetadataServiceOp struct {
// contains filtered or unexported fields
}
func (*MetadataServiceOp) Create ¶
func (s *MetadataServiceOp) Create(ctx context.Context, req *message.MetadataCreateRequest) error
Create implements MetadataService
func (*MetadataServiceOp) Delete ¶
func (s *MetadataServiceOp) Delete(ctx context.Context, req *message.MetadataDeleteRequest) error
Delete implements MetadataService
func (*MetadataServiceOp) Read ¶
func (s *MetadataServiceOp) Read(ctx context.Context, req *message.MetadataReadRequest) (*message.MetadataReadResponse, error)
Read implements MetadataService
func (*MetadataServiceOp) Update ¶
func (s *MetadataServiceOp) Update(ctx context.Context, req *message.MetadataUpdateRequest) error
Update implements MetadataService
type Repository ¶ added in v0.2.0
type Repository interface { Get(ID string) *RepositoryMessage Put(*message.Message) error }
func MustRepository ¶ added in v0.2.0
func MustRepository(r Repository, err error) Repository
MustRepository is a helper that wraps a call to a function returning (Repository, error) and panics if the error is non-nil. It is intended for use in variable initializations such as
var t = template.Must(template.New("name").Parse("text"))
func NewRepository ¶ added in v0.2.0
func NewRepository(config *RepositoryConfig) (Repository, error)
type RepositoryBuiltinImpl ¶ added in v0.2.0
RepositoryInMemoryImpl is a memory-based Repository.
func (*RepositoryBuiltinImpl) Get ¶ added in v0.2.0
func (r *RepositoryBuiltinImpl) Get(ID string) *RepositoryMessage
type RepositoryConfig ¶ added in v0.2.0
type RepositoryDynamoDBImpl ¶ added in v0.2.0
type RepositoryDynamoDBImpl struct { DynamoDB dynamodbiface.DynamoDBAPI Table string }
RepositoryDynamoDBImpl implements Repository.
func (*RepositoryDynamoDBImpl) Get ¶ added in v0.2.0
func (r *RepositoryDynamoDBImpl) Get(ID string) *RepositoryMessage
type RepositoryMessage ¶ added in v0.2.0
type RepositoryMessage struct { MessageId string `dynamodbav:"ID"` MessageClass string `dynamodbav:"messageClass"` MessageType string `dynamodbav:"messageType"` Sequence string `dynamodbav:"sequence"` Position int `dynamodbav:"position"` Status RepositoryMessageState `dynamodbav:"status"` }
RepositoryMessage is a minifed version of message.Message meant to be stored in the local data repository as specified in the RDSS API docs.
type RepositoryMessageState ¶ added in v0.2.0
type RepositoryMessageState int
const ( RepositoryMessageStateReceived RepositoryMessageState = iota RepositoryMessageStateSent RepositoryMessageStateToSend )
func (RepositoryMessageState) String ¶ added in v0.2.0
func (s RepositoryMessageState) String() string
type ValidationMode ¶ added in v0.6.0
type ValidationMode int
ValidationMode determines the type of message validation that the client is going to perform when new messages are received.
const ( // Messages are rejected if invalid, validation issues will be logged. ValidationModeStrict ValidationMode = iota // Messages will not be rejected but the validation issues will be logged. ValidationModeWarnings // Message validator is disabled. ValidationModeDisabled )
type VocabularyService ¶ added in v0.2.0
type VocabularyService interface { Read(context.Context, *message.VocabularyReadRequest) (*message.VocabularyReadResponse, error) Patch(context.Context, *message.VocabularyPatchRequest) error }
type VocabularyServiceOp ¶ added in v0.2.0
type VocabularyServiceOp struct {
// contains filtered or unexported fields
}
func (*VocabularyServiceOp) Patch ¶ added in v0.2.0
func (s *VocabularyServiceOp) Patch(ctx context.Context, req *message.VocabularyPatchRequest) error
Patch implements VocabularyService
func (*VocabularyServiceOp) Read ¶ added in v0.2.0
func (s *VocabularyServiceOp) Read(ctx context.Context, req *message.VocabularyReadRequest) (*message.VocabularyReadResponse, error)
Read implements VocabularyService