package main
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"time"
"github.com/urfave/cli/v3"
"github.com/mountain-reverie/blue-green-load-balancer/internal/bgctl"
)
// Exit codes
const (
exitSuccess = 0
exitUsageError = 1
exitConnectionError = 2
exitAPIError = 3
)
func main() {
cmd := &cli.Command{
Name: "bgctl",
Usage: "Blue/Green Load Balancer control CLI",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "hostname",
Aliases: []string{"H"},
Value: "bluegreen-admin",
Usage: "Admin server hostname",
Sources: cli.EnvVars("BGCTL_HOSTNAME"),
},
&cli.StringFlag{
Name: "control-url",
Usage: "Tailscale/Headscale control server URL",
Sources: cli.EnvVars("BGCTL_CONTROL_URL"),
},
&cli.StringFlag{
Name: "auth-key",
Usage: "Tailscale auth key",
Sources: cli.EnvVars("TS_AUTH_KEY"),
},
&cli.StringFlag{
Name: "state-dir",
Value: defaultStateDir(),
Usage: "Tailscale state directory",
Sources: cli.EnvVars("BGCTL_STATE_DIR"),
},
&cli.StringFlag{
Name: "output",
Aliases: []string{"o"},
Value: "text",
Usage: "Output format: json or text",
},
&cli.DurationFlag{
Name: "timeout",
Value: 30 * time.Second,
Usage: "Request timeout",
},
&cli.BoolFlag{
Name: "verbose",
Aliases: []string{"v"},
Usage: "Enable verbose logging",
},
},
Commands: []*cli.Command{
{
Name: "status",
Usage: "Display current blue/green status",
Action: runStatus,
},
{
Name: "metrics",
Usage: "Display current metrics snapshot",
Action: runMetrics,
},
{
Name: "refresh",
Usage: "Trigger a git refresh",
Action: runRefresh,
},
},
}
if err := cmd.Run(context.Background(), os.Args); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(exitUsageError)
}
}
func defaultStateDir() string {
home, err := os.UserHomeDir()
if err != nil {
return ".bgctl/tailscale"
}
return filepath.Join(home, ".bgctl", "tailscale")
}
func createClient(ctx context.Context, cmd *cli.Command) (*bgctl.Client, error) {
var logger *slog.Logger
if cmd.Bool("verbose") {
logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
} else {
logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
}
cfg := &bgctl.Config{
Hostname: cmd.String("hostname"),
ControlURL: cmd.String("control-url"),
AuthKey: cmd.String("auth-key"),
StateDir: cmd.String("state-dir"),
Timeout: cmd.Duration("timeout"),
Verbose: cmd.Bool("verbose"),
Logger: logger,
}
client, err := bgctl.New(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("connecting to Tailscale: %w", err)
}
return client, nil
}
func runStatus(ctx context.Context, cmd *cli.Command) error {
client, err := createClient(ctx, cmd)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(exitConnectionError)
return nil
}
defer func() { _ = client.Close() }()
status, err := client.GetStatus(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
if isAPIError(err) {
os.Exit(exitAPIError)
} else {
os.Exit(exitConnectionError)
}
return nil
}
formatter := bgctl.NewFormatter(cmd.String("output"))
fmt.Print(formatter.FormatStatus(status))
return nil
}
func runMetrics(ctx context.Context, cmd *cli.Command) error {
client, err := createClient(ctx, cmd)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(exitConnectionError)
return nil
}
defer func() { _ = client.Close() }()
metrics, err := client.GetMetrics(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
if isAPIError(err) {
os.Exit(exitAPIError)
} else {
os.Exit(exitConnectionError)
}
return nil
}
formatter := bgctl.NewFormatter(cmd.String("output"))
fmt.Print(formatter.FormatMetrics(metrics))
return nil
}
func runRefresh(ctx context.Context, cmd *cli.Command) error {
client, err := createClient(ctx, cmd)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(exitConnectionError)
return nil
}
defer func() { _ = client.Close() }()
refresh, err := client.TriggerRefresh(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
if isAPIError(err) {
os.Exit(exitAPIError)
} else {
os.Exit(exitConnectionError)
}
return nil
}
formatter := bgctl.NewFormatter(cmd.String("output"))
fmt.Print(formatter.FormatRefresh(refresh))
return nil
}
func isAPIError(err error) bool {
_, ok := err.(*bgctl.APIError)
return ok
}
package main
import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"github.com/prometheus/client_golang/prometheus"
"github.com/mountain-reverie/blue-green-load-balancer/internal/app"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
"github.com/mountain-reverie/blue-green-load-balancer/internal/logging"
)
func main() {
configPath := flag.String("config", "config/config.yaml", "Path to configuration file")
localAdmin := flag.String("local-admin", "", "Run admin server locally on this address (e.g., :8081) instead of Tailscale")
logLevel := flag.String("log-level", "info", "Log level (debug, info, warn, error)")
flag.Parse()
// Set up logging
var level slog.Level
switch *logLevel {
case "debug":
level = slog.LevelDebug
case "warn":
level = slog.LevelWarn
case "error":
level = slog.LevelError
default:
level = slog.LevelInfo
}
// Use journald handler when running under systemd, otherwise JSON
var handler slog.Handler
if logging.IsUnderSystemd() {
handler = logging.NewJournaldHandler(level)
} else {
handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})
}
logger := slog.New(handler)
// Load configuration
cfg, err := config.Load(*configPath)
if err != nil {
logger.Error("failed to load configuration", "error", err)
os.Exit(1)
}
logger.Info("starting blue/green load balancer",
"blue_url", cfg.Services.Blue.URL,
"green_url", cfg.Services.Green.URL,
"listen_addr", cfg.Proxy.ListenAddr,
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create application with all components
application, err := app.New(cfg,
app.WithLogger(logger),
app.WithPrometheusRegistry(prometheus.DefaultRegisterer),
app.WithConfigPath(*configPath),
)
if err != nil {
logger.Error("failed to create application", "error", err)
os.Exit(1)
}
// Start application components (health checker, SSE updates, git watcher)
if err := application.Start(ctx); err != nil {
logger.Error("failed to start application", "error", err)
os.Exit(1)
}
defer application.Stop()
// Start admin server
if *localAdmin != "" {
if err := application.StartAdminServer(ctx, *localAdmin); err != nil {
logger.Error("failed to start local admin server", "error", err)
os.Exit(1)
}
logger.Info("admin server started locally", "addr", *localAdmin)
} else {
if err := application.Admin.Start(ctx); err != nil {
logger.Error("failed to start admin server", "error", err)
os.Exit(1)
}
}
// Start proxy server (with socket activation if available)
var proxyErrCh <-chan error
if logging.IsSocketActivated() {
listeners, err := logging.GetSocketActivationListeners()
if err != nil {
logger.Error("failed to get socket activation listeners", "error", err)
os.Exit(1)
}
if len(listeners) == 0 {
logger.Error("socket activation enabled but no listeners received")
os.Exit(1)
}
// Use the first listener for the proxy (typically bluegreen.socket)
logger.Info("using socket activation", "listeners", len(listeners))
proxyErrCh = application.StartProxyServerWithListener(listeners[0])
} else {
proxyErrCh = application.StartProxyServer()
}
// Notify systemd that we're ready
if err := logging.NotifyReady(); err != nil {
logger.Debug("failed to notify systemd ready", "error", err)
}
if err := logging.NotifyStatus("Running"); err != nil {
logger.Debug("failed to set systemd status", "error", err)
}
// Set up signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
for {
select {
case sig := <-sigCh:
switch sig {
case syscall.SIGHUP:
// Reload configuration
logger.Info("received SIGHUP, reloading configuration")
// Notify systemd we're reloading
if err := logging.NotifyReloading(); err != nil {
logger.Debug("failed to notify systemd reloading", "error", err)
}
if err := logging.NotifyStatus("Reloading configuration..."); err != nil {
logger.Debug("failed to set systemd status", "error", err)
}
// Perform reload
if err := application.Reload(); err != nil {
logger.Error("failed to reload configuration", "error", err)
}
// Notify systemd we're ready again
if err := logging.NotifyReady(); err != nil {
logger.Debug("failed to notify systemd ready", "error", err)
}
if err := logging.NotifyStatus("Running"); err != nil {
logger.Debug("failed to set systemd status", "error", err)
}
case syscall.SIGINT, syscall.SIGTERM:
logger.Info("received shutdown signal", "signal", sig)
// Notify systemd we're stopping
if err := logging.NotifyStopping(); err != nil {
logger.Debug("failed to notify systemd stopping", "error", err)
}
if err := logging.NotifyStatus("Shutting down..."); err != nil {
logger.Debug("failed to set systemd status", "error", err)
}
// Graceful shutdown (handled by defer application.Stop())
logger.Info("shutting down...")
return
}
case err := <-proxyErrCh:
if err != nil {
logger.Error("proxy server error", "error", err)
}
return
case <-ctx.Done():
logger.Info("context cancelled")
return
}
}
}
func init() {
// Print version info
fmt.Println("Blue/Green Load Balancer v0.1.0")
}
package admin
import (
"context"
"time"
"github.com/danielgtaylor/huma/v2"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
"github.com/mountain-reverie/blue-green-load-balancer/internal/health"
"github.com/mountain-reverie/blue-green-load-balancer/internal/metrics"
)
// ServiceStatus represents the status of a backend service.
type ServiceStatus struct {
URL string `json:"url" example:"http://localhost:3001"`
HealthPath string `json:"health_path" example:"/health"`
Healthy bool `json:"healthy" example:"true"`
LastCheck time.Time `json:"last_check"`
Latency time.Duration `json:"latency"`
Error string `json:"error,omitempty"`
}
// GitStatusResponse represents git repository status.
type GitStatusResponse struct {
LastFetch time.Time `json:"last_fetch"`
LastTag string `json:"last_tag,omitempty"`
BlueCommit string `json:"blue_commit,omitempty"`
GreenCommit string `json:"green_commit,omitempty"`
ActiveCommit string `json:"active_commit,omitempty"`
Error string `json:"error,omitempty"`
}
// StatusOutput is the response for GET /api/status
type StatusOutput struct {
Body struct {
ActiveService string `json:"active_service" example:"blue"`
Blue ServiceStatus `json:"blue"`
Green ServiceStatus `json:"green"`
LastSwitch time.Time `json:"last_switch"`
SwitchCount int64 `json:"switch_count" example:"5"`
Git GitStatusResponse `json:"git"`
}
}
// MetricsOutput is the response for GET /api/metrics
type MetricsOutput struct {
Body struct {
Requests RequestMetrics `json:"requests"`
Latency LatencyMetrics `json:"latency"`
Errors ErrorMetrics `json:"errors"`
ActiveConnections int64 `json:"active_connections" example:"42"`
UptimeSeconds int64 `json:"uptime_seconds" example:"86400"`
}
}
// RequestMetrics contains request statistics.
type RequestMetrics struct {
Total int64 `json:"total" example:"10000"`
PerSecond float64 `json:"per_second" example:"15.5"`
Blue int64 `json:"blue" example:"5000"`
Green int64 `json:"green" example:"5000"`
}
// LatencyMetrics contains latency statistics.
type LatencyMetrics struct {
P50 float64 `json:"p50" example:"10.5"`
P90 float64 `json:"p90" example:"25.0"`
P99 float64 `json:"p99" example:"100.0"`
Avg float64 `json:"avg" example:"15.2"`
}
// ErrorMetrics contains error statistics.
type ErrorMetrics struct {
Total int64 `json:"total" example:"10"`
Rate float64 `json:"rate" example:"0.001"`
LastError string `json:"last_error,omitempty" example:"connection refused"`
}
// MetricsHistoryOutput is the response for GET /api/metrics/history
type MetricsHistoryOutput struct {
Body struct {
Points []metrics.DataPoint `json:"points"`
Resolution time.Duration `json:"resolution"`
Duration time.Duration `json:"duration"`
}
}
// HistoryInput is the query params for GET /api/metrics/history
type HistoryInput struct {
Duration string `query:"duration" default:"1h" doc:"Duration of history to return (e.g., 1h, 24h)"`
Resolution string `query:"resolution" default:"1m" doc:"Resolution of data points (e.g., 1m, 5m)"`
}
// RegisterAPI registers all API operations with Huma.
func RegisterAPI(api huma.API, s *Server) {
// GET /api/status
huma.Get(api, "/api/status", func(ctx context.Context, input *struct{}) (*StatusOutput, error) {
status := s.switcher.GetStatus()
blueEndpoint := s.cfg.GetServiceEndpoint(config.ServiceBlue)
greenEndpoint := s.cfg.GetServiceEndpoint(config.ServiceGreen)
out := &StatusOutput{}
out.Body.ActiveService = string(status.ActiveService)
out.Body.LastSwitch = status.LastSwitch
out.Body.SwitchCount = status.SwitchCount
out.Body.Blue = toServiceStatus(blueEndpoint, status.BlueHealth)
out.Body.Green = toServiceStatus(greenEndpoint, status.GreenHealth)
out.Body.Git = GitStatusResponse{
LastFetch: status.Git.LastFetch,
LastTag: status.Git.LastTag,
BlueCommit: status.Git.BlueCommit,
GreenCommit: status.Git.GreenCommit,
ActiveCommit: status.Git.ActiveCommit,
Error: status.Git.Error,
}
return out, nil
})
// GET /api/metrics
huma.Get(api, "/api/metrics", func(ctx context.Context, input *struct{}) (*MetricsOutput, error) {
if s.metrics == nil {
return nil, huma.Error500InternalServerError("metrics not available")
}
snapshot := s.metrics.Snapshot()
out := &MetricsOutput{}
out.Body.Requests = RequestMetrics{
Total: snapshot.TotalRequests,
PerSecond: snapshot.RequestsPerSecond,
Blue: snapshot.BlueRequests,
Green: snapshot.GreenRequests,
}
out.Body.Latency = LatencyMetrics{
P50: snapshot.LatencyP50,
P90: snapshot.LatencyP90,
P99: snapshot.LatencyP99,
Avg: snapshot.LatencyAvg,
}
out.Body.Errors = ErrorMetrics{
Total: snapshot.TotalErrors,
Rate: snapshot.ErrorRate,
LastError: snapshot.LastError,
}
out.Body.ActiveConnections = snapshot.ActiveConnections
out.Body.UptimeSeconds = snapshot.UptimeSeconds
return out, nil
})
// GET /api/metrics/history
huma.Get(api, "/api/metrics/history", func(ctx context.Context, input *HistoryInput) (*MetricsHistoryOutput, error) {
if s.metrics == nil {
return nil, huma.Error500InternalServerError("metrics not available")
}
duration, err := time.ParseDuration(input.Duration)
if err != nil {
duration = time.Hour
}
resolution, err := time.ParseDuration(input.Resolution)
if err != nil {
resolution = time.Minute
}
points := s.metrics.History(duration, resolution)
out := &MetricsHistoryOutput{}
out.Body.Points = points
out.Body.Duration = duration
out.Body.Resolution = resolution
return out, nil
})
}
func toServiceStatus(endpoint config.ServiceEndpoint, h health.Status) ServiceStatus {
return ServiceStatus{
URL: endpoint.URL,
HealthPath: endpoint.HealthPath,
Healthy: h.Healthy,
LastCheck: h.LastCheck,
Latency: h.Latency,
Error: h.Error,
}
}
package admin
import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
"sync"
"github.com/danielgtaylor/huma/v2"
"github.com/danielgtaylor/huma/v2/adapters/humachi"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"tailscale.com/tsnet"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
"github.com/mountain-reverie/blue-green-load-balancer/internal/metrics"
"github.com/mountain-reverie/blue-green-load-balancer/internal/switcher"
)
// Server is the admin server that runs on the Tailscale network.
type Server struct {
cfg *config.Config
logger *slog.Logger
switcher *switcher.Switcher
metrics *metrics.Collector
tsServer *tsnet.Server
router chi.Router
api huma.API
mu sync.Mutex
listener net.Listener
}
// NewServer creates a new admin server.
func NewServer(
cfg *config.Config,
logger *slog.Logger,
sw *switcher.Switcher,
m *metrics.Collector,
) *Server {
s := &Server{
cfg: cfg,
logger: logger,
switcher: sw,
metrics: m,
}
s.setupRouter()
return s
}
// setupRouter configures the HTTP router and Huma API.
func (s *Server) setupRouter() {
s.router = chi.NewRouter()
// Middleware
s.router.Use(middleware.RequestID)
s.router.Use(middleware.RealIP)
s.router.Use(middleware.Recoverer)
s.router.Use(middleware.Logger)
// Create Huma API
humaConfig := huma.DefaultConfig("Blue/Green Load Balancer Admin", "1.0.0")
humaConfig.Info.Description = "Admin API for the Blue/Green Load Balancer"
humaConfig.DocsPath = "/api/docs"
humaConfig.OpenAPIPath = "/api/openapi"
s.api = humachi.New(s.router, humaConfig)
// Register API operations
RegisterAPI(s.api, s)
RegisterWebhookAPI(s.api, s, s.cfg.Admin.WebhookKey)
}
// Start starts the admin server on the Tailscale network.
func (s *Server) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
// Initialize tsnet server
s.tsServer = &tsnet.Server{
Hostname: s.cfg.Admin.Hostname,
Dir: s.cfg.Admin.StateDir,
Ephemeral: s.cfg.Admin.Ephemeral,
Logf: func(format string, args ...any) { s.logger.Debug(fmt.Sprintf(format, args...)) },
}
// Set custom control server URL if provided (e.g., Headscale)
if s.cfg.Admin.ControlURL != "" {
s.tsServer.ControlURL = s.cfg.Admin.ControlURL
}
// Set auth key if provided
if s.cfg.Admin.AuthKey != "" {
s.tsServer.AuthKey = s.cfg.Admin.AuthKey
}
// Start the Tailscale server
if err := s.tsServer.Start(); err != nil {
return fmt.Errorf("starting tsnet server: %w", err)
}
// Get listener on port 80
ln, err := s.tsServer.Listen("tcp", ":80")
if err != nil {
_ = s.tsServer.Close()
return fmt.Errorf("listening on tsnet: %w", err)
}
s.listener = ln
// Start HTTP server
go func() {
server := &http.Server{
Handler: s.router,
}
if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
s.logger.Error("admin server error", "error", err)
}
}()
s.logger.Info("admin server started",
"hostname", s.cfg.Admin.Hostname,
)
return nil
}
// Stop stops the admin server.
func (s *Server) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.listener != nil {
_ = s.listener.Close()
}
if s.tsServer != nil {
_ = s.tsServer.Close()
}
return nil
}
// Router returns the HTTP router for adding additional routes.
func (s *Server) Router() chi.Router {
return s.router
}
// API returns the Huma API for adding additional operations.
func (s *Server) API() huma.API {
return s.api
}
// LocalServer starts a local HTTP server for development/testing.
// This bypasses Tailscale and listens on a local address.
func (s *Server) LocalServer(ctx context.Context, addr string) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("listening on %s: %w", addr, err)
}
s.mu.Lock()
s.listener = ln
s.mu.Unlock()
go func() {
server := &http.Server{
Handler: s.router,
}
if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
s.logger.Error("local admin server error", "error", err)
}
}()
s.logger.Info("local admin server started", "addr", addr)
return nil
}
package admin
import (
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"strings"
"time"
"github.com/danielgtaylor/huma/v2"
"github.com/mountain-reverie/blue-green-load-balancer/internal/switcher"
)
// WebhookConfig holds webhook authentication settings.
type WebhookConfig struct {
Secret string // HMAC secret for webhook verification
}
// WebhookRefreshInput is the request body for POST /api/webhook/refresh
type WebhookRefreshInput struct {
RawBody []byte
Body struct {
Signature string `json:"signature,omitempty" doc:"HMAC signature for verification"`
}
Headers struct {
XHubSignature256 string `header:"X-Hub-Signature-256" doc:"GitHub-style HMAC signature"`
XWebhookSecret string `header:"X-Webhook-Secret" doc:"Simple secret header"`
}
}
// WebhookRefreshOutput is the response for POST /api/webhook/refresh
type WebhookRefreshOutput struct {
Body switcher.RefreshGitResult
}
// RegisterWebhookAPI registers webhook API operations.
func RegisterWebhookAPI(api huma.API, s *Server, webhookSecret string) {
if webhookSecret == "" {
s.logger.Warn("webhook authentication disabled - no secret configured")
}
// POST /api/webhook/refresh - triggers a git refresh which may cause a switch
huma.Post(api, "/api/webhook/refresh", func(ctx context.Context, input *WebhookRefreshInput) (*WebhookRefreshOutput, error) {
// Verify webhook signature if secret is configured
if webhookSecret != "" {
if !verifyWebhookRefresh(input, webhookSecret) {
return nil, huma.Error401Unauthorized("invalid webhook signature")
}
}
result := s.switcher.RefreshGit(ctx)
// Return 500 only on complete failure. Partial success (Refreshed=true with Error)
// returns 200 with error details in the response body.
if result.Error != "" && !result.Refreshed {
return nil, huma.Error500InternalServerError(result.Error)
}
return &WebhookRefreshOutput{Body: result}, nil
})
}
// verifyWebhookRefresh verifies the webhook signature for refresh requests.
func verifyWebhookRefresh(input *WebhookRefreshInput, secret string) bool {
// Check X-Hub-Signature-256 header (GitHub style)
if input.Headers.XHubSignature256 != "" {
return verifyHubSignature(input.RawBody, input.Headers.XHubSignature256, secret)
}
// Check X-Webhook-Secret header (simple)
if input.Headers.XWebhookSecret != "" {
return subtle.ConstantTimeCompare([]byte(input.Headers.XWebhookSecret), []byte(secret)) == 1
}
// Check signature in body
if input.Body.Signature != "" {
return subtle.ConstantTimeCompare([]byte(input.Body.Signature), []byte(secret)) == 1
}
return false
}
// verifyHubSignature verifies a GitHub-style HMAC signature.
func verifyHubSignature(payload []byte, signature string, secret string) bool {
// Signature format: sha256=<hex>
if !strings.HasPrefix(signature, "sha256=") {
return false
}
sigHex := strings.TrimPrefix(signature, "sha256=")
sigBytes, err := hex.DecodeString(sigHex)
if err != nil {
return false
}
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(payload)
expectedMAC := mac.Sum(nil)
return hmac.Equal(sigBytes, expectedMAC)
}
// WebhookHandler provides a standard http.Handler for webhooks.
// This can be used outside of Huma if needed.
type WebhookHandler struct {
switcher *switcher.Switcher
webhookSecret string
}
// NewWebhookHandler creates a new webhook handler.
func NewWebhookHandler(sw *switcher.Switcher, secret string) *WebhookHandler {
return &WebhookHandler{
switcher: sw,
webhookSecret: secret,
}
}
// maxWebhookBodySize is the maximum size of a webhook request body (1MB).
const maxWebhookBodySize = 1 << 20
// ServeHTTP implements http.Handler for git refresh webhooks.
func (h *WebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Limit request body size to prevent DoS
r.Body = http.MaxBytesReader(w, r.Body, maxWebhookBodySize)
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "failed to read body", http.StatusBadRequest)
return
}
// Verify signature when secret is configured
if h.webhookSecret != "" {
signature := r.Header.Get("X-Hub-Signature-256")
secretHeader := r.Header.Get("X-Webhook-Secret")
// Require at least one authentication method
if signature == "" && secretHeader == "" {
http.Error(w, "unauthorized: missing signature", http.StatusUnauthorized)
return
}
// Verify HMAC signature if provided
if signature != "" {
if !verifyHubSignature(body, signature, h.webhookSecret) {
http.Error(w, "unauthorized: invalid signature", http.StatusUnauthorized)
return
}
} else if secretHeader != "" {
// Verify simple secret header
if subtle.ConstantTimeCompare([]byte(secretHeader), []byte(h.webhookSecret)) != 1 {
http.Error(w, "unauthorized: invalid secret", http.StatusUnauthorized)
return
}
}
}
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
// Trigger git refresh - any tag changes will cause switches via the callback
result := h.switcher.RefreshGit(ctx)
if result.Error != "" && !result.Refreshed {
http.Error(w, result.Error, http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(result); err != nil {
// Response already started, can only log
// Error will be logged by middleware or handled upstream
return
}
}
// Package app provides the application server that can be used both in production and tests.
package app
import (
"context"
"errors"
"log/slog"
"net"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/mountain-reverie/blue-green-load-balancer/internal/admin"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
"github.com/mountain-reverie/blue-green-load-balancer/internal/health"
"github.com/mountain-reverie/blue-green-load-balancer/internal/metrics"
"github.com/mountain-reverie/blue-green-load-balancer/internal/proxy"
"github.com/mountain-reverie/blue-green-load-balancer/internal/switcher"
"github.com/mountain-reverie/blue-green-load-balancer/internal/ui"
)
// Application encapsulates all server components and provides a unified interface
// for running the blue/green load balancer. It can be used in production (main.go)
// or in integration tests.
type Application struct {
Config *config.Config
Logger *slog.Logger
Proxy *proxy.Proxy
Health *health.Checker
Switcher *switcher.Switcher
Metrics *metrics.Collector
Admin *admin.Server
UI *ui.Handlers
// prometheusReg is stored to apply after Metrics is initialized
prometheusReg prometheus.Registerer
// configPath stores the config file path for reload
configPath string
mu sync.Mutex
gitWatcher *switcher.GitWatcher
proxyServer *http.Server
ctx context.Context
cancel context.CancelFunc
started bool
}
// Option configures the Application.
type Option func(*Application)
// WithLogger sets the logger for the application.
func WithLogger(logger *slog.Logger) Option {
return func(a *Application) {
a.Logger = logger
}
}
// WithPrometheusRegistry registers metrics with a Prometheus registry.
// The registration happens after all components are initialized.
func WithPrometheusRegistry(reg prometheus.Registerer) Option {
return func(a *Application) {
a.prometheusReg = reg
}
}
// WithConfigPath stores the config file path for reload support.
func WithConfigPath(path string) Option {
return func(a *Application) {
a.configPath = path
}
}
// New creates a new Application with all components initialized.
func New(cfg *config.Config, opts ...Option) (*Application, error) {
app := &Application{
Config: cfg,
Logger: slog.Default(),
}
// Apply options first to get logger
for _, opt := range opts {
opt(app)
}
var err error
// Initialize metrics collector
app.Metrics = metrics.NewCollector(cfg)
// Initialize reverse proxy with callbacks
app.Proxy, err = proxy.New(cfg, app.Logger,
proxy.WithRequestCallback(func(target config.ServiceTarget, statusCode int, duration time.Duration) {
app.Metrics.RecordRequest(target, statusCode, duration)
}),
proxy.WithErrorCallback(func(target config.ServiceTarget, err error) {
app.Metrics.RecordError(target, err)
}),
)
if err != nil {
return nil, err
}
// Initialize health checker
app.Health = health.NewChecker(cfg, app.Logger)
// Initialize switcher
app.Switcher = switcher.NewSwitcher(cfg, app.Logger, app.Proxy, app.Health,
switcher.WithSwitchCallback(func(event switcher.SwitchEvent) {
app.Metrics.RecordSwitch(event.From, event.To, string(event.Trigger))
}),
)
// Initialize admin server (includes Huma API)
app.Admin = admin.NewServer(cfg, app.Logger, app.Switcher, app.Metrics)
// Initialize UI handlers and register routes
app.UI = ui.NewHandlers(app.Switcher, app.Metrics)
app.UI.RegisterRoutes(app.Admin.Router())
// Add Prometheus metrics endpoint
app.Admin.Router().Handle("/metrics", promhttp.Handler())
// Register metrics with Prometheus if configured
if app.prometheusReg != nil {
if err := app.Metrics.Register(app.prometheusReg); err != nil {
app.Logger.Warn("failed to register prometheus metrics", "error", err)
}
}
return app, nil
}
// Start starts all application components.
// This method is not safe to call multiple times without calling Stop() first.
func (a *Application) Start(ctx context.Context) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.started {
return errors.New("application already started")
}
a.ctx, a.cancel = context.WithCancel(ctx)
a.started = true
// Start health checker
a.Health.Start(a.ctx)
// Start UI updates (SSE)
a.UI.StartUpdates()
// Initialize git watcher if configured
if a.Config.Git.RepoURL != "" {
a.gitWatcher = switcher.NewGitWatcher(a.Config, a.Logger,
switcher.WithTagChangeCallback(a.Switcher.HandleTagChange),
)
a.Switcher.SetGitWatcher(a.gitWatcher)
a.gitWatcher.Start(a.ctx)
}
return nil
}
// Stop stops all application components gracefully.
func (a *Application) Stop() {
a.mu.Lock()
defer a.mu.Unlock()
if a.cancel != nil {
a.cancel()
a.cancel = nil
}
if a.proxyServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_ = a.proxyServer.Shutdown(ctx)
cancel()
a.proxyServer = nil
}
if a.gitWatcher != nil {
a.gitWatcher.Stop()
a.gitWatcher = nil
}
if a.UI != nil {
a.UI.StopUpdates()
}
if a.Health != nil {
a.Health.Stop()
}
if a.Admin != nil {
_ = a.Admin.Stop()
}
a.started = false
}
// Reload reloads the configuration from disk and applies changes.
// Not all configuration changes can be applied without restart:
// - proxy.listen_addr: requires restart
// - admin.*: requires restart (Tailscale settings)
// Hot-reloadable settings:
// - health.interval, health.timeout
// - git.poll_interval
// - services.*.health_path
func (a *Application) Reload() error {
a.mu.Lock()
defer a.mu.Unlock()
if a.configPath == "" {
return errors.New("config path not set, cannot reload")
}
newCfg, err := config.Load(a.configPath)
if err != nil {
return err
}
// Log changes that require restart
if newCfg.Proxy.ListenAddr != a.Config.Proxy.ListenAddr {
a.Logger.Warn("proxy.listen_addr changed, requires restart",
"old", a.Config.Proxy.ListenAddr,
"new", newCfg.Proxy.ListenAddr)
}
if newCfg.Admin.Hostname != a.Config.Admin.Hostname {
a.Logger.Warn("admin.hostname changed, requires restart",
"old", a.Config.Admin.Hostname,
"new", newCfg.Admin.Hostname)
}
// Apply hot-reloadable changes
a.Config.Health.Interval = newCfg.Health.Interval
a.Config.Health.Timeout = newCfg.Health.Timeout
a.Config.Git.PollInterval = newCfg.Git.PollInterval
a.Config.Services.Blue.HealthPath = newCfg.Services.Blue.HealthPath
a.Config.Services.Green.HealthPath = newCfg.Services.Green.HealthPath
a.Config.Proxy.DrainTimeout = newCfg.Proxy.DrainTimeout
// Update health checker (it reads from config on each tick)
// The health checker already uses the config pointer, so changes apply automatically
// Log successful reload
a.Logger.Info("configuration reloaded",
"health_interval", a.Config.Health.Interval,
"health_timeout", a.Config.Health.Timeout,
"git_poll_interval", a.Config.Git.PollInterval)
return nil
}
// Handler returns the HTTP handler for the admin/API server.
func (a *Application) Handler() http.Handler {
return a.Admin.Router()
}
// ProxyHandler returns the HTTP handler for the reverse proxy.
func (a *Application) ProxyHandler() http.Handler {
return a.Proxy
}
// StartAdminServer starts the admin server on a local address.
// For Tailscale, use Admin.Start() directly.
func (a *Application) StartAdminServer(ctx context.Context, addr string) error {
return a.Admin.LocalServer(ctx, addr)
}
// StartProxyServer starts the proxy server on the configured address.
// Returns a channel that receives any error from ListenAndServe (or nil on clean shutdown).
func (a *Application) StartProxyServer() <-chan error {
return a.StartProxyServerWithListener(nil)
}
// StartProxyServerWithListener starts the proxy server using the provided listener.
// If listener is nil, it creates its own listener on the configured address.
// This supports systemd socket activation by accepting pre-created listeners.
// Returns a channel that receives any error from Serve (or nil on clean shutdown).
func (a *Application) StartProxyServerWithListener(ln net.Listener) <-chan error {
errCh := make(chan error, 1)
a.mu.Lock()
a.proxyServer = &http.Server{
Addr: a.Config.Proxy.ListenAddr,
Handler: a.Proxy,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
a.mu.Unlock()
go func() {
var err error
if ln != nil {
// Use provided listener (socket activation)
a.Logger.Info("proxy server starting with socket activation", "addr", ln.Addr().String())
err = a.proxyServer.Serve(ln)
} else {
// Create our own listener
a.Logger.Info("proxy server starting", "addr", a.Config.Proxy.ListenAddr)
err = a.proxyServer.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
errCh <- err
} else {
errCh <- nil
}
close(errCh)
}()
return errCh
}
package bgctl
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
"tailscale.com/client/local"
"tailscale.com/tsnet"
)
// Config holds the client configuration.
type Config struct {
Hostname string
ControlURL string
AuthKey string
StateDir string
Timeout time.Duration
Verbose bool
Logger *slog.Logger
}
// Client connects to the admin server via Tailscale network.
type Client struct {
tsServer *tsnet.Server
httpClient *http.Client
hostname string
logger *slog.Logger
}
// New creates a new client connected to the Tailscale network.
func New(ctx context.Context, cfg *Config) (*Client, error) {
logger := cfg.Logger
if logger == nil {
logger = slog.Default()
}
if cfg.StateDir == "" {
return nil, fmt.Errorf("state directory is required")
}
srv := &tsnet.Server{
Hostname: "bgctl",
Dir: cfg.StateDir,
ControlURL: cfg.ControlURL,
AuthKey: cfg.AuthKey,
Ephemeral: true,
}
if !cfg.Verbose {
srv.Logf = func(format string, args ...any) {}
}
logger.Debug("starting tsnet server",
"state_dir", cfg.StateDir,
"control_url", cfg.ControlURL,
)
if err := srv.Start(); err != nil {
return nil, fmt.Errorf("starting tsnet server: %w", err)
}
lc, err := srv.LocalClient()
if err != nil {
_ = srv.Close()
return nil, fmt.Errorf("getting local client: %w", err)
}
logger.Debug("waiting for Tailscale IP assignment")
if err := waitForTailscaleIP(ctx, lc); err != nil {
_ = srv.Close()
return nil, fmt.Errorf("waiting for Tailscale IP: %w", err)
}
timeout := cfg.Timeout
if timeout == 0 {
timeout = 30 * time.Second
}
return &Client{
tsServer: srv,
httpClient: &http.Client{
Transport: &http.Transport{
DialContext: srv.Dial,
},
Timeout: timeout,
},
hostname: cfg.Hostname,
logger: logger,
}, nil
}
// waitForTailscaleIP waits until the tsnet server has a Tailscale IP assigned.
func waitForTailscaleIP(ctx context.Context, lc *local.Client) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
status, err := lc.Status(ctx)
if err != nil {
continue
}
if status.Self != nil && len(status.Self.TailscaleIPs) > 0 {
return nil
}
}
}
}
// Close closes the client and its tsnet server.
func (c *Client) Close() error {
if transport, ok := c.httpClient.Transport.(*http.Transport); ok {
transport.CloseIdleConnections()
}
return c.tsServer.Close()
}
// GetStatus retrieves the current blue/green status.
func (c *Client) GetStatus(ctx context.Context) (*StatusResponse, error) {
url := fmt.Sprintf("http://%s/api/status", c.hostname)
c.logger.Debug("fetching status", "url", url)
resp, err := c.get(ctx, url)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, c.parseError(resp)
}
var status StatusResponse
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return nil, fmt.Errorf("decoding status response: %w", err)
}
return &status, nil
}
// GetMetrics retrieves the current metrics snapshot.
func (c *Client) GetMetrics(ctx context.Context) (*MetricsResponse, error) {
url := fmt.Sprintf("http://%s/api/metrics", c.hostname)
c.logger.Debug("fetching metrics", "url", url)
resp, err := c.get(ctx, url)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, c.parseError(resp)
}
var metrics MetricsResponse
if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil {
return nil, fmt.Errorf("decoding metrics response: %w", err)
}
return &metrics, nil
}
// TriggerRefresh triggers a git refresh on the admin server.
func (c *Client) TriggerRefresh(ctx context.Context) (*RefreshResponse, error) {
url := fmt.Sprintf("http://%s/api/webhook/refresh", c.hostname)
c.logger.Debug("triggering refresh", "url", url)
resp, err := c.post(ctx, url, "application/json", strings.NewReader("{}"))
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, c.parseError(resp)
}
var refresh RefreshResponse
if err := json.NewDecoder(resp.Body).Decode(&refresh); err != nil {
return nil, fmt.Errorf("decoding refresh response: %w", err)
}
return &refresh, nil
}
// get performs an HTTP GET request via the Tailscale network.
func (c *Client) get(ctx context.Context, url string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}
return c.httpClient.Do(req)
}
// post performs an HTTP POST request via the Tailscale network.
func (c *Client) post(ctx context.Context, url, contentType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
return c.httpClient.Do(req)
}
// parseError parses an error response from the admin API.
func (c *Client) parseError(resp *http.Response) error {
body, _ := io.ReadAll(resp.Body)
return &APIError{
StatusCode: resp.StatusCode,
Message: string(body),
}
}
// APIError represents an error response from the admin API.
type APIError struct {
StatusCode int
Message string
}
func (e *APIError) Error() string {
if e.Message != "" {
return fmt.Sprintf("API error (status %d): %s", e.StatusCode, e.Message)
}
return fmt.Sprintf("API error: status %d", e.StatusCode)
}
package bgctl
import (
"encoding/json"
"fmt"
"strings"
"time"
)
// Formatter formats API responses for output.
type Formatter interface {
FormatStatus(*StatusResponse) string
FormatMetrics(*MetricsResponse) string
FormatRefresh(*RefreshResponse) string
}
// NewFormatter creates a new formatter based on the format string.
// Valid formats are "json" and "text" (default).
func NewFormatter(format string) Formatter {
if format == "json" {
return &jsonFormatter{}
}
return &textFormatter{}
}
// textFormatter formats responses as human-readable text.
type textFormatter struct{}
func (f *textFormatter) FormatStatus(s *StatusResponse) string {
var b strings.Builder
b.WriteString(fmt.Sprintf("Active Service: %s\n", s.ActiveService))
b.WriteString(fmt.Sprintf("Switch Count: %d\n", s.SwitchCount))
if !s.LastSwitch.IsZero() {
b.WriteString(fmt.Sprintf("Last Switch: %s\n", s.LastSwitch.Format(time.RFC3339)))
}
b.WriteString("\n")
// Blue service
b.WriteString(formatServiceLine("Blue", &s.Blue))
b.WriteString("\n")
// Green service
b.WriteString(formatServiceLine("Green", &s.Green))
b.WriteString("\n")
// Git status
if s.Git.BlueCommit != "" || s.Git.GreenCommit != "" {
b.WriteString("\nGit:\n")
if s.Git.BlueCommit != "" {
b.WriteString(fmt.Sprintf(" Blue Commit: %s\n", truncateCommit(s.Git.BlueCommit)))
}
if s.Git.GreenCommit != "" {
b.WriteString(fmt.Sprintf(" Green Commit: %s\n", truncateCommit(s.Git.GreenCommit)))
}
if s.Git.ActiveCommit != "" {
b.WriteString(fmt.Sprintf(" Active Commit: %s\n", truncateCommit(s.Git.ActiveCommit)))
}
if s.Git.LastTag != "" {
b.WriteString(fmt.Sprintf(" Last Tag: %s\n", s.Git.LastTag))
}
if !s.Git.LastFetch.IsZero() {
b.WriteString(fmt.Sprintf(" Last Fetch: %s\n", s.Git.LastFetch.Format(time.RFC3339)))
}
if s.Git.Error != "" {
b.WriteString(fmt.Sprintf(" Error: %s\n", s.Git.Error))
}
}
return b.String()
}
func formatServiceLine(name string, s *ServiceStatus) string {
healthStatus := "healthy"
if !s.Healthy {
healthStatus = "unhealthy"
}
latencyStr := formatDuration(s.Latency)
line := fmt.Sprintf("%-5s: %-9s %s (%s)", name, healthStatus, s.URL, latencyStr)
if s.Error != "" {
line += fmt.Sprintf(" error: %s", s.Error)
}
return line
}
func formatDuration(d time.Duration) string {
if d < time.Millisecond {
return fmt.Sprintf("%dμs", d.Microseconds())
}
if d < time.Second {
return fmt.Sprintf("%dms", d.Milliseconds())
}
return fmt.Sprintf("%.2fs", d.Seconds())
}
func truncateCommit(commit string) string {
if len(commit) > 7 {
return commit[:7]
}
return commit
}
func (f *textFormatter) FormatMetrics(m *MetricsResponse) string {
var b strings.Builder
b.WriteString("Requests:\n")
b.WriteString(fmt.Sprintf(" Total: %d\n", m.Requests.Total))
b.WriteString(fmt.Sprintf(" Per Second: %.2f\n", m.Requests.PerSecond))
b.WriteString(fmt.Sprintf(" Blue: %d\n", m.Requests.Blue))
b.WriteString(fmt.Sprintf(" Green: %d\n", m.Requests.Green))
b.WriteString("\n")
b.WriteString("Latency:\n")
b.WriteString(fmt.Sprintf(" P50: %.2fms\n", m.Latency.P50))
b.WriteString(fmt.Sprintf(" P90: %.2fms\n", m.Latency.P90))
b.WriteString(fmt.Sprintf(" P99: %.2fms\n", m.Latency.P99))
b.WriteString(fmt.Sprintf(" Avg: %.2fms\n", m.Latency.Avg))
b.WriteString("\n")
b.WriteString("Errors:\n")
b.WriteString(fmt.Sprintf(" Total: %d\n", m.Errors.Total))
b.WriteString(fmt.Sprintf(" Rate: %.4f\n", m.Errors.Rate))
if m.Errors.LastError != "" {
b.WriteString(fmt.Sprintf(" Last: %s\n", m.Errors.LastError))
}
b.WriteString("\n")
b.WriteString(fmt.Sprintf("Active Connections: %d\n", m.ActiveConnections))
b.WriteString(fmt.Sprintf("Uptime: %s\n", formatUptime(m.UptimeSeconds)))
return b.String()
}
func formatUptime(seconds int64) string {
d := time.Duration(seconds) * time.Second
days := int(d.Hours()) / 24
hours := int(d.Hours()) % 24
minutes := int(d.Minutes()) % 60
if days > 0 {
return fmt.Sprintf("%dd %dh %dm", days, hours, minutes)
}
if hours > 0 {
return fmt.Sprintf("%dh %dm", hours, minutes)
}
return fmt.Sprintf("%dm", minutes)
}
func (f *textFormatter) FormatRefresh(r *RefreshResponse) string {
var b strings.Builder
if r.Refreshed {
b.WriteString("Refresh: success\n")
} else {
b.WriteString("Refresh: failed\n")
}
if r.Error != "" {
b.WriteString(fmt.Sprintf("Error: %s\n", r.Error))
}
if r.BlueCommit != "" {
b.WriteString(fmt.Sprintf("Blue Commit: %s\n", truncateCommit(r.BlueCommit)))
}
if r.GreenCommit != "" {
b.WriteString(fmt.Sprintf("Green Commit: %s\n", truncateCommit(r.GreenCommit)))
}
if r.ActiveCommit != "" {
b.WriteString(fmt.Sprintf("Active Commit: %s\n", truncateCommit(r.ActiveCommit)))
}
if !r.LastFetch.IsZero() {
b.WriteString(fmt.Sprintf("Last Fetch: %s\n", r.LastFetch.Format(time.RFC3339)))
}
return b.String()
}
// jsonFormatter formats responses as JSON.
type jsonFormatter struct{}
func (f *jsonFormatter) FormatStatus(s *StatusResponse) string {
return toJSON(s)
}
func (f *jsonFormatter) FormatMetrics(m *MetricsResponse) string {
return toJSON(m)
}
func (f *jsonFormatter) FormatRefresh(r *RefreshResponse) string {
return toJSON(r)
}
func toJSON(v any) string {
data, err := json.MarshalIndent(v, "", " ")
if err != nil {
return fmt.Sprintf(`{"error": "failed to marshal JSON: %s"}`, err)
}
return string(data) + "\n"
}
package config
import (
"fmt"
"os"
"time"
"gopkg.in/yaml.v3"
)
// Config represents the application configuration.
type Config struct {
Services ServiceConfig `yaml:"services"`
Proxy ProxyConfig `yaml:"proxy"`
Git GitConfig `yaml:"git"`
Deploy DeployConfig `yaml:"deploy"`
Admin AdminConfig `yaml:"admin"`
Health HealthConfig `yaml:"health"`
Metrics MetricsConfig `yaml:"metrics"`
}
// ServiceConfig defines the blue and green service endpoints.
type ServiceConfig struct {
Blue ServiceEndpoint `yaml:"blue"`
Green ServiceEndpoint `yaml:"green"`
}
// ServiceEndpoint represents a single backend service.
type ServiceEndpoint struct {
URL string `yaml:"url"`
HealthPath string `yaml:"health_path"`
}
// ProxyConfig defines the proxy server settings.
type ProxyConfig struct {
ListenAddr string `yaml:"listen_addr"`
DrainTimeout time.Duration `yaml:"drain_timeout"`
}
// GitConfig defines the git repository settings for deployment watching.
type GitConfig struct {
RepoURL string `yaml:"repo_url"`
PollInterval time.Duration `yaml:"poll_interval"`
Branch string `yaml:"branch"`
AuthToken string `yaml:"auth_token"`
}
// DeployConfig defines the git tag patterns for blue/green deployments.
type DeployConfig struct {
BlueTag string `yaml:"blue_tag"`
GreenTag string `yaml:"green_tag"`
ActiveTag string `yaml:"active_tag"`
}
// AdminConfig defines the Tailscale admin interface settings.
type AdminConfig struct {
Hostname string `yaml:"hostname"`
StateDir string `yaml:"state_dir"`
AuthKey string `yaml:"auth_key"`
ControlURL string `yaml:"control_url"` // Custom control server (e.g., Headscale)
Ephemeral bool `yaml:"ephemeral"` // Node is removed when it goes offline
WebhookKey string `yaml:"webhook_key"` // Secret for webhook signature verification
}
// HealthConfig defines the health check settings.
type HealthConfig struct {
Interval time.Duration `yaml:"interval"`
Timeout time.Duration `yaml:"timeout"`
}
// MetricsConfig defines the metrics collection settings.
type MetricsConfig struct {
HistoryDuration time.Duration `yaml:"history_duration"`
HistoryResolution time.Duration `yaml:"history_resolution"`
}
// Load reads the configuration from a YAML file.
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("reading config file: %w", err)
}
// Expand environment variables
data = []byte(os.ExpandEnv(string(data)))
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("parsing config file: %w", err)
}
cfg.setDefaults()
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("validating config: %w", err)
}
return &cfg, nil
}
// setDefaults applies default values to the configuration.
func (c *Config) setDefaults() {
if c.Proxy.ListenAddr == "" {
c.Proxy.ListenAddr = ":8080"
}
if c.Proxy.DrainTimeout == 0 {
c.Proxy.DrainTimeout = 30 * time.Second
}
if c.Git.PollInterval == 0 {
c.Git.PollInterval = 30 * time.Second
}
if c.Git.Branch == "" {
c.Git.Branch = "main"
}
if c.Deploy.BlueTag == "" {
c.Deploy.BlueTag = "deploy/blue"
}
if c.Deploy.GreenTag == "" {
c.Deploy.GreenTag = "deploy/green"
}
if c.Deploy.ActiveTag == "" {
c.Deploy.ActiveTag = "deploy/active"
}
if c.Admin.Hostname == "" {
c.Admin.Hostname = "bluegreen-admin"
}
if c.Admin.StateDir == "" {
c.Admin.StateDir = "/var/lib/bluegreen/tailscale"
}
if c.Health.Interval == 0 {
c.Health.Interval = 10 * time.Second
}
if c.Health.Timeout == 0 {
c.Health.Timeout = 5 * time.Second
}
if c.Metrics.HistoryDuration == 0 {
c.Metrics.HistoryDuration = 24 * time.Hour
}
if c.Metrics.HistoryResolution == 0 {
c.Metrics.HistoryResolution = 1 * time.Minute
}
if c.Services.Blue.HealthPath == "" {
c.Services.Blue.HealthPath = "/health"
}
if c.Services.Green.HealthPath == "" {
c.Services.Green.HealthPath = "/health"
}
}
// validate checks the configuration for required fields and valid values.
func (c *Config) validate() error {
if c.Services.Blue.URL == "" {
return fmt.Errorf("services.blue.url is required")
}
if c.Services.Green.URL == "" {
return fmt.Errorf("services.green.url is required")
}
if c.Proxy.DrainTimeout < 0 {
return fmt.Errorf("proxy.drain_timeout must be non-negative")
}
if c.Git.PollInterval < time.Second {
return fmt.Errorf("git.poll_interval must be at least 1 second")
}
if c.Health.Interval < time.Second {
return fmt.Errorf("health.interval must be at least 1 second")
}
if c.Health.Timeout < 100*time.Millisecond {
return fmt.Errorf("health.timeout must be at least 100ms")
}
if c.Health.Timeout >= c.Health.Interval {
return fmt.Errorf("health.timeout must be less than health.interval")
}
return nil
}
// ServiceTarget represents which backend service is targeted.
type ServiceTarget string
const (
ServiceBlue ServiceTarget = "blue"
ServiceGreen ServiceTarget = "green"
)
// GetServiceEndpoint returns the endpoint configuration for the given target.
func (c *Config) GetServiceEndpoint(target ServiceTarget) ServiceEndpoint {
switch target {
case ServiceBlue:
return c.Services.Blue
case ServiceGreen:
return c.Services.Green
default:
return ServiceEndpoint{}
}
}
package health
import (
"context"
"log/slog"
"net/http"
"sync"
"time"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
)
// Status represents the health status of a service.
type Status struct {
Healthy bool `json:"healthy"`
LastCheck time.Time `json:"last_check"`
LastHealthy time.Time `json:"last_healthy,omitempty"`
Latency time.Duration `json:"latency"`
Error string `json:"error,omitempty"`
}
// Checker performs periodic health checks on backend services.
type Checker struct {
cfg *config.Config
logger *slog.Logger
client *http.Client
mu sync.RWMutex
status map[config.ServiceTarget]*Status
onStatusChange func(target config.ServiceTarget, status Status)
cancel context.CancelFunc
wg sync.WaitGroup
}
// CheckerOption configures the health checker.
type CheckerOption func(*Checker)
// WithStatusChangeCallback sets a callback for status changes.
func WithStatusChangeCallback(fn func(target config.ServiceTarget, status Status)) CheckerOption {
return func(c *Checker) {
c.onStatusChange = fn
}
}
// NewChecker creates a new health checker.
func NewChecker(cfg *config.Config, logger *slog.Logger, opts ...CheckerOption) *Checker {
c := &Checker{
cfg: cfg,
logger: logger,
client: &http.Client{
Timeout: cfg.Health.Timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
},
status: make(map[config.ServiceTarget]*Status),
}
// Initialize status for both services
c.status[config.ServiceBlue] = &Status{}
c.status[config.ServiceGreen] = &Status{}
for _, opt := range opts {
opt(c)
}
return c
}
// Start begins the health check loop.
func (c *Checker) Start(ctx context.Context) {
ctx, c.cancel = context.WithCancel(ctx)
c.wg.Add(2)
go c.checkLoop(ctx, config.ServiceBlue)
go c.checkLoop(ctx, config.ServiceGreen)
}
// Stop stops the health check loop.
func (c *Checker) Stop() {
if c.cancel != nil {
c.cancel()
}
c.wg.Wait()
}
// checkLoop runs the health check loop for a single service.
func (c *Checker) checkLoop(ctx context.Context, target config.ServiceTarget) {
defer c.wg.Done()
ticker := time.NewTicker(c.cfg.Health.Interval)
defer ticker.Stop()
// Initial check
c.check(ctx, target)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.check(ctx, target)
}
}
}
// check performs a single health check.
func (c *Checker) check(ctx context.Context, target config.ServiceTarget) {
endpoint := c.cfg.GetServiceEndpoint(target)
url := endpoint.URL + endpoint.HealthPath
start := time.Now()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
c.updateStatus(target, Status{
Healthy: false,
LastCheck: time.Now(),
Latency: time.Since(start),
Error: err.Error(),
})
return
}
resp, err := c.client.Do(req)
latency := time.Since(start)
if err != nil {
c.updateStatus(target, Status{
Healthy: false,
LastCheck: time.Now(),
Latency: latency,
Error: err.Error(),
})
return
}
defer func() { _ = resp.Body.Close() }()
healthy := resp.StatusCode >= 200 && resp.StatusCode < 300
status := Status{
Healthy: healthy,
LastCheck: time.Now(),
Latency: latency,
}
if healthy {
status.LastHealthy = time.Now()
}
if !healthy {
status.Error = http.StatusText(resp.StatusCode)
}
c.updateStatus(target, status)
}
// updateStatus updates the status for a service and triggers callbacks.
func (c *Checker) updateStatus(target config.ServiceTarget, status Status) {
c.mu.Lock()
prev := c.status[target]
wasHealthy := prev != nil && prev.Healthy
// Preserve last healthy time if still healthy or if we have previous data
if prev != nil && !status.LastHealthy.IsZero() {
status.LastHealthy = prev.LastHealthy
}
if status.Healthy {
status.LastHealthy = status.LastCheck
}
c.status[target] = &status
c.mu.Unlock()
// Log status changes
if wasHealthy != status.Healthy {
if status.Healthy {
c.logger.Info("service became healthy",
"target", target,
"latency", status.Latency,
)
} else {
c.logger.Warn("service became unhealthy",
"target", target,
"error", status.Error,
)
}
}
if c.onStatusChange != nil {
c.onStatusChange(target, status)
}
}
// GetStatus returns the current health status for a service.
func (c *Checker) GetStatus(target config.ServiceTarget) Status {
c.mu.RLock()
defer c.mu.RUnlock()
if s, ok := c.status[target]; ok && s != nil {
return *s
}
return Status{}
}
// GetAllStatus returns health status for all services.
func (c *Checker) GetAllStatus() map[config.ServiceTarget]Status {
c.mu.RLock()
defer c.mu.RUnlock()
result := make(map[config.ServiceTarget]Status)
for k, v := range c.status {
if v != nil {
result[k] = *v
}
}
return result
}
// IsHealthy returns whether a specific service is healthy.
func (c *Checker) IsHealthy(target config.ServiceTarget) bool {
return c.GetStatus(target).Healthy
}
// CheckNow performs an immediate health check for all services.
func (c *Checker) CheckNow(ctx context.Context) {
c.check(ctx, config.ServiceBlue)
c.check(ctx, config.ServiceGreen)
}
// Package logging provides logging utilities including native journald integration.
package logging
import (
"log/slog"
"os"
"strings"
slogjournal "github.com/systemd/slog-journal"
)
// NewJournaldHandler creates a handler that writes to journald if available,
// otherwise falls back to JSON output on stdout.
//
// When running under systemd, this uses the native journald protocol with:
// - Automatic field mapping (Message→MESSAGE, Level→PRIORITY, etc.)
// - Source location tracking (CODE_FILE, CODE_FUNC, CODE_LINE)
// - Large message handling via temporary file descriptors
// - Key sanitization to match journald requirements (uppercase, underscores)
//
// When not running under systemd, falls back to JSON handler on stdout.
func NewJournaldHandler(level slog.Leveler) slog.Handler {
if !IsUnderSystemd() {
// Fall back to JSON handler for non-systemd environments
return slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})
}
// Use the official systemd slog-journal handler with key transformation
h, err := slogjournal.NewHandler(&slogjournal.Options{
Level: level,
// Transform keys to journald format: uppercase with underscores
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
a.Key = sanitizeKey(a.Key)
return a
},
ReplaceGroup: func(group string) string {
return sanitizeKey(group)
},
})
if err != nil {
// Fall back to JSON if journald handler creation fails
return slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})
}
return h
}
// sanitizeKey converts a key to journald-compatible format.
// Journald variables must be uppercase, start with a letter, and contain
// only letters, numbers, and underscores.
func sanitizeKey(key string) string {
var b strings.Builder
b.Grow(len(key))
for _, r := range key {
switch {
case r >= 'a' && r <= 'z':
b.WriteRune(r - 'a' + 'A') // Convert to uppercase
case r >= 'A' && r <= 'Z':
b.WriteRune(r)
case r >= '0' && r <= '9':
if b.Len() == 0 {
b.WriteRune('_') // Prefix numbers with underscore
}
b.WriteRune(r)
case r == '_' || r == '-' || r == '.':
b.WriteRune('_')
default:
b.WriteRune('_')
}
}
result := b.String()
if result == "" {
return "FIELD"
}
return result
}
package logging
import (
"net"
"os"
"github.com/coreos/go-systemd/v22/activation"
"github.com/coreos/go-systemd/v22/daemon"
)
// NotifyReady sends the READY=1 notification to systemd.
// This should be called when the service is fully initialized and ready to serve.
func NotifyReady() error {
_, err := daemon.SdNotify(false, daemon.SdNotifyReady)
return err
}
// NotifyReloading sends the RELOADING=1 notification to systemd.
// This should be called when the service starts reloading configuration.
func NotifyReloading() error {
_, err := daemon.SdNotify(false, daemon.SdNotifyReloading)
return err
}
// NotifyStopping sends the STOPPING=1 notification to systemd.
// This should be called when the service begins graceful shutdown.
func NotifyStopping() error {
_, err := daemon.SdNotify(false, daemon.SdNotifyStopping)
return err
}
// NotifyStatus sends a STATUS= notification to systemd with the given message.
// This updates the status shown by "systemctl status".
func NotifyStatus(status string) error {
_, err := daemon.SdNotify(false, "STATUS="+status)
return err
}
// NotifyWatchdog sends the WATCHDOG=1 notification to systemd.
// This should be called periodically if WatchdogSec is configured.
func NotifyWatchdog() error {
_, err := daemon.SdNotify(false, daemon.SdNotifyWatchdog)
return err
}
// IsUnderSystemd returns true if the process is running under systemd.
// This checks for the presence of NOTIFY_SOCKET or INVOCATION_ID.
func IsUnderSystemd() bool {
// Check for systemd notify socket
if os.Getenv("NOTIFY_SOCKET") != "" {
return true
}
// Check for invocation ID (set by systemd for all services)
if os.Getenv("INVOCATION_ID") != "" {
return true
}
return false
}
// IsSocketActivated returns true if the process was socket-activated by systemd.
// This checks for the presence of LISTEN_FDS environment variable.
func IsSocketActivated() bool {
return os.Getenv("LISTEN_FDS") != ""
}
// GetSocketActivationListeners returns listeners passed by systemd socket activation.
// Returns nil if not socket-activated or if there are no listeners.
// The returned listeners are ready to use with http.Server.Serve().
func GetSocketActivationListeners() ([]net.Listener, error) {
return activation.Listeners()
}
// GetSocketActivationListenersByName returns listeners by their socket unit name.
// This is useful when multiple sockets are configured.
// The name should match the socket unit filename without the .socket extension.
func GetSocketActivationListenersByName() (map[string][]net.Listener, error) {
return activation.ListenersWithNames()
}
package metrics
import (
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
)
// Collector collects and manages metrics for the load balancer.
type Collector struct {
startTime time.Time
store *Store
// Counters
totalRequests atomic.Int64
blueRequests atomic.Int64
greenRequests atomic.Int64
totalErrors atomic.Int64
// Current values
activeConnections atomic.Int64
// Last error
mu sync.RWMutex
lastError string
// Prometheus metrics
requestsTotal *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
errorsTotal *prometheus.CounterVec
activeConns prometheus.Gauge
switchTotal *prometheus.CounterVec
}
// NewCollector creates a new metrics collector.
func NewCollector(cfg *config.Config) *Collector {
c := &Collector{
startTime: time.Now(),
store: NewStore(cfg.Metrics.HistoryDuration, cfg.Metrics.HistoryResolution),
}
c.initPrometheusMetrics()
return c
}
// initPrometheusMetrics initializes Prometheus metrics.
func (c *Collector) initPrometheusMetrics() {
c.requestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bluegreen_requests_total",
Help: "Total number of requests processed",
},
[]string{"backend", "status"},
)
c.requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "bluegreen_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"backend"},
)
c.errorsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bluegreen_errors_total",
Help: "Total number of errors",
},
[]string{"backend", "type"},
)
c.activeConns = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "bluegreen_active_connections",
Help: "Number of active connections",
},
)
c.switchTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bluegreen_switches_total",
Help: "Total number of backend switches",
},
[]string{"from", "to", "trigger"},
)
}
// Register registers all Prometheus metrics with the given registry.
func (c *Collector) Register(reg prometheus.Registerer) error {
collectors := []prometheus.Collector{
c.requestsTotal,
c.requestDuration,
c.errorsTotal,
c.activeConns,
c.switchTotal,
}
for _, col := range collectors {
if err := reg.Register(col); err != nil {
return err
}
}
return nil
}
// RecordRequest records a completed request.
func (c *Collector) RecordRequest(target config.ServiceTarget, statusCode int, duration time.Duration) {
c.totalRequests.Add(1)
switch target {
case config.ServiceBlue:
c.blueRequests.Add(1)
case config.ServiceGreen:
c.greenRequests.Add(1)
}
// Prometheus metrics
status := "success"
if statusCode >= 400 {
status = "error"
}
c.requestsTotal.WithLabelValues(string(target), status).Inc()
c.requestDuration.WithLabelValues(string(target)).Observe(duration.Seconds())
// Store for history
c.store.RecordRequest(duration)
}
// RecordError records an error.
func (c *Collector) RecordError(target config.ServiceTarget, err error) {
c.totalErrors.Add(1)
c.mu.Lock()
c.lastError = err.Error()
c.mu.Unlock()
c.errorsTotal.WithLabelValues(string(target), "proxy").Inc()
c.store.RecordError()
}
// RecordSwitch records a backend switch.
func (c *Collector) RecordSwitch(from, to config.ServiceTarget, trigger string) {
c.switchTotal.WithLabelValues(string(from), string(to), trigger).Inc()
}
// SetActiveConnections updates the active connection count.
func (c *Collector) SetActiveConnections(count int64) {
c.activeConnections.Store(count)
c.activeConns.Set(float64(count))
}
// IncrementConnections increments the active connection count.
func (c *Collector) IncrementConnections() {
c.activeConnections.Add(1)
c.activeConns.Inc()
}
// DecrementConnections decrements the active connection count.
func (c *Collector) DecrementConnections() {
c.activeConnections.Add(-1)
c.activeConns.Dec()
}
// Snapshot contains a point-in-time view of all metrics.
type Snapshot struct {
TotalRequests int64
BlueRequests int64
GreenRequests int64
RequestsPerSecond float64
TotalErrors int64
ErrorRate float64
ActiveConnections int64
UptimeSeconds int64
LatencyP50 float64
LatencyP90 float64
LatencyP99 float64
LatencyAvg float64
LastError string
}
// Snapshot returns a point-in-time snapshot of all metrics.
func (c *Collector) Snapshot() Snapshot {
c.mu.RLock()
lastError := c.lastError
c.mu.RUnlock()
total := c.totalRequests.Load()
uptime := time.Since(c.startTime).Seconds()
rps := float64(total) / uptime
if uptime < 1 {
rps = float64(total)
}
errors := c.totalErrors.Load()
var errorRate float64
if total > 0 {
errorRate = float64(errors) / float64(total)
}
latencies := c.store.GetLatencyPercentiles()
return Snapshot{
TotalRequests: total,
BlueRequests: c.blueRequests.Load(),
GreenRequests: c.greenRequests.Load(),
RequestsPerSecond: rps,
TotalErrors: errors,
ErrorRate: errorRate,
ActiveConnections: c.activeConnections.Load(),
UptimeSeconds: int64(uptime),
LatencyP50: latencies.P50,
LatencyP90: latencies.P90,
LatencyP99: latencies.P99,
LatencyAvg: latencies.Avg,
LastError: lastError,
}
}
// History returns historical data points.
func (c *Collector) History(duration, resolution time.Duration) []DataPoint {
return c.store.GetHistory(duration, resolution)
}
// Store returns the underlying metrics store.
func (c *Collector) Store() *Store {
return c.store
}
package metrics
import (
"sort"
"sync"
"time"
)
// DataPoint represents a single point in time for metrics history.
type DataPoint struct {
Timestamp time.Time `json:"timestamp"`
Requests int64 `json:"requests"`
Errors int64 `json:"errors"`
AvgLatency float64 `json:"avg_latency_ms"`
P50Latency float64 `json:"p50_latency_ms"`
P90Latency float64 `json:"p90_latency_ms"`
P99Latency float64 `json:"p99_latency_ms"`
Connections int64 `json:"connections"`
}
// LatencyPercentiles contains latency percentile values.
type LatencyPercentiles struct {
P50 float64
P90 float64
P99 float64
Avg float64
}
// bucket holds metrics for a single time bucket.
type bucket struct {
timestamp time.Time
requests int64
errors int64
latencies []time.Duration
totalLatency time.Duration
}
// Store is a ring buffer for storing historical metrics data.
type Store struct {
mu sync.RWMutex
buckets []*bucket
resolution time.Duration
duration time.Duration
maxBuckets int
current int
// For overall latency tracking
allLatencies []time.Duration
maxLatencies int
}
// NewStore creates a new metrics store.
func NewStore(duration, resolution time.Duration) *Store {
maxBuckets := int(duration / resolution)
if maxBuckets < 1 {
maxBuckets = 1440 // Default to 24 hours at 1 minute resolution
}
s := &Store{
buckets: make([]*bucket, maxBuckets),
resolution: resolution,
duration: duration,
maxBuckets: maxBuckets,
maxLatencies: 10000, // Keep last 10000 latencies for percentile calculation
}
// Initialize buckets
now := time.Now().Truncate(resolution)
for i := range s.buckets {
s.buckets[i] = &bucket{
timestamp: now.Add(-time.Duration(maxBuckets-i-1) * resolution),
latencies: make([]time.Duration, 0, 100),
}
}
s.current = maxBuckets - 1
return s
}
// getCurrentBucket returns the current bucket, rotating if necessary.
func (s *Store) getCurrentBucket() *bucket {
now := time.Now().Truncate(s.resolution)
currentBucket := s.buckets[s.current]
// Check if we need to rotate to a new bucket
if now.After(currentBucket.timestamp) {
// Calculate how many buckets to skip
elapsed := now.Sub(currentBucket.timestamp)
skip := int(elapsed / s.resolution)
for i := 0; i < skip; i++ {
s.current = (s.current + 1) % s.maxBuckets
s.buckets[s.current] = &bucket{
timestamp: currentBucket.timestamp.Add(time.Duration(i+1) * s.resolution),
latencies: make([]time.Duration, 0, 100),
}
}
currentBucket = s.buckets[s.current]
}
return currentBucket
}
// RecordRequest records a request with its latency.
func (s *Store) RecordRequest(latency time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
bucket := s.getCurrentBucket()
bucket.requests++
bucket.latencies = append(bucket.latencies, latency)
bucket.totalLatency += latency
// Track overall latencies for percentile calculation
s.allLatencies = append(s.allLatencies, latency)
if len(s.allLatencies) > s.maxLatencies {
s.allLatencies = s.allLatencies[len(s.allLatencies)-s.maxLatencies:]
}
}
// RecordError records an error.
func (s *Store) RecordError() {
s.mu.Lock()
defer s.mu.Unlock()
bucket := s.getCurrentBucket()
bucket.errors++
}
// GetLatencyPercentiles returns latency percentiles from recent data.
func (s *Store) GetLatencyPercentiles() LatencyPercentiles {
s.mu.RLock()
defer s.mu.RUnlock()
if len(s.allLatencies) == 0 {
return LatencyPercentiles{}
}
// Copy and sort latencies
sorted := make([]time.Duration, len(s.allLatencies))
copy(sorted, s.allLatencies)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] < sorted[j]
})
n := len(sorted)
var total time.Duration
for _, l := range sorted {
total += l
}
return LatencyPercentiles{
P50: float64(sorted[n*50/100].Milliseconds()),
P90: float64(sorted[n*90/100].Milliseconds()),
P99: float64(sorted[n*99/100].Milliseconds()),
Avg: float64(total.Milliseconds()) / float64(n),
}
}
// GetHistory returns historical data points for the specified duration.
func (s *Store) GetHistory(duration, resolution time.Duration) []DataPoint {
s.mu.RLock()
defer s.mu.RUnlock()
now := time.Now()
startTime := now.Add(-duration)
// Collect relevant buckets
var points []DataPoint
for _, b := range s.buckets {
if b == nil || b.timestamp.Before(startTime) {
continue
}
point := DataPoint{
Timestamp: b.timestamp,
Requests: b.requests,
Errors: b.errors,
}
if len(b.latencies) > 0 {
point.AvgLatency = float64(b.totalLatency.Milliseconds()) / float64(len(b.latencies))
// Calculate percentiles for this bucket
sorted := make([]time.Duration, len(b.latencies))
copy(sorted, b.latencies)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] < sorted[j]
})
n := len(sorted)
if n > 0 {
point.P50Latency = float64(sorted[n*50/100].Milliseconds())
point.P90Latency = float64(sorted[n*90/100].Milliseconds())
point.P99Latency = float64(sorted[n*99/100].Milliseconds())
}
}
points = append(points, point)
}
// Sort by timestamp
sort.Slice(points, func(i, j int) bool {
return points[i].Timestamp.Before(points[j].Timestamp)
})
// Aggregate if resolution is coarser than storage resolution
if resolution > s.resolution {
points = aggregatePoints(points, resolution)
}
return points
}
// aggregatePoints combines data points into larger time buckets.
func aggregatePoints(points []DataPoint, resolution time.Duration) []DataPoint {
if len(points) == 0 {
return points
}
var result []DataPoint
var current *DataPoint
var currentEnd time.Time
var latencySum float64
var latencyCount int
for _, p := range points {
bucketStart := p.Timestamp.Truncate(resolution)
if current == nil || bucketStart.After(currentEnd) {
if current != nil && latencyCount > 0 {
current.AvgLatency = latencySum / float64(latencyCount)
result = append(result, *current)
}
current = &DataPoint{
Timestamp: bucketStart,
}
currentEnd = bucketStart.Add(resolution)
latencySum = 0
latencyCount = 0
}
current.Requests += p.Requests
current.Errors += p.Errors
if p.AvgLatency > 0 {
latencySum += p.AvgLatency * float64(p.Requests)
latencyCount += int(p.Requests)
}
// Use max for percentiles when aggregating
if p.P50Latency > current.P50Latency {
current.P50Latency = p.P50Latency
}
if p.P90Latency > current.P90Latency {
current.P90Latency = p.P90Latency
}
if p.P99Latency > current.P99Latency {
current.P99Latency = p.P99Latency
}
}
// Don't forget the last bucket
if current != nil && latencyCount > 0 {
current.AvgLatency = latencySum / float64(latencyCount)
result = append(result, *current)
}
return result
}
// Reset clears all stored metrics.
func (s *Store) Reset() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now().Truncate(s.resolution)
for i := range s.buckets {
s.buckets[i] = &bucket{
timestamp: now.Add(-time.Duration(s.maxBuckets-i-1) * s.resolution),
latencies: make([]time.Duration, 0, 100),
}
}
s.current = s.maxBuckets - 1
s.allLatencies = nil
}
package proxy
import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"
)
// Drainer tracks active connections and provides graceful drain functionality.
type Drainer struct {
activeConnections atomic.Int64
wg sync.WaitGroup
draining atomic.Bool
drainCh chan struct{}
mu sync.Mutex
drainMu sync.Mutex // Serializes Drain() calls to prevent WaitGroup reuse
prevDone chan struct{} // Tracks previous drain goroutine completion
}
// NewDrainer creates a new connection drainer.
func NewDrainer() *Drainer {
return &Drainer{
drainCh: make(chan struct{}),
}
}
// TrackRequest wraps an HTTP handler to track active requests.
func (d *Drainer) TrackRequest(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Hold mutex while checking draining state and adding to WaitGroup
// to prevent race with Drain() setting draining=true and calling wg.Wait().
d.mu.Lock()
if d.draining.Load() {
d.mu.Unlock()
http.Error(w, "Service is draining", http.StatusServiceUnavailable)
return
}
d.activeConnections.Add(1)
d.wg.Add(1)
d.mu.Unlock()
defer func() {
d.activeConnections.Add(-1)
d.wg.Done()
}()
next.ServeHTTP(w, r)
})
}
// ActiveConnections returns the current number of active connections.
func (d *Drainer) ActiveConnections() int64 {
return d.activeConnections.Load()
}
// IsDraining returns whether the drainer is currently in drain mode.
func (d *Drainer) IsDraining() bool {
return d.draining.Load()
}
// Track enters a tracked section. Returns a release function, or nil if draining.
// This method atomically checks the draining state and adds to the WaitGroup
// under mutex protection, preventing races with Drain().
func (d *Drainer) Track() func() {
d.mu.Lock()
if d.draining.Load() {
d.mu.Unlock()
return nil
}
d.activeConnections.Add(1)
d.wg.Add(1)
d.mu.Unlock()
return func() {
d.activeConnections.Add(-1)
d.wg.Done()
}
}
// Drain starts the drain process and waits for all active connections to complete.
// It returns when all connections are drained or the context is cancelled.
// Concurrent calls to Drain() are serialized to prevent WaitGroup reuse panics.
func (d *Drainer) Drain(ctx context.Context, timeout time.Duration) error {
// Serialize drain operations to prevent WaitGroup reuse while Wait() is running
d.drainMu.Lock()
defer d.drainMu.Unlock()
// Wait for any previous drain goroutine to complete before starting a new one.
// This prevents multiple wg.Wait() goroutines from running concurrently,
// which would cause a panic if the WaitGroup is reused after Reset().
if d.prevDone != nil {
select {
case <-d.prevDone:
// Previous drain completed
case <-ctx.Done():
return ctx.Err()
}
}
d.mu.Lock()
if d.draining.Load() {
d.mu.Unlock()
return nil
}
d.draining.Store(true)
close(d.drainCh)
d.mu.Unlock()
done := make(chan struct{})
d.prevDone = done
go func() {
d.wg.Wait()
close(done)
}()
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Reset resets the drainer state for a new drain cycle.
// This method is non-blocking to prevent deadlocks when the previous drain's
// wg.Wait() goroutine hasn't completed (e.g., due to stuck in-flight requests).
// If the previous drain is still running, draining remains true to prevent
// WaitGroup reuse panics - the next Drain() call will handle cleanup.
func (d *Drainer) Reset() {
d.drainMu.Lock()
defer d.drainMu.Unlock()
// Check if previous drain goroutine has completed (non-blocking)
prevDrainDone := true
if d.prevDone != nil {
select {
case <-d.prevDone:
d.prevDone = nil
default:
// Previous drain still running - don't fully reset to prevent WaitGroup reuse
prevDrainDone = false
}
}
d.mu.Lock()
// Only allow new requests if previous drain has completed.
// This prevents WaitGroup reuse while wg.Wait() is still running.
if prevDrainDone {
d.draining.Store(false)
}
// Always create new drainCh for the next drain cycle
d.drainCh = make(chan struct{})
d.mu.Unlock()
}
// DrainCh returns a channel that is closed when draining starts.
func (d *Drainer) DrainCh() <-chan struct{} {
return d.drainCh
}
// ConnectionTracker provides a way to track connection state.
type ConnectionTracker struct {
drainer *Drainer
onTrack func(delta int64)
}
// NewConnectionTracker creates a tracker with optional callbacks.
func NewConnectionTracker(d *Drainer, onTrack func(delta int64)) *ConnectionTracker {
return &ConnectionTracker{
drainer: d,
onTrack: onTrack,
}
}
// Track increments the connection count and returns a release function.
// Returns nil if the drainer is currently draining.
func (ct *ConnectionTracker) Track() func() {
// Hold mutex while checking draining state and adding to WaitGroup
// to prevent race with Drain() setting draining=true and calling wg.Wait().
ct.drainer.mu.Lock()
if ct.drainer.draining.Load() {
ct.drainer.mu.Unlock()
return nil
}
ct.drainer.activeConnections.Add(1)
ct.drainer.wg.Add(1)
ct.drainer.mu.Unlock()
if ct.onTrack != nil {
ct.onTrack(1)
}
return func() {
ct.drainer.activeConnections.Add(-1)
ct.drainer.wg.Done()
if ct.onTrack != nil {
ct.onTrack(-1)
}
}
}
package proxy
import (
"bufio"
"context"
"fmt"
"log/slog"
"net"
"net/http"
"net/http/httputil"
"net/url"
"sync/atomic"
"time"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
)
// Proxy is a reverse proxy that can switch between blue and green backends.
type Proxy struct {
cfg *config.Config
logger *slog.Logger
drainer *Drainer
// Current active backend
activeTarget atomic.Pointer[config.ServiceTarget]
// Reverse proxies for each backend
blueProxy *httputil.ReverseProxy
greenProxy *httputil.ReverseProxy
// Metrics callbacks
onRequest func(target config.ServiceTarget, statusCode int, duration time.Duration)
onError func(target config.ServiceTarget, err error)
}
// ProxyOption configures the proxy.
type ProxyOption func(*Proxy)
// WithRequestCallback sets a callback for completed requests.
func WithRequestCallback(fn func(target config.ServiceTarget, statusCode int, duration time.Duration)) ProxyOption {
return func(p *Proxy) {
p.onRequest = fn
}
}
// WithErrorCallback sets a callback for proxy errors.
func WithErrorCallback(fn func(target config.ServiceTarget, err error)) ProxyOption {
return func(p *Proxy) {
p.onError = fn
}
}
// New creates a new proxy instance.
func New(cfg *config.Config, logger *slog.Logger, opts ...ProxyOption) (*Proxy, error) {
blueURL, err := url.Parse(cfg.Services.Blue.URL)
if err != nil {
return nil, fmt.Errorf("parsing blue URL: %w", err)
}
greenURL, err := url.Parse(cfg.Services.Green.URL)
if err != nil {
return nil, fmt.Errorf("parsing green URL: %w", err)
}
p := &Proxy{
cfg: cfg,
logger: logger,
drainer: NewDrainer(),
}
// Apply options
for _, opt := range opts {
opt(p)
}
// Create reverse proxies
p.blueProxy = p.createReverseProxy(blueURL, config.ServiceBlue)
p.greenProxy = p.createReverseProxy(greenURL, config.ServiceGreen)
// Set initial target to blue
initialTarget := config.ServiceBlue
p.activeTarget.Store(&initialTarget)
return p, nil
}
// createReverseProxy creates a configured reverse proxy for a backend.
func (p *Proxy) createReverseProxy(target *url.URL, service config.ServiceTarget) *httputil.ReverseProxy {
proxy := httputil.NewSingleHostReverseProxy(target)
// Custom director to preserve the original host header option
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
originalDirector(req)
req.Header.Set("X-Forwarded-Host", req.Host)
req.Header.Set("X-Backend", string(service))
}
// Error handler
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
p.logger.Error("proxy error",
"target", service,
"error", err,
"path", r.URL.Path,
)
if p.onError != nil {
p.onError(service, err)
}
http.Error(w, "Bad Gateway", http.StatusBadGateway)
}
// Modify response to capture status code
proxy.ModifyResponse = func(resp *http.Response) error {
resp.Header.Set("X-Backend", string(service))
return nil
}
return proxy
}
// ServeHTTP implements http.Handler.
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Atomically check draining and track connection.
// This prevents the race between checking IsDraining() and adding to WaitGroup.
release := p.drainer.Track()
if release == nil {
http.Error(w, "Service is draining", http.StatusServiceUnavailable)
return
}
defer release()
// Get current target
target := p.ActiveTarget()
// Wrap response writer to capture status code
rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
// Route to appropriate backend
switch target {
case config.ServiceBlue:
p.blueProxy.ServeHTTP(rw, r)
case config.ServiceGreen:
p.greenProxy.ServeHTTP(rw, r)
default:
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Call metrics callback
if p.onRequest != nil {
p.onRequest(target, rw.statusCode, time.Since(start))
}
}
// ActiveTarget returns the current active backend target.
func (p *Proxy) ActiveTarget() config.ServiceTarget {
target := p.activeTarget.Load()
if target == nil {
return config.ServiceBlue
}
return *target
}
// Switch changes the active backend to the specified target.
// It gracefully drains existing connections before switching.
func (p *Proxy) Switch(ctx context.Context, target config.ServiceTarget) error {
current := p.ActiveTarget()
if current == target {
p.logger.Info("already on target", "target", target)
return nil
}
p.logger.Info("starting switch",
"from", current,
"to", target,
)
// Start draining
if err := p.drainer.Drain(ctx, p.cfg.Proxy.DrainTimeout); err != nil {
p.logger.Warn("drain timeout reached, switching anyway",
"error", err,
"active_connections", p.drainer.ActiveConnections(),
)
}
// Switch target
p.activeTarget.Store(&target)
// Reset drainer for next switch
p.drainer.Reset()
p.logger.Info("switch complete",
"from", current,
"to", target,
)
return nil
}
// Drainer returns the connection drainer.
func (p *Proxy) Drainer() *Drainer {
return p.drainer
}
// ActiveConnections returns the current number of active connections.
func (p *Proxy) ActiveConnections() int64 {
return p.drainer.ActiveConnections()
}
// responseWriter wraps http.ResponseWriter to capture the status code.
type responseWriter struct {
http.ResponseWriter
statusCode int
written bool
}
func (rw *responseWriter) WriteHeader(code int) {
if !rw.written {
rw.statusCode = code
rw.written = true
}
rw.ResponseWriter.WriteHeader(code)
}
func (rw *responseWriter) Write(b []byte) (int, error) {
if !rw.written {
rw.statusCode = http.StatusOK
rw.written = true
}
return rw.ResponseWriter.Write(b)
}
// Flush implements http.Flusher.
func (rw *responseWriter) Flush() {
if f, ok := rw.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
// Hijack implements http.Hijacker for websocket support.
func (rw *responseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if h, ok := rw.ResponseWriter.(http.Hijacker); ok {
return h.Hijack()
}
return nil, nil, fmt.Errorf("response writer does not support hijacking")
}
package switcher
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/config"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/transport/http"
"github.com/go-git/go-git/v5/storage/memory"
appconfig "github.com/mountain-reverie/blue-green-load-balancer/internal/config"
)
// GitStatus represents the current git repository state.
type GitStatus struct {
LastFetch time.Time `json:"last_fetch"`
LastTag string `json:"last_tag"`
BlueCommit string `json:"blue_commit,omitempty"`
GreenCommit string `json:"green_commit,omitempty"`
ActiveCommit string `json:"active_commit,omitempty"`
Error string `json:"error,omitempty"`
}
// TagChange represents a detected tag change.
type TagChange struct {
Tag string
Commit string
Target appconfig.ServiceTarget
Timestamp time.Time
}
// GitWatcher polls a git repository for tag changes.
type GitWatcher struct {
cfg *appconfig.Config
logger *slog.Logger
mu sync.RWMutex
status GitStatus
lastBlue string
lastGreen string
onTagChange func(change TagChange)
cancel context.CancelFunc
wg sync.WaitGroup
}
// GitWatcherOption configures the git watcher.
type GitWatcherOption func(*GitWatcher)
// WithTagChangeCallback sets a callback for tag changes.
func WithTagChangeCallback(fn func(change TagChange)) GitWatcherOption {
return func(w *GitWatcher) {
w.onTagChange = fn
}
}
// NewGitWatcher creates a new git watcher.
func NewGitWatcher(cfg *appconfig.Config, logger *slog.Logger, opts ...GitWatcherOption) *GitWatcher {
w := &GitWatcher{
cfg: cfg,
logger: logger,
}
for _, opt := range opts {
opt(w)
}
return w
}
// Start begins the git polling loop.
func (w *GitWatcher) Start(ctx context.Context) {
ctx, w.cancel = context.WithCancel(ctx)
w.wg.Add(1)
go w.pollLoop(ctx)
}
// Stop stops the git polling loop.
func (w *GitWatcher) Stop() {
if w.cancel != nil {
w.cancel()
}
w.wg.Wait()
}
// pollLoop runs the git polling loop.
func (w *GitWatcher) pollLoop(ctx context.Context) {
defer w.wg.Done()
ticker := time.NewTicker(w.cfg.Git.PollInterval)
defer ticker.Stop()
// Initial fetch
w.fetchAndCheck(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.fetchAndCheck(ctx)
}
}
}
// fetchAndCheck fetches the repository and checks for tag changes.
func (w *GitWatcher) fetchAndCheck(ctx context.Context) {
refs, err := w.fetchRefs(ctx)
if err != nil {
w.logger.Error("failed to fetch git refs", "error", err)
w.mu.Lock()
w.status.Error = err.Error()
w.status.LastFetch = time.Now()
w.mu.Unlock()
return
}
w.mu.Lock()
w.status.LastFetch = time.Now()
w.status.Error = ""
// Check blue tag
blueRef := "refs/tags/" + w.cfg.Deploy.BlueTag
if commit, ok := refs[blueRef]; ok {
w.status.BlueCommit = commit
if w.lastBlue != "" && w.lastBlue != commit {
w.mu.Unlock()
w.notifyTagChange(TagChange{
Tag: w.cfg.Deploy.BlueTag,
Commit: commit,
Target: appconfig.ServiceBlue,
Timestamp: time.Now(),
})
w.mu.Lock()
}
w.lastBlue = commit
}
// Check green tag
greenRef := "refs/tags/" + w.cfg.Deploy.GreenTag
if commit, ok := refs[greenRef]; ok {
w.status.GreenCommit = commit
if w.lastGreen != "" && w.lastGreen != commit {
w.mu.Unlock()
w.notifyTagChange(TagChange{
Tag: w.cfg.Deploy.GreenTag,
Commit: commit,
Target: appconfig.ServiceGreen,
Timestamp: time.Now(),
})
w.mu.Lock()
}
w.lastGreen = commit
}
// Check active tag
activeRef := "refs/tags/" + w.cfg.Deploy.ActiveTag
if commit, ok := refs[activeRef]; ok {
w.status.ActiveCommit = commit
w.status.LastTag = w.cfg.Deploy.ActiveTag
}
w.mu.Unlock()
}
// fetchRefs fetches references from the remote repository.
func (w *GitWatcher) fetchRefs(ctx context.Context) (map[string]string, error) {
if w.cfg.Git.RepoURL == "" {
return nil, errors.New("git repo URL not configured")
}
// Create a remote
remote := git.NewRemote(memory.NewStorage(), &config.RemoteConfig{
Name: "origin",
URLs: []string{w.cfg.Git.RepoURL},
})
// Set up authentication if provided
var auth *http.BasicAuth
if w.cfg.Git.AuthToken != "" {
auth = &http.BasicAuth{
Username: "git",
Password: w.cfg.Git.AuthToken,
}
}
// List remote references
listOpts := &git.ListOptions{
Auth: auth,
}
refs, err := remote.ListContext(ctx, listOpts)
if err != nil {
return nil, fmt.Errorf("listing remote refs: %w", err)
}
result := make(map[string]string)
for _, ref := range refs {
result[ref.Name().String()] = ref.Hash().String()
}
return result, nil
}
// notifyTagChange notifies listeners of a tag change.
func (w *GitWatcher) notifyTagChange(change TagChange) {
w.logger.Info("tag change detected",
"tag", change.Tag,
"commit", change.Commit,
"target", change.Target,
)
if w.onTagChange != nil {
w.onTagChange(change)
}
}
// GetStatus returns the current git status.
func (w *GitWatcher) GetStatus() GitStatus {
w.mu.RLock()
defer w.mu.RUnlock()
return w.status
}
// FetchNow performs an immediate fetch.
func (w *GitWatcher) FetchNow(ctx context.Context) error {
w.fetchAndCheck(ctx)
w.mu.RLock()
defer w.mu.RUnlock()
if w.status.Error != "" {
return errors.New(w.status.Error)
}
return nil
}
// CloneRepo clones the git repository to a local directory.
// This can be used for more complex git operations.
func CloneRepo(ctx context.Context, cfg *appconfig.Config, destDir string) (*git.Repository, error) {
if cfg.Git.RepoURL == "" {
return nil, errors.New("git repo URL not configured")
}
// Ensure destination directory exists
if err := os.MkdirAll(filepath.Dir(destDir), 0750); err != nil {
return nil, fmt.Errorf("creating destination directory: %w", err)
}
// Set up authentication
var auth *http.BasicAuth
if cfg.Git.AuthToken != "" {
auth = &http.BasicAuth{
Username: "git",
Password: cfg.Git.AuthToken,
}
}
// Clone options
cloneOpts := &git.CloneOptions{
URL: cfg.Git.RepoURL,
Auth: auth,
ReferenceName: plumbing.NewBranchReferenceName(cfg.Git.Branch),
SingleBranch: true,
Depth: 1,
Tags: git.AllTags,
}
repo, err := git.PlainCloneContext(ctx, destDir, false, cloneOpts)
if err != nil {
return nil, fmt.Errorf("cloning repository: %w", err)
}
return repo, nil
}
package switcher
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
"github.com/mountain-reverie/blue-green-load-balancer/internal/health"
"github.com/mountain-reverie/blue-green-load-balancer/internal/proxy"
)
// SwitchEvent represents a switch operation.
type SwitchEvent struct {
From config.ServiceTarget `json:"from"`
To config.ServiceTarget `json:"to"`
Trigger SwitchTrigger `json:"trigger"`
Timestamp time.Time `json:"timestamp"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
}
// SwitchTrigger indicates what triggered a switch.
type SwitchTrigger string
const (
TriggerWebhook SwitchTrigger = "webhook"
TriggerGitTag SwitchTrigger = "git_tag"
)
// Switcher orchestrates switching between blue and green backends.
type Switcher struct {
cfg *config.Config
logger *slog.Logger
proxy *proxy.Proxy
health *health.Checker
watcher *GitWatcher
mu sync.RWMutex
lastSwitch time.Time
switchCount int64
history []SwitchEvent
onSwitch func(event SwitchEvent)
}
// SwitcherOption configures the switcher.
type SwitcherOption func(*Switcher)
// WithSwitchCallback sets a callback for switch events.
func WithSwitchCallback(fn func(event SwitchEvent)) SwitcherOption {
return func(s *Switcher) {
s.onSwitch = fn
}
}
// NewSwitcher creates a new switch orchestrator.
func NewSwitcher(
cfg *config.Config,
logger *slog.Logger,
p *proxy.Proxy,
h *health.Checker,
opts ...SwitcherOption,
) *Switcher {
s := &Switcher{
cfg: cfg,
logger: logger,
proxy: p,
health: h,
history: make([]SwitchEvent, 0, 100),
}
for _, opt := range opts {
opt(s)
}
return s
}
// SetGitWatcher sets the git watcher for automatic switching.
func (s *Switcher) SetGitWatcher(w *GitWatcher) {
s.watcher = w
}
// Switch performs a switch to the specified target.
func (s *Switcher) Switch(ctx context.Context, target config.ServiceTarget, trigger SwitchTrigger) error {
s.mu.Lock()
from := s.proxy.ActiveTarget()
if from == target {
s.mu.Unlock()
s.logger.Info("already on target, no switch needed", "target", target)
return nil
}
// Check health of target
if !s.health.IsHealthy(target) {
s.mu.Unlock()
return fmt.Errorf("target %s is not healthy", target)
}
start := time.Now()
s.mu.Unlock()
s.logger.Info("starting switch",
"from", from,
"to", target,
"trigger", trigger,
)
// Perform the switch
err := s.proxy.Switch(ctx, target)
duration := time.Since(start)
event := SwitchEvent{
From: from,
To: target,
Trigger: trigger,
Timestamp: start,
Duration: duration,
Success: err == nil,
}
if err != nil {
event.Error = err.Error()
}
s.mu.Lock()
if err == nil {
s.lastSwitch = time.Now()
s.switchCount++
}
// Add to history (keep last 100)
s.history = append(s.history, event)
if len(s.history) > 100 {
s.history = s.history[1:]
}
s.mu.Unlock()
// Notify listeners
if s.onSwitch != nil {
s.onSwitch(event)
}
if err != nil {
s.logger.Error("switch failed",
"from", from,
"to", target,
"error", err,
"duration", duration,
)
return err
}
s.logger.Info("switch completed",
"from", from,
"to", target,
"duration", duration,
)
return nil
}
// SwitchToBlue switches to the blue backend.
func (s *Switcher) SwitchToBlue(ctx context.Context, trigger SwitchTrigger) error {
return s.Switch(ctx, config.ServiceBlue, trigger)
}
// SwitchToGreen switches to the green backend.
func (s *Switcher) SwitchToGreen(ctx context.Context, trigger SwitchTrigger) error {
return s.Switch(ctx, config.ServiceGreen, trigger)
}
// ActiveTarget returns the currently active target.
func (s *Switcher) ActiveTarget() config.ServiceTarget {
if s.proxy == nil {
return config.ServiceBlue
}
return s.proxy.ActiveTarget()
}
// LastSwitch returns the time of the last successful switch.
func (s *Switcher) LastSwitch() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastSwitch
}
// SwitchCount returns the total number of switches.
func (s *Switcher) SwitchCount() int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.switchCount
}
// History returns the switch history.
func (s *Switcher) History() []SwitchEvent {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]SwitchEvent, len(s.history))
copy(result, s.history)
return result
}
// GetGitStatus returns the current git status.
func (s *Switcher) GetGitStatus() GitStatus {
if s.watcher != nil {
return s.watcher.GetStatus()
}
return GitStatus{}
}
// RefreshGitResult contains the result of a git refresh operation.
type RefreshGitResult struct {
Refreshed bool `json:"refreshed"`
Error string `json:"error,omitempty"`
BlueCommit string `json:"blue_commit,omitempty"`
GreenCommit string `json:"green_commit,omitempty"`
ActiveCommit string `json:"active_commit,omitempty"`
LastFetch time.Time `json:"last_fetch"`
}
// RefreshGit triggers an immediate git fetch and returns the result.
// Any tag changes detected will trigger switches via the normal callback mechanism.
func (s *Switcher) RefreshGit(ctx context.Context) RefreshGitResult {
if s.watcher == nil {
return RefreshGitResult{
Refreshed: false,
Error: "git watcher not configured",
}
}
err := s.watcher.FetchNow(ctx)
status := s.watcher.GetStatus()
result := RefreshGitResult{
Refreshed: err == nil,
BlueCommit: status.BlueCommit,
GreenCommit: status.GreenCommit,
ActiveCommit: status.ActiveCommit,
LastFetch: status.LastFetch,
}
if err != nil {
result.Error = err.Error()
}
return result
}
// HandleTagChange handles a git tag change event.
func (s *Switcher) HandleTagChange(change TagChange) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.logger.Info("handling git tag change",
"tag", change.Tag,
"target", change.Target,
)
if err := s.Switch(ctx, change.Target, TriggerGitTag); err != nil {
s.logger.Error("failed to switch on tag change",
"error", err,
"tag", change.Tag,
"target", change.Target,
)
}
}
// Status represents the current switcher status.
type Status struct {
ActiveService config.ServiceTarget `json:"active_service"`
LastSwitch time.Time `json:"last_switch"`
SwitchCount int64 `json:"switch_count"`
BlueHealth health.Status `json:"blue_health"`
GreenHealth health.Status `json:"green_health"`
Git GitStatus `json:"git"`
}
// GetStatus returns the current switcher status.
func (s *Switcher) GetStatus() Status {
return Status{
ActiveService: s.ActiveTarget(),
LastSwitch: s.LastSwitch(),
SwitchCount: s.SwitchCount(),
BlueHealth: s.health.GetStatus(config.ServiceBlue),
GreenHealth: s.health.GetStatus(config.ServiceGreen),
Git: s.GetGitStatus(),
}
}
package ui
import (
"io/fs"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/mountain-reverie/blue-green-load-balancer/internal/metrics"
"github.com/mountain-reverie/blue-green-load-balancer/internal/switcher"
"github.com/mountain-reverie/blue-green-load-balancer/internal/ui/templates"
"github.com/mountain-reverie/blue-green-load-balancer/static"
)
// Handlers provides HTTP handlers for the UI.
type Handlers struct {
switcher *switcher.Switcher
metrics *metrics.Collector
sse *SSEBroker
}
// NewHandlers creates new UI handlers.
func NewHandlers(sw *switcher.Switcher, m *metrics.Collector) *Handlers {
h := &Handlers{
switcher: sw,
metrics: m,
sse: NewSSEBroker(),
}
return h
}
// RegisterRoutes registers UI routes on the given router.
func (h *Handlers) RegisterRoutes(r chi.Router) {
r.Get("/", h.Dashboard)
r.Get("/dashboard", h.Dashboard)
r.Get("/events", h.SSEHandler)
// Serve embedded static files
staticFS, _ := fs.Sub(static.FS, "css")
r.Handle("/static/css/*", http.StripPrefix("/static/css/", http.FileServer(http.FS(staticFS))))
}
// Dashboard renders the main dashboard page.
func (h *Handlers) Dashboard(w http.ResponseWriter, r *http.Request) {
status := h.switcher.GetStatus()
var snapshot metrics.Snapshot
if h.metrics != nil {
snapshot = h.metrics.Snapshot()
}
data := templates.DashboardData{
Status: status,
Metrics: snapshot,
History: h.switcher.History(),
}
component := templates.DashboardPage(data)
if err := component.Render(r.Context(), w); err != nil {
http.Error(w, "failed to render dashboard", http.StatusInternalServerError)
}
}
// SSEHandler handles Server-Sent Events connections.
func (h *Handlers) SSEHandler(w http.ResponseWriter, r *http.Request) {
h.sse.ServeHTTP(w, r)
}
// SSEBroker returns the SSE broker for sending updates.
func (h *Handlers) SSEBroker() *SSEBroker {
return h.sse
}
// StartUpdates starts sending periodic updates to SSE clients.
func (h *Handlers) StartUpdates() {
h.sse.Start()
}
// StopUpdates stops sending updates.
func (h *Handlers) StopUpdates() {
h.sse.Stop()
}
// SendStatusUpdate sends a status update to all SSE clients.
func (h *Handlers) SendStatusUpdate() {
status := h.switcher.GetStatus()
var snapshot metrics.Snapshot
if h.metrics != nil {
snapshot = h.metrics.Snapshot()
}
h.sse.SendEvent("status", StatusUpdate{
Status: status,
Metrics: snapshot,
})
}
// SendSwitchEvent sends a switch event to all SSE clients.
func (h *Handlers) SendSwitchEvent(event switcher.SwitchEvent) {
h.sse.SendEvent("switch", event)
}
// StatusUpdate is sent via SSE when status changes.
type StatusUpdate struct {
Status switcher.Status `json:"status"`
Metrics metrics.Snapshot `json:"metrics"`
}
package ui
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// SSEBroker manages Server-Sent Events connections.
type SSEBroker struct {
clients map[chan []byte]bool
register chan chan []byte
unregister chan chan []byte
broadcast chan []byte
mu sync.RWMutex
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewSSEBroker creates a new SSE broker.
func NewSSEBroker() *SSEBroker {
return &SSEBroker{
clients: make(map[chan []byte]bool),
register: make(chan chan []byte),
unregister: make(chan chan []byte),
broadcast: make(chan []byte, 100),
}
}
// Start begins the broker's event loop.
func (b *SSEBroker) Start() {
ctx, cancel := context.WithCancel(context.Background())
b.cancel = cancel
b.wg.Add(1)
go b.run(ctx)
}
// Stop stops the broker.
func (b *SSEBroker) Stop() {
if b.cancel != nil {
b.cancel()
}
b.wg.Wait()
}
// run is the main event loop.
func (b *SSEBroker) run(ctx context.Context) {
defer b.wg.Done()
for {
select {
case <-ctx.Done():
b.mu.Lock()
for client := range b.clients {
close(client)
delete(b.clients, client)
}
b.mu.Unlock()
return
case client := <-b.register:
b.mu.Lock()
b.clients[client] = true
b.mu.Unlock()
case client := <-b.unregister:
b.mu.Lock()
if _, ok := b.clients[client]; ok {
close(client)
delete(b.clients, client)
}
b.mu.Unlock()
case msg := <-b.broadcast:
b.mu.RLock()
for client := range b.clients {
select {
case client <- msg:
default:
// Client buffer full, skip
}
}
b.mu.RUnlock()
}
}
}
// ServeHTTP implements http.Handler for SSE connections.
func (b *SSEBroker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
// Create client channel
client := make(chan []byte, 10)
// Register client
b.register <- client
// Ensure cleanup (non-blocking in case broker is stopped)
defer func() {
select {
case b.unregister <- client:
default:
// Broker already stopped, channel is no longer being received
}
}()
// Send initial connection message
_, _ = fmt.Fprintf(w, "event: connected\ndata: {\"connected\": true}\n\n")
flusher.Flush()
// Keep-alive ticker
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-r.Context().Done():
return
case msg, ok := <-client:
if !ok {
return
}
_, _ = w.Write(msg)
flusher.Flush()
case <-ticker.C:
// Send keep-alive
_, _ = fmt.Fprintf(w, ": keep-alive\n\n")
flusher.Flush()
}
}
}
// SendEvent sends an event to all connected clients.
func (b *SSEBroker) SendEvent(eventType string, data any) {
jsonData, err := json.Marshal(data)
if err != nil {
return
}
msg := fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, jsonData)
select {
case b.broadcast <- []byte(msg):
default:
// Broadcast channel full, drop message
}
}
// SendRaw sends a raw message to all connected clients.
func (b *SSEBroker) SendRaw(msg string) {
select {
case b.broadcast <- []byte(msg):
default:
}
}
// ClientCount returns the number of connected clients.
func (b *SSEBroker) ClientCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.clients)
}
// Code generated by templ - DO NOT EDIT.
// templ: version: v0.3.977
package templates
//lint:file-ignore SA4006 This context is only used if a nested component is present.
import "github.com/a-h/templ"
import templruntime "github.com/a-h/templ/runtime"
import (
"fmt"
"time"
"github.com/mountain-reverie/blue-green-load-balancer/internal/config"
"github.com/mountain-reverie/blue-green-load-balancer/internal/metrics"
"github.com/mountain-reverie/blue-green-load-balancer/internal/switcher"
)
type DashboardData struct {
Status switcher.Status
Metrics metrics.Snapshot
History []switcher.SwitchEvent
}
func DashboardPage(data DashboardData) templ.Component {
return templruntime.GeneratedTemplate(func(templ_7745c5c3_Input templruntime.GeneratedComponentInput) (templ_7745c5c3_Err error) {
templ_7745c5c3_W, ctx := templ_7745c5c3_Input.Writer, templ_7745c5c3_Input.Context
if templ_7745c5c3_CtxErr := ctx.Err(); templ_7745c5c3_CtxErr != nil {
return templ_7745c5c3_CtxErr
}
templ_7745c5c3_Buffer, templ_7745c5c3_IsBuffer := templruntime.GetBuffer(templ_7745c5c3_W)
if !templ_7745c5c3_IsBuffer {
defer func() {
templ_7745c5c3_BufErr := templruntime.ReleaseBuffer(templ_7745c5c3_Buffer)
if templ_7745c5c3_Err == nil {
templ_7745c5c3_Err = templ_7745c5c3_BufErr
}
}()
}
ctx = templ.InitializeContext(ctx)
templ_7745c5c3_Var1 := templ.GetChildren(ctx)
if templ_7745c5c3_Var1 == nil {
templ_7745c5c3_Var1 = templ.NopComponent
}
ctx = templ.ClearChildren(ctx)
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 1, "<!doctype html><html lang=\"en\"><head><meta charset=\"UTF-8\"><meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"><title>Blue/Green Load Balancer</title><link href=\"/static/css/output.css\" rel=\"stylesheet\"><script src=\"https://unpkg.com/htmx.org@1.9.10\"></script><script src=\"https://unpkg.com/htmx.org/dist/ext/sse.js\"></script></head><body class=\"bg-gray-900 text-gray-100 min-h-screen\"><div class=\"container mx-auto px-4 py-8\"><header class=\"mb-8\"><h1 class=\"text-3xl font-bold text-white\">Blue/Green Load Balancer</h1><p class=\"text-gray-400 mt-2\">Admin Dashboard</p></header><main hx-ext=\"sse\" sse-connect=\"/events\" sse-swap=\"status\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = StatusSection(data.Status).Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = SwitchControls().Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricsSection(data.Metrics).Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = HistorySection(data.History).Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "</main></div><script>\n\t\t\t\t// Handle SSE status updates\n\t\t\t\tdocument.body.addEventListener('sse:status', function(e) {\n\t\t\t\t\tconst data = JSON.parse(e.detail.data);\n\t\t\t\t\tupdateStatus(data.status);\n\t\t\t\t\tupdateMetrics(data.metrics);\n\t\t\t\t});\n\n\t\t\t\tfunction updateStatus(status) {\n\t\t\t\t\t// Update active service indicator\n\t\t\t\t\tconst activeEl = document.getElementById('active-service');\n\t\t\t\t\tif (activeEl) {\n\t\t\t\t\t\tactiveEl.textContent = status.active_service;\n\t\t\t\t\t\tactiveEl.className = status.active_service === 'blue'\n\t\t\t\t\t\t\t? 'text-blue-400 font-bold'\n\t\t\t\t\t\t\t: 'text-green-400 font-bold';\n\t\t\t\t\t}\n\t\t\t\t}\n\n\t\t\t\tfunction updateMetrics(metrics) {\n\t\t\t\t\t// Update metrics values\n\t\t\t\t\tdocument.getElementById('total-requests').textContent = metrics.total_requests;\n\t\t\t\t\tdocument.getElementById('rps').textContent = metrics.requests_per_second.toFixed(2);\n\t\t\t\t\tdocument.getElementById('active-connections').textContent = metrics.active_connections;\n\t\t\t\t\tdocument.getElementById('error-rate').textContent = (metrics.error_rate * 100).toFixed(2) + '%';\n\t\t\t\t}\n\t\t\t</script></body></html>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
return nil
})
}
func StatusSection(status switcher.Status) templ.Component {
return templruntime.GeneratedTemplate(func(templ_7745c5c3_Input templruntime.GeneratedComponentInput) (templ_7745c5c3_Err error) {
templ_7745c5c3_W, ctx := templ_7745c5c3_Input.Writer, templ_7745c5c3_Input.Context
if templ_7745c5c3_CtxErr := ctx.Err(); templ_7745c5c3_CtxErr != nil {
return templ_7745c5c3_CtxErr
}
templ_7745c5c3_Buffer, templ_7745c5c3_IsBuffer := templruntime.GetBuffer(templ_7745c5c3_W)
if !templ_7745c5c3_IsBuffer {
defer func() {
templ_7745c5c3_BufErr := templruntime.ReleaseBuffer(templ_7745c5c3_Buffer)
if templ_7745c5c3_Err == nil {
templ_7745c5c3_Err = templ_7745c5c3_BufErr
}
}()
}
ctx = templ.InitializeContext(ctx)
templ_7745c5c3_Var2 := templ.GetChildren(ctx)
if templ_7745c5c3_Var2 == nil {
templ_7745c5c3_Var2 = templ.NopComponent
}
ctx = templ.ClearChildren(ctx)
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 3, "<section class=\"mb-8\"><h2 class=\"text-xl font-semibold mb-4\">Service Status</h2><div class=\"grid grid-cols-1 md:grid-cols-3 gap-4\"><!-- Active Service Card --><div class=\"bg-gray-800 rounded-lg p-6\"><h3 class=\"text-gray-400 text-sm uppercase mb-2\">Active Service</h3>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var3 = []any{templ.KV("text-blue-400", status.ActiveService == config.ServiceBlue), templ.KV("text-green-400", status.ActiveService == config.ServiceGreen), "text-2xl font-bold"}
templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var3...)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 4, "<p id=\"active-service\" class=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var4 string
templ_7745c5c3_Var4, templ_7745c5c3_Err = templ.JoinStringErrs(templ.CSSClasses(templ_7745c5c3_Var3).String())
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 1, Col: 0}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var4))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 5, "\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var5 string
templ_7745c5c3_Var5, templ_7745c5c3_Err = templ.JoinStringErrs(string(status.ActiveService))
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 81, Col: 35}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var5))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 6, "</p>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
if !status.LastSwitch.IsZero() {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 7, "<p class=\"text-gray-500 text-sm mt-2\">Last switch: ")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var6 string
templ_7745c5c3_Var6, templ_7745c5c3_Err = templ.JoinStringErrs(status.LastSwitch.Format(time.RFC3339))
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 85, Col: 59}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var6))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 8, "</p>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 9, "</div><!-- Blue Service Card -->")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = ServiceCard("Blue", status.BlueHealth.Healthy, status.BlueHealth.Latency, status.BlueHealth.Error, "blue").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 10, "<!-- Green Service Card -->")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = ServiceCard("Green", status.GreenHealth.Healthy, status.GreenHealth.Latency, status.GreenHealth.Error, "green").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 11, "</div></section>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
return nil
})
}
func ServiceCard(name string, healthy bool, latency time.Duration, errMsg string, color string) templ.Component {
return templruntime.GeneratedTemplate(func(templ_7745c5c3_Input templruntime.GeneratedComponentInput) (templ_7745c5c3_Err error) {
templ_7745c5c3_W, ctx := templ_7745c5c3_Input.Writer, templ_7745c5c3_Input.Context
if templ_7745c5c3_CtxErr := ctx.Err(); templ_7745c5c3_CtxErr != nil {
return templ_7745c5c3_CtxErr
}
templ_7745c5c3_Buffer, templ_7745c5c3_IsBuffer := templruntime.GetBuffer(templ_7745c5c3_W)
if !templ_7745c5c3_IsBuffer {
defer func() {
templ_7745c5c3_BufErr := templruntime.ReleaseBuffer(templ_7745c5c3_Buffer)
if templ_7745c5c3_Err == nil {
templ_7745c5c3_Err = templ_7745c5c3_BufErr
}
}()
}
ctx = templ.InitializeContext(ctx)
templ_7745c5c3_Var7 := templ.GetChildren(ctx)
if templ_7745c5c3_Var7 == nil {
templ_7745c5c3_Var7 = templ.NopComponent
}
ctx = templ.ClearChildren(ctx)
var templ_7745c5c3_Var8 = []any{"bg-gray-800 rounded-lg p-6 border-l-4", templ.KV("border-blue-500", color == "blue"), templ.KV("border-green-500", color == "green")}
templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var8...)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 12, "<div class=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var9 string
templ_7745c5c3_Var9, templ_7745c5c3_Err = templ.JoinStringErrs(templ.CSSClasses(templ_7745c5c3_Var8).String())
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 1, Col: 0}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var9))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 13, "\"><div class=\"flex items-center justify-between mb-2\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var10 = []any{"text-lg font-semibold", templ.KV("text-blue-400", color == "blue"), templ.KV("text-green-400", color == "green")}
templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var10...)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 14, "<h3 class=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var11 string
templ_7745c5c3_Var11, templ_7745c5c3_Err = templ.JoinStringErrs(templ.CSSClasses(templ_7745c5c3_Var10).String())
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 1, Col: 0}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var11))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 15, "\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var12 string
templ_7745c5c3_Var12, templ_7745c5c3_Err = templ.JoinStringErrs(name)
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 101, Col: 10}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var12))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 16, "</h3>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var13 = []any{"px-2 py-1 rounded text-xs font-medium", templ.KV("bg-green-900 text-green-300", healthy), templ.KV("bg-red-900 text-red-300", !healthy)}
templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var13...)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 17, "<span class=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var14 string
templ_7745c5c3_Var14, templ_7745c5c3_Err = templ.JoinStringErrs(templ.CSSClasses(templ_7745c5c3_Var13).String())
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 1, Col: 0}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var14))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 18, "\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
if healthy {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 19, "Healthy")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
} else {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 20, "Unhealthy")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 21, "</span></div><p class=\"text-gray-400 text-sm\">Latency: ")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var15 string
templ_7745c5c3_Var15, templ_7745c5c3_Err = templ.JoinStringErrs(fmt.Sprintf("%.2fms", float64(latency.Microseconds())/1000))
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 112, Col: 73}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var15))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 22, "</p>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
if errMsg != "" {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 23, "<p class=\"text-red-400 text-sm mt-1\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var16 string
templ_7745c5c3_Var16, templ_7745c5c3_Err = templ.JoinStringErrs(errMsg)
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 115, Col: 48}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var16))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 24, "</p>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 25, "</div>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
return nil
})
}
func MetricsSection(m metrics.Snapshot) templ.Component {
return templruntime.GeneratedTemplate(func(templ_7745c5c3_Input templruntime.GeneratedComponentInput) (templ_7745c5c3_Err error) {
templ_7745c5c3_W, ctx := templ_7745c5c3_Input.Writer, templ_7745c5c3_Input.Context
if templ_7745c5c3_CtxErr := ctx.Err(); templ_7745c5c3_CtxErr != nil {
return templ_7745c5c3_CtxErr
}
templ_7745c5c3_Buffer, templ_7745c5c3_IsBuffer := templruntime.GetBuffer(templ_7745c5c3_W)
if !templ_7745c5c3_IsBuffer {
defer func() {
templ_7745c5c3_BufErr := templruntime.ReleaseBuffer(templ_7745c5c3_Buffer)
if templ_7745c5c3_Err == nil {
templ_7745c5c3_Err = templ_7745c5c3_BufErr
}
}()
}
ctx = templ.InitializeContext(ctx)
templ_7745c5c3_Var17 := templ.GetChildren(ctx)
if templ_7745c5c3_Var17 == nil {
templ_7745c5c3_Var17 = templ.NopComponent
}
ctx = templ.ClearChildren(ctx)
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 26, "<section class=\"mb-8\"><h2 class=\"text-xl font-semibold mb-4\">Metrics</h2><div class=\"grid grid-cols-2 md:grid-cols-4 gap-4\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("Total Requests", fmt.Sprintf("%d", m.TotalRequests), "total-requests").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("Requests/sec", fmt.Sprintf("%.2f", m.RequestsPerSecond), "rps").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("Active Connections", fmt.Sprintf("%d", m.ActiveConnections), "active-connections").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("Error Rate", fmt.Sprintf("%.2f%%", m.ErrorRate*100), "error-rate").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 27, "</div><div class=\"grid grid-cols-2 md:grid-cols-4 gap-4 mt-4\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("P50 Latency", fmt.Sprintf("%.2fms", m.LatencyP50), "p50").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("P90 Latency", fmt.Sprintf("%.2fms", m.LatencyP90), "p90").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("P99 Latency", fmt.Sprintf("%.2fms", m.LatencyP99), "p99").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = MetricCard("Uptime", formatDuration(time.Duration(m.UptimeSeconds)*time.Second), "uptime").Render(ctx, templ_7745c5c3_Buffer)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 28, "</div></section>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
return nil
})
}
func MetricCard(label string, value string, id string) templ.Component {
return templruntime.GeneratedTemplate(func(templ_7745c5c3_Input templruntime.GeneratedComponentInput) (templ_7745c5c3_Err error) {
templ_7745c5c3_W, ctx := templ_7745c5c3_Input.Writer, templ_7745c5c3_Input.Context
if templ_7745c5c3_CtxErr := ctx.Err(); templ_7745c5c3_CtxErr != nil {
return templ_7745c5c3_CtxErr
}
templ_7745c5c3_Buffer, templ_7745c5c3_IsBuffer := templruntime.GetBuffer(templ_7745c5c3_W)
if !templ_7745c5c3_IsBuffer {
defer func() {
templ_7745c5c3_BufErr := templruntime.ReleaseBuffer(templ_7745c5c3_Buffer)
if templ_7745c5c3_Err == nil {
templ_7745c5c3_Err = templ_7745c5c3_BufErr
}
}()
}
ctx = templ.InitializeContext(ctx)
templ_7745c5c3_Var18 := templ.GetChildren(ctx)
if templ_7745c5c3_Var18 == nil {
templ_7745c5c3_Var18 = templ.NopComponent
}
ctx = templ.ClearChildren(ctx)
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 29, "<div class=\"bg-gray-800 rounded-lg p-4\"><p class=\"text-gray-400 text-sm\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var19 string
templ_7745c5c3_Var19, templ_7745c5c3_Err = templ.JoinStringErrs(label)
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 140, Col: 42}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var19))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 30, "</p><p id=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var20 string
templ_7745c5c3_Var20, templ_7745c5c3_Err = templ.JoinStringErrs(id)
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 141, Col: 12}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var20))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 31, "\" class=\"text-2xl font-bold text-white mt-1\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var21 string
templ_7745c5c3_Var21, templ_7745c5c3_Err = templ.JoinStringErrs(value)
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 141, Col: 65}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var21))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 32, "</p></div>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
return nil
})
}
func HistorySection(events []switcher.SwitchEvent) templ.Component {
return templruntime.GeneratedTemplate(func(templ_7745c5c3_Input templruntime.GeneratedComponentInput) (templ_7745c5c3_Err error) {
templ_7745c5c3_W, ctx := templ_7745c5c3_Input.Writer, templ_7745c5c3_Input.Context
if templ_7745c5c3_CtxErr := ctx.Err(); templ_7745c5c3_CtxErr != nil {
return templ_7745c5c3_CtxErr
}
templ_7745c5c3_Buffer, templ_7745c5c3_IsBuffer := templruntime.GetBuffer(templ_7745c5c3_W)
if !templ_7745c5c3_IsBuffer {
defer func() {
templ_7745c5c3_BufErr := templruntime.ReleaseBuffer(templ_7745c5c3_Buffer)
if templ_7745c5c3_Err == nil {
templ_7745c5c3_Err = templ_7745c5c3_BufErr
}
}()
}
ctx = templ.InitializeContext(ctx)
templ_7745c5c3_Var22 := templ.GetChildren(ctx)
if templ_7745c5c3_Var22 == nil {
templ_7745c5c3_Var22 = templ.NopComponent
}
ctx = templ.ClearChildren(ctx)
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 33, "<section class=\"mb-8\"><h2 class=\"text-xl font-semibold mb-4\">Switch History</h2><div class=\"bg-gray-800 rounded-lg overflow-hidden\"><table class=\"w-full\"><thead class=\"bg-gray-700\"><tr><th class=\"px-4 py-3 text-left text-sm font-medium text-gray-300\">Time</th><th class=\"px-4 py-3 text-left text-sm font-medium text-gray-300\">From</th><th class=\"px-4 py-3 text-left text-sm font-medium text-gray-300\">To</th><th class=\"px-4 py-3 text-left text-sm font-medium text-gray-300\">Trigger</th><th class=\"px-4 py-3 text-left text-sm font-medium text-gray-300\">Duration</th><th class=\"px-4 py-3 text-left text-sm font-medium text-gray-300\">Status</th></tr></thead> <tbody class=\"divide-y divide-gray-700\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
if len(events) == 0 {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 34, "<tr><td colspan=\"6\" class=\"px-4 py-8 text-center text-gray-500\">No switch events yet</td></tr>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
}
for _, event := range events {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 35, "<tr class=\"hover:bg-gray-750\"><td class=\"px-4 py-3 text-sm text-gray-300\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var23 string
templ_7745c5c3_Var23, templ_7745c5c3_Err = templ.JoinStringErrs(event.Timestamp.Format("2006-01-02 15:04:05"))
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 171, Col: 55}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var23))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 36, "</td><td class=\"px-4 py-3 text-sm\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var24 = []any{templ.KV("text-blue-400", event.From == config.ServiceBlue), templ.KV("text-green-400", event.From == config.ServiceGreen)}
templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var24...)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 37, "<span class=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var25 string
templ_7745c5c3_Var25, templ_7745c5c3_Err = templ.JoinStringErrs(templ.CSSClasses(templ_7745c5c3_Var24).String())
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 1, Col: 0}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var25))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 38, "\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var26 string
templ_7745c5c3_Var26, templ_7745c5c3_Err = templ.JoinStringErrs(string(event.From))
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 175, Col: 29}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var26))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 39, "</span></td><td class=\"px-4 py-3 text-sm\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var27 = []any{templ.KV("text-blue-400", event.To == config.ServiceBlue), templ.KV("text-green-400", event.To == config.ServiceGreen)}
templ_7745c5c3_Err = templ.RenderCSSItems(ctx, templ_7745c5c3_Buffer, templ_7745c5c3_Var27...)
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 40, "<span class=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var28 string
templ_7745c5c3_Var28, templ_7745c5c3_Err = templ.JoinStringErrs(templ.CSSClasses(templ_7745c5c3_Var27).String())
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 1, Col: 0}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var28))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 41, "\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var29 string
templ_7745c5c3_Var29, templ_7745c5c3_Err = templ.JoinStringErrs(string(event.To))
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 180, Col: 27}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var29))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 42, "</span></td><td class=\"px-4 py-3 text-sm text-gray-400\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var30 string
templ_7745c5c3_Var30, templ_7745c5c3_Err = templ.JoinStringErrs(string(event.Trigger))
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 184, Col: 31}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var30))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 43, "</td><td class=\"px-4 py-3 text-sm text-gray-400\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var31 string
templ_7745c5c3_Var31, templ_7745c5c3_Err = templ.JoinStringErrs(event.Duration.String())
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 187, Col: 33}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var31))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 44, "</td><td class=\"px-4 py-3 text-sm\">")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
if event.Success {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 45, "<span class=\"text-green-400\">Success</span>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
} else {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 46, "<span class=\"text-red-400\" title=\"")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
var templ_7745c5c3_Var32 string
templ_7745c5c3_Var32, templ_7745c5c3_Err = templ.JoinStringErrs(event.Error)
if templ_7745c5c3_Err != nil {
return templ.Error{Err: templ_7745c5c3_Err, FileName: `internal/ui/templates/dashboard.templ`, Line: 193, Col: 55}
}
_, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var32))
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 47, "\">Failed</span>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 48, "</td></tr>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
}
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 49, "</tbody></table></div></section>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
return nil
})
}
func SwitchControls() templ.Component {
return templruntime.GeneratedTemplate(func(templ_7745c5c3_Input templruntime.GeneratedComponentInput) (templ_7745c5c3_Err error) {
templ_7745c5c3_W, ctx := templ_7745c5c3_Input.Writer, templ_7745c5c3_Input.Context
if templ_7745c5c3_CtxErr := ctx.Err(); templ_7745c5c3_CtxErr != nil {
return templ_7745c5c3_CtxErr
}
templ_7745c5c3_Buffer, templ_7745c5c3_IsBuffer := templruntime.GetBuffer(templ_7745c5c3_W)
if !templ_7745c5c3_IsBuffer {
defer func() {
templ_7745c5c3_BufErr := templruntime.ReleaseBuffer(templ_7745c5c3_Buffer)
if templ_7745c5c3_Err == nil {
templ_7745c5c3_Err = templ_7745c5c3_BufErr
}
}()
}
ctx = templ.InitializeContext(ctx)
templ_7745c5c3_Var33 := templ.GetChildren(ctx)
if templ_7745c5c3_Var33 == nil {
templ_7745c5c3_Var33 = templ.NopComponent
}
ctx = templ.ClearChildren(ctx)
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 50, "<section class=\"mb-8\"><h2 class=\"text-xl font-semibold mb-4\">Git Controls</h2><div class=\"bg-gray-800 rounded-lg p-6\"><p class=\"text-gray-400 mb-4\">Switching is controlled by git tags. Update the deploy/blue or deploy/green tag to trigger a switch.</p><div class=\"flex gap-4\"><button hx-post=\"/api/webhook/refresh\" hx-swap=\"none\" class=\"px-6 py-3 bg-indigo-600 hover:bg-indigo-700 text-white rounded-lg font-medium transition-colors\">Refresh Git Tags</button></div></div></section>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
return nil
})
}
func formatDuration(d time.Duration) string {
if d < time.Minute {
return fmt.Sprintf("%ds", int(d.Seconds()))
}
if d < time.Hour {
return fmt.Sprintf("%dm", int(d.Minutes()))
}
if d < 24*time.Hour {
return fmt.Sprintf("%dh %dm", int(d.Hours()), int(d.Minutes())%60)
}
days := int(d.Hours()) / 24
hours := int(d.Hours()) % 24
return fmt.Sprintf("%dd %dh", days, hours)
}
var _ = templruntime.GeneratedTemplate
package headscale
import (
"context"
"fmt"
"io"
"net/http"
"time"
"tailscale.com/client/local"
"tailscale.com/tsnet"
)
// TestClient is an HTTP client that connects via the Tailscale network.
type TestClient struct {
tsServer *tsnet.Server
httpClient *http.Client
}
// NewTestClient creates a new test client connected to a Headscale server.
func NewTestClient(ctx context.Context, hostname, stateDir, controlURL, authKey string) (*TestClient, error) {
srv := &tsnet.Server{
Hostname: hostname,
Dir: stateDir,
ControlURL: controlURL,
AuthKey: authKey,
Ephemeral: true,
}
if err := srv.Start(); err != nil {
return nil, fmt.Errorf("starting tsnet server: %w", err)
}
// Wait for Tailscale IP assignment
lc, err := srv.LocalClient()
if err != nil {
_ = srv.Close()
return nil, fmt.Errorf("getting local client: %w", err)
}
if err := waitForTailscaleIP(ctx, lc); err != nil {
_ = srv.Close()
return nil, fmt.Errorf("waiting for Tailscale IP: %w", err)
}
return &TestClient{
tsServer: srv,
httpClient: &http.Client{
Transport: &http.Transport{
DialContext: srv.Dial,
},
Timeout: 30 * time.Second,
},
}, nil
}
// waitForTailscaleIP waits until the tsnet server has a Tailscale IP assigned.
func waitForTailscaleIP(ctx context.Context, lc *local.Client) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
status, err := lc.Status(ctx)
if err != nil {
continue
}
if status.Self != nil && len(status.Self.TailscaleIPs) > 0 {
return nil
}
}
}
}
// Get performs an HTTP GET request via the Tailscale network.
func (c *TestClient) Get(ctx context.Context, url string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
return c.httpClient.Do(req)
}
// Post performs an HTTP POST request via the Tailscale network.
func (c *TestClient) Post(ctx context.Context, url, contentType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)
return c.httpClient.Do(req)
}
// Close closes the test client and its tsnet server.
func (c *TestClient) Close() error {
// Close idle connections to prevent lingering connections
if transport, ok := c.httpClient.Transport.(*http.Transport); ok {
transport.CloseIdleConnections()
}
return c.tsServer.Close()
}
// TailscaleIP returns the Tailscale IP of this client.
func (c *TestClient) TailscaleIP(ctx context.Context) (string, error) {
lc, err := c.tsServer.LocalClient()
if err != nil {
return "", fmt.Errorf("getting local client: %w", err)
}
status, err := lc.Status(ctx)
if err != nil {
return "", fmt.Errorf("getting status: %w", err)
}
if status.Self == nil || len(status.Self.TailscaleIPs) == 0 {
return "", fmt.Errorf("no Tailscale IPs assigned")
}
return status.Self.TailscaleIPs[0].String(), nil
}
// WaitForPeer waits until a peer with the given hostname is visible.
func (c *TestClient) WaitForPeer(ctx context.Context, hostname string) error {
lc, err := c.tsServer.LocalClient()
if err != nil {
return fmt.Errorf("getting local client: %w", err)
}
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
status, err := lc.Status(ctx)
if err != nil {
continue
}
for _, peer := range status.Peer {
if peer.HostName == hostname {
return nil
}
}
}
}
}
package headscale
import "fmt"
// generateHeadscaleConfig returns a minimal Headscale configuration for testing.
// serverURL should be the externally accessible URL (e.g., http://localhost:32768).
func generateHeadscaleConfig(serverURL string) string {
return fmt.Sprintf(`
server_url: %s
listen_addr: 0.0.0.0:8080
metrics_listen_addr: 0.0.0.0:9090
grpc_listen_addr: 0.0.0.0:50443
grpc_allow_insecure: true
database:
type: sqlite
sqlite:
path: /var/lib/headscale/db.sqlite
noise:
private_key_path: /var/lib/headscale/noise_private.key
prefixes:
v4: 100.64.0.0/10
v6: fd7a:115c:a1e0::/48
derp:
server:
enabled: true
region_id: 999
region_code: "headscale"
region_name: "Headscale Embedded"
stun_listen_addr: 0.0.0.0:3478
private_key_path: /var/lib/headscale/derp_server_private.key
urls:
- https://controlplane.tailscale.com/derpmap/default
dns:
magic_dns: true
base_domain: test.headscale.net
policy:
path: /etc/headscale/acl.json
`, serverURL)
}
// generateHeadscaleACL returns a permissive ACL policy for testing.
func generateHeadscaleACL() string {
return `{
"acls": [
{
"action": "accept",
"src": ["*"],
"dst": ["*:*"]
}
]
}`
}
package headscale
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
// Container wraps a Headscale testcontainer.
type Container struct {
testcontainers.Container
URL string // http://host:port
}
// headscaleTestPort is the fixed port used for Headscale in tests.
// Using a fixed port simplifies config generation (server_url needs to be known at startup).
const headscaleTestPort = "19080"
// StartHeadscale starts a Headscale container for testing.
func StartHeadscale(ctx context.Context) (*Container, error) {
// Use localhost with fixed port for the server URL
serverURL := fmt.Sprintf("http://localhost:%s", headscaleTestPort)
configContent := generateHeadscaleConfig(serverURL)
aclContent := generateHeadscaleACL()
req := testcontainers.ContainerRequest{
Image: "headscale/headscale:0.23",
ExposedPorts: []string{headscaleTestPort + ":8080/tcp"},
Cmd: []string{"serve"},
Files: []testcontainers.ContainerFile{
{
Reader: strings.NewReader(configContent),
ContainerFilePath: "/etc/headscale/config.yaml",
FileMode: 0644,
},
{
Reader: strings.NewReader(aclContent),
ContainerFilePath: "/etc/headscale/acl.json",
FileMode: 0644,
},
},
WaitingFor: wait.ForAll(
wait.ForLog("listening and serving HTTP on"),
wait.ForListeningPort("8080/tcp"),
).WithDeadline(60 * time.Second),
}
container, err := testcontainers.GenericContainer(ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
return nil, fmt.Errorf("starting Headscale container: %w", err)
}
return &Container{
Container: container,
URL: serverURL,
}, nil
}
// CreateUser creates a new user in Headscale.
func (h *Container) CreateUser(ctx context.Context, username string) error {
exitCode, _, err := h.Exec(ctx, []string{"headscale", "users", "create", username})
if err != nil {
return fmt.Errorf("executing user create: %w", err)
}
if exitCode != 0 {
return fmt.Errorf("user create exited with code %d", exitCode)
}
return nil
}
// preauthKeyResponse represents the JSON response from headscale preauthkeys create.
type preauthKeyResponse struct {
Key string `json:"key"`
}
// CreatePreauthKey creates a reusable, ephemeral preauth key for a user.
func (h *Container) CreatePreauthKey(ctx context.Context, user string) (string, error) {
exitCode, reader, err := h.Exec(ctx, []string{
"headscale", "preauthkeys", "create",
"--user", user,
"--reusable",
"--ephemeral",
"--expiration", "1h",
"-o", "json",
})
if err != nil {
return "", fmt.Errorf("executing preauthkey create: %w", err)
}
if exitCode != 0 {
return "", fmt.Errorf("preauthkey create exited with code %d", exitCode)
}
var buf bytes.Buffer
if _, err := buf.ReadFrom(reader); err != nil {
return "", fmt.Errorf("reading exec output: %w", err)
}
// Docker exec output may contain multiplexed stream headers.
// Find the start of JSON content.
output := buf.Bytes()
jsonStart := bytes.Index(output, []byte("{"))
if jsonStart == -1 {
return "", fmt.Errorf("no JSON found in output: %s", buf.String())
}
var resp preauthKeyResponse
if err := json.Unmarshal(output[jsonStart:], &resp); err != nil {
return "", fmt.Errorf("parsing JSON response: %w (output: %s)", err, string(output[jsonStart:]))
}
if resp.Key == "" {
return "", fmt.Errorf("empty key in response: %s", buf.String())
}
return resp.Key, nil
}
// headscaleNode represents a node in the Headscale nodes list JSON output.
type headscaleNode struct {
ID int `json:"id"`
Name string `json:"name"`
GivenName string `json:"givenName"`
IPAddresses []string `json:"ipAddresses"`
Online bool `json:"online"`
}
// GetTailscaleIP retrieves the Tailscale IP for a given hostname.
// It only returns IPs for online nodes and matches the hostname exactly.
func (h *Container) GetTailscaleIP(ctx context.Context, hostname string) (string, error) {
exitCode, reader, err := h.Exec(ctx, []string{
"headscale", "nodes", "list", "-o", "json-line",
})
if err != nil {
return "", fmt.Errorf("executing nodes list: %w", err)
}
if exitCode != 0 {
return "", fmt.Errorf("nodes list exited with code %d", exitCode)
}
var buf bytes.Buffer
if _, err := buf.ReadFrom(reader); err != nil {
return "", fmt.Errorf("reading exec output: %w", err)
}
output := buf.String()
if strings.TrimSpace(output) == "" {
return "", fmt.Errorf("no nodes found for hostname %s", hostname)
}
for line := range strings.SplitSeq(output, "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
// Docker exec output may contain multiplexed stream headers before JSON.
// Find the start of JSON content on this line.
jsonStart := strings.Index(line, "{")
if jsonStart == -1 {
continue
}
line = line[jsonStart:]
var node headscaleNode
if err := json.Unmarshal([]byte(line), &node); err != nil {
// Skip malformed lines
continue
}
// Match hostname exactly against both name and givenName fields
if node.Name != hostname && node.GivenName != hostname {
continue
}
// Only return IPs for online nodes
if !node.Online {
continue
}
// Return the first IP address (typically the IPv4 address)
if len(node.IPAddresses) == 0 {
return "", fmt.Errorf("node %s has no IP addresses", hostname)
}
return node.IPAddresses[0], nil
}
return "", fmt.Errorf("no online node found for hostname %s", hostname)
}