From 7ee22fee5160bf90ca0d79e6052b984b38824d92 Mon Sep 17 00:00:00 2001 From: LordMathis Date: Tue, 28 Oct 2025 23:53:11 +0100 Subject: [PATCH 1/7] Implement shutting down status --- pkg/instance/status.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/instance/status.go b/pkg/instance/status.go index c05d460..9230c90 100644 --- a/pkg/instance/status.go +++ b/pkg/instance/status.go @@ -14,20 +14,23 @@ const ( Running Failed Restarting + ShuttingDown ) var nameToStatus = map[string]Status{ - "stopped": Stopped, - "running": Running, - "failed": Failed, - "restarting": Restarting, + "stopped": Stopped, + "running": Running, + "failed": Failed, + "restarting": Restarting, + "shutting_down": ShuttingDown, } var statusToName = map[Status]string{ - Stopped: "stopped", - Running: "running", - Failed: "failed", - Restarting: "restarting", + Stopped: "stopped", + Running: "running", + Failed: "failed", + Restarting: "restarting", + ShuttingDown: "shutting_down", } // Status enum JSON marshaling methods From 2e5644db53a153d62d0e5e96fb040039d77d74a2 Mon Sep 17 00:00:00 2001 From: LordMathis Date: Tue, 28 Oct 2025 23:59:02 +0100 Subject: [PATCH 2/7] Implement inflight request tracking --- pkg/instance/proxy.go | 47 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/pkg/instance/proxy.go b/pkg/instance/proxy.go index d261ddf..a61dfb6 100644 --- a/pkg/instance/proxy.go +++ b/pkg/instance/proxy.go @@ -37,8 +37,9 @@ type proxy struct { proxyOnce sync.Once proxyErr error - lastRequestTime atomic.Int64 - timeProvider TimeProvider + lastRequestTime atomic.Int64 + inflightRequests atomic.Int32 + timeProvider TimeProvider } // newProxy creates a new Proxy for the given instance @@ -153,6 +154,31 @@ func (p *proxy) build() (*httputil.ReverseProxy, error) { return proxy, nil } +// serveHTTP handles HTTP requests with inflight tracking and shutting down state checks +func (p *proxy) serveHTTP(w http.ResponseWriter, r *http.Request) error { + // Check if instance is shutting down + status := p.instance.GetStatus() + if status == ShuttingDown { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("Instance is shutting down")) + return fmt.Errorf("instance is shutting down") + } + + // Get the reverse proxy + reverseProxy, err := p.get() + if err != nil { + return err + } + + // Track inflight requests + p.incInflightRequests() + defer p.decInflightRequests() + + // Serve the request + reverseProxy.ServeHTTP(w, r) + return nil +} + // clear resets the proxy, allowing it to be recreated when options change. func (p *proxy) clear() { p.mu.Lock() @@ -160,7 +186,7 @@ func (p *proxy) clear() { p.proxy = nil p.proxyErr = nil - p.proxyOnce = sync.Once{} // Reset Once for next GetProxy call + p.proxyOnce = sync.Once{} } // updateLastRequestTime updates the last request access time for the instance @@ -199,3 +225,18 @@ func (p *proxy) shouldTimeout() bool { func (p *proxy) setTimeProvider(tp TimeProvider) { p.timeProvider = tp } + +// incInflightRequests increments the inflight request counter +func (p *proxy) incInflightRequests() { + p.inflightRequests.Add(1) +} + +// decInflightRequests decrements the inflight request counter +func (p *proxy) decInflightRequests() { + p.inflightRequests.Add(-1) +} + +// getInflightRequests returns the current number of inflight requests +func (p *proxy) getInflightRequests() int32 { + return p.inflightRequests.Load() +} From 2b94244c8a71ea6c4dc519ce27fa3715927aa19a Mon Sep 17 00:00:00 2001 From: LordMathis Date: Wed, 29 Oct 2025 00:00:02 +0100 Subject: [PATCH 3/7] Replace GetProxy with ServeHttp in instance --- pkg/instance/instance.go | 27 +++++++++------ pkg/instance/instance_test.go | 63 ----------------------------------- 2 files changed, 17 insertions(+), 73 deletions(-) diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 7366a03..5e5dc27 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -5,7 +5,7 @@ import ( "fmt" "llamactl/pkg/config" "log" - "net/http/httputil" + "net/http" "time" ) @@ -182,15 +182,6 @@ func (i *Instance) GetPort() int { return i.options.GetPort() } -// GetProxy returns the reverse proxy for this instance -func (i *Instance) GetProxy() (*httputil.ReverseProxy, error) { - if i.proxy == nil { - return nil, fmt.Errorf("instance %s has no proxy component", i.Name) - } - - return i.proxy.get() -} - func (i *Instance) IsRemote() bool { opts := i.GetOptions() if opts == nil { @@ -242,6 +233,22 @@ func (i *Instance) ShouldTimeout() bool { return i.proxy.shouldTimeout() } +// GetInflightRequests returns the current number of inflight requests +func (i *Instance) GetInflightRequests() int32 { + if i.proxy == nil { + return 0 + } + return i.proxy.getInflightRequests() +} + +// ServeHTTP serves HTTP requests through the proxy with request tracking and shutdown handling +func (i *Instance) ServeHTTP(w http.ResponseWriter, r *http.Request) error { + if i.proxy == nil { + return fmt.Errorf("instance %s has no proxy component", i.Name) + } + return i.proxy.serveHTTP(w, r) +} + func (i *Instance) getCommand() string { opts := i.GetOptions() if opts == nil { diff --git a/pkg/instance/instance_test.go b/pkg/instance/instance_test.go index a843845..122cc96 100644 --- a/pkg/instance/instance_test.go +++ b/pkg/instance/instance_test.go @@ -171,64 +171,6 @@ func TestSetOptions(t *testing.T) { } } -func TestGetProxy(t *testing.T) { - globalConfig := &config.AppConfig{ - Backends: config.BackendConfig{ - LlamaCpp: config.BackendSettings{ - Command: "llama-server", - Args: []string{}, - }, - MLX: config.BackendSettings{ - Command: "mlx_lm.server", - Args: []string{}, - }, - VLLM: config.BackendSettings{ - Command: "vllm", - Args: []string{"serve"}, - }, - }, - Instances: config.InstancesConfig{ - LogsDir: "/tmp/test", - }, - Nodes: map[string]config.NodeConfig{}, - LocalNode: "main", - } - - options := &instance.Options{ - Nodes: map[string]struct{}{"main": {}}, - BackendOptions: backends.Options{ - BackendType: backends.BackendTypeLlamaCpp, - LlamaServerOptions: &backends.LlamaServerOptions{ - Host: "localhost", - Port: 8080, - }, - }, - } - - // Mock onStatusChange function - mockOnStatusChange := func(oldStatus, newStatus instance.Status) {} - - inst := instance.New("test-instance", globalConfig, options, mockOnStatusChange) - - // Get proxy for the first time - proxy1, err := inst.GetProxy() - if err != nil { - t.Fatalf("GetProxy failed: %v", err) - } - if proxy1 == nil { - t.Error("Expected proxy to be created") - } - - // Get proxy again - should return cached version - proxy2, err := inst.GetProxy() - if err != nil { - t.Fatalf("GetProxy failed: %v", err) - } - if proxy1 != proxy2 { - t.Error("Expected cached proxy to be returned") - } -} - func TestMarshalJSON(t *testing.T) { globalConfig := &config.AppConfig{ Backends: config.BackendConfig{ @@ -613,11 +555,6 @@ func TestRemoteInstanceOperations(t *testing.T) { t.Error("Expected error when restarting remote instance") } - // GetProxy should fail for remote instance - if _, err := inst.GetProxy(); err != nil { - t.Error("Expected no error when getting proxy for remote instance") - } - // GetLogs should fail for remote instance if _, err := inst.GetLogs(10); err == nil { t.Error("Expected error when getting logs for remote instance") From d65c5ab717c658426d6b179503d486f03e0d330d Mon Sep 17 00:00:00 2001 From: LordMathis Date: Wed, 29 Oct 2025 00:00:33 +0100 Subject: [PATCH 4/7] Wait for inflight requests before stopping --- pkg/instance/process.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/instance/process.go b/pkg/instance/process.go index 94e057f..408e45c 100644 --- a/pkg/instance/process.go +++ b/pkg/instance/process.go @@ -132,14 +132,28 @@ func (p *process) stop() error { p.restartCancel = nil } - // Set status to stopped first to signal intentional stop - p.instance.SetStatus(Stopped) + // Set status to ShuttingDown first to reject new requests + p.instance.SetStatus(ShuttingDown) // Get the monitor done channel before releasing the lock monitorDone := p.monitorDone p.mu.Unlock() + // Wait for inflight requests to complete (max 30 seconds) + log.Printf("Instance %s shutting down, waiting for inflight requests to complete...", p.instance.Name) + deadline := time.Now().Add(30 * time.Second) + for time.Now().Before(deadline) { + inflight := p.instance.GetInflightRequests() + if inflight == 0 { + break + } + time.Sleep(100 * time.Millisecond) + } + + // Now set status to stopped to signal intentional stop + p.instance.SetStatus(Stopped) + // Stop the process with SIGINT if cmd exists if p.cmd != nil && p.cmd.Process != nil { if err := p.cmd.Process.Signal(syscall.SIGINT); err != nil { @@ -156,6 +170,7 @@ func (p *process) stop() error { select { case <-monitorDone: // Process exited normally + log.Printf("Instance %s shut down gracefully", p.instance.Name) case <-time.After(30 * time.Second): // Force kill if it doesn't exit within 30 seconds if p.cmd != nil && p.cmd.Process != nil { From 77c0e22fd09968bc646f77673ebd5a0867e04d1e Mon Sep 17 00:00:00 2001 From: LordMathis Date: Wed, 29 Oct 2025 00:01:29 +0100 Subject: [PATCH 5/7] Use instance's ServeHTTP in handlers --- pkg/server/handlers_backends.go | 26 ++++++++++++-------------- pkg/server/handlers_instances.go | 13 ++++++------- pkg/server/handlers_openai.go | 13 ++++++------- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/pkg/server/handlers_backends.go b/pkg/server/handlers_backends.go index 390ecb0..a60c6ce 100644 --- a/pkg/server/handlers_backends.go +++ b/pkg/server/handlers_backends.go @@ -66,17 +66,16 @@ func (h *Handler) LlamaCppUIProxy() http.HandlerFunc { return } - proxy, err := inst.GetProxy() - if err != nil { - writeError(w, http.StatusInternalServerError, "failed to get proxy", err.Error()) - return - } - if !inst.IsRemote() { h.stripLlamaCppPrefix(r, inst.Name) } - proxy.ServeHTTP(w, r) + // Use instance's ServeHTTP which tracks inflight requests and handles shutting down state + err = inst.ServeHTTP(w, r) + if err != nil { + // Error is already handled in ServeHTTP (response written) + return + } } } @@ -118,17 +117,16 @@ func (h *Handler) LlamaCppProxy() http.HandlerFunc { } } - proxy, err := inst.GetProxy() - if err != nil { - writeError(w, http.StatusInternalServerError, "failed to get proxy", err.Error()) - return - } - if !inst.IsRemote() { h.stripLlamaCppPrefix(r, inst.Name) } - proxy.ServeHTTP(w, r) + // Use instance's ServeHTTP which tracks inflight requests and handles shutting down state + err = inst.ServeHTTP(w, r) + if err != nil { + // Error is already handled in ServeHTTP (response written) + return + } } } diff --git a/pkg/server/handlers_instances.go b/pkg/server/handlers_instances.go index 0480f22..43bed3e 100644 --- a/pkg/server/handlers_instances.go +++ b/pkg/server/handlers_instances.go @@ -332,12 +332,6 @@ func (h *Handler) InstanceProxy() http.HandlerFunc { return } - proxy, err := inst.GetProxy() - if err != nil { - writeError(w, http.StatusInternalServerError, "proxy_failed", "Failed to get proxy: "+err.Error()) - return - } - if !inst.IsRemote() { // Strip the "/api/v1/instances//proxy" prefix from the request URL prefix := fmt.Sprintf("/api/v1/instances/%s/proxy", inst.Name) @@ -348,6 +342,11 @@ func (h *Handler) InstanceProxy() http.HandlerFunc { r.Header.Set("X-Forwarded-Host", r.Header.Get("Host")) r.Header.Set("X-Forwarded-Proto", "http") - proxy.ServeHTTP(w, r) + // Use instance's ServeHTTP which tracks inflight requests and handles shutting down state + err = inst.ServeHTTP(w, r) + if err != nil { + // Error is already handled in ServeHTTP (response written) + return + } } } diff --git a/pkg/server/handlers_openai.go b/pkg/server/handlers_openai.go index d221200..0937e6a 100644 --- a/pkg/server/handlers_openai.go +++ b/pkg/server/handlers_openai.go @@ -114,16 +114,15 @@ func (h *Handler) OpenAIProxy() http.HandlerFunc { } } - proxy, err := inst.GetProxy() - if err != nil { - writeError(w, http.StatusInternalServerError, "proxy_failed", err.Error()) - return - } - // Recreate the request body from the bytes we read r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) r.ContentLength = int64(len(bodyBytes)) - proxy.ServeHTTP(w, r) + // Use instance's ServeHTTP which tracks inflight requests and handles shutting down state + err = inst.ServeHTTP(w, r) + if err != nil { + // Error is already handled in ServeHTTP (response written) + return + } } } From c340439306c94b0bc6c924166633ad4fecca9fd5 Mon Sep 17 00:00:00 2001 From: LordMathis Date: Wed, 29 Oct 2025 00:09:18 +0100 Subject: [PATCH 6/7] Add support for 'shutting_down' state in HealthBadge and health service --- webui/src/components/HealthBadge.tsx | 6 ++++++ webui/src/hooks/useInstanceHealth.ts | 2 +- webui/src/lib/healthService.ts | 12 +++++++----- webui/src/types/instance.ts | 4 ++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/webui/src/components/HealthBadge.tsx b/webui/src/components/HealthBadge.tsx index 13eda9f..6cfa02a 100644 --- a/webui/src/components/HealthBadge.tsx +++ b/webui/src/components/HealthBadge.tsx @@ -21,6 +21,8 @@ const HealthBadge: React.FC = ({ health }) => { return ; case "restarting": return ; + case "shutting_down": + return ; case "stopped": return ; case "failed": @@ -36,6 +38,8 @@ const HealthBadge: React.FC = ({ health }) => { return "outline"; case "restarting": return "outline"; + case "shutting_down": + return "outline"; case "stopped": return "secondary"; case "failed": @@ -51,6 +55,8 @@ const HealthBadge: React.FC = ({ health }) => { return "Starting"; case "restarting": return "Restarting"; + case "shutting_down": + return "Shutting Down"; case "stopped": return "Stopped"; case "failed": diff --git a/webui/src/hooks/useInstanceHealth.ts b/webui/src/hooks/useInstanceHealth.ts index 87d818c..2cea936 100644 --- a/webui/src/hooks/useInstanceHealth.ts +++ b/webui/src/hooks/useInstanceHealth.ts @@ -18,7 +18,7 @@ export function useInstanceHealth(instanceName: string, instanceStatus: Instance // Trigger health check when instance status changes to active states useEffect(() => { - if (instanceStatus === 'running' || instanceStatus === 'restarting') { + if (instanceStatus === 'running' || instanceStatus === 'restarting' || instanceStatus === 'shutting_down') { healthService.refreshHealth(instanceName).catch(error => { console.error(`Failed to refresh health for ${instanceName}:`, error) }) diff --git a/webui/src/lib/healthService.ts b/webui/src/lib/healthService.ts index aabea0c..7d47486 100644 --- a/webui/src/lib/healthService.ts +++ b/webui/src/lib/healthService.ts @@ -5,11 +5,12 @@ type HealthCallback = (health: HealthStatus) => void // Polling intervals based on health state (in milliseconds) const POLLING_INTERVALS: Record = { - 'starting': 5000, // 5 seconds - frequent during startup - 'restarting': 5000, // 5 seconds - restart in progress - 'ready': 60000, // 60 seconds - stable state - 'stopped': 0, // No polling - 'failed': 0, // No polling + 'starting': 5000, // 5 seconds - frequent during startup + 'restarting': 5000, // 5 seconds - restart in progress + 'shutting_down': 3000, // 3 seconds - monitor shutdown progress + 'ready': 60000, // 60 seconds - stable state + 'stopped': 0, // No polling + 'failed': 0, // No polling } class HealthService { @@ -96,6 +97,7 @@ class HealthService { case 'running': return 'starting' // Should not happen as we check HTTP for running case 'failed': return 'failed' case 'restarting': return 'restarting' + case 'shutting_down': return 'shutting_down' } } diff --git a/webui/src/types/instance.ts b/webui/src/types/instance.ts index 1c42547..e243b72 100644 --- a/webui/src/types/instance.ts +++ b/webui/src/types/instance.ts @@ -11,9 +11,9 @@ export const BackendType = { export type BackendTypeValue = typeof BackendType[keyof typeof BackendType] -export type InstanceStatus = 'running' | 'stopped' | 'failed' | 'restarting' +export type InstanceStatus = 'running' | 'stopped' | 'failed' | 'restarting' | 'shutting_down' -export type HealthState = 'stopped' | 'starting' | 'ready' | 'failed' | 'restarting' +export type HealthState = 'stopped' | 'starting' | 'ready' | 'failed' | 'restarting' | 'shutting_down' export interface HealthStatus { state: HealthState From 560850f86d73c1a6e8096322fddf5b76eddd9a92 Mon Sep 17 00:00:00 2001 From: LordMathis Date: Thu, 30 Oct 2025 18:00:59 +0100 Subject: [PATCH 7/7] Add shutdown state checks in HTTP handlers --- pkg/instance/proxy.go | 10 +--------- pkg/server/handlers_backends.go | 6 ++++++ pkg/server/handlers_openai.go | 7 +++++++ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/instance/proxy.go b/pkg/instance/proxy.go index a61dfb6..df86744 100644 --- a/pkg/instance/proxy.go +++ b/pkg/instance/proxy.go @@ -154,16 +154,8 @@ func (p *proxy) build() (*httputil.ReverseProxy, error) { return proxy, nil } -// serveHTTP handles HTTP requests with inflight tracking and shutting down state checks +// serveHTTP handles HTTP requests with inflight tracking func (p *proxy) serveHTTP(w http.ResponseWriter, r *http.Request) error { - // Check if instance is shutting down - status := p.instance.GetStatus() - if status == ShuttingDown { - w.WriteHeader(http.StatusServiceUnavailable) - w.Write([]byte("Instance is shutting down")) - return fmt.Errorf("instance is shutting down") - } - // Get the reverse proxy reverseProxy, err := p.get() if err != nil { diff --git a/pkg/server/handlers_backends.go b/pkg/server/handlers_backends.go index a60c6ce..1e249f9 100644 --- a/pkg/server/handlers_backends.go +++ b/pkg/server/handlers_backends.go @@ -109,6 +109,12 @@ func (h *Handler) LlamaCppProxy() http.HandlerFunc { return } + // Check if instance is shutting down before autostart logic + if inst.GetStatus() == instance.ShuttingDown { + writeError(w, http.StatusServiceUnavailable, "instance_shutting_down", "Instance is shutting down") + return + } + if !inst.IsRemote() && !inst.IsRunning() { err := h.ensureInstanceRunning(inst) if err != nil { diff --git a/pkg/server/handlers_openai.go b/pkg/server/handlers_openai.go index 0937e6a..81aa9e7 100644 --- a/pkg/server/handlers_openai.go +++ b/pkg/server/handlers_openai.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "io" + "llamactl/pkg/instance" "llamactl/pkg/validation" "net/http" ) @@ -106,6 +107,12 @@ func (h *Handler) OpenAIProxy() http.HandlerFunc { return } + // Check if instance is shutting down before autostart logic + if inst.GetStatus() == instance.ShuttingDown { + writeError(w, http.StatusServiceUnavailable, "instance_shutting_down", "Instance is shutting down") + return + } + if !inst.IsRemote() && !inst.IsRunning() { err := h.ensureInstanceRunning(inst) if err != nil {