-
Import the package
import (
"github.com/arquivei/gomsgprocessor"
)
-
Define a incoming message struct
type ExampleMessage struct {
ID int `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
City string `json:"city"`
State string `json:"state"`
ChildrenNames []string `json:"childrenNames"`
Namespace string `json:"namespace"`
}
-
Implement the Message interface, witch is the input of ParallelProcessor's MakeDocuments.
func (e *ExampleMessage) GetNamespace() gomsgprocessor.Namespace {
// Namespace is a logical separator that will be used to group messages while processing then.
return gomsgprocessor.Namespace(e.Namespace)
}
func (e *ExampleMessage) GetType() gomsgprocessor.MessageType {
// MessageType is used to decide which DocumentBuilder to use for each Message.
return gomsgprocessor.MessageType("typeExample")
}
func (e *ExampleMessage) UpdateLogWithData(ctx context.Context) {
// Optional logger method
log.Ctx(ctx).UpdateContext(func(zc zerolog.Context) zerolog.Context {
return zc.
Int("msg_id", e.ID).
Str("msg_name", e.Name).
Strs("msg_children_names", e.ChildrenNames).
Int("msg_age", e.Age).
Str("msg_city", e.City).
Str("msg_state", e.State).
Str("msg_type", string(e.GetType())).
Str("msg_namespace", e.Namespace)
})
}
-
Define a outcoming document struct, witch is the result of a DocumentBuilder's Build.
type ExampleDocument struct {
ID string
CreatedAt time.Time
ParentName string
ParentBirthYear int
ChildName string
CityAndState string
Namespace string
}
-
Implement the DocumentBuilder interface, witch transforms a Message into a slice of Documents.
type ExampleBuilder struct{}
// Build transforms a Message into []Document.
func (b *ExampleBuilder) Build(_ context.Context, msg gomsgprocessor.Message) ([]gomsgprocessor.Document, error) {
exampleMsg, ok := msg.(*ExampleMessage)
if !ok {
return nil, errors.New("failed to cast message")
}
// Parallel Processor will ignore this message
if len(exampleMsg.ChildrenNames) == 0 {
return nil, nil
}
documents := make([]gomsgprocessor.Document, 0, len(exampleMsg.ChildrenNames))
for _, childName := range exampleMsg.ChildrenNames {
documents = append(documents, ExampleDocument{
ID: strconv.Itoa(exampleMsg.ID) + "_" + childName,
CreatedAt: time.Now(),
ParentName: exampleMsg.Name,
CityAndState: exampleMsg.City + " - " + exampleMsg.State,
ChildName: childName,
ParentBirthYear: time.Now().Year() - exampleMsg.Age,
Namespace: exampleMsg.Namespace,
})
}
return documents, nil
}
-
Define a (optional) function, used for deduplicate the slice of documents.
func ExampleDeduplicateDocuments(documents []gomsgprocessor.Document) ([]gomsgprocessor.Document, error) {
examplesDocuments := make([]ExampleDocument, 0, len(documents))
for _, document := range documents {
exampleDocument, ok := document.(ExampleDocument)
if !ok {
return nil, errors.New("failed to cast document")
}
examplesDocuments = append(examplesDocuments, exampleDocument)
}
documentsByID := make(map[string]ExampleDocument, len(examplesDocuments))
for _, exampleDocument := range examplesDocuments {
documentsByID[exampleDocument.ID] = exampleDocument
}
deduplicatedDocuments := make([]gomsgprocessor.Document, 0, len(documentsByID))
for _, documentByID := range documentsByID {
deduplicatedDocuments = append(deduplicatedDocuments, documentByID)
}
return deduplicatedDocuments, nil
}
-
And now, it's time!
func main() {
// NewParallelProcessor returns a new ParallelProcessor with a map of
// DocumentBuilder for each MessageType.
//
// A list of Option is also available for this method. See option.go for more
// information.
parallelProcessor := gomsgprocessor.NewParallelProcessor(
map[gomsgprocessor.MessageType]gomsgprocessor.DocumentBuilder{
"typeExample": &ExampleBuilder{},
},
gomsgprocessor.WithDeduplicateDocumentsOption(ExampleDeduplicateDocuments),
)
messages := []gomsgprocessor.Message{
&ExampleMessage{
ID: 1,
Name: "John",
Age: 30,
City: "New York",
State: "NY",
ChildrenNames: []string{"John", "Jane", "Mary"},
Namespace: "namespace1",
},
&ExampleMessage{
ID: 2,
Name: "Poul",
Age: 25,
City: "New Jersey",
State: "NY",
ChildrenNames: []string{},
Namespace: "namespace1",
},
&ExampleMessage{
ID: 3,
Name: "Chris",
Age: 35,
City: "Washington",
State: "DC",
ChildrenNames: []string{"Bob"},
Namespace: "namespace1",
},
&ExampleMessage{
ID: 3,
Name: "Chris",
Age: 35,
City: "Washington",
State: "DC",
ChildrenNames: []string{"Bob"},
Namespace: "namespace2",
},
&ExampleMessage{
ID: 1,
Name: "John",
Age: 30,
City: "New York",
State: "NY",
ChildrenNames: []string{"John", "Jane", "Mary"},
Namespace: "namespace1",
},
}
// MakeDocuments creates in parallel a slice of Document for given []Message
// using the map of DocumentBuilder (see NewParallelProcessor).
//
// This method returns a []Document and a (foundationkit/errors).Error.
// If not nil, this error has a (foundationkit/errors).Code associated with and
// can be a ErrCodeBuildDocuments or a ErrCodeDeduplicateDocuments.
documents, err := parallelProcessor.MakeDocuments(context.Background(), messages)
if err != nil {
panic(err)
}
examplesDocuments := make([]ExampleDocument, 0, len(documents))
for _, document := range documents {
exampleDocument, ok := document.(ExampleDocument)
if !ok {
panic("failed to cast document")
}
examplesDocuments = append(examplesDocuments, exampleDocument)
}
fmt.Println(JSONMarshal(examplesDocuments))
}
// Simple json marshaler with indentation
func JSONMarshal(t interface{}) (string, error) {
buffer := &bytes.Buffer{}
encoder := json.NewEncoder(buffer)
encoder.SetEscapeHTML(false)
encoder.SetIndent("", " ")
err := encoder.Encode(t)
return buffer.String(), err
}