refactor: graceful shutdown for server (#482)

This commit is contained in:
Alessandro (Ale) Segala
2025-04-28 18:13:50 +09:00
committed by GitHub
parent ce24372c57
commit 3ec98736cf
10 changed files with 209 additions and 90 deletions

View File

@@ -38,7 +38,6 @@ func initApplicationImages() {
log.Fatalf("Error copying file: %v", err) log.Fatalf("Error copying file: %v", err)
} }
} }
} }
func imageAlreadyExists(fileName string, destinationFiles []os.DirEntry) bool { func imageAlreadyExists(fileName string, destinationFiles []os.DirEntry) bool {
@@ -55,6 +54,11 @@ func imageAlreadyExists(fileName string, destinationFiles []os.DirEntry) bool {
} }
func getImageNameWithoutExtension(fileName string) string { func getImageNameWithoutExtension(fileName string) string {
splitted := strings.Split(fileName, ".") idx := strings.LastIndexByte(fileName, '.')
return strings.Join(splitted[:len(splitted)-1], ".") if idx < 1 {
// No dot found, or fileName starts with a dot
return fileName
}
return fileName[:idx]
} }

View File

@@ -6,10 +6,12 @@ import (
_ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/pocket-id/pocket-id/backend/internal/service" "github.com/pocket-id/pocket-id/backend/internal/service"
"github.com/pocket-id/pocket-id/backend/internal/utils/signals"
) )
func Bootstrap() { func Bootstrap() {
ctx := context.TODO() // Get a context that is canceled when the application is stopping
ctx := signals.SignalContext(context.Background())
initApplicationImages() initApplicationImages()

View File

@@ -2,8 +2,10 @@ package bootstrap
import ( import (
"context" "context"
"fmt"
"log" "log"
"net" "net"
"net/http"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -21,6 +23,13 @@ import (
var registerTestControllers []func(apiGroup *gin.RouterGroup, db *gorm.DB, appConfigService *service.AppConfigService, jwtService *service.JwtService) var registerTestControllers []func(apiGroup *gin.RouterGroup, db *gorm.DB, appConfigService *service.AppConfigService, jwtService *service.JwtService)
func initRouter(ctx context.Context, db *gorm.DB, appConfigService *service.AppConfigService) { func initRouter(ctx context.Context, db *gorm.DB, appConfigService *service.AppConfigService) {
err := initRouterInternal(ctx, db, appConfigService)
if err != nil {
log.Fatalf("failed to init router: %v", err)
}
}
func initRouterInternal(ctx context.Context, db *gorm.DB, appConfigService *service.AppConfigService) error {
// Set the appropriate Gin mode based on the environment // Set the appropriate Gin mode based on the environment
switch common.EnvConfig.AppEnv { switch common.EnvConfig.AppEnv {
case "production": case "production":
@@ -37,7 +46,7 @@ func initRouter(ctx context.Context, db *gorm.DB, appConfigService *service.AppC
// Initialize services // Initialize services
emailService, err := service.NewEmailService(appConfigService, db) emailService, err := service.NewEmailService(appConfigService, db)
if err != nil { if err != nil {
log.Fatalf("Unable to create email service: %v", err) return fmt.Errorf("unable to create email service: %w", err)
} }
geoLiteService := service.NewGeoLiteService(ctx) geoLiteService := service.NewGeoLiteService(ctx)
@@ -58,10 +67,30 @@ func initRouter(ctx context.Context, db *gorm.DB, appConfigService *service.AppC
r.Use(middleware.NewErrorHandlerMiddleware().Add()) r.Use(middleware.NewErrorHandlerMiddleware().Add())
r.Use(rateLimitMiddleware.Add(rate.Every(time.Second), 60)) r.Use(rateLimitMiddleware.Add(rate.Every(time.Second), 60))
job.RegisterLdapJobs(ctx, ldapService, appConfigService) scheduler, err := job.NewScheduler()
job.RegisterDbCleanupJobs(ctx, db) if err != nil {
job.RegisterFileCleanupJobs(ctx, db) return fmt.Errorf("failed to create job scheduler: %w", err)
job.RegisterApiKeyExpiryJob(ctx, apiKeyService, appConfigService) }
err = scheduler.RegisterLdapJobs(ctx, ldapService, appConfigService)
if err != nil {
return fmt.Errorf("failed to register LDAP jobs in scheduler: %w", err)
}
err = scheduler.RegisterDbCleanupJobs(ctx, db)
if err != nil {
return fmt.Errorf("failed to register DB cleanup jobs in scheduler: %w", err)
}
err = scheduler.RegisterFileCleanupJobs(ctx, db)
if err != nil {
return fmt.Errorf("failed to register file cleanup jobs in scheduler: %w", err)
}
err = scheduler.RegisterApiKeyExpiryJob(ctx, apiKeyService, appConfigService)
if err != nil {
return fmt.Errorf("failed to register API key expiration jobs in scheduler: %w", err)
}
// Run the scheduler in a background goroutine, until the context is canceled
go scheduler.Run(ctx)
// Initialize middleware for specific routes // Initialize middleware for specific routes
authMiddleware := middleware.NewAuthMiddleware(apiKeyService, userService, jwtService) authMiddleware := middleware.NewAuthMiddleware(apiKeyService, userService, jwtService)
@@ -89,20 +118,52 @@ func initRouter(ctx context.Context, db *gorm.DB, appConfigService *service.AppC
baseGroup := r.Group("/") baseGroup := r.Group("/")
controller.NewWellKnownController(baseGroup, jwtService) controller.NewWellKnownController(baseGroup, jwtService)
// Get the listener // Set up the server
l, err := net.Listen("tcp", common.EnvConfig.Host+":"+common.EnvConfig.Port) srv := &http.Server{
if err != nil { Addr: net.JoinHostPort(common.EnvConfig.Host, common.EnvConfig.Port),
log.Fatal(err) MaxHeaderBytes: 1 << 20,
ReadHeaderTimeout: 10 * time.Second,
Handler: r,
} }
// Set up the listener
listener, err := net.Listen("tcp", srv.Addr)
if err != nil {
return fmt.Errorf("failed to create TCP listener: %w", err)
}
log.Printf("Server listening on %s", srv.Addr)
// Notify systemd that we are ready // Notify systemd that we are ready
if err := systemd.SdNotifyReady(); err != nil { err = systemd.SdNotifyReady()
log.Println("Unable to notify systemd that the service is ready: ", err) if err != nil {
log.Printf("[WARN] Unable to notify systemd that the service is ready: %v", err)
// continue to serve anyway since it's not that important // continue to serve anyway since it's not that important
} }
// Serve requests // Start the server in a background goroutine
if err := r.RunListener(l); err != nil { go func() {
log.Fatal(err) defer listener.Close()
// Next call blocks until the server is shut down
srvErr := srv.Serve(listener)
if srvErr != http.ErrServerClosed {
log.Fatalf("Error starting app server: %v", srvErr)
} }
}()
// Block until the context is canceled
<-ctx.Done()
// Handle graceful shutdown
// Note we use the background context here as ctx has been canceled already
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
shutdownErr := srv.Shutdown(shutdownCtx) //nolint:contextcheck
shutdownCancel()
if shutdownErr != nil {
// Log the error only (could be context canceled)
log.Printf("[WARN] App server shutdown error: %v", shutdownErr)
}
return nil
} }

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"log" "log"
"github.com/go-co-op/gocron/v2"
"github.com/pocket-id/pocket-id/backend/internal/service" "github.com/pocket-id/pocket-id/backend/internal/service"
) )
@@ -13,20 +12,13 @@ type ApiKeyEmailJobs struct {
appConfigService *service.AppConfigService appConfigService *service.AppConfigService
} }
func RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService *service.ApiKeyService, appConfigService *service.AppConfigService) { func (s *Scheduler) RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService *service.ApiKeyService, appConfigService *service.AppConfigService) error {
jobs := &ApiKeyEmailJobs{ jobs := &ApiKeyEmailJobs{
apiKeyService: apiKeyService, apiKeyService: apiKeyService,
appConfigService: appConfigService, appConfigService: appConfigService,
} }
scheduler, err := gocron.NewScheduler() return s.registerJob(ctx, "ExpiredApiKeyEmailJob", "0 0 * * *", jobs.checkAndNotifyExpiringApiKeys)
if err != nil {
log.Fatalf("Failed to create a new scheduler: %v", err)
}
registerJob(ctx, scheduler, "ExpiredApiKeyEmailJob", "0 0 * * *", jobs.checkAndNotifyExpiringApiKeys)
scheduler.Start()
} }
func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error { func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error {

View File

@@ -2,30 +2,25 @@ package job
import ( import (
"context" "context"
"log" "errors"
"time" "time"
"github.com/go-co-op/gocron/v2"
"gorm.io/gorm" "gorm.io/gorm"
"github.com/pocket-id/pocket-id/backend/internal/model" "github.com/pocket-id/pocket-id/backend/internal/model"
datatype "github.com/pocket-id/pocket-id/backend/internal/model/types" datatype "github.com/pocket-id/pocket-id/backend/internal/model/types"
) )
func RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) { func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error {
scheduler, err := gocron.NewScheduler()
if err != nil {
log.Fatalf("Failed to create a new scheduler: %s", err)
}
jobs := &DbCleanupJobs{db: db} jobs := &DbCleanupJobs{db: db}
registerJob(ctx, scheduler, "ClearWebauthnSessions", "0 3 * * *", jobs.clearWebauthnSessions) return errors.Join(
registerJob(ctx, scheduler, "ClearOneTimeAccessTokens", "0 3 * * *", jobs.clearOneTimeAccessTokens) s.registerJob(ctx, "ClearWebauthnSessions", "0 3 * * *", jobs.clearWebauthnSessions),
registerJob(ctx, scheduler, "ClearOidcAuthorizationCodes", "0 3 * * *", jobs.clearOidcAuthorizationCodes) s.registerJob(ctx, "ClearOneTimeAccessTokens", "0 3 * * *", jobs.clearOneTimeAccessTokens),
registerJob(ctx, scheduler, "ClearOidcRefreshTokens", "0 3 * * *", jobs.clearOidcRefreshTokens) s.registerJob(ctx, "ClearOidcAuthorizationCodes", "0 3 * * *", jobs.clearOidcAuthorizationCodes),
registerJob(ctx, scheduler, "ClearAuditLogs", "0 3 * * *", jobs.clearAuditLogs) s.registerJob(ctx, "ClearOidcRefreshTokens", "0 3 * * *", jobs.clearOidcRefreshTokens),
scheduler.Start() s.registerJob(ctx, "ClearAuditLogs", "0 3 * * *", jobs.clearAuditLogs),
)
} }
type DbCleanupJobs struct { type DbCleanupJobs struct {

View File

@@ -8,24 +8,16 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/go-co-op/gocron/v2"
"gorm.io/gorm" "gorm.io/gorm"
"github.com/pocket-id/pocket-id/backend/internal/common" "github.com/pocket-id/pocket-id/backend/internal/common"
"github.com/pocket-id/pocket-id/backend/internal/model" "github.com/pocket-id/pocket-id/backend/internal/model"
) )
func RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB) { func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB) error {
scheduler, err := gocron.NewScheduler()
if err != nil {
log.Fatalf("Failed to create a new scheduler: %s", err)
}
jobs := &FileCleanupJobs{db: db} jobs := &FileCleanupJobs{db: db}
registerJob(ctx, scheduler, "ClearUnusedDefaultProfilePictures", "0 2 * * 0", jobs.clearUnusedDefaultProfilePictures) return s.registerJob(ctx, "ClearUnusedDefaultProfilePictures", "0 2 * * 0", jobs.clearUnusedDefaultProfilePictures)
scheduler.Start()
} }
type FileCleanupJobs struct { type FileCleanupJobs struct {

View File

@@ -1,29 +0,0 @@
package job
import (
"context"
"log"
"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"
)
func registerJob(ctx context.Context, scheduler gocron.Scheduler, name string, interval string, job func(ctx context.Context) error) {
_, err := scheduler.NewJob(
gocron.CronJob(interval, false),
gocron.NewTask(job),
gocron.WithContext(ctx),
gocron.WithEventListeners(
gocron.AfterJobRuns(func(jobID uuid.UUID, jobName string) {
log.Printf("Job %q run successfully", name)
}),
gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
log.Printf("Job %q failed with error: %v", name, err)
}),
),
)
if err != nil {
log.Fatalf("Failed to register job %q: %v", name, err)
}
}

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"log" "log"
"github.com/go-co-op/gocron/v2"
"github.com/pocket-id/pocket-id/backend/internal/service" "github.com/pocket-id/pocket-id/backend/internal/service"
) )
@@ -13,24 +12,23 @@ type LdapJobs struct {
appConfigService *service.AppConfigService appConfigService *service.AppConfigService
} }
func RegisterLdapJobs(ctx context.Context, ldapService *service.LdapService, appConfigService *service.AppConfigService) { func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.LdapService, appConfigService *service.AppConfigService) error {
jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService} jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService}
scheduler, err := gocron.NewScheduler()
if err != nil {
log.Fatalf("Failed to create a new scheduler: %v", err)
}
// Register the job to run every hour // Register the job to run every hour
registerJob(ctx, scheduler, "SyncLdap", "0 * * * *", jobs.syncLdap) err := s.registerJob(ctx, "SyncLdap", "0 * * * *", jobs.syncLdap)
if err != nil {
return err
}
// Run the job immediately on startup // Run the job immediately on startup
err = jobs.syncLdap(ctx) err = jobs.syncLdap(ctx)
if err != nil { if err != nil {
// Log the error only, but don't return it
log.Printf("Failed to sync LDAP: %v", err) log.Printf("Failed to sync LDAP: %v", err)
} }
scheduler.Start() return nil
} }
func (j *LdapJobs) syncLdap(ctx context.Context) error { func (j *LdapJobs) syncLdap(ctx context.Context) error {

View File

@@ -0,0 +1,64 @@
package job
import (
"context"
"fmt"
"log"
"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"
)
type Scheduler struct {
scheduler gocron.Scheduler
}
func NewScheduler() (*Scheduler, error) {
scheduler, err := gocron.NewScheduler()
if err != nil {
return nil, fmt.Errorf("failed to create a new scheduler: %w", err)
}
return &Scheduler{
scheduler: scheduler,
}, nil
}
// Run the scheduler.
// This function blocks until the context is canceled.
func (s *Scheduler) Run(ctx context.Context) {
log.Println("Starting job scheduler")
s.scheduler.Start()
// Block until context is canceled
<-ctx.Done()
err := s.scheduler.Shutdown()
if err != nil {
log.Printf("[WARN] Error shutting down job scheduler: %v", err)
} else {
log.Println("Job scheduler shut down")
}
}
func (s *Scheduler) registerJob(ctx context.Context, name string, interval string, job func(ctx context.Context) error) error {
_, err := s.scheduler.NewJob(
gocron.CronJob(interval, false),
gocron.NewTask(job),
gocron.WithContext(ctx),
gocron.WithEventListeners(
gocron.AfterJobRuns(func(jobID uuid.UUID, jobName string) {
log.Printf("Job %q run successfully", name)
}),
gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
log.Printf("Job %q failed with error: %v", name, err)
}),
),
)
if err != nil {
return fmt.Errorf("failed to register job %q: %w", name, err)
}
return nil
}

View File

@@ -0,0 +1,40 @@
package signals
import (
"context"
"log"
"os"
"os/signal"
"syscall"
)
/*
This code is adapted from:
https://github.com/kubernetes-sigs/controller-runtime/blob/8499b67e316a03b260c73f92d0380de8cd2e97a1/pkg/manager/signals/signal.go
Copyright 2017 The Kubernetes Authors.
License: Apache2 (https://github.com/kubernetes-sigs/controller-runtime/blob/8499b67e316a03b260c73f92d0380de8cd2e97a1/LICENSE)
*/
var onlyOneSignalHandler = make(chan struct{})
// SignalContext returns a context that is canceled when the application receives an interrupt signal.
// A second signal forces an immediate shutdown.
func SignalContext(parentCtx context.Context) context.Context {
close(onlyOneSignalHandler) // Panics when called twice
ctx, cancel := context.WithCancel(parentCtx)
sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("Received interrupt signal. Shutting down…")
cancel()
<-sigCh
log.Println("Received a second interrupt signal. Forcing an immediate shutdown.")
os.Exit(1)
}()
return ctx
}