Documentation
¶
Overview ¶
Package subscriber provides JungleBus subscription management for overlay services.
The subscriber package handles the connection to JungleBus, receives blockchain transactions in real-time, and queues them for processing. It's the entry point for transaction data in overlay services.
JungleBus Integration:
JungleBus is a Bitcoin transaction subscription service that provides:
- Real-time transaction streaming
- Topic-based filtering
- Resumable subscriptions with block/page tracking
- Both full and lite modes for different use cases
Key Components:
Subscriber:
- Manages JungleBus client connection
- Handles transaction callbacks
- Queues transactions in Redis for processing
- Tracks subscription progress
- Provides graceful shutdown
Configuration:
- TopicID: JungleBus topic to subscribe to
- QueueName: Redis queue for storing transactions
- FromBlock/FromPage: Resume point for subscriptions
- QueueSize: Buffer size for incoming transactions
- LiteMode: Whether to receive lite transactions
Example Usage:
// Create subscriber configuration config := &SubscriberConfig{ TopicID: "your-topic-id", QueueName: "tx-queue", FromBlock: 850000, FromPage: 0, QueueSize: 1000, LiteMode: false, } // Create Redis client redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) // Create and start subscriber subscriber := NewSubscriber(redisClient, config) ctx := context.Background() if err := subscriber.Start(ctx); err != nil { log.Fatal(err) } // Subscriber will run until context is cancelled // or shutdown signal is received
Transaction Flow:
1. JungleBus sends transactions matching the topic 2. Subscriber receives transaction callback 3. Transaction ID is extracted and queued in Redis 4. Progress (block/page) is saved for resumption 5. Processor picks up transactions from queue
Progress Tracking:
The subscriber saves progress to Redis, enabling:
- Resumption after restarts
- No duplicate processing
- Efficient catch-up after downtime
Error Handling:
The subscriber handles:
- JungleBus connection failures with reconnection
- Redis queue failures with retries
- Graceful shutdown on signals (SIGINT, SIGTERM)
Monitoring:
Progress and errors are logged for monitoring:
- Current block/page being processed
- Queue status
- Connection health
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber manages JungleBus subscriptions with Redis queue integration
func NewSubscriber ¶
func NewSubscriber(cfg *SubscriberConfig, redisClient *redis.Client, jbClient *junglebus.Client) *Subscriber
NewSubscriber creates a new subscriber with the given configuration and connections
func (*Subscriber) GetProgress ¶
func (s *Subscriber) GetProgress(ctx context.Context) (uint64, error)
GetProgress returns the current progress for the topic
type SubscriberConfig ¶
type SubscriberConfig struct { // Topic ID to subscribe to TopicID string // Queue name in Redis where transactions will be stored QueueName string // Starting block and page for subscription FromBlock uint64 FromPage uint64 // JungleBus subscription options QueueSize uint64 LiteMode bool }
SubscriberConfig holds configuration for a JungleBus subscriber
func DefaultSubscriberConfig ¶
func DefaultSubscriberConfig() *SubscriberConfig
DefaultSubscriberConfig returns a default configuration