messaging

package
v0.0.0-...-16d7efa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: GPL-3.0 Imports: 7 Imported by: 0

README

Messaging Package

Overview

The messaging package provides an abstraction layer over NATS (a distributed messaging system) for Go-based microservices. It simplifies publishing and subscribing to various events within the Murmurations Network. This package handles the intricacies of NATS client setup, JSON marshaling, and message handling, allowing developers to focus on business logic.

Key Features

  • Simplified Event Publishing: Easily publish events without worrying about the underlying NATS client setup.
  • Event Subscriptions: Subscribe to specific subjects with queue support for load-balanced message handling.
  • Structured Event Data: Define and use structured data for events, enhancing code readability and maintainability.

Usage

Installation

To use the messaging package in your project, import it as follows:

import "github.com/MurmurationsNetwork/MurmurationsServices/messaging"
Publishing Events

You can publish events using either Publish or PublishSync functions. Publish is asynchronous, while PublishSync waits for an acknowledgment from the NATS server.

Example: Publishing an Event
err := messaging.Publish(messaging.NodeCreated, eventData)
if err != nil {
    // handle error
}
Example: Synchronous Publishing
err := messaging.PublishSync(messaging.NodeValidated, eventData)
if err != nil {
    // handle error
}
Subscribing to Events

Use the QueueSubscribe function to subscribe to a specific subject. This function ensures load balancing across multiple instances of your service.

Example: Subscribing to an Event
err := messaging.QueueSubscribe("subject", "queue", func(msg *nats.Msg) {
    // handle the message

    // acknowledge the message after successful processing.
    if ackErr := msg.Ack(); ackErr != nil {
    }
})
if err != nil {
    // handle error
}

Documentation

Index

Constants

View Source
const (
	// NodeCreated is the subject for an event where a node has been created.
	NodeCreated = "NODES.created"

	// NodeValidated is the subject for an event where a node has been
	// successfully validated.
	NodeValidated = "NODES.validated"

	// NodeValidationFailed is the subject for an event where a node's validation
	// has failed.
	NodeValidationFailed = "NODES.validation_failed"
)

Variables

This section is empty.

Functions

func Publish

func Publish(subject string, message any) error

Publish checks for an existing Publisher instance or creates one, and then publishes the message to the specified subject.

func PublishSync

func PublishSync(subject string, message any) error

func QueueSubscribe

func QueueSubscribe(
	subject, queue string,
	handler MessageHandler,
) error

QueueSubscribe checks for an existing Subscriber instance or creates one, and then subscribes to the specified queue.

Types

type MessageHandler

type MessageHandler func(msg *nats.Msg)

type NodeCreatedData

type NodeCreatedData struct {
	ProfileURL string `json:"profile_url"`
	Version    int32  `json:"version"`
}

type NodeValidatedData

type NodeValidatedData struct {
	// ProfileURL is the URL of the profile associated with the node.
	ProfileURL string `json:"profile_url"`

	// ProfileHash is the hash of the profile data.
	ProfileHash string `json:"profile_hash"`

	// ProfileStr is a string representation of the profile data.
	ProfileStr string `json:"profile_str"`

	// LastUpdated is a Unix timestamp indicating when the node data was last updated.
	LastUpdated int64 `json:"last_updated"`

	// Version is the version vector of the node.
	// https://en.wikipedia.org/wiki/Version_vector
	Version int32 `json:"version"`

	// Expires is a string representing the expiration date of the node.
	// It's optional and can be empty if the node doesn't have an expiration date.
	Expires *int64 `json:"expires,omitempty"`
}

NodeValidatedData represents the validated data of a node.

type NodeValidationFailedData

type NodeValidationFailedData struct {
	ProfileURL     string           `json:"profile_url"`
	FailureReasons *[]jsonapi.Error `json:"failure_reasons"`
	Version        int32            `json:"version"`
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL