messaging

package module
v0.0.0-...-2a96f56 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2019 License: Apache-2.0 Imports: 13 Imported by: 0

README

Go Messaging library

A library that supports sending and receiving messages over the Apache Kafka messaging system.

Project wiki

See: Main.md

Project status

4S maturity level: prototyping.

Prerequisites

You need to have the following software installed

Go (golang) (https://golang.org/) librdkafka (https://github.com/confluentinc/confluent-kafka-go#installing-librdkafka)

Building:

This library does not contain a main package, and therefore cannot be built on its own.

Usage

To use this library, it must be included in a project containing a main-package.

To include this library run the following command in your terminal/shell:

go get "bitbucket.org/4s/go-messaging"

After retrieving the library, the library can be included by importing it, in the files in which it is required:

package ...

import (
  ...
  messaging "bitbucket.org/4s/go-messaging"
)

...

message := messaging.NewMessage()
...

Environment variables

The file kafka.env contain the environment variables that you need to use if you include and use this library in your own services.

The environmentvariables CORRELATION_ID and TRANSACTION_ID denotes the JSON keys for the CorrelationID and TransactionID in log output.

kafka.env environment variables: KAFKA_BOOTSTRAP_SERVER should be set to the host and port of your Kafka server. I.e. the host/port pair to use for establishing the initial connection to the Kafka cluster. This corresponds to the native Kafka producer and consumer config called bootstrap.servers - see https://kafka.apache.org/documentation/#configuration - except that the messaging library currently only supports one server, not a list of servers like native Kafka does.

All other environment variables starting with KAFKA_ map directly to a corresponding Kafka configuration key following a scheme where you strip off the KAFKA_ part and replace _ with a .. For instance KAFKA_KEY_DESERIALIZER corresponds to the native Kafka consumer property key.deserializer

You should not change the values of KAFKA_ENABLE_AUTO_COMMIT.`

Currently only the following Kafka related environment variables are supported:

KAFKA_BOOTSTRAP_SERVER

#KafkaConsumer
KAFKA_ENABLE_AUTO_COMMIT
KAFKA_AUTO_COMMIT_INTERVAL_MS
KAFKA_SESSION_TIMEOUT_MS
KAFKA_GROUP_ID

#KafkaProducer
KAFKA_ACKS
KAFKA_RETRIES
Consumer health

With the following environment variables you may enable that instances of the KafkaProcessAndConsume class touch a file at specified intervals:

ENABLE_HEALTH=true
HEALTH_FILE_PATH=/tmp/health
HEALTH_INTERVAL_MS=5000

This can be used to check the health of the KafkaProcessAndConsume thread - typically be having a shell script check on the timestamp of the health file.

Test

To run unit tests:

go test ./test

To run integration tests:

For the integration tests to succeed you need access to a running instance of Apache Kafka and KAFKA_BOOTSTRAP_SERVER in kafka.env must point to the Kafka broker of this instance.

Furthermore you need to ensure the instance of Kafka you're testing against doesn't already have data on the topics you are testing against. To ensure this delete these topics (assumes your kafka is running inside a docker container):

docker exec <docker-container-name> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic InputReceived_FHIR_Observation_MDC29112
docker exec <docker-container-name> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic DataCreated_FHIR_Observation_MDC29112

Now run:

go test -tags integration

Documentation

Index

Constants

This section is empty.

Variables

View Source
var TopicSeparator = "_"

TopicSeparator is the string used to separate parts of a topic

View Source
var VERSION = NewVersion("3.0.0")

VERSION is the current version of the message format Change this whenever you make changes to the Message format! Uses semantic versioning

Functions

func ExtractEtagTypeVersion

func ExtractEtagTypeVersion(etag string) string

ExtractEtagTypeVersion is a method...

func GetEnv

func GetEnv(ctx context.Context, env string, defaultValue string) string

GetEnv returns either an environmentvariable or the defaultvalue provided

func VerifyOrCreateID

func VerifyOrCreateID(correlationID string) string

VerifyOrCreateID checks if a correlationID exists and fits with the scheme of correlationId's used in this library. if it does, it returns the correlationID, if it doesn't it either adjusts it or generates a new one, and returns that ported from the method used in https://bitbucket.org/4s/messaging/src/master/src/main/java/dk/s4/microservices/messaging/MessagingUtils.java

Types

type BodyCategory

type BodyCategory int

BodyCategory is an enum for the set of valid message bodyCategories.

const (
	FHIR BodyCategory = iota + 1
	CDA
	System
)

The valid bodyCategory values

func BodyCategoryFromString

func BodyCategoryFromString(catString string) (BodyCategory, error)

BodyCategoryFromString parses a string into a BodyCategory

func (BodyCategory) MarshalJSON

func (b BodyCategory) MarshalJSON() ([]byte, error)

MarshalJSON marshals BodyCategory as a quoted json string

func (BodyCategory) String

func (b BodyCategory) String() string

func (*BodyCategory) UnmarshalJSON

func (b *BodyCategory) UnmarshalJSON(byteArray []byte) error

UnmarshalJSON unmashals a quoted json string to BodyCategory

type CodeableConcept

type CodeableConcept struct {
	Coding []Coding `json:"coding,omitempty"`
	Text   string   `json:"text,omitempty"`
}

CodeableConcept represents a value that is usually supplied by providing a reference to one or more terminologies or ontologies but may also be defined by the provision of text.

type Coding

type Coding struct {
	System       string `json:"system,omitempty"`
	Version      string `json:"version,omitempty"`
	Code         string `json:"code,omitempty"`
	Display      string `json:"display,omitempty"`
	UserSelected bool   `json:"userSelected,omitempty"`
}

Coding is a representation of a defined concept using a symbol from a defined "code system"

type ConsumeAndProcess

type ConsumeAndProcess struct {
	// contains filtered or unexported fields
}

ConsumeAndProcess is a struct for consuming, processing and responding to messages over kafka

func NewConsumeAndProcess

func NewConsumeAndProcess(
	ctx context.Context,
	consumer EventConsumer,
	producer EventProducer,
) (ConsumeAndProcess, error)

NewConsumeAndProcess functions as a constructor that returns a new instance of ConsumeAndProcess

func (*ConsumeAndProcess) GetInterceptors

func (consumeAndProcess *ConsumeAndProcess) GetInterceptors() []EventConsumerInterceptor

GetInterceptors returns a list of all registered server interceptors

func (*ConsumeAndProcess) Run

func (consumeAndProcess *ConsumeAndProcess) Run(
	ctx context.Context,
	sigs chan os.Signal,
	wg *sync.WaitGroup,
)

Run is a method that runs the consumeAndProcess method in a loop until it is stopped This method should be run in a dedicated goroutine

func (*ConsumeAndProcess) SetInterceptors

func (consumeAndProcess *ConsumeAndProcess) SetInterceptors(interceptors ...EventConsumerInterceptor)

SetInterceptors sets (or clears) the list of interceptors

type EventConsumer

type EventConsumer interface {
	SubscribeTopics(topics Topics) error
	SubscribeTopicPattern(regexp *regexp.Regexp) error
	Poll(ctx context.Context) (Message, Topic, error)
	ProcessMessage(ctx context.Context,
		consumedTopic Topic,
		receivedMessage Message,
		messageProcessedTopic *Topic,
		outgoingMessage *Message) error
	Commit() error
	SetPreParseHook(func(topic Topic, message Message) bool)
	SetPreProcessHook(func(topic Topic, message Message) bool)
	SetPostProcessHook(func(topic Topic, message Message) bool)
	Close()
}

EventConsumer is an interface for event consumers

type EventConsumerInterceptor

type EventConsumerInterceptor interface {
	// IncomingMessagePreParsing is called before any other processing takes place for each incoming message. It may be e.g. be used
	// to provide alternate processing of incoming messages.
	IncomingMessagePreParsing(consumedTopic Topic, receivedMessage Message) bool
	// IncomingMessagePreProcessMessage is called for each message, immediately after parsing of incoming topic and message strings and
	// before any other processing takes place for each incoming message. It may e.g. be used to provide alternate processing of incoming messages.
	IncomingMessagePreProcessMessage(consumedTopic Topic, receivedMessage Message) bool
	// IncomingMessagePostProcessMessage is called for each message, immediately after processing the incoming message, but before any
	// response is sent out be the {@link EventConsumer}. It may e.g. be used to provide alternate responses to incoming messages.
	IncomingMessagePostProcessMessage(consumedTopic Topic, receivedMessage Message) bool
	// HandleError is called upon any exception being thrown within the {@link EventConsumer's} processing code.
	// This includes parse exceptions and message incompatibility exceptions.
	HandleError(consumedTopic Topic, receivedMessage Message, err error) bool
	// ProcessingCompletedNormally is called after all processing of an incoming message is completed, but only if the
	// request completes normally (i.e. no exception is thrown).
	ProcessingCompletedNormally(consumedTopic Topic, receivedMessage Message)
}

EventConsumerInterceptor is a ... TODO:

type EventConsumerInterceptorAdaptor

type EventConsumerInterceptorAdaptor struct{}

EventConsumerInterceptorAdaptor is a struct implementing all methods for EventConsumerInterceptor, providing a No-op implementagion of all methods, always returning true

func (EventConsumerInterceptorAdaptor) HandleError

func (eventConsumerInterceptorAdaptor EventConsumerInterceptorAdaptor) HandleError(consumedTopic Topic, receivedMessage Message, err error) bool

HandleError is called upon any exception being thrown within the {@link EventConsumer's} processing code. This includes parse exceptions and message incompatibility exceptions.

func (EventConsumerInterceptorAdaptor) IncomingMessagePostProcessMessage

func (eventConsumerInterceptorAdaptor EventConsumerInterceptorAdaptor) IncomingMessagePostProcessMessage(consumedTopic Topic, receivedMessage Message) bool

IncomingMessagePostProcessMessage is called for each message, immediately after processing the incoming message, but before any response is sent out be the {@link EventConsumer}. It may e.g. be used to provide alternate responses to incoming messages.

func (EventConsumerInterceptorAdaptor) IncomingMessagePreParsing

func (eventConsumerInterceptorAdaptor EventConsumerInterceptorAdaptor) IncomingMessagePreParsing(consumedTopic Topic, receivedMessage Message) bool

IncomingMessagePreParsing is called before any other processing takes place for each incoming message. It may be e.g. be used to provide alternate processing of incoming messages.

func (EventConsumerInterceptorAdaptor) IncomingMessagePreProcessMessage

func (eventConsumerInterceptorAdaptor EventConsumerInterceptorAdaptor) IncomingMessagePreProcessMessage(consumedTopic Topic, receivedMessage Message) bool

IncomingMessagePreProcessMessage is called for each message, immediately after parsing of incoming topic and message strings and before any other processing takes place for each incoming message. It may e.g. be used to provide alternate processing of incoming messages.

func (EventConsumerInterceptorAdaptor) ProcessingCompletedNormally

func (eventConsumerInterceptorAdaptor EventConsumerInterceptorAdaptor) ProcessingCompletedNormally(consumedTopic Topic, receivedMessage Message)

ProcessingCompletedNormally is called after all processing of an incoming message is completed, but only if the request completes normally (i.e. no exception is thrown).

type EventProcessor

type EventProcessor interface {
	ProcessMessage(ctx context.Context,
		consumedTopic Topic,
		receivedMessage Message,
		messageProcessedTopic *Topic,
		outgoingMessage *Message) error
}

EventProcessor is an interface for EventProcessors

type EventProducer

type EventProducer interface {
	Produce(topic Topic, message Message) error
	Close()
}

EventProducer is an interface for eventproducers

type Header struct {
	HeaderVersion  Version      `json:"version,omitempty"`
	Sender         string       `json:"sender,omitempty"`
	BodyCategory   BodyCategory `json:"bodyCategory,omitempty"`
	BodyType       string       `json:"bodyType,omitempty"`
	ContentVersion string       `json:"contentVersion,omitempty"`
	Prefer         Prefer       `json:"prefer,omitempty"`
	Etag           string       `json:"etag,omitempty"`
	IfMatch        string       `json:"ifMatch,omitempty"`
	IfNoneExist    string       `json:"ifNoneExist,omitempty"`
	Location       string       `json:"location,omitempty"`
	CorrelationID  string       `json:"correlationId,omitempty"`
	TransactionID  string       `json:"transactionId,omitempty"`
	Security       string       `json:"security,omitempty"`
	Session        string       `json:"session,omitempty"`
}

Header is the header of the message

type Message

type Message struct {
	Header Header `json:"header,omitempty"`
	Body   string `json:"body,omitempty"`
}

Message is struct for producing messages on the form:

{
  "header":
  {
    "sender": "obs-input-service",
    "bodyCategory": "FHIR",
    "bodyType": "Observation",
    "contentVersion": "3.3.0",
    "location":"http://fhirtest.uhn.ca/baseDstu3/Observation/15354/_history/1"
    "correlationId": "xez5ZXQcDG6"
    "transactionId": "e6651fe2-fb8e-4a54-8b8e-7343dbdb997c"
    "security": ...
  },
  "body":
  {
    "resourceType": "Observation",
    ...
  }
}

func NewMessage

func NewMessage() Message

NewMessage functions as a constructor for Message that returns a new instance of Message

func (*Message) CloneFields

func (m *Message) CloneFields(message Message) error

CloneFields copies the fields from the provided message, to this message.

func (*Message) GetBody

func (m *Message) GetBody() string

GetBody returns the body of the message as a string

func (*Message) GetBodyCategory

func (m *Message) GetBodyCategory() BodyCategory

GetBodyCategory returns the bodyCategory of the message

func (*Message) GetBodyType

func (m *Message) GetBodyType() string

GetBodyType returns the bodytype of the message

func (*Message) GetContentVersion

func (m *Message) GetContentVersion() string

GetContentVersion returns the content version of the message

func (*Message) GetCorrelationID

func (m *Message) GetCorrelationID() string

GetCorrelationID returns the correlationID of the message

func (*Message) GetEtag

func (m *Message) GetEtag() string

GetEtag returns the etag header of the message

func (*Message) GetEtagVersion

func (m *Message) GetEtagVersion() string

GetEtagVersion returns the version part of an E-tag. E.g. if Etag contains the weak etag 'W/"4232"' this function will return the string 4232

func (*Message) GetHeaderVersion

func (m *Message) GetHeaderVersion() Version

GetHeaderVersion returns the headerVersion of the message

func (*Message) GetIfMatch

func (m *Message) GetIfMatch() string

GetIfMatch returns the ifMatch header of the message

func (*Message) GetIfMatchVersion

func (m *Message) GetIfMatchVersion() string

GetIfMatchVersion returns the version part of an If-Match header. E.g. if If-Match contains the weak etag 'W/"4232"' this function will return the string 4232

func (*Message) GetIfNoneExist

func (m *Message) GetIfNoneExist() string

GetIfNoneExist returns the ifMatch header of the message

func (*Message) GetLocation

func (m *Message) GetLocation() string

GetLocation returns the location header of the message

func (*Message) GetPrefer

func (m *Message) GetPrefer() Prefer

GetPrefer returns the prefer header of the message

func (*Message) GetSecurity

func (m *Message) GetSecurity() string

GetSecurity returns the security header of the message

func (*Message) GetSender

func (m *Message) GetSender() string

GetSender returns the sender of the message

func (*Message) GetSession

func (m *Message) GetSession() string

GetSession returns the session header of the message

func (*Message) GetTransactionID

func (m *Message) GetTransactionID() string

GetTransactionID returns the transactionID of the message

func (*Message) SetBody

func (m *Message) SetBody(body string) *Message

SetBody sets the body of the message

func (*Message) SetBodyCategory

func (m *Message) SetBodyCategory(bodyCategory BodyCategory) *Message

SetBodyCategory sets the bodyCategory of the message

func (*Message) SetBodyType

func (m *Message) SetBodyType(bodyType string) *Message

SetBodyType sets the bodytype of the message

func (*Message) SetContentVersion

func (m *Message) SetContentVersion(contentVersion string) *Message

SetContentVersion sets the content version of the message

func (*Message) SetCorrelationID

func (m *Message) SetCorrelationID(correlationID string) *Message

SetCorrelationID sets the correlationID of the message

func (*Message) SetEtag

func (m *Message) SetEtag(etag string) *Message

SetEtag sets the etag header of the message

func (*Message) SetHeaderVersion

func (m *Message) SetHeaderVersion(version Version) *Message

SetHeaderVersion sets the headerVersion of the message

func (*Message) SetIfMatch

func (m *Message) SetIfMatch(ifMatch string) *Message

SetIfMatch sets the ifMatch header of the message

func (*Message) SetIfNoneExist

func (m *Message) SetIfNoneExist(ifNoneExist string) *Message

SetIfNoneExist sets the ifNoneExist header of the message

func (*Message) SetLocation

func (m *Message) SetLocation(location string) *Message

SetLocation sets the location header of the message

func (*Message) SetPrefer

func (m *Message) SetPrefer(prefer Prefer) *Message

SetPrefer sets the prefer header of the message

func (*Message) SetSecurity

func (m *Message) SetSecurity(security string) *Message

SetSecurity sets the security header of the message

func (*Message) SetSender

func (m *Message) SetSender(sender string) *Message

SetSender sets the sender of the message

func (*Message) SetSession

func (m *Message) SetSession(session string) *Message

SetSession sets the session header of the message

func (*Message) SetTransactionID

func (m *Message) SetTransactionID(transactionID string) *Message

SetTransactionID sets the transactionID of the message

func (*Message) SetWeakEtagVersion

func (m *Message) SetWeakEtagVersion(etag string) *Message

SetWeakEtagVersion sets the etag version formatted as a weak Etag

func (*Message) SetWeakIfMatchVersion

func (m *Message) SetWeakIfMatchVersion(ifMatch string) *Message

SetWeakIfMatchVersion sets the ifMatch header formatted as a weak Etag

func (*Message) ToByteArray

func (m *Message) ToByteArray() []byte

ToByteArray returns message as ByteArray

type Operation

type Operation int

Operation is an enum for the set of valid topic Operations.

const (
	// Events
	InputReceived Operation = iota + 1
	DataValidated
	DataConverted
	DataCreated
	DataUpdated
	TransactionCompleted
	ProcessingFailed
	Error
	// Commands
	Create
	Update
	Delete
)

The valid Operation values

func OperationFromString

func OperationFromString(operationString string) (Operation, error)

OperationFromString parses a string into an Operation

func (Operation) String

func (o Operation) String() string

type OperationOutcome

type OperationOutcome struct {
	ResourceType string                `json:"resourceType,omitempty"`
	ID           string                `json:"id,omitempty"`
	Text         interface{}           `json:"text,omitempty"`
	Issue        OperationOutcomeIssue `json:"issue,omitempty"`
}

OperationOutcome is a collection of error, warning or information messages that result from a system action.

func NewOperationOutcome

func NewOperationOutcome(severity string, code string, diagnostics string) OperationOutcome

NewOperationOutcome functions as a constructor for OperationOutcome and returns a new instance of OperationOutcome

func (*OperationOutcome) GetCode

func (ou *OperationOutcome) GetCode() string

GetCode returns the Code of the OperationOutcome

func (*OperationOutcome) GetDiagnostics

func (ou *OperationOutcome) GetDiagnostics() string

GetDiagnostics returns the Diagnostics of the OperationOutcome

func (*OperationOutcome) GetSeverity

func (ou *OperationOutcome) GetSeverity() string

GetSeverity returns the Severity of the OperationOutcome

func (*OperationOutcome) SetCode

func (ou *OperationOutcome) SetCode(code string) *OperationOutcome

SetCode sets the Code of the OperationOutcome

func (*OperationOutcome) SetDiagnostics

func (ou *OperationOutcome) SetDiagnostics(diagnostics string) *OperationOutcome

SetDiagnostics sets the Diagnostics of the OperationOutcome

func (*OperationOutcome) SetSeverity

func (ou *OperationOutcome) SetSeverity(severity string) *OperationOutcome

SetSeverity sets the Severity of the OperationOutcome

type OperationOutcomeIssue

type OperationOutcomeIssue struct {
	Severity    string          `json:"severity,omitempty"`
	Code        string          `json:"code,omitempty"`
	Details     CodeableConcept `json:"details,omitempty"`
	Diagnostics string          `json:"diagnostics,omitempty"`
	Location    string          `json:"location,omitempty"`
	Expression  string          `json:"expression,omitempty"`
}

OperationOutcomeIssue is a single issue associated with the action

type Prefer

type Prefer int

Prefer is an enum for the set of valid message prefers.

const (
	REPRESENTATION Prefer = iota + 1
	OPERATIONOUTCOME
	MINIMAL
)

The valid prefer values

func PreferFromString

func PreferFromString(catString string) (Prefer, error)

PreferFromString parses a string into a prefer

func (Prefer) MarshalJSON

func (p Prefer) MarshalJSON() ([]byte, error)

MarshalJSON marshals Prefer as a quoted json string

func (Prefer) String

func (p Prefer) String() string

func (*Prefer) UnmarshalJSON

func (p *Prefer) UnmarshalJSON(byteArray []byte) error

UnmarshalJSON unmashals a quoted json string to Prefer

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic is a struct representing a Topic

func NewTopic

func NewTopic() Topic

NewTopic functions as a constructor for Topic that returns a new instance of Topic

func NewTopicFromTopic

func NewTopicFromTopic(topic Topic) Topic

NewTopicFromTopic functions as a constructor for Topic that returns a new instance of Topic based on the provided topic

func TopicFromString

func TopicFromString(topicString string) (Topic, error)

TopicFromString creates a topic from the provided string

func (*Topic) AddDataCode

func (t *Topic) AddDataCode(dataCode string) *Topic

AddDataCode adds a dataCode to the topics dataCode array

func (*Topic) Equals

func (t *Topic) Equals(other Topic) bool

Equals checks whether the topic upon which this method was called, is equal to the provided other topic

func (*Topic) GetDataCategory

func (t *Topic) GetDataCategory() BodyCategory

GetDataCategory returns the DataCategory of the topic

func (*Topic) GetDataCodes

func (t *Topic) GetDataCodes() []string

GetDataCodes returns the DataCodes of the topic

func (*Topic) GetDataType

func (t *Topic) GetDataType() string

GetDataType returns the DataType of the topic

func (*Topic) GetOperation

func (t *Topic) GetOperation() Operation

GetOperation returns the Operation of the topic

func (*Topic) SetDataCategory

func (t *Topic) SetDataCategory(dataCategory BodyCategory) *Topic

SetDataCategory sets DataCategory of Topic

func (*Topic) SetDataCodes

func (t *Topic) SetDataCodes(dataCodes []string) *Topic

SetDataCodes sets DataCodes of Topic

func (*Topic) SetDataType

func (t *Topic) SetDataType(dataType string) *Topic

SetDataType sets DataType of Topic

func (*Topic) SetOperation

func (t *Topic) SetOperation(operation Operation) *Topic

SetOperation sets the Operation of the Topic

func (*Topic) String

func (t *Topic) String() string

ToString returns the topic as a human readable, dash-separated string

type Topics

type Topics []Topic

Topics is a slice topics

func (Topics) String

func (t Topics) String() string

ToString returns the topics as a human readable, comma-separated string

func (Topics) StringArray

func (t Topics) StringArray() []string

StringArray returns the topics as an array og strings

type Version

type Version struct {
	Major int
	Minor int
	Patch int
}

Version is an object that represents a version in semantic versioning

func NewVersion

func NewVersion(versionString string) Version

NewVersion acts as a constructor for Version and returns a new instance of version, based on the provided versionstring

func (Version) IsCompatible

func (v Version) IsCompatible(version Version) bool

IsCompatible compares two versions for compatibility

func (Version) MarshalJSON

func (v Version) MarshalJSON() ([]byte, error)

MarshalJSON marshals BodyCategory as a quoted json string

func (Version) String

func (v Version) String() string

func (*Version) UnmarshalJSON

func (v *Version) UnmarshalJSON(byteArray []byte) error

UnmarshalJSON unmashals a quoted json string to BodyCategory

Directories

Path Synopsis
implementations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL