plugin

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package plugin provides the gRPC-based plugin interface for ZMS observers.

This file defines the go-plugin wrapper for the gRPC observer service. Plugins run as separate processes and communicate via gRPC.

Package plugin provides interfaces and types for creating ZMS observer plugins.

This package defines the plugin system for ZMS (Zabbix Metric Shipper), enabling developers to create custom observer implementations that can process Zabbix export data and send it to various destinations.

The plugin system supports dynamic loading of shared libraries (.so files) that implement the Observer interface. Plugins have access to core ZMS functionality through the BaseObserver interface, including filtering, metrics, and buffering.

Creating a Plugin:

1. Implement the Observer interface 2. Embed BaseObserverImpl for core functionality 3. Export PluginInfo variable and NewObserver() function 4. Compile as shared library: go build -buildmode=plugin

Example plugin structure:

package main

import "zms.szuro.net/pkg/plugin"

var PluginInfo = plugin.PluginInfo{
    Name: "my-plugin",
    Version: "1.0.0",
    Description: "Example plugin",
    Author: "Developer",
}

type MyPlugin struct {
    plugin.BaseObserverImpl
    // Custom fields
}

func NewObserver() plugin.Observer {
    return &MyPlugin{}
}

func (p *MyPlugin) Initialize(connection string, options map[string]string) error {
    // Plugin initialization logic
    return nil
}

func (p *MyPlugin) SaveHistory(h []zbx.History) bool {
    // Process history data
    return true
}

Index

Constants

This section is empty.

Variables

View Source
var Handshake = plugin.HandshakeConfig{
	ProtocolVersion:  1,
	MagicCookieKey:   "ZMS_PLUGIN",
	MagicCookieValue: "zabbix_metric_shipper",
}

Handshake is the shared configuration between ZMS and plugins. This must match exactly between the main application and all plugins to ensure compatibility.

Functions

func StringToExportType

func StringToExportType(s string) proto.ExportType

StringToExportType converts export type string to proto.ExportType.

func ZbxEventToProto

func ZbxEventToProto(e *zbx.Event) *proto.Event

ZbxEventToProto converts a single zbx.Event to proto.Event.

func ZbxEventsToProto

func ZbxEventsToProto(zbxEvents []zbx.Event) []*proto.Event

ZbxEventsToProto converts zbx.Event slice to proto.Event slice.

func ZbxHistorySliceToProto

func ZbxHistorySliceToProto(zbxHistory []zbx.History) []*proto.History

ZbxHistorySliceToProto converts zbx.History slice to proto.History slice.

func ZbxHistoryToProto

func ZbxHistoryToProto(h *zbx.History) *proto.History

ZbxHistoryToProto converts a single zbx.History to proto.History.

func ZbxTrendToProto

func ZbxTrendToProto(t *zbx.Trend) *proto.Trend

ZbxTrendToProto converts a single zbx.Trend to proto.Trend.

func ZbxTrendsToProto

func ZbxTrendsToProto(zbxTrends []zbx.Trend) []*proto.Trend

ZbxTrendsToProto converts zbx.Trend slice to proto.Trend slice.

Types

type BaseObserverGRPC

type BaseObserverGRPC struct {
	// Name is the configured name of this observer instance
	Name string

	// PluginName is the plugin type identifier
	PluginName string

	// Filter handles tag-based filtering for this observer
	Filter filter.Filter

	// Logger provides structured logging
	Logger *slog.Logger
	// contains filtered or unexported fields
}

BaseObserverGRPC provides core functionality for gRPC-based observer plugins. Plugins should embed this struct to get access to filtering, metrics, and logging.

Unlike the old plugin system, this base observer only handles: - Filtering (initialized during Initialize RPC) - Metrics (initialized during Initialize RPC) - Logging

Buffer management is optional and can be implemented by plugins using the buffer package.

func NewBaseObserverGRPC

func NewBaseObserverGRPC() *BaseObserverGRPC

NewBaseObserverGRPC creates a new BaseObserverGRPC instance. Plugins should call this in their constructor.

func (*BaseObserverGRPC) FilterEvents

func (b *BaseObserverGRPC) FilterEvents(events []*proto.Event) []zbx.Event

FilterEvents applies the configured filter to event data. Returns only the event records that pass the filter.

func (*BaseObserverGRPC) FilterHistory

func (b *BaseObserverGRPC) FilterHistory(history []*proto.History) []zbx.History

FilterHistory applies the configured filter to history data. Returns only the history records that pass the filter.

func (*BaseObserverGRPC) FilterTrends

func (b *BaseObserverGRPC) FilterTrends(trends []*proto.Trend) []zbx.Trend

FilterTrends applies the configured filter to trend data. Returns only the trend records that pass the filter.

func (*BaseObserverGRPC) Initialize

Initialize handles common initialization tasks for all plugins. Plugins should call this method in their Initialize implementation before doing plugin-specific initialization.

This method: - Stores the observer name and configuration - Sets up filtering based on the provided filter config - Initializes Prometheus metrics

type ObserverPlugin

type ObserverPlugin struct {
	plugin.Plugin
	// Impl is the concrete implementation of the observer
	Impl proto.ObserverServiceServer
}

ObserverPlugin is the implementation of the plugin.Plugin interface for HashiCorp go-plugin. This handles the gRPC server/client setup.

func (*ObserverPlugin) GRPCClient

func (p *ObserverPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error)

