telecast

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: MIT Imports: 17 Imported by: 0

README

Telecast

A lightweight, embeddable Telegram Bot API broadcast engine for Go.

  • Library-firstNew()Start()Enqueue()Shutdown(); no HTTP server required
  • Priority queues with weighted round-robin (5:3:1) and starvation prevention
  • Rate limiting — global + per-chat token buckets, auto-adapts to Telegram 429
  • Campaigns — stream millions of recipients in batches, crash-safe with idempotent enqueue
  • SQLite by default — WAL mode, single-file, ready for Docker/K8s with a PVC

Intended use: transactional notifications and opt-in messaging. Please respect Telegram Bot API policies.

Install

go get github.com/Seinarukiro2/telecast

Requires Go 1.23+.

Quickstart

package main

import (
    "context"
    "log"
    "os"
    "os/signal"

    "github.com/Seinarukiro2/telecast"
)

func main() {
    eng, err := telecast.New(telecast.Config{
        BotToken: os.Getenv("BOT_TOKEN"),
        StoreDSN: "telecast.db",
    })
    if err != nil { log.Fatal(err) }

    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
    defer stop()

    eng.Start(ctx)
    defer eng.Shutdown(context.Background())

    id, _ := eng.Enqueue(ctx, telecast.Task{
        ChatID: 123456789,
        Text:   "Hello from Telecast!",
    })
    log.Println("enqueued:", id)
    <-ctx.Done()
}

See example/ for more: basic, campaign, custom renderer, live test.

Public API Surface

// Core lifecycle
telecast.New(Config) (*Engine, error)
eng.Start(ctx)      error
eng.Shutdown(ctx)   error

// Task delivery
eng.Enqueue(ctx, Task) (string, error)
eng.TaskStatus(ctx, id) (*model.Task, error)

// Campaigns
eng.CreateCampaign(ctx, CampaignConfig) (string, error)
eng.AddRecipients(ctx, id, []Recipient) error
eng.StartCampaign(ctx, id)  error
eng.PauseCampaign(ctx, id)  error
eng.CampaignStats(ctx, id)  (*CampaignStats, error)

// DLQ
eng.DLQList(ctx, limit, offset) ([]*model.Task, int, error)
eng.DLQRequeue(ctx, id) error

// Optional HTTP API
eng.Handler() http.Handler

Stability: v0.x releases may include minor breaking changes. v1.0 will freeze the public API.

SQLite in Docker / Kubernetes

SQLite with WAL mode is the default. It requires a single writer — deploy with 1 replica + a persistent volume.

# Kubernetes StatefulSet (abbreviated)
spec:
  replicas: 1
  template:
    spec:
      containers:
        - name: telecast
          env:
            - name: TELECAST_STORE_DSN
              value: /data/telecast.db
          volumeMounts:
            - name: data
              mountPath: /data
  volumeClaimTemplates:
    - metadata: { name: data }
      spec:
        accessModes: [ReadWriteOnce]
        resources: { requests: { storage: 1Gi } }

When to switch to Postgres: when you need multi-replica HA.

Campaigns

Campaigns deliver a template to many recipients in memory-safe batches:

  1. Peek unprocessed recipients (no mutation — crash-safe)
  2. Enqueue tasks with deterministic idempotency keys (campaign:{id}:{chat_id})
  3. Mark recipients as processed after successful enqueue

If the process crashes between steps 2 and 3, the next run re-peeks the same batch. Duplicate enqueues are caught by the UNIQUE index on idempotency_key — no double-sends.

Single-flight per campaign_id prevents concurrent batches from overlapping.

Idempotency

  • Set Task.IdempotencyKey to a stable string (e.g. order:123:confirmation)
  • Scope is global — the same key always means "same message"
  • The key is checked via INSERT INTO idempotency_keys (atomic) + UNIQUE index on tasks.idempotency_key
  • Empty key = no dedup (fire-and-forget)
  • Campaign tasks auto-generate keys: campaign:{campaign_id}:{chat_id}

Metrics

Prometheus metrics are opt-in. Pass PrometheusRegisterer in Config to enable.

Metric Type Labels
telecast_tasks_sent_total counter priority
telecast_tasks_failed_total counter priority, error_class
telecast_tasks_retried_total counter priority
telecast_telegram_429_total counter
telecast_queue_depth gauge
telecast_dlq_depth gauge
telecast_in_flight gauge
telecast_request_latency_seconds histogram method, result

Mount eng.Handler() to expose /metrics via the built-in HTTP API.

Templates

YAML-based with locale fallback (exact → base → en):

welcome:
  en: "Hello, {{.Name}}!"
  ru: "Привет, {{.Name}}!"

Or plug in a custom renderer:

type MyRenderer struct{}
func (r *MyRenderer) Render(key, locale string, vars map[string]any) (string, error) {
    return fmt.Sprintf("Hi %v!", vars["Name"]), nil
}

eng, _ := telecast.New(telecast.Config{
    BotToken:         "...",
    TemplateRenderer: &MyRenderer{},
})

Configuration

All fields have sensible defaults. Only BotToken is required.

Field Default Description
BotToken Telegram Bot API token
StoreDSN telecast.db SQLite database path
TemplatesPath YAML templates file
TemplatesData YAML templates as bytes
TemplateRenderer Custom renderer (overrides YAML)
GlobalRPS 25 Global messages/sec
PerChatRPS 1 Per-chat messages/sec
MaxConcurrency 8 Worker pool size
LeaseTTL 30s Task lease duration
MaxRetries 5 Max retries before DLQ
BaseBackoff 1s Initial retry delay
MaxBackoff 5m Max retry delay
Logger slog.Default() Structured logger
PrometheusRegisterer nil Prometheus registry (nil = disabled)

Optional: CLI

go build -o telecast ./cmd/telecast

# Standalone server (HTTP API + engine)
BOT_TOKEN=... ./telecast serve --store telecast.db --templates templates.yaml

# Enqueue from command line
./telecast enqueue --chat-id 123 --text "Hello!" --priority high

Optional: HTTP API

Mount on your own server — not required for library usage.

http.Handle("/telecast/", http.StripPrefix("/telecast", eng.Handler()))
Method Path Description
POST /v1/tasks Enqueue task
GET /v1/tasks/{id} Task status
POST /v1/campaigns Create campaign
POST /v1/campaigns/{id}/recipients:batchAdd Add recipients
POST /v1/campaigns/{id}/start Start
POST /v1/campaigns/{id}/pause Pause
GET /v1/campaigns/{id}/stats Stats
GET /v1/dlq Dead-letter list
POST /v1/dlq/{task_id}/requeue Requeue

License

MIT — see LICENSE.

Documentation

Overview

Package telecast is a lightweight Telegram Bot API broadcast & notification engine.

Telecast is a library-first engine designed to be embedded in your Go application:

eng, err := telecast.New(telecast.Config{
    BotToken: os.Getenv("BOT_TOKEN"),
    StoreDSN: "/data/telecast.db",
})
if err != nil { log.Fatal(err) }

if err := eng.Start(ctx); err != nil { log.Fatal(err) }
defer eng.Shutdown(context.Background())

id, err := eng.Enqueue(ctx, telecast.Task{
    ChatID: 12345, Text: "Hello!", Priority: telecast.PriorityNormal,
})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CampaignConfig

type CampaignConfig struct {
	Name           string
	TemplateKey    string
	LocaleStrategy string // "per_user" (default) or "fixed"
	FixedLocale    string
	Vars           map[string]any
	Priority       Priority
}

CampaignConfig describes a broadcast campaign.

type CampaignStats

type CampaignStats = model.CampaignStats

CampaignStats is a snapshot of campaign delivery progress.

type Config

type Config struct {
	// BotToken is the Telegram Bot API token (required).
	BotToken string

	// StoreDSN is the SQLite database path. Default: "telecast.db".
	StoreDSN string

	// TemplatesPath is the path to the YAML templates file.
	// Leave empty if not using file-based templates.
	TemplatesPath string

	// TemplatesData is raw YAML templates data.
	// If set, takes priority over TemplatesPath.
	TemplatesData []byte

	// TemplateRenderer is a custom template renderer.
	// If set, TemplatesPath and TemplatesData are ignored.
	TemplateRenderer TemplateRenderer

	// TelegramBaseURL overrides the Telegram API base URL (for testing).
	TelegramBaseURL string

	// GlobalRPS is the global messages-per-second rate limit. Default: 25.
	GlobalRPS float64

	// PerChatRPS is the per-chat messages-per-second limit. Default: 1.
	PerChatRPS float64

	// MaxConcurrency is the worker pool size. Default: 8.
	MaxConcurrency int

	// LeaseTTL is how long a task is leased to a worker. Default: 30s.
	LeaseTTL time.Duration

	// PollInterval is how often the scheduler polls for tasks. Default: 500ms.
	PollInterval time.Duration

	// MaxRetries before a task is moved to DLQ. Default: 5.
	MaxRetries int

	// BaseBackoff is the initial retry delay. Default: 1s.
	BaseBackoff time.Duration

	// MaxBackoff caps the retry delay. Default: 5m.
	MaxBackoff time.Duration

	// Logger for structured logging. Default: slog.Default().
	Logger *slog.Logger

	// PrometheusRegisterer for metrics. Pass nil to disable. Default: nil (disabled).
	PrometheusRegisterer prometheus.Registerer
}

Config configures the Telecast engine.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the Telecast message delivery engine. It is safe for concurrent use after Start returns.

func New

func New(cfg Config) (*Engine, error)

New creates a Telecast engine. Call Start to begin processing.

func (*Engine) AddRecipients

func (e *Engine) AddRecipients(ctx context.Context, campaignID string, recipients []Recipient) error

AddRecipients adds recipients to a campaign in batch.

func (*Engine) CampaignStats

func (e *Engine) CampaignStats(ctx context.Context, id string) (*CampaignStats, error)

CampaignStats returns current campaign delivery progress.

func (*Engine) CreateCampaign

func (e *Engine) CreateCampaign(ctx context.Context, c CampaignConfig) (string, error)

CreateCampaign creates a new campaign. Returns the campaign ID.

func (*Engine) DLQList

func (e *Engine) DLQList(ctx context.Context, limit, offset int) ([]*model.Task, int, error)

DLQList returns dead-letter tasks with pagination.

func (*Engine) DLQRequeue

func (e *Engine) DLQRequeue(ctx context.Context, taskID string) error

DLQRequeue moves a dead-letter task back to the queue.

func (*Engine) Enqueue

func (e *Engine) Enqueue(ctx context.Context, t Task) (string, error)

Enqueue submits a task for delivery. Returns the task ID.

func (*Engine) Handler

func (e *Engine) Handler() http.Handler

Handler returns an http.Handler for the optional REST API. Mount it on your own server; it is NOT required for core functionality.

func (*Engine) PauseCampaign

func (e *Engine) PauseCampaign(ctx context.Context, id string) error

PauseCampaign transitions a campaign to paused state.

func (*Engine) Shutdown

func (e *Engine) Shutdown(ctx context.Context) error

Shutdown gracefully stops the engine and waits for in-flight work to complete, or until ctx expires.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) error

Start launches background goroutines (scheduler, workers, campaigns). It returns immediately. Call Shutdown to stop.

func (*Engine) StartCampaign

func (e *Engine) StartCampaign(ctx context.Context, id string) error

StartCampaign transitions a campaign to running state.

func (*Engine) TaskStatus

func (e *Engine) TaskStatus(ctx context.Context, taskID string) (*model.Task, error)

TaskStatus retrieves the current state of a task.

type Priority

type Priority = model.Priority

Priority determines scheduling order with weighted fairness (5:3:1).

const (
	PriorityHigh   Priority = "high"
	PriorityNormal Priority = "normal"
	PriorityLow    Priority = "low"
)

type Recipient

type Recipient struct {
	ChatID int64
	Locale string
	Vars   map[string]any
}

Recipient is a single target within a campaign.

type Task

type Task struct {
	ChatID                int64
	Kind                  TaskKind // default: KindSendMessage
	Text                  string   // raw text, mutually exclusive with TemplateKey
	TemplateKey           string
	Locale                string
	Vars                  map[string]any
	ParseMode             string // "HTML", "MarkdownV2", ""
	DisableWebPagePreview bool
	DisableNotification   bool
	ReplyMarkup           json.RawMessage
	Priority              Priority // default: PriorityNormal
	IdempotencyKey        string
	NotBefore             *time.Time
	MessageID             int64 // for KindEditMessage
}

Task is a message to be delivered via Telegram Bot API.

type TaskKind

type TaskKind = model.TaskKind

TaskKind identifies the Telegram Bot API method.

const (
	KindSendMessage TaskKind = "send_message"
	KindEditMessage TaskKind = "edit_message"
)

type TemplateRenderer

type TemplateRenderer interface {
	Render(key, locale string, vars map[string]any) (string, error)
}

TemplateRenderer renders message templates. Implement this interface to supply custom template logic when embedding Telecast.

Directories

Path Synopsis
cmd
telecast command
Telecast — a lightweight Telegram Bot API broadcast & notification engine.
Telecast — a lightweight Telegram Bot API broadcast & notification engine.
example
basic command
Example basic demonstrates the simplest Telecast usage: create an engine, enqueue a message, wait for delivery.
Example basic demonstrates the simplest Telecast usage: create an engine, enqueue a message, wait for delivery.
campaign command
Example campaign demonstrates creating a broadcast campaign that delivers a template to multiple recipients in batches.
Example campaign demonstrates creating a broadcast campaign that delivers a template to multiple recipients in batches.
custom_renderer command
Example custom_renderer shows how to replace the built-in YAML template engine with your own TemplateRenderer implementation.
Example custom_renderer shows how to replace the built-in YAML template engine with your own TemplateRenderer implementation.
internal
api
Package api provides the HTTP REST API for Telecast.
Package api provides the HTTP REST API for Telecast.
config
Package config centralises application configuration.
Package config centralises application configuration.
engine
Package engine implements the task scheduler, worker pool, and campaign processor.
Package engine implements the task scheduler, worker pool, and campaign processor.
metrics
Package metrics exports Prometheus counters, gauges and histograms.
Package metrics exports Prometheus counters, gauges and histograms.
storage
Package storage defines the persistence interface for Telecast.
Package storage defines the persistence interface for Telecast.
storage/postgres
Package postgres provides a scaffold for a PostgreSQL storage driver.
Package postgres provides a scaffold for a PostgreSQL storage driver.
storage/sqlite
Package sqlite implements storage.Store backed by SQLite with WAL mode.
Package sqlite implements storage.Store backed by SQLite with WAL mode.
telegram
Package telegram provides a thin client for the Telegram Bot API.
Package telegram provides a thin client for the Telegram Bot API.
testutil
Package testutil provides test helpers including a mock Telegram Bot API server.
Package testutil provides test helpers including a mock Telegram Bot API server.
tmpl
Package tmpl provides locale-aware template rendering with YAML source.
Package tmpl provides locale-aware template rendering with YAML source.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL