Move proxy to separate struct

This commit is contained in:
2025-10-14 22:01:09 +02:00
parent 02909c5153
commit 92a76bc84b
4 changed files with 214 additions and 110 deletions

View File

@@ -8,12 +8,9 @@ import (
"llamactl/pkg/backends" "llamactl/pkg/backends"
"llamactl/pkg/config" "llamactl/pkg/config"
"log" "log"
"net/http"
"net/http/httputil" "net/http/httputil"
"net/url"
"os/exec" "os/exec"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@@ -46,6 +43,9 @@ type Process struct {
// Logging file // Logging file
logger *Logger `json:"-"` logger *Logger `json:"-"`
// Proxy component
proxy *Proxy `json:"-"` // HTTP proxy and request tracking
// internal // internal
cmd *exec.Cmd `json:"-"` // Command to run the instance cmd *exec.Cmd `json:"-"` // Command to run the instance
ctx context.Context `json:"-"` // Context for managing the instance lifecycle ctx context.Context `json:"-"` // Context for managing the instance lifecycle
@@ -54,14 +54,12 @@ type Process struct {
stderr io.ReadCloser `json:"-"` // Standard error stream stderr io.ReadCloser `json:"-"` // Standard error stream
mu sync.RWMutex `json:"-"` // RWMutex for better read/write separation mu sync.RWMutex `json:"-"` // RWMutex for better read/write separation
restarts int `json:"-"` // Number of restarts restarts int `json:"-"` // Number of restarts
proxy *httputil.ReverseProxy `json:"-"` // Reverse proxy for this instance
// Restart control // Restart control
restartCancel context.CancelFunc `json:"-"` // Cancel function for pending restarts restartCancel context.CancelFunc `json:"-"` // Cancel function for pending restarts
monitorDone chan struct{} `json:"-"` // Channel to signal monitor goroutine completion monitorDone chan struct{} `json:"-"` // Channel to signal monitor goroutine completion
// Timeout management // Time provider for testing (kept for backward compatibility during refactor)
lastRequestTime atomic.Int64 // Unix timestamp of last request
timeProvider TimeProvider `json:"-"` // Time provider for testing timeProvider TimeProvider `json:"-"` // Time provider for testing
} }
@@ -73,7 +71,7 @@ func NewInstance(name string, globalBackendSettings *config.BackendConfig, globa
// Create the instance logger // Create the instance logger
logger := NewInstanceLogger(name, globalInstanceSettings.LogsDir) logger := NewInstanceLogger(name, globalInstanceSettings.LogsDir)
return &Process{ instance := &Process{
Name: name, Name: name,
options: options, options: options,
globalInstanceSettings: globalInstanceSettings, globalInstanceSettings: globalInstanceSettings,
@@ -84,6 +82,11 @@ func NewInstance(name string, globalBackendSettings *config.BackendConfig, globa
Status: Stopped, Status: Stopped,
onStatusChange: onStatusChange, onStatusChange: onStatusChange,
} }
// Create Proxy component
instance.proxy = NewProxy(instance)
return instance
} }
func (i *Process) GetOptions() *CreateInstanceOptions { func (i *Process) GetOptions() *CreateInstanceOptions {
@@ -149,88 +152,27 @@ func (i *Process) SetOptions(options *CreateInstanceOptions) {
options.ValidateAndApplyDefaults(i.Name, i.globalInstanceSettings) options.ValidateAndApplyDefaults(i.Name, i.globalInstanceSettings)
i.options = options i.options = options
// Clear the proxy so it gets recreated with new options // Clear the proxy so it gets recreated with new options
i.proxy = nil if i.proxy != nil {
i.proxy.clearProxy()
}
} }
// SetTimeProvider sets a custom time provider for testing // SetTimeProvider sets a custom time provider for testing
func (i *Process) SetTimeProvider(tp TimeProvider) { func (i *Process) SetTimeProvider(tp TimeProvider) {
i.timeProvider = tp i.timeProvider = tp
}
// GetProxy returns the reverse proxy for this instance, creating it if needed
func (i *Process) GetProxy() (*httputil.ReverseProxy, error) {
i.mu.Lock()
defer i.mu.Unlock()
if i.proxy != nil { if i.proxy != nil {
return i.proxy, nil i.proxy.SetTimeProvider(tp)
}
if i.options == nil {
return nil, fmt.Errorf("instance %s has no options set", i.Name)
}
// Remote instances should not use local proxy - they are handled by RemoteInstanceProxy
if len(i.options.Nodes) > 0 {
return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", i.Name)
}
var host string
var port int
switch i.options.BackendType {
case backends.BackendTypeLlamaCpp:
if i.options.LlamaServerOptions != nil {
host = i.options.LlamaServerOptions.Host
port = i.options.LlamaServerOptions.Port
}
case backends.BackendTypeMlxLm:
if i.options.MlxServerOptions != nil {
host = i.options.MlxServerOptions.Host
port = i.options.MlxServerOptions.Port
}
case backends.BackendTypeVllm:
if i.options.VllmServerOptions != nil {
host = i.options.VllmServerOptions.Host
port = i.options.VllmServerOptions.Port
} }
} }
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port)) // GetProxy returns the reverse proxy for this instance, delegating to Proxy component
if err != nil { func (i *Process) GetProxy() (*httputil.ReverseProxy, error) {
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", i.Name, err) if i.proxy == nil {
return nil, fmt.Errorf("instance %s has no proxy component", i.Name)
} }
return i.proxy.GetProxy()
proxy := httputil.NewSingleHostReverseProxy(targetURL)
var responseHeaders map[string]string
switch i.options.BackendType {
case backends.BackendTypeLlamaCpp:
responseHeaders = i.globalBackendSettings.LlamaCpp.ResponseHeaders
case backends.BackendTypeVllm:
responseHeaders = i.globalBackendSettings.VLLM.ResponseHeaders
case backends.BackendTypeMlxLm:
responseHeaders = i.globalBackendSettings.MLX.ResponseHeaders
}
proxy.ModifyResponse = func(resp *http.Response) error {
// Remove CORS headers from llama-server response to avoid conflicts
// llamactl will add its own CORS headers
resp.Header.Del("Access-Control-Allow-Origin")
resp.Header.Del("Access-Control-Allow-Methods")
resp.Header.Del("Access-Control-Allow-Headers")
resp.Header.Del("Access-Control-Allow-Credentials")
resp.Header.Del("Access-Control-Max-Age")
resp.Header.Del("Access-Control-Expose-Headers")
for key, value := range responseHeaders {
resp.Header.Set(key, value)
}
return nil
}
i.proxy = proxy
return i.proxy, nil
} }
// MarshalJSON implements json.Marshaler for Instance // MarshalJSON implements json.Marshaler for Instance
@@ -297,6 +239,9 @@ func (i *Process) UnmarshalJSON(data []byte) error {
if i.logger == nil && i.globalInstanceSettings != nil { if i.logger == nil && i.globalInstanceSettings != nil {
i.logger = NewInstanceLogger(i.Name, i.globalInstanceSettings.LogsDir) i.logger = NewInstanceLogger(i.Name, i.globalInstanceSettings.LogsDir)
} }
if i.proxy == nil {
i.proxy = NewProxy(i)
}
return nil return nil
} }

View File

@@ -36,7 +36,9 @@ func (i *Process) Start() error {
} }
// Initialize last request time to current time when starting // Initialize last request time to current time when starting
i.lastRequestTime.Store(i.timeProvider.Now().Unix()) if i.proxy != nil {
i.proxy.UpdateLastRequestTime()
}
// Create context before building command (needed for CommandContext) // Create context before building command (needed for CommandContext)
i.ctx, i.cancel = context.WithCancel(context.Background()) i.ctx, i.cancel = context.WithCancel(context.Background())
@@ -111,9 +113,6 @@ func (i *Process) Stop() error {
// Set status to stopped first to signal intentional stop // Set status to stopped first to signal intentional stop
i.SetStatus(Stopped) i.SetStatus(Stopped)
// Clean up the proxy
i.proxy = nil
// Get the monitor done channel before releasing the lock // Get the monitor done channel before releasing the lock
monitorDone := i.monitorDone monitorDone := i.monitorDone
@@ -159,8 +158,13 @@ func (i *Process) Stop() error {
return nil return nil
} }
// LastRequestTime returns the last request time as a Unix timestamp
// Delegates to the Proxy component
func (i *Process) LastRequestTime() int64 { func (i *Process) LastRequestTime() int64 {
return i.lastRequestTime.Load() if i.proxy == nil {
return 0
}
return i.proxy.LastRequestTime()
} }
func (i *Process) WaitForHealthy(timeout int) error { func (i *Process) WaitForHealthy(timeout int) error {

165
pkg/instance/proxy.go Normal file
View File

@@ -0,0 +1,165 @@
package instance
import (
"fmt"
"llamactl/pkg/backends"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"sync/atomic"
)
// Proxy manages HTTP reverse proxy and request tracking for an instance.
type Proxy struct {
process *Process // Owner reference - Proxy is owned by Process
mu sync.RWMutex
proxy *httputil.ReverseProxy
proxyOnce sync.Once
proxyErr error
lastRequestTime atomic.Int64
timeProvider TimeProvider
}
// NewProxy creates a new Proxy for the given process
func NewProxy(process *Process) *Proxy {
return &Proxy{
process: process,
timeProvider: realTimeProvider{},
}
}
// GetProxy returns the reverse proxy for this instance, creating it if needed.
// Uses sync.Once to ensure thread-safe one-time initialization.
func (p *Proxy) GetProxy() (*httputil.ReverseProxy, error) {
// sync.Once guarantees buildProxy() is called exactly once
// Other callers block until first initialization completes
p.proxyOnce.Do(func() {
p.proxy, p.proxyErr = p.buildProxy()
})
return p.proxy, p.proxyErr
}
// buildProxy creates the reverse proxy based on instance options
func (p *Proxy) buildProxy() (*httputil.ReverseProxy, error) {
options := p.process.GetOptions()
if options == nil {
return nil, fmt.Errorf("instance %s has no options set", p.process.Name)
}
// Remote instances should not use local proxy - they are handled by RemoteInstanceProxy
if len(options.Nodes) > 0 {
return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", p.process.Name)
}
// Get host/port from options
var host string
var port int
switch options.BackendType {
case backends.BackendTypeLlamaCpp:
if options.LlamaServerOptions != nil {
host = options.LlamaServerOptions.Host
port = options.LlamaServerOptions.Port
}
case backends.BackendTypeMlxLm:
if options.MlxServerOptions != nil {
host = options.MlxServerOptions.Host
port = options.MlxServerOptions.Port
}
case backends.BackendTypeVllm:
if options.VllmServerOptions != nil {
host = options.VllmServerOptions.Host
port = options.VllmServerOptions.Port
}
}
if host == "" {
host = "localhost"
}
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port))
if err != nil {
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.process.Name, err)
}
proxy := httputil.NewSingleHostReverseProxy(targetURL)
// Get response headers from backend config
var responseHeaders map[string]string
switch options.BackendType {
case backends.BackendTypeLlamaCpp:
responseHeaders = p.process.globalBackendSettings.LlamaCpp.ResponseHeaders
case backends.BackendTypeVllm:
responseHeaders = p.process.globalBackendSettings.VLLM.ResponseHeaders
case backends.BackendTypeMlxLm:
responseHeaders = p.process.globalBackendSettings.MLX.ResponseHeaders
}
proxy.ModifyResponse = func(resp *http.Response) error {
// Remove CORS headers from backend response to avoid conflicts
// llamactl will add its own CORS headers
resp.Header.Del("Access-Control-Allow-Origin")
resp.Header.Del("Access-Control-Allow-Methods")
resp.Header.Del("Access-Control-Allow-Headers")
resp.Header.Del("Access-Control-Allow-Credentials")
resp.Header.Del("Access-Control-Max-Age")
resp.Header.Del("Access-Control-Expose-Headers")
for key, value := range responseHeaders {
resp.Header.Set(key, value)
}
return nil
}
return proxy, nil
}
// clearProxy resets the proxy, allowing it to be recreated when options change.
// This resets the sync.Once so the next GetProxy call will rebuild the proxy.
func (p *Proxy) clearProxy() {
p.mu.Lock()
defer p.mu.Unlock()
p.proxy = nil
p.proxyErr = nil
p.proxyOnce = sync.Once{} // Reset Once for next GetProxy call
}
// UpdateLastRequestTime updates the last request access time for the instance
func (p *Proxy) UpdateLastRequestTime() {
lastRequestTime := p.timeProvider.Now().Unix()
p.lastRequestTime.Store(lastRequestTime)
}
// LastRequestTime returns the last request time as a Unix timestamp
func (p *Proxy) LastRequestTime() int64 {
return p.lastRequestTime.Load()
}
// ShouldTimeout checks if the instance should timeout based on idle time
func (p *Proxy) ShouldTimeout() bool {
if !p.process.IsRunning() {
return false
}
options := p.process.GetOptions()
if options == nil || options.IdleTimeout == nil || *options.IdleTimeout <= 0 {
return false
}
// Check if the last request time exceeds the idle timeout
lastRequest := p.lastRequestTime.Load()
idleTimeoutMinutes := *options.IdleTimeout
// Convert timeout from minutes to seconds for comparison
idleTimeoutSeconds := int64(idleTimeoutMinutes * 60)
return (p.timeProvider.Now().Unix() - lastRequest) > idleTimeoutSeconds
}
// SetTimeProvider sets a custom time provider for testing
func (p *Proxy) SetTimeProvider(tp TimeProvider) {
p.timeProvider = tp
}

View File

@@ -1,28 +1,18 @@
package instance package instance
// UpdateLastRequestTime updates the last request access time for the instance via proxy // UpdateLastRequestTime updates the last request access time for the instance via proxy
// Delegates to the Proxy component
func (i *Process) UpdateLastRequestTime() { func (i *Process) UpdateLastRequestTime() {
i.mu.Lock() if i.proxy != nil {
defer i.mu.Unlock() i.proxy.UpdateLastRequestTime()
}
lastRequestTime := i.timeProvider.Now().Unix()
i.lastRequestTime.Store(lastRequestTime)
} }
// ShouldTimeout checks if the instance should timeout based on idle time
// Delegates to the Proxy component
func (i *Process) ShouldTimeout() bool { func (i *Process) ShouldTimeout() bool {
i.mu.RLock() if i.proxy == nil {
defer i.mu.RUnlock()
if !i.IsRunning() || i.options.IdleTimeout == nil || *i.options.IdleTimeout <= 0 {
return false return false
} }
return i.proxy.ShouldTimeout()
// Check if the last request time exceeds the idle timeout
lastRequest := i.lastRequestTime.Load()
idleTimeoutMinutes := *i.options.IdleTimeout
// Convert timeout from minutes to seconds for comparison
idleTimeoutSeconds := int64(idleTimeoutMinutes * 60)
return (i.timeProvider.Now().Unix() - lastRequest) > idleTimeoutSeconds
} }