consumers

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: Apache-2.0 Imports: 13 Imported by: 11

README

Consumers

Consumers provide an abstraction for various “Magistrala consumers”.

A consumer is a generic plugin‑style service that handles received messages — for example, writing them to a database, sending notifications, or transforming them. Before consuming, messages from Magistrala can be transformed (e.g. to JSON or SenML) to match what a specific consumer expects.

This service (Notifiers) is optional — to use it, core services must be running (e.g. message broker + clients + channels etc.).

Concepts & Consumer Types

Consumer Interfaces

The service supports two main consumer interfaces in code:

  • BlockingConsumer — a synchronous consumer interface. Such a consumer processes the incoming message and returns an error if something goes wrong.
  • AsyncConsumer — an asynchronous consumer interface. The consumer receives messages and processes them asynchronously; errors can be monitored via an error channel returned by Errors().

A consumer implementation may wrap message parsing or transformation logic (e.g. converting to SenML/JSON) before invoking its own consume logic.

Message Flow

When a subscriber receives messages from the message broker:

  1. Messages may be transformed (e.g. via a transformer for SenML or JSON) based on configuration.
  2. The transformed message is passed to a consumer — either synchronously (BlockingConsumer) or asynchronously (AsyncConsumer).
  3. The consumer handles the message (e.g. storing to DB, sending notifications, writing files, etc.).

Consumers are decoupled from core messaging logic, making them flexible and pluggable.

Supported Consumers

The following consumer plugins are supported within the Magistrala repository:

Consumer Type Description Link
SMPP Notifier Sends SMS messages via SMPP smpp consumer
SMTP Notifier Sends email notifications via SMTP smtp consumer
Postgres Writer Stores messages in a PostgreSQL database postgres writer
Timescale Writer Stores messages in TimescaleDB (optimized for time-series) timescale writer

Each consumer has its own README with deployment instructions, configuration, and usage examples.

Notifier API (for Notifications)

The Notifiers service exposes an HTTP API to manage subscriptions and send notifications when messages are consumed. The API supports:

  • Creating subscriptions
  • Listing and filtering subscriptions
  • Viewing a subscription by ID
  • Deleting a subscription
Available Endpoints
Endpoint Method Description
/subscriptions POST Create a new subscription (topic + contact)
/subscriptions GET List subscriptions (with optional filters)
/subscriptions/{id} GET Retrieve a specific subscription by ID
/subscriptions/{id} DELETE Remove a subscription
Example: Create Subscription
curl -X POST http://localhost:9014/subscriptions \
  -H "Authorization: Bearer <user_access_token>" \
  -H "Content-Type: application/json" \
  -d '{
    "topic": "some/topic/subtopic",
    "contact": "user@example.com"
  }'

For more information about service capabilities and its usage, please check out the API documentation.

Best Practices

  • Use async consumers when message processing is long or non-blocking (e.g. writing to external APIs).
  • Always read from error channels in async consumers to avoid silent failures.
  • Use message transformers to adapt data format (SenML or JSON) to consumer needs.
  • Configure each consumer independently, allowing clear isolation and debugging.
  • Monitor subscription usage and prune stale entries regularly.

Versioning & Health Check

If the consumer exposes a /health endpoint, use it for service monitoring.

curl -X GET http://localhost:<port>/health \
  -H "accept: application/health+json"

Example response:

{
  "status": "pass",
  "version": "0.15.1",
  "description": "notifiers service",
  "build_time": "YYYY‑MM‑DDTHH:MM:SSZ"
}

For an in-depth explanation of the usage of consumers, as well as thorough understanding of Magistrala, please check out the official documentation.

Documentation

Overview

Package consumers contain the domain concept definitions needed to support Magistrala consumer services functionality.

Index

Constants

This section is empty.

Variables

View Source
var ErrNotify = errors.New("error sending notification")

ErrNotify wraps sending notification errors.

Functions

func Start

func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer any, configPath string, defaultTopic string, logger *slog.Logger) error

Start method starts consuming messages received from Message broker. This method transforms messages to SenML format before using MessageRepository to store them.

Types

type AsyncConsumer

type AsyncConsumer interface {
	// ConsumeAsync method is used to asynchronously consume received messages.
	ConsumeAsync(ctx context.Context, messages any)

	// Errors method returns a channel for reading errors which occur during async writes.
	// Must be  called before performing any writes for errors to be collected.
	// The channel is buffered(1) so it allows only 1 error without blocking if not drained.
	// The channel may receive nil error to indicate success.
	Errors() <-chan error
}

AsyncConsumer specifies a non-blocking message-consuming API, which can be used for writing data to the DB, publishing messages to broker, sending notifications, or any other asynchronous job.

type BlockingConsumer

type BlockingConsumer interface {
	// ConsumeBlocking method is used to consume received messages synchronously.
	// A non-nil error is returned to indicate operation failure.
	ConsumeBlocking(ctx context.Context, messages any) error
}

BlockingConsumer specifies a blocking message-consuming API, which can be used for writing data to the DB, publishing messages to broker, sending notifications... BlockingConsumer implementations might also support concurrent use, but consult implementation for more details.

type Notifier added in v0.20.0

type Notifier interface {
	// Notify method is used to send notification for the
	// received message to the provided list of receivers.
	Notify(from string, to []string, msg *messaging.Message) error
}

Notifier represents an API for sending notification.

Directories

Path Synopsis
Package notifiers contain the domain concept definitions needed to support Magistrala notifications functionality.
Package notifiers contain the domain concept definitions needed to support Magistrala notifications functionality.
api
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
postgres
Package postgres contains repository implementations using PostgreSQL as the underlying database.
Package postgres contains repository implementations using PostgreSQL as the underlying database.
smpp
Package smpp contains the domain concept definitions needed to support Magistrala SMS notifications.
Package smpp contains the domain concept definitions needed to support Magistrala SMS notifications.
tracing
Package tracing provides tracing instrumentation for Magistrala WebSocket adapter service.
Package tracing provides tracing instrumentation for Magistrala WebSocket adapter service.
Package writers contain the domain concept definitions needed to support Magistrala writer services functionality.
Package writers contain the domain concept definitions needed to support Magistrala writer services functionality.
api
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
Package api contains API-related concerns: endpoint definitions, middlewares and all resource representations.
postgres
Package postgres contains repository implementations using Postgres as the underlying database.
Package postgres contains repository implementations using Postgres as the underlying database.
timescale
Package timescale contains repository implementations using Timescale as the underlying database.
Package timescale contains repository implementations using Timescale as the underlying database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL