command

package
v0.0.0-...-33cf76a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2025 License: MIT Imports: 11 Imported by: 0

README

Пакет command: Типобезопасная шина команд

Пакет command предоставляет строго типизированную, расширяемую реализацию паттерна "Шина Команд" (Command Bus) для Go, полностью интегрированную с OpenTelemetry для трассировки и сбора метрик.

🚀 Основные возможности

  • Гарантия типобезопасности: Использование дженериков [C Command[R], R any] на всех уровнях (диспетчер, провайдер, обработчики) исключает ошибки несоответствия типов на этапе компиляции.
  • Централизованный реестр: Registry действует как потокобезопасная фабрика и менеджер экземпляров диспетчеров, гарантируя уникальность диспетчера для каждого именованного типа команды.
  • Принцип единственного обработчика: В отличие от шины событий, каждая команда имеет ровно одного обработчика, что соответствует паттерну CQRS.
  • Расширяемость через провайдеры: Архитектура построена на интерфейсе Provider, что позволяет легко заменять локальную реализацию на распределенные системы, не затрагивая бизнес-логику.
  • Встроенная наблюдаемость (Observability): Мощная система Middleware позволяет добавлять сквозную функциональность. Пакет поставляется с готовыми middleware для:
    • Структурированного логирования с использованием slog.
    • Сбора метрик по стандарту OpenTelemetry (количество и длительность обработки команд).
    • Распределенной трассировки по стандарту OpenTelemetry.
  • Гибкая конфигурация: Применение паттерна "функциональные опции" позволяет детально настраивать диспетчер (логгер, трассировщик, кастомные middleware).

🏛️ Архитектура и компоненты

Система состоит из следующих ключевых компонентов:

  • Command[R]: Интерфейс-маркер для любой команды. R — это тип возвращаемого значения.
  • CommandHandler[C, R]: Строго типизированная функция, которая обрабатывает команду C и возвращает результат R.
  • IDispatcher[C, R]: Основной публичный интерфейс шины. Предоставляет методы Dispatch, Register и Shutdown для корректного завершения работы.
  • Registry: Потокобезопасный контейнер и фабрика для создания и получения экземпляров IDispatcher. Является основной точкой входа для работы с пакетом. Предоставляет метод Shutdown для освобождения ресурсов всех зарегистрированных диспетчеров.
  • NewDispatcher: Фабричная функция для прямого создания экземпляра диспетчера с необходимыми опциями, минуя Registry.
  • Provider[C, R]: Внутренний интерфейс, абстрагирующий механизм выполнения команд. IDispatcher делегирует всю работу этому интерфейсу.
  • localProvider[C, R]: Реализация Provider по умолчанию, обеспечивающая внутрипроцессное выполнение.
  • Middleware[C, R]: Интерфейс для middleware, оборачивающего Provider для реализации сквозной функциональности.
Поток данных

Диаграмма ниже иллюстрирует полный жизненный цикл отправки команды и ее обработки.

sequenceDiagram
    participant Client as Клиентский код
    participant Registry as Реестр
    participant Dispatcher as IDispatcher
    participant Middleware as Middleware (Tracing, Metrics, Logging)
    participant Provider as localProvider
    participant Handler as CommandHandler

    Client->>Registry: Запросить диспетчер для "user.create"
    Registry-->>Client: Вернуть экземпляр IDispatcher[CreateUserCmd, string]

    Client->>Dispatcher: Register(handler)
    Dispatcher->>Provider: Register(handler)
    note right of Provider: Provider оборачивается в Middleware
    Provider->>Handler: Сохранить ссылку на обработчик

    Client->>Dispatcher: Dispatch(ctx, cmd)
    Dispatcher->>Middleware: Dispatch(ctx, cmd)
    note right of Middleware: Start Span, Record Metrics, Log...
    Middleware->>Provider: Dispatch(ctx, cmd)
    Provider->>Handler: handler(ctx, cmd)
    Handler-->>Provider: (result, error)
    Provider-->>Middleware: (result, error)
    note right of Middleware: End Span, Update Metrics, Log...
    Middleware-->>Dispatcher: (result, error)
    Dispatcher-->>Client: (result, error)

📖 Примеры использования

1. Определение команды

Каждая команда — это структура, которая служит для передачи данных. Она неявно реализует интерфейс-маркер command.Command[R] за счет использования в CommandHandler. Для поддержки трассировки она также должна реализовывать command.Metadatable.

package main

import "github.com/x-research-team/dtx-framework/bus/command"

// CreateUserCmd - команда для создания пользователя.
// Возвращаемый тип - string (ID пользователя).
type CreateUserCmd struct {
	ID    string
	Email string
	meta  map[string]string
}

// NewCreateUserCmd - конструктор для команды.
func NewCreateUserCmd(id, email string) CreateUserCmd {
	return CreateUserCmd{
		ID:    id,
		Email: email,
		meta:  make(map[string]string),
	}
}

// Metadata реализует интерфейс command.Metadatable.
func (c *CreateUserCmd) Metadata() map[string]string {
	return c.meta
}
2. Создание обработчика

Обработчик - это функция, соответствующая сигнатуре command.CommandHandler.

package main

import (
	"context"
	"fmt"
)

// handleCreateUser обрабатывает команду CreateUserCmd.
func handleCreateUser(ctx context.Context, cmd CreateUserCmd) (string, error) {
	fmt.Printf("Обработка команды: создание пользователя с Email: %s\n", cmd.Email)
	if cmd.Email == "" {
		return "", fmt.Errorf("email не может быть пустым")
	}
	// ... логика создания пользователя в БД ...
	
	// Возвращаем ID созданного пользователя.
	return cmd.ID, nil
}
3. Инициализация и использование
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/x-research-team/dtx-framework/bus/command"
)

func main() {
	// 1. Создаем новый реестр.
	registry := command.NewRegistry()

	// 2. Получаем строго типизированный диспетчер для нашей команды.
	// Если диспетчер не существует, он будет создан автоматически.
	userDispatcher, err := command.Dispatcher[CreateUserCmd, string](registry, "user.create")
	if err != nil {
		log.Fatalf("Не удалось получить диспетчер: %v", err)
	}

	// 3. Регистрируем обработчик.
	if err := userDispatcher.Register(handleCreateUser); err != nil {
		log.Fatalf("Не удалось зарегистрировать обработчик: %v", err)
	}

	// 4. Создаем и отправляем команду.
	cmd := NewCreateUserCmd("user-123", "test@example.com")
	userID, err := userDispatcher.Dispatch(context.Background(), cmd)
	if err != nil {
		fmt.Printf("Ошибка выполнения команды: %v\n", err)
	} else {
		fmt.Printf("Команда успешно выполнена. ID пользователя: %s\n", userID)
	}
}
4. Использование с полной наблюдаемостью

Пример конфигурации диспетчера с логгером, трассировщиком и сборщиком метрик.

package main

import (
	"context"
	"log/slog"
	"os"

	"github.com/x-research-team/dtx-framework/bus/command"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/sdk/metric"
	"go.opentelemetry.io/otel/sdk/trace"
)

func setupDispatcherWithObservability(registry *command.Registry) (command.IDispatcher[CreateUserCmd, string], error) {
	// --- Настройка компонентов наблюдаемости (пример) ---
	logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
	
	// Провайдер трассировки (например, экспортер в Jaeger или stdout)
	tracerProvider := trace.NewTracerProvider()
	otel.SetTracerProvider(tracerProvider)

	// Провайдер метрик (например, экспортер в Prometheus)
	meterProvider := metric.NewMeterProvider()
	otel.SetMeterProvider(meterProvider)
	// ---

	// Конфигурируем диспетчер с помощью функциональных опций.
	return command.Dispatcher[CreateUserCmd, string](
		registry,
		"user.create.observed",
		command.WithLogger[CreateUserCmd, string](logger),
		command.WithTracerProvider[CreateUserCmd, string](tracerProvider),
		command.WithMeterProvider[CreateUserCmd, string](meterProvider),
	)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLocalProvider

func NewLocalProvider[C Command[R], R any](cfg *config[C, R]) (*localProvider[C, R], error)

NewLocalProvider создает новый экземпляр локального провайдера.

Types

type Command

type Command[R any] interface{}

Command представляет собой интерфейс-маркер для команды, параметризованный типом возвращаемого значения R. Каждая команда - это уникальный запрос на выполнение операции.

type CommandHandler

type CommandHandler[C Command[R], R any] func(ctx context.Context, cmd C) (R, error)

CommandHandler определяет строго типизированную функцию-обработчик для команды C, которая возвращает результат типа R.

type IDispatcher

type IDispatcher[C Command[R], R any] interface {
	Dispatch(ctx context.Context, cmd C) (R, error)
	Register(handler CommandHandler[C, R]) error
	Shutdown(ctx context.Context) error
}

IDispatcher определяет основной, строго типизированный интерфейс для шины команд.

func Dispatcher

func Dispatcher[C Command[R], R any](r *Registry, commandName string, opts ...Option[C, R]) (IDispatcher[C, R], error)

Dispatcher возвращает строго типизированный экземпляр диспетчера для указанного имени команды.

func NewDispatcher

func NewDispatcher[C Command[R], R any](opts ...Option[C, R]) (IDispatcher[C, R], error)

NewDispatcher создает новый, готовый к использованию экземпляр диспетчера.

type Metadatable

type Metadatable interface {
	Metadata() map[string]string
}

Metadatable - это интерфейс для команд, которые могут переносить метаданные. Это используется для сквозной передачи данных, таких как идентификаторы трассировки.

type Middleware

type Middleware[C Command[R], R any] interface {
	Wrap(next Provider[C, R]) Provider[C, R]
}

Middleware определяет интерфейс для middleware шины команд.

func NewLoggingMiddleware

func NewLoggingMiddleware[C Command[R], R any](logger *slog.Logger) Middleware[C, R]

NewLoggingMiddleware создает новое middleware для логирования.

func NewMetricsMiddleware

func NewMetricsMiddleware[C Command[R], R any](provider metric.MeterProvider) Middleware[C, R]

NewMetricsMiddleware создает новое middleware для сбора метрик.

func NewTracingMiddleware

func NewTracingMiddleware[C Command[R], R any](tp trace.TracerProvider, p propagation.TextMapPropagator) Middleware[C, R]

NewTracingMiddleware создает новое middleware для трассировки.

type MiddlewareFunc

type MiddlewareFunc[C Command[R], R any] func(next Provider[C, R]) Provider[C, R]

MiddlewareFunc является адаптером, позволяющим использовать обычные функции как middleware.

func (MiddlewareFunc[C, R]) Wrap

func (f MiddlewareFunc[C, R]) Wrap(next Provider[C, R]) Provider[C, R]

Wrap реализует интерфейс BusMiddleware.

type Option

type Option[C Command[R], R any] func(*config[C, R])

Option определяет тип для функциональных опций, которые изменяют конфигурацию шины.

func WithLogger

func WithLogger[C Command[R], R any](logger *slog.Logger) Option[C, R]

WithLogger возвращает опцию, которая устанавливает логгер для шины.

func WithMeterProvider

func WithMeterProvider[C Command[R], R any](provider metric.MeterProvider) Option[C, R]

WithMeterProvider возвращает опцию, которая устанавливает провайдер метрик.

func WithMiddleware

func WithMiddleware[C Command[R], R any](mw ...Middleware[C, R]) Option[C, R]

WithMiddleware возвращает опцию, которая добавляет один или несколько middleware в цепочку обработки.

func WithPropagator

func WithPropagator[C Command[R], R any](propagator propagation.TextMapPropagator) Option[C, R]

WithPropagator возвращает опцию, которая устанавливает механизм распространения контекста.

func WithTracerProvider

func WithTracerProvider[C Command[R], R any](provider trace.TracerProvider) Option[C, R]

WithTracerProvider возвращает опцию, которая устанавливает провайдер трассировки.

type Provider

type Provider[C Command[R], R any] interface {
	// Dispatch отправляет команду на выполнение.
	Dispatch(ctx context.Context, cmd C) (R, error)

	// Register регистрирует обработчик для команды.
	Register(handler CommandHandler[C, R]) error

	// Shutdown корректно завершает работу провайдера.
	Shutdown(ctx context.Context) error
}

Provider определяет контракт для сменных механизмов диспетчеризации команд.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry - это потокобезопасный реестр для управления экземплярами диспетчеров.

func NewRegistry

func NewRegistry() *Registry

NewRegistry создает новый экземпляр реестра диспетчеров.

func (*Registry) Shutdown

func (r *Registry) Shutdown(ctx context.Context) error

Shutdown корректно завершает работу всех зарегистрированных диспетчеров.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL