queue

package
v0.0.0-...-1e59fd8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: MIT Imports: 6 Imported by: 0

README

pkg/queue

@publish / @subscribe 시퀀스 런타임. Memory 또는 Postgres 백엔드 기반 메시지 큐.

개요

SSaC에서 @publish "topic" {k:v}@subscribe "topic" 시퀀스의 코드젠 타겟 런타임. Init()으로 백엔드를 선택하고("memory" 또는 "postgres"), Subscribe()로 토픽 핸들러를 등록한 뒤 Start()로 처리를 시작한다. Memory 백엔드는 Publish 호출 시 같은 프로세스 핸들러를 동기 호출하고, Postgres 백엔드는 fullend_queue 테이블에 INSERT 후 1초 간격 폴링 루프가 FOR UPDATE SKIP LOCKED로 대기 메시지를 꺼내 핸들러에 디스패치한다. 전역 싱글톤 상태를 사용하므로 테스트는 resetQueue() 헬퍼로 초기화한다.

백엔드 비교

항목 memory postgres
초기화 Init(ctx, "memory", nil) Init(ctx, "postgres", db) (fullend_queue 테이블 auto-create)
Publish 핸들러 동기 호출 fullend_queue INSERT
Delivery 즉시, 프로세스 내부 1초 폴링, 여러 프로세스 가능 (FOR UPDATE SKIP LOCKED)
Delay 무시됨(즉시 실행) deliver_at 컬럼으로 지연
Priority 무시됨 priority 컬럼으로 정렬(high→normal→low)
영속성 없음 DB 저장, status/processed_at 추적

공개 API

Init(ctx context.Context, backend string, db *sql.DB) error

백엔드 초기화. backend"memory" 또는 "postgres". Postgres는 db 필수이며 fullend_queue 테이블과 pending 인덱스를 auto-create 한다.

Subscribe(topic string, handler func(ctx context.Context, msg []byte) error)

토픽 핸들러 등록. 동일 토픽에 여러 핸들러를 순차 호출한다. 핸들러가 error를 반환하면 해당 메시지는 status='failed'로 마킹된다(postgres).

Publish(ctx context.Context, topic string, payload any, opts ...PublishOption) error

JSON marshal 후 백엔드로 발행. 초기화 전 호출 시 ErrNotInitialized 반환. 구독자 없는 토픽에 발행해도 에러 아님.

PublishTx(ctx context.Context, tx *sql.Tx, topic string, payload any, opts ...PublishOption) error

tx-bound 발행 — 비즈니스 트랜잭션과 동일한 *sql.Txfullend_queue INSERT 를 수행한다. tx 가 rollback 되면 이벤트 레코드도 함께 사라지고, commit 되어야만 폴링 worker 에 노출된다. "DB 반영 성공 + 이벤트 발행 실패" inconsistency 를 제거하기 위한 outbox 패턴.

  • postgres 백엔드만 지원. memory 백엔드는 ErrTxUnsupported 반환 (동기 핸들러 호출은 tx 의미를 보장할 수 없음).
  • tx == nil 이면 즉시 에러 반환.
  • 옵션(WithDelay, WithPriority) 은 Publish 와 동일하게 적용.
Start(ctx context.Context) error

메시지 처리 시작. Memory는 ctx 취소까지 블록만 함(실제 디스패치는 Publish 시점). Postgres는 1초 간격 폴링 루프 실행. ctx 취소 시 정상 종료.

Close() error

폴링 루프 중지 후 전역 상태 해제. Start가 반환할 때까지 대기.

Publish Options
함수 설명
WithDelay(seconds int) PublishOption 지연 전달 시간(초). postgres만 적용, memory는 즉시 실행
WithPriority(p string) PublishOption 우선순위 "high" / "normal"(기본) / "low". postgres만 정렬에 반영
에러
이름 조건
ErrNotInitialized Init 이전에 Publish / PublishTx 호출
ErrUnknownBackend 지원하지 않는 백엔드 문자열
ErrTxUnsupported memory 백엔드에서 PublishTx 호출
DB 스키마 (postgres)
CREATE TABLE fullend_queue (
    id           BIGSERIAL PRIMARY KEY,
    topic        TEXT NOT NULL,
    payload      JSONB NOT NULL,
    priority     TEXT NOT NULL DEFAULT 'normal',
    status       TEXT NOT NULL DEFAULT 'pending',  -- pending/done/failed
    created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    deliver_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMPTZ,
    traceparent  TEXT NOT NULL DEFAULT ''          -- W3C TraceContext (Phase009)
);
CREATE INDEX idx_fullend_queue_pending
  ON fullend_queue (topic, status, deliver_at) WHERE status = 'pending';

기존 배포는 ALTER TABLE ... ADD COLUMN IF NOT EXISTS traceparentInit 시점에 자동 적용하므로 마이그레이션 수동 작업은 필요 없다.

Span propagation (W3C TraceContext)

Publish / PublishTx 는 호출 시점 ctx 의 활성 span 에서 W3C traceparent 헤더를 추출해 fullend_queue.traceparent 컬럼에 저장한다. 폴러(pollOnce)는 저장된 값을 다시 SpanContext 로 복원한 뒤 구독자 핸들러에 전달하므로, publish → dispatch 가 같은 분산 trace 에 연결된다.

  • 애플리케이션이 otel.SetTracerProvider 를 호출하지 않아도 패키지는 안전하다 — traceparent 가 빈 문자열로 저장되고, 복원은 no-op 이다.
  • 전파기는 otel.GetTextMapPropagator() (기본값: 글로벌 등록된 propagator). propagation.TraceContext{} 를 등록하면 표준 포맷이 그대로 사용된다. 다른 전파기(Baggage 등)를 쓰더라도 traceparent 키만 컬럼에 저장되므로 포맷 충돌은 없다.
  • 계측 라이브러리(예: otelhttp, otelgin)가 요청 스팬을 이미 연 ctx 를 핸들러에 전달하면 @publish 코드젠이 ctx 를 그대로 전달하므로 추가 코드 없이 연결된다.

사용 예시

SSaC @publish (HTTP 함수 내)
// @publish "user.registered" {UserID: user.ID, Email: user.Email}
// @publish "email.send"      {To: user.Email} {delay: 1800, priority: "high"}

코드젠 결과:

if err := queue.Publish(c.Request.Context(), "user.registered", struct {
    UserID int64
    Email  string
}{UserID: user.ID, Email: user.Email}); err != nil { /* 500 */ }

if err := queue.Publish(c.Request.Context(), "email.send", struct {
    To string
}{To: user.Email}, queue.WithDelay(1800), queue.WithPriority("high")); err != nil { /* 500 */ }
tx-bound 발행 (outbox 패턴)

@publish 가 HTTP 함수의 트랜잭션 블록 내부에 있으면 fullend 코드젠은 queue.PublishTx(ctx, tx, ...) 로 생성한다. commit 전까지 다른 트랜잭션에서 볼 수 없으므로 "DB 반영 + 이벤트" 가 원자적이다.

tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()

// ... DB 작업 ...

if err := queue.PublishTx(ctx, tx, "order.created", struct {
    OrderID int64
}{OrderID: order.ID}); err != nil {
    return nil, err // rollback 발동 → 이벤트 유실 없음
}

if err := tx.Commit(); err != nil { return nil, err }
SSaC @subscribe
// Message payload struct
type UserRegistered struct {
    UserID int64
    Email  string
}

// @subscribe "user.registered"
// @call _ = mail.SendEmail({To: message.Email, Subject: "환영"})
func OnUserRegistered(ctx context.Context, message UserRegistered) error { ... }

message 예약 소스로 큐 페이로드 필드 접근, 에러 반환 시 return fmt.Errorf(...), 성공 시 return nil.

Go 직접 사용
import (
    "context"
    "github.com/park-jun-woo/ssac/pkg/queue"
)

ctx := context.Background()
queue.Init(ctx, "postgres", db)
defer queue.Close()

queue.Subscribe("user.registered", func(ctx context.Context, msg []byte) error {
    var ev struct{ UserID int64; Email string }
    json.Unmarshal(msg, &ev)
    return handleRegistered(ev)
})

go queue.Start(ctx)  // postgres 폴링 시작

queue.Publish(ctx, "user.registered",
    map[string]any{"UserID": 42, "Email": "a@b.com"},
    queue.WithDelay(60),
    queue.WithPriority("high"),
)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotInitialized — Publish/Subscribe/Start called before Init.
	ErrNotInitialized = errors.New("queue: not initialized, call Init first")
	// ErrUnknownBackend — unsupported backend name passed to Init.
	ErrUnknownBackend = errors.New("queue: unknown backend")
)
View Source
var ErrTxUnsupported = errors.New("queue: tx-bound publish not supported by current backend")

ErrTxUnsupported is returned by Backend.PublishTx when the active backend does not support transaction-bound publishing (memory backend).

Functions

func Close

func Close() error

Close stops the polling loop (if any) and resets package state.

func Init

func Init(ctx context.Context, backendName string) error

Init initializes the queue with the memory backend. For durable backends (postgres, redis, etc.) callers use SetBackend(b) after yongol-generated code constructs the Backend implementation against the user's sqlc Queries.

Signature accepts backendName for backward-compatible call sites — only "memory" is accepted from inside ssac. Any other name (including "postgres") returns ErrUnknownBackend; the caller must instead call SetBackend(externalImpl) directly.

func Publish

func Publish(ctx context.Context, topic string, payload any, opts ...PublishOption) error

Publish serializes payload to JSON and delegates to the active Backend. Returns ErrNotInitialized if neither Init nor SetBackend has run.

func PublishTx

func PublishTx(ctx context.Context, tx any, topic string, payload any, opts ...PublishOption) error

PublishTx enqueues payload on topic inside the caller's transaction. The tx parameter is driver-neutral (any); the active Backend asserts the expected concrete type — typically pgx.Tx for the postgres backend or *sql.Tx for legacy database/sql. The memory backend returns ErrTxUnsupported.

Atomicity: on Commit the row becomes visible to pollers; on Rollback no trace remains. The caller is responsible for the commit/rollback.

func SetBackend

func SetBackend(b Backend)

SetBackend installs an externally constructed Backend (e.g. a yongol- generated postgres implementation). Use this from main.go after building the backend from the user's sqlc Queries:

q := db.New(pool)
queue.SetBackend(postgresqueue.NewPostgres(q))

Handlers registered via Subscribe before SetBackend survive the swap.

func Start

func Start(ctx context.Context) error

Start begins processing queued messages. It blocks until the context is cancelled. For backends that do not implement Starter this is a no-op that blocks on ctx.Done (memory semantics).

func Subscribe

func Subscribe(topic string, handler func(ctx context.Context, msg []byte) error)

Subscribe registers a handler for the given topic.

Types

type Backend

type Backend interface {
	// Publish enqueues a serialized payload on topic with the supplied
	// delivery config. The memory backend dispatches handlers synchronously;
	// durable backends persist the row and return.
	Publish(ctx context.Context, topic string, data []byte, cfg PublishConfig) error

	// PublishTx enqueues inside the caller's transaction. tx is driver-
	// specific; backends that do not support transactional publishing
	// (e.g. memory) return ErrTxUnsupported.
	PublishTx(ctx context.Context, tx any, topic string, data []byte, cfg PublishConfig) error
}

Backend is the interface implemented by queue backends. The package-level Publish/PublishTx functions delegate to the currently installed Backend.

memory backend ships in ssac for tests and zero-config dev. A postgres (or other durable) backend is provided by yongol codegen from the user's sqlc Queries via pkg/queue/interface.yaml ports (QueuePublish / QueuePoll / QueueAck). ssac itself never imports database/sql or pgx.

PublishTx accepts tx as `any` so both database/sql (*sql.Tx) and jackc/pgx (pgx.Tx) implementations are representable without ssac binding to a specific driver. The concrete Backend asserts the expected type.

type PublishConfig

type PublishConfig struct {
	Delay    int    // seconds
	Priority string // "high", "normal", "low"
}

PublishConfig holds options applied to a single Publish / PublishTx call. Exported because Backend implementations provided outside the queue package (e.g. yongol-generated postgres adapter) must be able to name the config type in their method signatures.

Fields are exported for the same reason — external implementations need read access to Priority / Delay when translating into their own driver calls (e.g. priority ordering, deliver_at offset). Callers still configure via WithPriority / WithDelay options; direct struct literals are reserved for tests and in-package wiring.

type PublishOption

type PublishOption func(*PublishConfig)

PublishOption configures a Publish call.

func WithDelay

func WithDelay(seconds int) PublishOption

WithDelay sets the delivery delay in seconds.

func WithPriority

func WithPriority(p string) PublishOption

WithPriority sets the message priority ("high", "normal", "low").

type Starter

type Starter interface {
	Start(ctx context.Context) error
}

Starter is implemented by Backends that own a polling loop (e.g. the generated postgres backend). memory has no loop — Start blocks until ctx is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL