srv

package
v0.0.0-...-3985b18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package srv provides a WebSocket hub for managing client connections and broadcasting GitHub webhook events to subscribed clients based on their subscription criteria.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidUsername indicates an invalid GitHub username.
	ErrInvalidUsername = errors.New("invalid username")
	// ErrInvalidURL indicates an invalid URL.
	ErrInvalidURL = errors.New("invalid URL")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	ID string
	// contains filtered or unexported fields
}

Client represents a connected WebSocket client with their subscription preferences.

Connection management follows a simple pattern:

  • ONE goroutine (Run) handles ALL writes to avoid concurrent write issues
  • Server sends pings every pingInterval to detect dead connections
  • Client responds with pongs; read loop resets deadline on any message
  • Read loop (in websocket.go) detects disconnects and closes the connection

Cleanup coordination (CRITICAL FOR THREAD SAFETY):

Multiple goroutines can trigger cleanup concurrently:
  1. Handle() defer in websocket.go calls Hub.Unregister() (async via channel)
  2. Handle() defer in websocket.go calls closeWebSocket() (closes WS connection)
  3. Client.Run() defer calls client.Close() when context is cancelled
  4. Hub.Run() processes unregister message and calls client.Close()
  5. Hub.cleanup() during shutdown calls client.Close() for all clients

Thread safety is ensured by:
  - Close() uses sync.Once to ensure channels are closed exactly once
  - closed atomic flag allows checking if client is closing (safe from any goroutine)
  - Hub checks closed flag before sending to avoid race with channel close
  - closeWebSocket() does NOT send to client channels (would race with Close)

Cleanup flow when a client disconnects:
  1. Handle() read loop exits (EOF, timeout, or error)
  2. defer cancel() signals Client.Run() via context
  3. defer Hub.Unregister(clientID) sends message to hub (returns immediately)
  4. defer closeWebSocket() closes the WebSocket connection only
  5. Client.Run() sees context cancellation, exits, calls defer client.Close()
  6. Hub.Run() processes unregister, calls client.Close() (idempotent via sync.Once)

func NewClient

func NewClient(ctx context.Context, id string, sub Subscription, conn *websocket.Conn, hub *Hub, userOrgs []string, tier github.Tier) *Client

NewClient creates a new client.

func NewClientForTest

func NewClientForTest(ctx context.Context, id string, sub Subscription, conn *websocket.Conn, hub *Hub, userOrgs []string) *Client

NewClientForTest creates a new client for testing with default TierFree. This maintains backward compatibility with existing tests.

func (*Client) CanAccessPrivateRepos

func (c *Client) CanAccessPrivateRepos() bool

CanAccessPrivateRepos returns true if the client's tier allows private repo access. Only Pro and Flock tiers have access to private repository events.

func (*Client) Close

func (c *Client) Close()

Close gracefully closes the client.

func (*Client) IsClosed

func (c *Client) IsClosed() bool

IsClosed returns true if the client is closed or closing. Safe to call from any goroutine.

func (*Client) Run

func (c *Client) Run(ctx context.Context, pingInterval, writeTimeout time.Duration)

Run handles sending events to the client and periodic pings. CRITICAL: This is the ONLY goroutine that writes to the WebSocket connection. All writes go through this function to prevent concurrent write issues.

Connection management:

  1. Server sends ping every pingInterval (54s)
  2. Client must respond with pong (read loop resets deadline on any message)
  3. If client doesn't respond, read timeout (90s) will disconnect them
  4. Any write error immediately closes the connection

type CommitCache

type CommitCache struct {
	// contains filtered or unexported fields
}

CommitCache maps commit SHAs to their associated pull requests. This enables reliable PR association for check_run/check_suite events even when GitHub's pull_requests array is empty.

func NewCommitCache

func NewCommitCache() *CommitCache

NewCommitCache creates a new commit→PR cache.

func (*CommitCache) Get

func (c *CommitCache) Get(ctx context.Context, commitSHA string) (PRInfo, bool)

Get retrieves PR info for a commit SHA. Returns the PRInfo and true if found, or zero value and false if not cached.

func (*CommitCache) Len

func (c *CommitCache) Len() int

Len returns the number of cached entries.

func (*CommitCache) Set

func (c *CommitCache) Set(ctx context.Context, commitSHA string, info PRInfo)

Set caches a commit SHA → PR mapping.

type Event

type Event struct {
	URL        string    `json:"url"`                   // Pull request URL (or repo URL for check events with race condition)
	Timestamp  time.Time `json:"timestamp"`             // When the event occurred
	Type       string    `json:"type"`                  // GitHub event type (e.g., "pull_request")
	DeliveryID string    `json:"delivery_id,omitempty"` // GitHub webhook delivery ID (unique per webhook)
	CommitSHA  string    `json:"commit_sha,omitempty"`  // Commit SHA for check events (used to look up PR when URL is repo-only)
}

Event represents a GitHub webhook event that will be broadcast to clients. It contains the PR URL, timestamp, event type, and delivery ID from GitHub.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub manages WebSocket clients and event broadcasting. It runs in its own goroutine and handles client registration, unregistration, and event distribution.

Thread safety design:

  • Single-goroutine pattern: Only Run() modifies the clients map
  • All external operations (Register, Unregister, Broadcast) send to buffered channels
  • ClientCount() uses RLock for safe concurrent reads
  • Client snapshot pattern in broadcast minimizes lock time

Unregister coordination:

  • Unregister(clientID) sends message to channel and returns immediately (async)
  • Run() processes unregister messages in order
  • Calls client.Close() which is idempotent (sync.Once)
  • Multiple concurrent unregisters for same client are safe

Broadcast safety:

  • Creates client snapshot with RLock, then releases lock
  • Non-blocking send to client.send channel prevents deadlocks
  • If client disconnects during iteration, send fails gracefully (channel full or closed)
  • Client.Close() is safe to call multiple times during this window

func NewHub

func NewHub(enforceTiers bool) *Hub

NewHub creates a new client hub. enforceTiers: if true, enforce tier restrictions; if false, log warnings only.

func (*Hub) Broadcast

func (h *Hub) Broadcast(ctx context.Context, event Event, payload map[string]any)

Broadcast sends an event to all matching clients.

func (*Hub) ClientCount

func (h *Hub) ClientCount() int

ClientCount returns the current number of connected clients. Safe to call from any goroutine.

func (*Hub) CommitCache

func (h *Hub) CommitCache() *CommitCache

CommitCache returns the hub's commit→PR cache for populating from webhook events.

func (*Hub) Register

func (h *Hub) Register(client *Client)

Register registers a new client.

func (*Hub) Run

func (h *Hub) Run(ctx context.Context)

Run starts the hub's event loop. The context should be passed from main for proper lifecycle management.

func (*Hub) Stop

func (h *Hub) Stop()

Stop signals the hub to stop.

func (*Hub) Unregister

func (h *Hub) Unregister(clientID string)

Unregister unregisters a client by ID.

func (*Hub) Wait

func (h *Hub) Wait()

Wait blocks until the hub has stopped.

type PRInfo

type PRInfo struct {
	URL      string // Full PR URL (e.g., https://github.com/owner/repo/pull/123)
	RepoURL  string // Repository URL (e.g., https://github.com/owner/repo)
	CachedAt time.Time
	Number   int // PR number
}

PRInfo contains cached information about a pull request.

type Subscription

type Subscription struct {
	Organization   string   `json:"organization"`
	Username       string   `json:"-"`
	EventTypes     []string `json:"event_types,omitempty"`
	PullRequests   []string `json:"pull_requests,omitempty"`
	UserEventsOnly bool     `json:"user_events_only,omitempty"`
}

Subscription represents a client's subscription criteria.

func (*Subscription) Validate

func (s *Subscription) Validate() error

Validate performs security validation on subscription data.

type WebSocketHandler

type WebSocketHandler struct {
	// contains filtered or unexported fields
}

WebSocketHandler handles WebSocket connections.

func NewWebSocketHandler

func NewWebSocketHandler(h *Hub, connLimiter *security.ConnectionLimiter, allowedEvents []string) *WebSocketHandler

NewWebSocketHandler creates a new WebSocket handler.

func NewWebSocketHandlerForTest

func NewWebSocketHandlerForTest(h *Hub, connLimiter *security.ConnectionLimiter, allowedEvents []string) *WebSocketHandler

NewWebSocketHandlerForTest creates a WebSocket handler for testing that skips GitHub auth.

func (*WebSocketHandler) Handle

func (h *WebSocketHandler) Handle(ws *websocket.Conn)

Handle handles a WebSocket connection.

func (*WebSocketHandler) PreValidateAuth

func (h *WebSocketHandler) PreValidateAuth(r *http.Request) bool

PreValidateAuth checks if the request has a valid GitHub token before WebSocket upgrade. This allows us to return proper HTTP status codes before the connection is upgraded.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL