wbf

module
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0

README

wbf banner

Минималистичный фреймворк для работы с базовыми инфраструктурными штуками.


WBF — это готовый набор обёрток для стандартной инфраструктуры. С его помощью можно быстро интегрировать в проект базу данных (PostgreSQL), кэширование (Redis), брокера сообщений (Kafka/RabbitMQ), систему логирования (Zerolog/Logger) и загрузчик конфигураций (Viper/CleanEnv).


Пакеты:

  • dbpg — пакет для работы с PostgreSQL, реализующий архитектуру «мастер-реплика» с балансировкой нагрузки на чтение, пулом соединений и встроенной поддержкой повторных попыток.

  • pgxdriver — пакет-обёртка над pgx/v5 с настраиваемым пулом соединений, встроенным retry-механизмом при подключении, транзакционным менеджером, batch/bulk-операциями и интеграцией с Squirrel.

  • redis — пакет-обёртка над go-redis со встроенной поддержкой повторных попыток, асинхронным батчевым выполнением операций записи и упрощённым API.

  • kafka — пакет для работы с Apache Kafka, предоставляющий готовых продюсера и консьюмера с автоматическими повторами и асинхронной обработкой сообщений.

  • kafkav2 — улучшенный пакет для работы с Apache Kafka, предоставляющий готовый producer, отказоустойчивый consumer с process retry + jitter, возможность работы с DLQ и улучшенное логирование.

  • dlq — компонент Dead Letter Queue для Kafka, предназначенный для надёжного сохранения сообщений, поддерживает сериализацию оригинального тела сообщения в base64, метаданные (топик, попытка, временная метка), а также fallback-механизм при ошибках маршалинга.

  • rabbitmq — пакет для работы с RabbitMQ, предоставляющий готовые клиенты для публикации и обработки сообщений с автоматическим переподключением, настраиваемыми стратегиями повторных попыток и поддержкой многопоточной обработки.

  • zlog — пакет для структурированного логирования на базе zerolog, предоставляющий готовый глобальный логгер с настройкой формата вывода (JSON или консоль), уровнями логирования и автоматическим добавлением временных меток.

  • logger — пакет для логирования с возможность настройки, унифицированными интерфейсами для zap/slog/zerolog/logrus, с поддержкой request_id, ротации через lumberjack и структурированием атрибутов.

  • config — пакет для работы с конфигурацией, реализующий загрузку настроек из различных источников через Viper, включая .env файлы, YAML/JSON конфиги, переменные окружения и командные флаги.

  • config/cleanenvport — порт популярной библиотеки для работы с конфигураций, обеспечивающий строго типизированную загрузку с валидацией через validator и поддержку флага --config / CONFIG_PATH.

  • retry — пакет для реализации повторных попыток выполнения операций, предоставляющий настраиваемые стратегии с экспоненциальным бэк-оффом, поддержкой контекста для graceful shutdown и универсальным интерфейсом для любых функций.

  • ginext — пакет-обёртка для веб-фреймворка Gin с полной поддержкой всех HTTP-методов, middleware и удобной настройкой режимов работы.

  • helpers — пакет для мелких вспомогательных функций общего назначения.


Примеры использования

PostgreSQL

dbpg

Инициализация подключения с настройками пула соединений:

opts := &dbpg.Options{MaxOpenConns: 10, MaxIdleConns: 5} 
db, err := dbpg.New(masterDSN, slaveDSNs, opts)

Запрос с автоматическим повторением при ошибках (через пакет retry):

query := "UPDATE..."
strategy := retry.Strategy{Attempts: 3, Delay: 5 * time.Second, Backoff: 2}

res, err := db.ExecWithRetry(ctx, strategy, query)

Пакетная запись через канал:

ch := make(chan string)
go db.BatchExec(ctx, ch)
ch <- "INSERT ..."
close(ch)

Транзакция с автоматическим rollback/commit:

err := db.WithTx(ctx, func(tx *sql.Tx) error {
    tx.ExecContext(ctx, "INSERT ...")
    tx.ExecContext(ctx, "UPDATE ...")
    return nil
})

pgx-drvier

Подключение с retry и настройкой пула:

pg, err := pgxdriver.New(
    dsn,
    log,
    pgxdriver.MaxPoolSize(50),
    pgxdriver.MaxConnAttempts(5),
    pgxdriver.BaseRetryDelay(100*time.Millisecond),
)
if err != nil {
    log.Fatal("Failed to connect to PostgreSQL:", err)
}
defer pg.Close()

Работа с транзакциями с автоматическим retry:

tm, err := transaction.NewManager(
    pg,
    log,
    transaction.MaxAttempts(5),
    transaction.BaseRetryDelay(10*time.Millisecond),
)
if err != nil {
    return err
}

err = tm.ExecuteInTransaction(ctx, "transfer", func(tx pgxdriver.QueryExecuter) error {
    _, err := tx.Exec(ctx, "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, fromID)
    if err != nil {
        return err
    }
    _, err = tx.Exec(ctx, "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, toID)
    return err
})

Массовая вставка через BulkInsert:

columns := []string{"name", "email"}
data := [][]any{
    {"Alice", "alice@example.com"},
    {"Bob", "bob@example.com"},
}
count, err := pgxdriver.BulkInsert(ctx, pg, "users", columns, data)

Redis

Подключение и чтение с ретраями:

client := redis.New("localhost:6379", "", 0)
strategy := retry.Strategy{Attempts: 3, Delay: 5 * time.Second, Backoff: 2}

val, err := client.GetWithRetry(ctx, strategy, "key")

Подключение с конфигурацией памяти:

options := redis.Options{
    Address:   "localhost:6379",
    Password:  "",                    
    MaxMemory: "100mb",               
    Policy:    "allkeys-lru",        
}

client, err := redis.Connect(options)

Запись с TTL и ретраями:

strategy := retry.Strategy{Attempts: 3, Delay: 2 * time.Second, Backoff: 2}
key := "abobaUUID"
value := "pending"
expiration := time.Hour

if err := client.SetWithExpirationAndRetry(ctx, strategy, key, value, expiration); err != nil {
    return err
}

Пакетная запись через канал:

ch := make(chan [2]string)
go client.BatchWriter(ctx, ch)
ch <- [2]string{"key", "value"}
close(ch)

Kafka

Producer — отправка сообщений с автоматическим повторением при ошибках:

producer := kafka.NewProducer([]string{"localhost:9092"}, "topic")
strategy := retry.Strategy{Attempts: 3, Delay: 5 * time.Second, Backoff: 2}

err := producer.SendWithRetry(ctx, strategy, []byte("key"), []byte("value"))
Kafkav2
producer := kafkav2.NewProducer(brokers, "orders", log)

Consumer — асинхронная обработка сообщений с повторами:

msgCh := make(chan kafka.Message)
consumer := kafka.NewConsumer([]string{"localhost:9092"}, "topic", "group")
strategy := retry.Strategy{Attempts: 3, Delay: 5 * time.Second, Backoff: 2}

consumer.StartConsuming(ctx, msgCh, strategy)

for msg := range msgCh {
    // обработка сообщения
}
Kafkav2
    consumer := kafkav2.NewConsumer(brokers, "orders", "order-processor", log)

Processor управляет жизненным циклом обработки Kafka сообщений, включающий retry/backoff механизмы и DLQ fallback
// Основной consumer
consumer := kafkav2.NewConsumer(brokers, "orders", "order-processor", log)

// DLQ-продюсер
dlqProducer := kafkav2.NewProducer(brokers, "dlq-orders", log)
dlqClient := dlq.New(dlqProducer, log)

// Процессор с retry и jitter
processor, err := kafkav2.NewProcessor(
    consumer,
    dlqClient,
    log,
    kafkav2.WithMaxAttempts(5),
    kafkav2.WithBaseRetryDelay(150*time.Millisecond),
    kafkav2.WithMaxRetryDelay(5*time.Second),
)
if err != nil {
    log.Fatal(err)
}

// Запуск обработки
processor.Start(ctx, func(ctx context.Context, msg kafka.Message) error {
    // Обработка сообщения
    if err := processOrder(msg.Value); err != nil {
        return fmt.Errorf("обработка заказа %d: %w", msg.Offset, err)
    }
    return nil
})

Логирование

zlog
zlog.Init()
zlog.Logger.Info().Msg("Hello")
logger

Инициализация с ротацией и уровнем:

log, err := logger.InitLogger(
    logger.ZapEngine,
    "order-service",
    "prod",
    logger.WithLevel(logger.InfoLevel),
    logger.WithRotation("logs/app.log", 100, 5, 30),
)
if err != nil {
    fmt.Fprintf(os.Stderr, "Ошибка инициализации логгера: %v\n", err)
    os.Exit(1)
}

Логирование с контекстом (request_id):

ctx = logger.SetRequestID(ctx, logger.GenerateRequestID())
log.Ctx(ctx).Info("Начата обработка заказа", "order_id", 123)

Структурированный вывод:

log.LogAttrs(ctx, logger.ErrorLevel, "Ошибка обработки",
    logger.String("error_type", "validation"),
    logger.Int("order_id", 123),
    logger.Any("payload", msg.Value),
)

Конфиги

Viper
cfg := config.New()
_ = cfg.Load("config.yaml")
val := cfg.GetString("some.key")
CleanEnvPort

Загрузка с валидацией через --config или CONFIG_PATH:

type Config struct {
    Server struct {
        Host string `yaml:"host" env:"SERVER_HOST" validate:"required"`
        Port int    `yaml:"port" env:"SERVER_PORT" validate:"required,min=1,max=65535"`
    } `yaml:"server"`
    DB struct {
        DSN string `yaml:"dsn" env:"DATABASE_DSN" validate:"required"`
    } `yaml:"database"`
}

var cfg Config
if err := cleanenvport.Load(&cfg); err != nil {
    log.Fatalf("Ошибка загрузки конфигурации: %v", err)
}

Прямая загрузка из файла:

if err := cleanenvport.LoadPath("./config.yaml", &cfg); err != nil {
    log.Fatalf("Ошибка загрузки конфигурации: %v", err)
}

Повторные попытки (retry)

ctx := context.Background()
strategy := retry.Strategy{Attempts: 3, Delay: 5 * time.Second, Backoff: 2}

err := retry.Do(func() error { return nil }, strategy)
err := retry.DoContext(ctx, strategy, func() error { retrun nil })

rabbitmq

Описание и документация: rabbitmq_doc.md


TODO

  • Написать тесты (like that's ever gonna happen)
  • Добавить больше примеров использования
  • Сделать middleware и метрики

Требования к качеству кода и коммитам

Pre-commit hooks

В проекте используется pre-commit для автоматической проверки кода и сообщений коммитов:

  • conventional commits — все коммиты должны соответствовать conventionalcommits.org
  • golangci-lint — код должен проходить все проверки линтера
Установка и настройка:
  1. Установите pre-commit: pip install pre-commit или brew install pre-commit
  2. Установите хуки: pre-commit install
  3. Для проверки вручную: pre-commit run --all-files

Линтеры

В проекте используется golangci-lint:

  • Конфиг: .golangci.yml
  • Проверяются стиль, ошибки, best practices
  • Перед коммитом и в CI код должен проходить все проверки линтера

Импорт

Для использования импортируйте пакеты так:

import "github.com/wb-go/wbf/dbpg"
import "github.com/wb-go/wbf/redis"
import "github.com/wb-go/wbf/kafka"
// и т.д.

Лицензия

Этот проект распространяется под лицензией Apache License 2.0. См. файл LICENSE.

Directories

Path Synopsis
Package config предоставляет управление конфигурацией с использованием Viper.
Package config предоставляет управление конфигурацией с использованием Viper.
cleanenv-port
Package cleanenvport provides a unified way to load and validate application configuration from a file (YAML/JSON/TOML) using cleanenv and validator.
Package cleanenvport provides a unified way to load and validate application configuration from a file (YAML/JSON/TOML) using cleanenv and validator.
Package dbpg provides PostgreSQL connection management with master-slave support.
Package dbpg provides PostgreSQL connection management with master-slave support.
pgx-driver
Package pgxdriver provides a robust PostgreSQL client built on pgx/v5, featuring connection retries with exponential backoff and jitter, integrated SQL query building via squirrel, and structured logging.
Package pgxdriver provides a robust PostgreSQL client built on pgx/v5, featuring connection retries with exponential backoff and jitter, integrated SQL query building via squirrel, and structured logging.
pgx-driver/transaction
Package transaction provides a transaction manager for PostgreSQL with automatic retry on transient errors such as serialization failures, deadlocks, and connection issues.
Package transaction provides a transaction manager for PostgreSQL with automatic retry on transient errors such as serialization failures, deadlocks, and connection issues.
Package ginext provides extensions for the Gin web framework.
Package ginext provides extensions for the Gin web framework.
Package helpers предоставляет вспомогательные функции общего назначения.
Package helpers предоставляет вспомогательные функции общего назначения.
Package kafka предоставляет клиенты для работы с Apache Kafka.
Package kafka предоставляет клиенты для работы с Apache Kafka.
dlq
Package dlq provides a Dead Letter Queue (DLQ) client for publishing failed Kafka messages to a dedicated error topic with structured metadata and safe serialization.
Package dlq provides a Dead Letter Queue (DLQ) client for publishing failed Kafka messages to a dedicated error topic with structured metadata and safe serialization.
kafka-v2
Package kafkav2 provides a robust Kafka client implementation with structured logging, retry-capable message processing, and Dead Letter Queue (DLQ) integration.
Package kafkav2 provides a robust Kafka client implementation with structured logging, retry-capable message processing, and Dead Letter Queue (DLQ) integration.
Package logger provides a unified, structured logging interface with support for multiple underlying logging engines (Zap, slog, zerolog, logrus).
Package logger provides a unified, structured logging interface with support for multiple underlying logging engines (Zap, slog, zerolog, logrus).
Package rabbitmq это обертка над github.com/rabbitmq/amqp091-go
Package rabbitmq это обертка над github.com/rabbitmq/amqp091-go
Package redis provides a client wrapper for Redis operations.
Package redis provides a client wrapper for Redis operations.
Package retry предоставляет функциональность повторных попыток с настраиваемыми стратегиями.
Package retry предоставляет функциональность повторных попыток с настраиваемыми стратегиями.
Package zlog предоставляет логирование с использованием Zerolog.
Package zlog предоставляет логирование с использованием Zerolog.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL