Merge pull request #78 from lordmathis/feat/inflight-requests

feat: Wait for inflight requests to finish before shutting down an instance
This commit is contained in:
2025-10-30 18:08:55 +01:00
committed by GitHub
12 changed files with 134 additions and 122 deletions

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"llamactl/pkg/config"
"log"
"net/http/httputil"
"net/http"
"time"
)
@@ -182,15 +182,6 @@ func (i *Instance) GetPort() int {
return i.options.GetPort()
}
// GetProxy returns the reverse proxy for this instance
func (i *Instance) GetProxy() (*httputil.ReverseProxy, error) {
if i.proxy == nil {
return nil, fmt.Errorf("instance %s has no proxy component", i.Name)
}
return i.proxy.get()
}
func (i *Instance) IsRemote() bool {
opts := i.GetOptions()
if opts == nil {
@@ -242,6 +233,22 @@ func (i *Instance) ShouldTimeout() bool {
return i.proxy.shouldTimeout()
}
// GetInflightRequests returns the current number of inflight requests
func (i *Instance) GetInflightRequests() int32 {
if i.proxy == nil {
return 0
}
return i.proxy.getInflightRequests()
}
// ServeHTTP serves HTTP requests through the proxy with request tracking and shutdown handling
func (i *Instance) ServeHTTP(w http.ResponseWriter, r *http.Request) error {
if i.proxy == nil {
return fmt.Errorf("instance %s has no proxy component", i.Name)
}
return i.proxy.serveHTTP(w, r)
}
func (i *Instance) getCommand() string {
opts := i.GetOptions()
if opts == nil {

View File

@@ -171,64 +171,6 @@ func TestSetOptions(t *testing.T) {
}
}
func TestGetProxy(t *testing.T) {
globalConfig := &config.AppConfig{
Backends: config.BackendConfig{
LlamaCpp: config.BackendSettings{
Command: "llama-server",
Args: []string{},
},
MLX: config.BackendSettings{
Command: "mlx_lm.server",
Args: []string{},
},
VLLM: config.BackendSettings{
Command: "vllm",
Args: []string{"serve"},
},
},
Instances: config.InstancesConfig{
LogsDir: "/tmp/test",
},
Nodes: map[string]config.NodeConfig{},
LocalNode: "main",
}
options := &instance.Options{
Nodes: map[string]struct{}{"main": {}},
BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp,
LlamaServerOptions: &backends.LlamaServerOptions{
Host: "localhost",
Port: 8080,
},
},
}
// Mock onStatusChange function
mockOnStatusChange := func(oldStatus, newStatus instance.Status) {}
inst := instance.New("test-instance", globalConfig, options, mockOnStatusChange)
// Get proxy for the first time
proxy1, err := inst.GetProxy()
if err != nil {
t.Fatalf("GetProxy failed: %v", err)
}
if proxy1 == nil {
t.Error("Expected proxy to be created")
}
// Get proxy again - should return cached version
proxy2, err := inst.GetProxy()
if err != nil {
t.Fatalf("GetProxy failed: %v", err)
}
if proxy1 != proxy2 {
t.Error("Expected cached proxy to be returned")
}
}
func TestMarshalJSON(t *testing.T) {
globalConfig := &config.AppConfig{
Backends: config.BackendConfig{
@@ -613,11 +555,6 @@ func TestRemoteInstanceOperations(t *testing.T) {
t.Error("Expected error when restarting remote instance")
}
// GetProxy should fail for remote instance
if _, err := inst.GetProxy(); err != nil {
t.Error("Expected no error when getting proxy for remote instance")
}
// GetLogs should fail for remote instance
if _, err := inst.GetLogs(10); err == nil {
t.Error("Expected error when getting logs for remote instance")

View File

@@ -132,14 +132,28 @@ func (p *process) stop() error {
p.restartCancel = nil
}
// Set status to stopped first to signal intentional stop
p.instance.SetStatus(Stopped)
// Set status to ShuttingDown first to reject new requests
p.instance.SetStatus(ShuttingDown)
// Get the monitor done channel before releasing the lock
monitorDone := p.monitorDone
p.mu.Unlock()
// Wait for inflight requests to complete (max 30 seconds)
log.Printf("Instance %s shutting down, waiting for inflight requests to complete...", p.instance.Name)
deadline := time.Now().Add(30 * time.Second)
for time.Now().Before(deadline) {
inflight := p.instance.GetInflightRequests()
if inflight == 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
// Now set status to stopped to signal intentional stop
p.instance.SetStatus(Stopped)
// Stop the process with SIGINT if cmd exists
if p.cmd != nil && p.cmd.Process != nil {
if err := p.cmd.Process.Signal(syscall.SIGINT); err != nil {
@@ -156,6 +170,7 @@ func (p *process) stop() error {
select {
case <-monitorDone:
// Process exited normally
log.Printf("Instance %s shut down gracefully", p.instance.Name)
case <-time.After(30 * time.Second):
// Force kill if it doesn't exit within 30 seconds
if p.cmd != nil && p.cmd.Process != nil {

View File

@@ -37,8 +37,9 @@ type proxy struct {
proxyOnce sync.Once
proxyErr error
lastRequestTime atomic.Int64
timeProvider TimeProvider
lastRequestTime atomic.Int64
inflightRequests atomic.Int32
timeProvider TimeProvider
}
// newProxy creates a new Proxy for the given instance
@@ -153,6 +154,23 @@ func (p *proxy) build() (*httputil.ReverseProxy, error) {
return proxy, nil
}
// serveHTTP handles HTTP requests with inflight tracking
func (p *proxy) serveHTTP(w http.ResponseWriter, r *http.Request) error {
// Get the reverse proxy
reverseProxy, err := p.get()
if err != nil {
return err
}
// Track inflight requests
p.incInflightRequests()
defer p.decInflightRequests()
// Serve the request
reverseProxy.ServeHTTP(w, r)
return nil
}
// clear resets the proxy, allowing it to be recreated when options change.
func (p *proxy) clear() {
p.mu.Lock()
@@ -160,7 +178,7 @@ func (p *proxy) clear() {
p.proxy = nil
p.proxyErr = nil
p.proxyOnce = sync.Once{} // Reset Once for next GetProxy call
p.proxyOnce = sync.Once{}
}
// updateLastRequestTime updates the last request access time for the instance
@@ -199,3 +217,18 @@ func (p *proxy) shouldTimeout() bool {
func (p *proxy) setTimeProvider(tp TimeProvider) {
p.timeProvider = tp
}
// incInflightRequests increments the inflight request counter
func (p *proxy) incInflightRequests() {
p.inflightRequests.Add(1)
}
// decInflightRequests decrements the inflight request counter
func (p *proxy) decInflightRequests() {
p.inflightRequests.Add(-1)
}
// getInflightRequests returns the current number of inflight requests
func (p *proxy) getInflightRequests() int32 {
return p.inflightRequests.Load()
}

View File

@@ -14,20 +14,23 @@ const (
Running
Failed
Restarting
ShuttingDown
)
var nameToStatus = map[string]Status{
"stopped": Stopped,
"running": Running,
"failed": Failed,
"restarting": Restarting,
"stopped": Stopped,
"running": Running,
"failed": Failed,
"restarting": Restarting,
"shutting_down": ShuttingDown,
}
var statusToName = map[Status]string{
Stopped: "stopped",
Running: "running",
Failed: "failed",
Restarting: "restarting",
Stopped: "stopped",
Running: "running",
Failed: "failed",
Restarting: "restarting",
ShuttingDown: "shutting_down",
}
// Status enum JSON marshaling methods

View File

@@ -66,17 +66,16 @@ func (h *Handler) LlamaCppUIProxy() http.HandlerFunc {
return
}
proxy, err := inst.GetProxy()
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to get proxy", err.Error())
return
}
if !inst.IsRemote() {
h.stripLlamaCppPrefix(r, inst.Name)
}
proxy.ServeHTTP(w, r)
// Use instance's ServeHTTP which tracks inflight requests and handles shutting down state
err = inst.ServeHTTP(w, r)
if err != nil {
// Error is already handled in ServeHTTP (response written)
return
}
}
}
@@ -110,6 +109,12 @@ func (h *Handler) LlamaCppProxy() http.HandlerFunc {
return
}
// Check if instance is shutting down before autostart logic
if inst.GetStatus() == instance.ShuttingDown {
writeError(w, http.StatusServiceUnavailable, "instance_shutting_down", "Instance is shutting down")
return
}
if !inst.IsRemote() && !inst.IsRunning() {
err := h.ensureInstanceRunning(inst)
if err != nil {
@@ -118,17 +123,16 @@ func (h *Handler) LlamaCppProxy() http.HandlerFunc {
}
}
proxy, err := inst.GetProxy()
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to get proxy", err.Error())
return
}
if !inst.IsRemote() {
h.stripLlamaCppPrefix(r, inst.Name)
}
proxy.ServeHTTP(w, r)
// Use instance's ServeHTTP which tracks inflight requests and handles shutting down state
err = inst.ServeHTTP(w, r)
if err != nil {
// Error is already handled in ServeHTTP (response written)
return
}
}
}

View File

@@ -332,12 +332,6 @@ func (h *Handler) InstanceProxy() http.HandlerFunc {
return
}
proxy, err := inst.GetProxy()
if err != nil {
writeError(w, http.StatusInternalServerError, "proxy_failed", "Failed to get proxy: "+err.Error())
return
}
if !inst.IsRemote() {
// Strip the "/api/v1/instances/<name>/proxy" prefix from the request URL
prefix := fmt.Sprintf("/api/v1/instances/%s/proxy", inst.Name)
@@ -348,6 +342,11 @@ func (h *Handler) InstanceProxy() http.HandlerFunc {
r.Header.Set("X-Forwarded-Host", r.Header.Get("Host"))
r.Header.Set("X-Forwarded-Proto", "http")
proxy.ServeHTTP(w, r)
// Use instance's ServeHTTP which tracks inflight requests and handles shutting down state
err = inst.ServeHTTP(w, r)
if err != nil {
// Error is already handled in ServeHTTP (response written)
return
}
}
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"io"
"llamactl/pkg/instance"
"llamactl/pkg/validation"
"net/http"
)
@@ -106,6 +107,12 @@ func (h *Handler) OpenAIProxy() http.HandlerFunc {
return
}
// Check if instance is shutting down before autostart logic
if inst.GetStatus() == instance.ShuttingDown {
writeError(w, http.StatusServiceUnavailable, "instance_shutting_down", "Instance is shutting down")
return
}
if !inst.IsRemote() && !inst.IsRunning() {
err := h.ensureInstanceRunning(inst)
if err != nil {
@@ -114,16 +121,15 @@ func (h *Handler) OpenAIProxy() http.HandlerFunc {
}
}
proxy, err := inst.GetProxy()
if err != nil {
writeError(w, http.StatusInternalServerError, "proxy_failed", err.Error())
return
}
// Recreate the request body from the bytes we read
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
r.ContentLength = int64(len(bodyBytes))
proxy.ServeHTTP(w, r)
// Use instance's ServeHTTP which tracks inflight requests and handles shutting down state
err = inst.ServeHTTP(w, r)
if err != nil {
// Error is already handled in ServeHTTP (response written)
return
}
}
}

View File

@@ -21,6 +21,8 @@ const HealthBadge: React.FC<HealthBadgeProps> = ({ health }) => {
return <Loader2 className="h-3 w-3 animate-spin" />;
case "restarting":
return <Loader2 className="h-3 w-3 animate-spin" />;
case "shutting_down":
return <Loader2 className="h-3 w-3 animate-spin" />;
case "stopped":
return <Clock className="h-3 w-3" />;
case "failed":
@@ -36,6 +38,8 @@ const HealthBadge: React.FC<HealthBadgeProps> = ({ health }) => {
return "outline";
case "restarting":
return "outline";
case "shutting_down":
return "outline";
case "stopped":
return "secondary";
case "failed":
@@ -51,6 +55,8 @@ const HealthBadge: React.FC<HealthBadgeProps> = ({ health }) => {
return "Starting";
case "restarting":
return "Restarting";
case "shutting_down":
return "Shutting Down";
case "stopped":
return "Stopped";
case "failed":

View File

@@ -18,7 +18,7 @@ export function useInstanceHealth(instanceName: string, instanceStatus: Instance
// Trigger health check when instance status changes to active states
useEffect(() => {
if (instanceStatus === 'running' || instanceStatus === 'restarting') {
if (instanceStatus === 'running' || instanceStatus === 'restarting' || instanceStatus === 'shutting_down') {
healthService.refreshHealth(instanceName).catch(error => {
console.error(`Failed to refresh health for ${instanceName}:`, error)
})

View File

@@ -5,11 +5,12 @@ type HealthCallback = (health: HealthStatus) => void
// Polling intervals based on health state (in milliseconds)
const POLLING_INTERVALS: Record<HealthState, number> = {
'starting': 5000, // 5 seconds - frequent during startup
'restarting': 5000, // 5 seconds - restart in progress
'ready': 60000, // 60 seconds - stable state
'stopped': 0, // No polling
'failed': 0, // No polling
'starting': 5000, // 5 seconds - frequent during startup
'restarting': 5000, // 5 seconds - restart in progress
'shutting_down': 3000, // 3 seconds - monitor shutdown progress
'ready': 60000, // 60 seconds - stable state
'stopped': 0, // No polling
'failed': 0, // No polling
}
class HealthService {
@@ -96,6 +97,7 @@ class HealthService {
case 'running': return 'starting' // Should not happen as we check HTTP for running
case 'failed': return 'failed'
case 'restarting': return 'restarting'
case 'shutting_down': return 'shutting_down'
}
}

View File

@@ -11,9 +11,9 @@ export const BackendType = {
export type BackendTypeValue = typeof BackendType[keyof typeof BackendType]
export type InstanceStatus = 'running' | 'stopped' | 'failed' | 'restarting'
export type InstanceStatus = 'running' | 'stopped' | 'failed' | 'restarting' | 'shutting_down'
export type HealthState = 'stopped' | 'starting' | 'ready' | 'failed' | 'restarting'
export type HealthState = 'stopped' | 'starting' | 'ready' | 'failed' | 'restarting' | 'shutting_down'
export interface HealthStatus {
state: HealthState