GRPCClient creates a client that communicates with the plugin. This is called by the main ZMS application when connecting to a plugin.

func (*ObserverPlugin) GRPCServer

func (p *ObserverPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error

GRPCServer registers the observer implementation with the gRPC server. This is called by go-plugin when starting the plugin process.

type PluginInfo

type PluginInfo struct {
	// Name is the human-readable name of the plugin.
	Name string

	// Version is the semantic version of the plugin (e.g., "1.0.0").
	Version string

	// Description provides a brief description of what the plugin does.
	Description string

	// Author identifies who created or maintains the plugin.
	Author string
}

PluginInfo contains metadata about a plugin. Plugins should export a variable of this type named "PluginInfo" to provide information about the plugin to ZMS and users.

Example:

var PluginInfo = plugin.PluginInfo{
    Name:        "my-plugin",
    Version:     "1.0.0",
    Description: "Example plugin for processing data",
    Author:      "Plugin Developer",
}

type ZMSBuffer

type ZMSBuffer interface {

	// BufferHistory stores history records in the offline buffer.
	// Used when the target destination is unavailable.
	BufferHistory(history []zbx.History) (err error)

	// FetchHistory retrieves a specified number of history records from the buffer.
	// Returns up to 'number' records, or fewer if not enough are available.
	FetchHistory(number int) (history []zbx.History, err error)

	// DeleteHistory removes the specified history records from the buffer.
	// Used to clean up successfully processed records.
	DeleteHistory(history []zbx.History) (err error)

	// BufferTrends stores trend records in the offline buffer.
	// Used when the target destination is unavailable.
	BufferTrends(trends []zbx.Trend) (err error)

	// FetchTrends retrieves a specified number of trend records from the buffer.
	// Returns up to 'number' records, or fewer if not enough are available.
	FetchTrends(number int) (trends []zbx.Trend, err error)

	// DeleteTrends removes the specified trend records from the buffer.
	// Used to clean up successfully processed records.
	DeleteTrends(trends []zbx.Trend) (err error)

	// BufferEvents stores event records in the offline buffer.
	// Used when the target destination is unavailable.
	BufferEvents(events []zbx.Event) (err error)

	// FetchEvents retrieves a specified number of event records from the buffer.
	// Returns up to 'number' records, or fewer if not enough are available.
	FetchEvents(number int) (events []zbx.Event, err error)

	// DeleteEvents removes the specified event records from the buffer.
	// Used to clean up successfully processed records.
	DeleteEvents(events []zbx.Event) (err error)

	// InitBuffer initializes the buffer with the specified path and TTL.
	// bufferPath: directory where buffer data will be stored
	// t: time-to-live in hours for buffered records
	InitBuffer(bufferPath string, t int64)

	// Cleanup releases buffer resources and closes the database connection.
	Cleanup()
}

ZMSBuffer defines the interface for offline data buffering. This interface provides persistent storage for Zabbix export data when the target destination is temporarily unavailable, ensuring data reliability and delivery guarantees.

The buffer uses BadgerDB for local persistence and supports TTL-based expiration to prevent unbounded storage growth. Data is serialized using gob encoding.

type ZMSDefaultBuffer

type ZMSDefaultBuffer struct {
	// contains filtered or unexported fields
}

ZMSDefaultBuffer is the default implementation of ZMSBuffer using BadgerDB. It provides persistent, transactional storage with configurable TTL for automatic cleanup. Data is stored using gob encoding and indexed by hash keys generated from export records.

func (ZMSDefaultBuffer) BufferEvents

func (b ZMSDefaultBuffer) BufferEvents(events []zbx.Event) (err error)

func (ZMSDefaultBuffer) BufferHistory

func (b ZMSDefaultBuffer) BufferHistory(history []zbx.History) (err error)

func (ZMSDefaultBuffer) BufferTrends

func (b ZMSDefaultBuffer) BufferTrends(trends []zbx.Trend) (err error)

func (ZMSDefaultBuffer) Cleanup

func (b ZMSDefaultBuffer) Cleanup()

Cleanup releases resources held by the baseObserver. If offlineBufferTTL is greater than zero, it closes the buffer to free associated resources.

func (ZMSDefaultBuffer) DeleteEvents

func (b ZMSDefaultBuffer) DeleteEvents(events []zbx.Event) (err error)

func (ZMSDefaultBuffer) DeleteHistory

func (b ZMSDefaultBuffer) DeleteHistory(history []zbx.History) (err error)

func (ZMSDefaultBuffer) DeleteTrends

func (b ZMSDefaultBuffer) DeleteTrends(trends []zbx.Trend) (err error)

func (ZMSDefaultBuffer) FetchEvents

func (b ZMSDefaultBuffer) FetchEvents(number int) (events []zbx.Event, err error)

func (ZMSDefaultBuffer) FetchHistory

func (b ZMSDefaultBuffer) FetchHistory(number int) (history []zbx.History, err error)

func (ZMSDefaultBuffer) FetchTrends

func (b ZMSDefaultBuffer) FetchTrends(number int) (trends []zbx.Trend, err error)

func (ZMSDefaultBuffer) InitBuffer

func (b ZMSDefaultBuffer) InitBuffer(bufferPath string, ttl int64)

InitBuffer initializes the BadgerDB buffer with the specified path and TTL. If TTL is 0, buffering is disabled. Otherwise, a BadgerDB instance is created at the specified path with automatic TTL-based cleanup.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL