diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 6ab771b..63774f1 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -8,12 +8,9 @@ import ( "llamactl/pkg/backends" "llamactl/pkg/config" "log" - "net/http" "net/http/httputil" - "net/url" "os/exec" "sync" - "sync/atomic" "time" ) @@ -46,23 +43,24 @@ type Process struct { // Logging file logger *Logger `json:"-"` + // Proxy component + proxy *Proxy `json:"-"` // HTTP proxy and request tracking + // internal - cmd *exec.Cmd `json:"-"` // Command to run the instance - ctx context.Context `json:"-"` // Context for managing the instance lifecycle - cancel context.CancelFunc `json:"-"` // Function to cancel the context - stdout io.ReadCloser `json:"-"` // Standard output stream - stderr io.ReadCloser `json:"-"` // Standard error stream - mu sync.RWMutex `json:"-"` // RWMutex for better read/write separation - restarts int `json:"-"` // Number of restarts - proxy *httputil.ReverseProxy `json:"-"` // Reverse proxy for this instance + cmd *exec.Cmd `json:"-"` // Command to run the instance + ctx context.Context `json:"-"` // Context for managing the instance lifecycle + cancel context.CancelFunc `json:"-"` // Function to cancel the context + stdout io.ReadCloser `json:"-"` // Standard output stream + stderr io.ReadCloser `json:"-"` // Standard error stream + mu sync.RWMutex `json:"-"` // RWMutex for better read/write separation + restarts int `json:"-"` // Number of restarts // Restart control restartCancel context.CancelFunc `json:"-"` // Cancel function for pending restarts monitorDone chan struct{} `json:"-"` // Channel to signal monitor goroutine completion - // Timeout management - lastRequestTime atomic.Int64 // Unix timestamp of last request - timeProvider TimeProvider `json:"-"` // Time provider for testing + // Time provider for testing (kept for backward compatibility during refactor) + timeProvider TimeProvider `json:"-"` // Time provider for testing } // NewInstance creates a new instance with the given name, log path, and options @@ -73,7 +71,7 @@ func NewInstance(name string, globalBackendSettings *config.BackendConfig, globa // Create the instance logger logger := NewInstanceLogger(name, globalInstanceSettings.LogsDir) - return &Process{ + instance := &Process{ Name: name, options: options, globalInstanceSettings: globalInstanceSettings, @@ -84,6 +82,11 @@ func NewInstance(name string, globalBackendSettings *config.BackendConfig, globa Status: Stopped, onStatusChange: onStatusChange, } + + // Create Proxy component + instance.proxy = NewProxy(instance) + + return instance } func (i *Process) GetOptions() *CreateInstanceOptions { @@ -149,88 +152,27 @@ func (i *Process) SetOptions(options *CreateInstanceOptions) { options.ValidateAndApplyDefaults(i.Name, i.globalInstanceSettings) i.options = options + // Clear the proxy so it gets recreated with new options - i.proxy = nil + if i.proxy != nil { + i.proxy.clearProxy() + } } // SetTimeProvider sets a custom time provider for testing func (i *Process) SetTimeProvider(tp TimeProvider) { i.timeProvider = tp + if i.proxy != nil { + i.proxy.SetTimeProvider(tp) + } } -// GetProxy returns the reverse proxy for this instance, creating it if needed +// GetProxy returns the reverse proxy for this instance, delegating to Proxy component func (i *Process) GetProxy() (*httputil.ReverseProxy, error) { - i.mu.Lock() - defer i.mu.Unlock() - - if i.proxy != nil { - return i.proxy, nil + if i.proxy == nil { + return nil, fmt.Errorf("instance %s has no proxy component", i.Name) } - - if i.options == nil { - return nil, fmt.Errorf("instance %s has no options set", i.Name) - } - - // Remote instances should not use local proxy - they are handled by RemoteInstanceProxy - if len(i.options.Nodes) > 0 { - return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", i.Name) - } - - var host string - var port int - switch i.options.BackendType { - case backends.BackendTypeLlamaCpp: - if i.options.LlamaServerOptions != nil { - host = i.options.LlamaServerOptions.Host - port = i.options.LlamaServerOptions.Port - } - case backends.BackendTypeMlxLm: - if i.options.MlxServerOptions != nil { - host = i.options.MlxServerOptions.Host - port = i.options.MlxServerOptions.Port - } - case backends.BackendTypeVllm: - if i.options.VllmServerOptions != nil { - host = i.options.VllmServerOptions.Host - port = i.options.VllmServerOptions.Port - } - } - - targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port)) - if err != nil { - return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", i.Name, err) - } - - proxy := httputil.NewSingleHostReverseProxy(targetURL) - - var responseHeaders map[string]string - switch i.options.BackendType { - case backends.BackendTypeLlamaCpp: - responseHeaders = i.globalBackendSettings.LlamaCpp.ResponseHeaders - case backends.BackendTypeVllm: - responseHeaders = i.globalBackendSettings.VLLM.ResponseHeaders - case backends.BackendTypeMlxLm: - responseHeaders = i.globalBackendSettings.MLX.ResponseHeaders - } - proxy.ModifyResponse = func(resp *http.Response) error { - // Remove CORS headers from llama-server response to avoid conflicts - // llamactl will add its own CORS headers - resp.Header.Del("Access-Control-Allow-Origin") - resp.Header.Del("Access-Control-Allow-Methods") - resp.Header.Del("Access-Control-Allow-Headers") - resp.Header.Del("Access-Control-Allow-Credentials") - resp.Header.Del("Access-Control-Max-Age") - resp.Header.Del("Access-Control-Expose-Headers") - - for key, value := range responseHeaders { - resp.Header.Set(key, value) - } - return nil - } - - i.proxy = proxy - - return i.proxy, nil + return i.proxy.GetProxy() } // MarshalJSON implements json.Marshaler for Instance @@ -297,6 +239,9 @@ func (i *Process) UnmarshalJSON(data []byte) error { if i.logger == nil && i.globalInstanceSettings != nil { i.logger = NewInstanceLogger(i.Name, i.globalInstanceSettings.LogsDir) } + if i.proxy == nil { + i.proxy = NewProxy(i) + } return nil } diff --git a/pkg/instance/lifecycle.go b/pkg/instance/lifecycle.go index fa37dc3..dfdeab3 100644 --- a/pkg/instance/lifecycle.go +++ b/pkg/instance/lifecycle.go @@ -36,7 +36,9 @@ func (i *Process) Start() error { } // Initialize last request time to current time when starting - i.lastRequestTime.Store(i.timeProvider.Now().Unix()) + if i.proxy != nil { + i.proxy.UpdateLastRequestTime() + } // Create context before building command (needed for CommandContext) i.ctx, i.cancel = context.WithCancel(context.Background()) @@ -111,9 +113,6 @@ func (i *Process) Stop() error { // Set status to stopped first to signal intentional stop i.SetStatus(Stopped) - // Clean up the proxy - i.proxy = nil - // Get the monitor done channel before releasing the lock monitorDone := i.monitorDone @@ -159,8 +158,13 @@ func (i *Process) Stop() error { return nil } +// LastRequestTime returns the last request time as a Unix timestamp +// Delegates to the Proxy component func (i *Process) LastRequestTime() int64 { - return i.lastRequestTime.Load() + if i.proxy == nil { + return 0 + } + return i.proxy.LastRequestTime() } func (i *Process) WaitForHealthy(timeout int) error { diff --git a/pkg/instance/proxy.go b/pkg/instance/proxy.go new file mode 100644 index 0000000..5727edb --- /dev/null +++ b/pkg/instance/proxy.go @@ -0,0 +1,165 @@ +package instance + +import ( + "fmt" + "llamactl/pkg/backends" + "net/http" + "net/http/httputil" + "net/url" + "sync" + "sync/atomic" +) + +// Proxy manages HTTP reverse proxy and request tracking for an instance. +type Proxy struct { + process *Process // Owner reference - Proxy is owned by Process + + mu sync.RWMutex + proxy *httputil.ReverseProxy + proxyOnce sync.Once + proxyErr error + lastRequestTime atomic.Int64 + timeProvider TimeProvider +} + +// NewProxy creates a new Proxy for the given process +func NewProxy(process *Process) *Proxy { + return &Proxy{ + process: process, + timeProvider: realTimeProvider{}, + } +} + +// GetProxy returns the reverse proxy for this instance, creating it if needed. +// Uses sync.Once to ensure thread-safe one-time initialization. +func (p *Proxy) GetProxy() (*httputil.ReverseProxy, error) { + // sync.Once guarantees buildProxy() is called exactly once + // Other callers block until first initialization completes + p.proxyOnce.Do(func() { + p.proxy, p.proxyErr = p.buildProxy() + }) + + return p.proxy, p.proxyErr +} + +// buildProxy creates the reverse proxy based on instance options +func (p *Proxy) buildProxy() (*httputil.ReverseProxy, error) { + options := p.process.GetOptions() + if options == nil { + return nil, fmt.Errorf("instance %s has no options set", p.process.Name) + } + + // Remote instances should not use local proxy - they are handled by RemoteInstanceProxy + if len(options.Nodes) > 0 { + return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", p.process.Name) + } + + // Get host/port from options + var host string + var port int + switch options.BackendType { + case backends.BackendTypeLlamaCpp: + if options.LlamaServerOptions != nil { + host = options.LlamaServerOptions.Host + port = options.LlamaServerOptions.Port + } + case backends.BackendTypeMlxLm: + if options.MlxServerOptions != nil { + host = options.MlxServerOptions.Host + port = options.MlxServerOptions.Port + } + case backends.BackendTypeVllm: + if options.VllmServerOptions != nil { + host = options.VllmServerOptions.Host + port = options.VllmServerOptions.Port + } + } + + if host == "" { + host = "localhost" + } + + targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port)) + if err != nil { + return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.process.Name, err) + } + + proxy := httputil.NewSingleHostReverseProxy(targetURL) + + // Get response headers from backend config + var responseHeaders map[string]string + switch options.BackendType { + case backends.BackendTypeLlamaCpp: + responseHeaders = p.process.globalBackendSettings.LlamaCpp.ResponseHeaders + case backends.BackendTypeVllm: + responseHeaders = p.process.globalBackendSettings.VLLM.ResponseHeaders + case backends.BackendTypeMlxLm: + responseHeaders = p.process.globalBackendSettings.MLX.ResponseHeaders + } + + proxy.ModifyResponse = func(resp *http.Response) error { + // Remove CORS headers from backend response to avoid conflicts + // llamactl will add its own CORS headers + resp.Header.Del("Access-Control-Allow-Origin") + resp.Header.Del("Access-Control-Allow-Methods") + resp.Header.Del("Access-Control-Allow-Headers") + resp.Header.Del("Access-Control-Allow-Credentials") + resp.Header.Del("Access-Control-Max-Age") + resp.Header.Del("Access-Control-Expose-Headers") + + for key, value := range responseHeaders { + resp.Header.Set(key, value) + } + return nil + } + + return proxy, nil +} + +// clearProxy resets the proxy, allowing it to be recreated when options change. +// This resets the sync.Once so the next GetProxy call will rebuild the proxy. +func (p *Proxy) clearProxy() { + p.mu.Lock() + defer p.mu.Unlock() + + p.proxy = nil + p.proxyErr = nil + p.proxyOnce = sync.Once{} // Reset Once for next GetProxy call +} + +// UpdateLastRequestTime updates the last request access time for the instance +func (p *Proxy) UpdateLastRequestTime() { + lastRequestTime := p.timeProvider.Now().Unix() + p.lastRequestTime.Store(lastRequestTime) +} + +// LastRequestTime returns the last request time as a Unix timestamp +func (p *Proxy) LastRequestTime() int64 { + return p.lastRequestTime.Load() +} + +// ShouldTimeout checks if the instance should timeout based on idle time +func (p *Proxy) ShouldTimeout() bool { + if !p.process.IsRunning() { + return false + } + + options := p.process.GetOptions() + if options == nil || options.IdleTimeout == nil || *options.IdleTimeout <= 0 { + return false + } + + // Check if the last request time exceeds the idle timeout + lastRequest := p.lastRequestTime.Load() + idleTimeoutMinutes := *options.IdleTimeout + + // Convert timeout from minutes to seconds for comparison + idleTimeoutSeconds := int64(idleTimeoutMinutes * 60) + + return (p.timeProvider.Now().Unix() - lastRequest) > idleTimeoutSeconds +} + +// SetTimeProvider sets a custom time provider for testing +func (p *Proxy) SetTimeProvider(tp TimeProvider) { + p.timeProvider = tp +} diff --git a/pkg/instance/timeout.go b/pkg/instance/timeout.go index 94cdc16..acfdc58 100644 --- a/pkg/instance/timeout.go +++ b/pkg/instance/timeout.go @@ -1,28 +1,18 @@ package instance // UpdateLastRequestTime updates the last request access time for the instance via proxy +// Delegates to the Proxy component func (i *Process) UpdateLastRequestTime() { - i.mu.Lock() - defer i.mu.Unlock() - - lastRequestTime := i.timeProvider.Now().Unix() - i.lastRequestTime.Store(lastRequestTime) + if i.proxy != nil { + i.proxy.UpdateLastRequestTime() + } } +// ShouldTimeout checks if the instance should timeout based on idle time +// Delegates to the Proxy component func (i *Process) ShouldTimeout() bool { - i.mu.RLock() - defer i.mu.RUnlock() - - if !i.IsRunning() || i.options.IdleTimeout == nil || *i.options.IdleTimeout <= 0 { + if i.proxy == nil { return false } - - // Check if the last request time exceeds the idle timeout - lastRequest := i.lastRequestTime.Load() - idleTimeoutMinutes := *i.options.IdleTimeout - - // Convert timeout from minutes to seconds for comparison - idleTimeoutSeconds := int64(idleTimeoutMinutes * 60) - - return (i.timeProvider.Now().Unix() - lastRequest) > idleTimeoutSeconds + return i.proxy.ShouldTimeout() }