Merge pull request #71 from lordmathis/refactor/proxy

refactor: Move all proxy handling to instance package
This commit is contained in:
2025-10-25 16:32:32 +02:00
committed by GitHub
12 changed files with 352 additions and 422 deletions

View File

@@ -58,7 +58,7 @@ func main() {
} }
// Initialize the instance manager // Initialize the instance manager
instanceManager := manager.New(cfg.Backends, cfg.Instances, cfg.Nodes, cfg.LocalNode) instanceManager := manager.New(&cfg)
// Create a new handler with the instance manager // Create a new handler with the instance manager
handler := server.NewHandler(instanceManager, cfg) handler := server.NewHandler(instanceManager, cfg)

View File

@@ -17,6 +17,7 @@ type Instance struct {
// Global configuration // Global configuration
globalInstanceSettings *config.InstancesConfig globalInstanceSettings *config.InstancesConfig
globalBackendSettings *config.BackendConfig globalBackendSettings *config.BackendConfig
globalNodesConfig map[string]config.NodeConfig
localNodeName string `json:"-"` // Name of the local node for remote detection localNodeName string `json:"-"` // Name of the local node for remote detection
status *status `json:"-"` status *status `json:"-"`
@@ -29,7 +30,13 @@ type Instance struct {
} }
// New creates a new instance with the given name, log path, options and local node name // New creates a new instance with the given name, log path, options and local node name
func New(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, opts *Options, localNodeName string, onStatusChange func(oldStatus, newStatus Status)) *Instance { func New(name string, globalConfig *config.AppConfig, opts *Options, onStatusChange func(oldStatus, newStatus Status)) *Instance {
globalInstanceSettings := &globalConfig.Instances
globalBackendSettings := &globalConfig.Backends
globalNodesConfig := globalConfig.Nodes
localNodeName := globalConfig.LocalNode
// Validate and copy options // Validate and copy options
opts.validateAndApplyDefaults(name, globalInstanceSettings) opts.validateAndApplyDefaults(name, globalInstanceSettings)
@@ -45,15 +52,21 @@ func New(name string, globalBackendSettings *config.BackendConfig, globalInstanc
options: options, options: options,
globalInstanceSettings: globalInstanceSettings, globalInstanceSettings: globalInstanceSettings,
globalBackendSettings: globalBackendSettings, globalBackendSettings: globalBackendSettings,
globalNodesConfig: globalNodesConfig,
localNodeName: localNodeName, localNodeName: localNodeName,
Created: time.Now().Unix(), Created: time.Now().Unix(),
status: status, status: status,
} }
var err error
instance.proxy, err = newProxy(instance)
if err != nil {
log.Println("Warning: Failed to create proxy for instance", instance.Name, "-", err)
}
// Only create logger, proxy, and process for local instances // Only create logger, proxy, and process for local instances
if !instance.IsRemote() { if !instance.IsRemote() {
instance.logger = newLogger(name, globalInstanceSettings.LogsDir) instance.logger = newLogger(name, globalInstanceSettings.LogsDir)
instance.proxy = newProxy(instance)
instance.process = newProcess(instance) instance.process = newProcess(instance)
} }
@@ -175,14 +188,6 @@ func (i *Instance) GetProxy() (*httputil.ReverseProxy, error) {
return nil, fmt.Errorf("instance %s has no proxy component", i.Name) return nil, fmt.Errorf("instance %s has no proxy component", i.Name)
} }
// Remote instances should not use local proxy - they are handled by RemoteInstanceProxy
opts := i.GetOptions()
if opts != nil && len(opts.Nodes) > 0 {
if _, isLocal := opts.Nodes[i.localNodeName]; !isLocal {
return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", i.Name)
}
}
return i.proxy.get() return i.proxy.get()
} }
@@ -307,34 +312,5 @@ func (i *Instance) UnmarshalJSON(data []byte) error {
i.status = aux.Status i.status = aux.Status
i.options = aux.Options i.options = aux.Options
// Handle options with validation and defaults
if i.options != nil {
opts := i.options.get()
if opts != nil {
opts.validateAndApplyDefaults(i.Name, i.globalInstanceSettings)
}
}
// Initialize fields that are not serialized or may be nil
if i.status == nil {
i.status = newStatus(Stopped)
}
if i.options == nil {
i.options = newOptions(&Options{})
}
// Only create logger, proxy, and process for non-remote instances
if !i.IsRemote() {
if i.logger == nil && i.globalInstanceSettings != nil {
i.logger = newLogger(i.Name, i.globalInstanceSettings.LogsDir)
}
if i.proxy == nil {
i.proxy = newProxy(i)
}
if i.process == nil {
i.process = newProcess(i)
}
}
return nil return nil
} }

View File

@@ -11,26 +11,29 @@ import (
) )
func TestNewInstance(t *testing.T) { func TestNewInstance(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{ Backends: config.BackendConfig{
Command: "llama-server", LlamaCpp: config.BackendSettings{
Args: []string{}, Command: "llama-server",
Args: []string{},
},
MLX: config.BackendSettings{
Command: "mlx_lm.server",
Args: []string{},
},
VLLM: config.BackendSettings{
Command: "vllm",
Args: []string{"serve"},
},
}, },
MLX: config.BackendSettings{ Instances: config.InstancesConfig{
Command: "mlx_lm.server", LogsDir: "/tmp/test",
Args: []string{}, DefaultAutoRestart: true,
DefaultMaxRestarts: 3,
DefaultRestartDelay: 5,
}, },
VLLM: config.BackendSettings{ Nodes: map[string]config.NodeConfig{},
Command: "vllm", LocalNode: "main",
Args: []string{"serve"},
},
}
globalSettings := &config.InstancesConfig{
LogsDir: "/tmp/test",
DefaultAutoRestart: true,
DefaultMaxRestarts: 3,
DefaultRestartDelay: 5,
} }
options := &instance.Options{ options := &instance.Options{
@@ -46,7 +49,7 @@ func TestNewInstance(t *testing.T) {
// Mock onStatusChange function // Mock onStatusChange function
mockOnStatusChange := func(oldStatus, newStatus instance.Status) {} mockOnStatusChange := func(oldStatus, newStatus instance.Status) {}
inst := instance.New("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) inst := instance.New("test-instance", globalConfig, options, mockOnStatusChange)
if inst.Name != "test-instance" { if inst.Name != "test-instance" {
t.Errorf("Expected name 'test-instance', got %q", inst.Name) t.Errorf("Expected name 'test-instance', got %q", inst.Name)
@@ -79,8 +82,8 @@ func TestNewInstance(t *testing.T) {
autoRestart := false autoRestart := false
maxRestarts := 10 maxRestarts := 10
optionsWithOverrides := &instance.Options{ optionsWithOverrides := &instance.Options{
AutoRestart: &autoRestart, AutoRestart: &autoRestart,
MaxRestarts: &maxRestarts, MaxRestarts: &maxRestarts,
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
LlamaServerOptions: &backends.LlamaServerOptions{ LlamaServerOptions: &backends.LlamaServerOptions{
@@ -89,7 +92,7 @@ func TestNewInstance(t *testing.T) {
}, },
} }
inst2 := instance.New("test-override", backendConfig, globalSettings, optionsWithOverrides, "main", mockOnStatusChange) inst2 := instance.New("test-override", globalConfig, optionsWithOverrides, mockOnStatusChange)
opts2 := inst2.GetOptions() opts2 := inst2.GetOptions()
if opts2.AutoRestart == nil || *opts2.AutoRestart { if opts2.AutoRestart == nil || *opts2.AutoRestart {
@@ -101,26 +104,29 @@ func TestNewInstance(t *testing.T) {
} }
func TestSetOptions(t *testing.T) { func TestSetOptions(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{ Backends: config.BackendConfig{
Command: "llama-server", LlamaCpp: config.BackendSettings{
Args: []string{}, Command: "llama-server",
Args: []string{},
},
MLX: config.BackendSettings{
Command: "mlx_lm.server",
Args: []string{},
},
VLLM: config.BackendSettings{
Command: "vllm",
Args: []string{"serve"},
},
}, },
MLX: config.BackendSettings{ Instances: config.InstancesConfig{
Command: "mlx_lm.server", LogsDir: "/tmp/test",
Args: []string{}, DefaultAutoRestart: true,
DefaultMaxRestarts: 3,
DefaultRestartDelay: 5,
}, },
VLLM: config.BackendSettings{ Nodes: map[string]config.NodeConfig{},
Command: "vllm", LocalNode: "main",
Args: []string{"serve"},
},
}
globalSettings := &config.InstancesConfig{
LogsDir: "/tmp/test",
DefaultAutoRestart: true,
DefaultMaxRestarts: 3,
DefaultRestartDelay: 5,
} }
initialOptions := &instance.Options{ initialOptions := &instance.Options{
@@ -136,7 +142,7 @@ func TestSetOptions(t *testing.T) {
// Mock onStatusChange function // Mock onStatusChange function
mockOnStatusChange := func(oldStatus, newStatus instance.Status) {} mockOnStatusChange := func(oldStatus, newStatus instance.Status) {}
inst := instance.New("test-instance", backendConfig, globalSettings, initialOptions, "main", mockOnStatusChange) inst := instance.New("test-instance", globalConfig, initialOptions, mockOnStatusChange)
// Update options // Update options
newOptions := &instance.Options{ newOptions := &instance.Options{
@@ -166,23 +172,26 @@ func TestSetOptions(t *testing.T) {
} }
func TestGetProxy(t *testing.T) { func TestGetProxy(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{ Backends: config.BackendConfig{
Command: "llama-server", LlamaCpp: config.BackendSettings{
Args: []string{}, Command: "llama-server",
Args: []string{},
},
MLX: config.BackendSettings{
Command: "mlx_lm.server",
Args: []string{},
},
VLLM: config.BackendSettings{
Command: "vllm",
Args: []string{"serve"},
},
}, },
MLX: config.BackendSettings{ Instances: config.InstancesConfig{
Command: "mlx_lm.server", LogsDir: "/tmp/test",
Args: []string{},
}, },
VLLM: config.BackendSettings{ Nodes: map[string]config.NodeConfig{},
Command: "vllm", LocalNode: "main",
Args: []string{"serve"},
},
}
globalSettings := &config.InstancesConfig{
LogsDir: "/tmp/test",
} }
options := &instance.Options{ options := &instance.Options{
@@ -199,7 +208,7 @@ func TestGetProxy(t *testing.T) {
// Mock onStatusChange function // Mock onStatusChange function
mockOnStatusChange := func(oldStatus, newStatus instance.Status) {} mockOnStatusChange := func(oldStatus, newStatus instance.Status) {}
inst := instance.New("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) inst := instance.New("test-instance", globalConfig, options, mockOnStatusChange)
// Get proxy for the first time // Get proxy for the first time
proxy1, err := inst.GetProxy() proxy1, err := inst.GetProxy()
@@ -221,10 +230,14 @@ func TestGetProxy(t *testing.T) {
} }
func TestMarshalJSON(t *testing.T) { func TestMarshalJSON(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"}, Backends: config.BackendConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"},
},
Instances: config.InstancesConfig{LogsDir: "/tmp/test"},
Nodes: map[string]config.NodeConfig{},
LocalNode: "main",
} }
globalSettings := &config.InstancesConfig{LogsDir: "/tmp/test"}
options := &instance.Options{ options := &instance.Options{
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
@@ -235,7 +248,7 @@ func TestMarshalJSON(t *testing.T) {
}, },
} }
inst := instance.New("test-instance", backendConfig, globalSettings, options, "main", nil) inst := instance.New("test-instance", globalConfig, options, nil)
data, err := json.Marshal(inst) data, err := json.Marshal(inst)
if err != nil { if err != nil {
@@ -342,23 +355,26 @@ func TestCreateOptionsValidation(t *testing.T) {
}, },
} }
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{ Backends: config.BackendConfig{
Command: "llama-server", LlamaCpp: config.BackendSettings{
Args: []string{}, Command: "llama-server",
Args: []string{},
},
MLX: config.BackendSettings{
Command: "mlx_lm.server",
Args: []string{},
},
VLLM: config.BackendSettings{
Command: "vllm",
Args: []string{"serve"},
},
}, },
MLX: config.BackendSettings{ Instances: config.InstancesConfig{
Command: "mlx_lm.server", LogsDir: "/tmp/test",
Args: []string{},
}, },
VLLM: config.BackendSettings{ Nodes: map[string]config.NodeConfig{},
Command: "vllm", LocalNode: "main",
Args: []string{"serve"},
},
}
globalSettings := &config.InstancesConfig{
LogsDir: "/tmp/test",
} }
for _, tt := range tests { for _, tt := range tests {
@@ -377,7 +393,7 @@ func TestCreateOptionsValidation(t *testing.T) {
// Mock onStatusChange function // Mock onStatusChange function
mockOnStatusChange := func(oldStatus, newStatus instance.Status) {} mockOnStatusChange := func(oldStatus, newStatus instance.Status) {}
instance := instance.New("test", backendConfig, globalSettings, options, "main", mockOnStatusChange) instance := instance.New("test", globalConfig, options, mockOnStatusChange)
opts := instance.GetOptions() opts := instance.GetOptions()
if opts.MaxRestarts == nil { if opts.MaxRestarts == nil {
@@ -396,10 +412,14 @@ func TestCreateOptionsValidation(t *testing.T) {
} }
func TestStatusChangeCallback(t *testing.T) { func TestStatusChangeCallback(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"}, Backends: config.BackendConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"},
},
Instances: config.InstancesConfig{LogsDir: "/tmp/test"},
Nodes: map[string]config.NodeConfig{},
LocalNode: "main",
} }
globalSettings := &config.InstancesConfig{LogsDir: "/tmp/test"}
options := &instance.Options{ options := &instance.Options{
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
@@ -418,7 +438,7 @@ func TestStatusChangeCallback(t *testing.T) {
callbackCalled = true callbackCalled = true
} }
inst := instance.New("test", backendConfig, globalSettings, options, "main", onStatusChange) inst := instance.New("test", globalConfig, options, onStatusChange)
inst.SetStatus(instance.Running) inst.SetStatus(instance.Running)
@@ -434,10 +454,14 @@ func TestStatusChangeCallback(t *testing.T) {
} }
func TestSetOptions_NodesPreserved(t *testing.T) { func TestSetOptions_NodesPreserved(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"}, Backends: config.BackendConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"},
},
Instances: config.InstancesConfig{LogsDir: "/tmp/test"},
Nodes: map[string]config.NodeConfig{},
LocalNode: "main",
} }
globalSettings := &config.InstancesConfig{LogsDir: "/tmp/test"}
tests := []struct { tests := []struct {
name string name string
@@ -477,7 +501,7 @@ func TestSetOptions_NodesPreserved(t *testing.T) {
}, },
} }
inst := instance.New("test", backendConfig, globalSettings, options, "main", nil) inst := instance.New("test", globalConfig, options, nil)
// Attempt to update nodes (should be ignored) // Attempt to update nodes (should be ignored)
updateOptions := &instance.Options{ updateOptions := &instance.Options{
@@ -512,10 +536,14 @@ func TestSetOptions_NodesPreserved(t *testing.T) {
} }
func TestProcessErrorCases(t *testing.T) { func TestProcessErrorCases(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"}, Backends: config.BackendConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"},
},
Instances: config.InstancesConfig{LogsDir: "/tmp/test"},
Nodes: map[string]config.NodeConfig{},
LocalNode: "main",
} }
globalSettings := &config.InstancesConfig{LogsDir: "/tmp/test"}
options := &instance.Options{ options := &instance.Options{
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
@@ -525,7 +553,7 @@ func TestProcessErrorCases(t *testing.T) {
}, },
} }
inst := instance.New("test", backendConfig, globalSettings, options, "main", nil) inst := instance.New("test", globalConfig, options, nil)
// Stop when not running should return error // Stop when not running should return error
err := inst.Stop() err := inst.Stop()
@@ -544,10 +572,16 @@ func TestProcessErrorCases(t *testing.T) {
} }
func TestRemoteInstanceOperations(t *testing.T) { func TestRemoteInstanceOperations(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"}, Backends: config.BackendConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"},
},
Instances: config.InstancesConfig{LogsDir: "/tmp/test"},
Nodes: map[string]config.NodeConfig{
"remote-node": {Address: "http://remote-node:8080"},
},
LocalNode: "main",
} }
globalSettings := &config.InstancesConfig{LogsDir: "/tmp/test"}
options := &instance.Options{ options := &instance.Options{
Nodes: map[string]struct{}{"remote-node": {}}, // Remote instance Nodes: map[string]struct{}{"remote-node": {}}, // Remote instance
BackendOptions: backends.Options{ BackendOptions: backends.Options{
@@ -558,7 +592,7 @@ func TestRemoteInstanceOperations(t *testing.T) {
}, },
} }
inst := instance.New("remote-test", backendConfig, globalSettings, options, "main", nil) inst := instance.New("remote-test", globalConfig, options, nil)
if !inst.IsRemote() { if !inst.IsRemote() {
t.Error("Expected instance to be remote") t.Error("Expected instance to be remote")
@@ -580,8 +614,8 @@ func TestRemoteInstanceOperations(t *testing.T) {
} }
// GetProxy should fail for remote instance // GetProxy should fail for remote instance
if _, err := inst.GetProxy(); err == nil { if _, err := inst.GetProxy(); err != nil {
t.Error("Expected error when getting proxy for remote instance") t.Error("Expected no error when getting proxy for remote instance")
} }
// GetLogs should fail for remote instance // GetLogs should fail for remote instance
@@ -591,14 +625,18 @@ func TestRemoteInstanceOperations(t *testing.T) {
} }
func TestIdleTimeout(t *testing.T) { func TestIdleTimeout(t *testing.T) {
backendConfig := &config.BackendConfig{ globalConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"}, Backends: config.BackendConfig{
LlamaCpp: config.BackendSettings{Command: "llama-server"},
},
Instances: config.InstancesConfig{LogsDir: "/tmp/test"},
Nodes: map[string]config.NodeConfig{},
LocalNode: "main",
} }
globalSettings := &config.InstancesConfig{LogsDir: "/tmp/test"}
t.Run("not running never times out", func(t *testing.T) { t.Run("not running never times out", func(t *testing.T) {
timeout := 1 timeout := 1
inst := instance.New("test", backendConfig, globalSettings, &instance.Options{ inst := instance.New("test", globalConfig, &instance.Options{
IdleTimeout: &timeout, IdleTimeout: &timeout,
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
@@ -606,7 +644,7 @@ func TestIdleTimeout(t *testing.T) {
Model: "/path/to/model.gguf", Model: "/path/to/model.gguf",
}, },
}, },
}, "main", nil) }, nil)
if inst.ShouldTimeout() { if inst.ShouldTimeout() {
t.Error("Non-running instance should never timeout") t.Error("Non-running instance should never timeout")
@@ -614,7 +652,7 @@ func TestIdleTimeout(t *testing.T) {
}) })
t.Run("no timeout configured", func(t *testing.T) { t.Run("no timeout configured", func(t *testing.T) {
inst := instance.New("test", backendConfig, globalSettings, &instance.Options{ inst := instance.New("test", globalConfig, &instance.Options{
IdleTimeout: nil, // No timeout IdleTimeout: nil, // No timeout
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
@@ -622,7 +660,7 @@ func TestIdleTimeout(t *testing.T) {
Model: "/path/to/model.gguf", Model: "/path/to/model.gguf",
}, },
}, },
}, "main", nil) }, nil)
inst.SetStatus(instance.Running) inst.SetStatus(instance.Running)
if inst.ShouldTimeout() { if inst.ShouldTimeout() {
@@ -632,15 +670,17 @@ func TestIdleTimeout(t *testing.T) {
t.Run("timeout exceeded", func(t *testing.T) { t.Run("timeout exceeded", func(t *testing.T) {
timeout := 1 // 1 minute timeout := 1 // 1 minute
inst := instance.New("test", backendConfig, globalSettings, &instance.Options{ inst := instance.New("test", globalConfig, &instance.Options{
IdleTimeout: &timeout, IdleTimeout: &timeout,
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
LlamaServerOptions: &backends.LlamaServerOptions{ LlamaServerOptions: &backends.LlamaServerOptions{
Model: "/path/to/model.gguf", Model: "/path/to/model.gguf",
Host: "localhost",
Port: 8080,
}, },
}, },
}, "main", nil) }, nil)
inst.SetStatus(instance.Running) inst.SetStatus(instance.Running)
// Use mock time provider // Use mock time provider

View File

@@ -26,20 +26,78 @@ func (realTimeProvider) Now() time.Time {
type proxy struct { type proxy struct {
instance *Instance instance *Instance
mu sync.RWMutex targetURL *url.URL
proxy *httputil.ReverseProxy apiKey string // For remote instances
proxyOnce sync.Once
proxyErr error responseHeaders map[string]string
mu sync.RWMutex
proxy *httputil.ReverseProxy
proxyOnce sync.Once
proxyErr error
lastRequestTime atomic.Int64 lastRequestTime atomic.Int64
timeProvider TimeProvider timeProvider TimeProvider
} }
// newProxy creates a new Proxy for the given instance // newProxy creates a new Proxy for the given instance
func newProxy(instance *Instance) *proxy { func newProxy(instance *Instance) (*proxy, error) {
return &proxy{
p := &proxy{
instance: instance, instance: instance,
timeProvider: realTimeProvider{}, timeProvider: realTimeProvider{},
} }
var err error
options := instance.GetOptions()
if options == nil {
return nil, fmt.Errorf("instance %s has no options set", instance.Name)
}
if instance.IsRemote() {
// Take the first remote node as the target for now
var nodeName string
for node := range options.Nodes {
nodeName = node
break
}
if nodeName == "" {
return nil, fmt.Errorf("instance %s has no remote nodes defined", p.instance.Name)
}
node, ok := p.instance.globalNodesConfig[nodeName]
if !ok {
return nil, fmt.Errorf("remote node %s is not defined", nodeName)
}
p.targetURL, err = url.Parse(node.Address)
if err != nil {
return nil, fmt.Errorf("failed to parse target URL for remote instance %s: %w", p.instance.Name, err)
}
p.apiKey = node.APIKey
} else {
// Get host/port from process
host := p.instance.options.GetHost()
port := p.instance.options.GetPort()
if port == 0 {
return nil, fmt.Errorf("instance %s has no port assigned", p.instance.Name)
}
p.targetURL, err = url.Parse(fmt.Sprintf("http://%s:%d", host, port))
if err != nil {
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.instance.Name, err)
}
// Get response headers from backend config
p.responseHeaders = options.BackendOptions.GetResponseHeaders(p.instance.globalBackendSettings)
}
return p, nil
} }
// get returns the reverse proxy for this instance, creating it if needed. // get returns the reverse proxy for this instance, creating it if needed.
@@ -56,46 +114,40 @@ func (p *proxy) get() (*httputil.ReverseProxy, error) {
// build creates the reverse proxy based on instance options // build creates the reverse proxy based on instance options
func (p *proxy) build() (*httputil.ReverseProxy, error) { func (p *proxy) build() (*httputil.ReverseProxy, error) {
options := p.instance.GetOptions()
if options == nil {
return nil, fmt.Errorf("instance %s has no options set", p.instance.Name)
}
// Remote instances should not use local proxy - they are handled by RemoteInstanceProxy proxy := httputil.NewSingleHostReverseProxy(p.targetURL)
if _, isLocal := options.Nodes[p.instance.localNodeName]; !isLocal {
return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", p.instance.Name)
}
// Get host/port from process // Modify the request before sending it to the backend
host := p.instance.options.GetHost() originalDirector := proxy.Director
port := p.instance.options.GetPort() proxy.Director = func(req *http.Request) {
if port == 0 { originalDirector(req)
return nil, fmt.Errorf("instance %s has no port assigned", p.instance.Name)
}
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port))
if err != nil {
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.instance.Name, err)
}
proxy := httputil.NewSingleHostReverseProxy(targetURL) // Add API key header for remote instances
if p.instance.IsRemote() && p.apiKey != "" {
// Get response headers from backend config req.Header.Set("Authorization", "Bearer "+p.apiKey)
responseHeaders := options.BackendOptions.GetResponseHeaders(p.instance.globalBackendSettings) }
proxy.ModifyResponse = func(resp *http.Response) error { // Update last request time
// Remove CORS headers from backend response to avoid conflicts p.updateLastRequestTime()
// llamactl will add its own CORS headers }
resp.Header.Del("Access-Control-Allow-Origin")
resp.Header.Del("Access-Control-Allow-Methods") if !p.instance.IsRemote() {
resp.Header.Del("Access-Control-Allow-Headers") // Add custom headers to the request
resp.Header.Del("Access-Control-Allow-Credentials") proxy.ModifyResponse = func(resp *http.Response) error {
resp.Header.Del("Access-Control-Max-Age") // Remove CORS headers from backend response to avoid conflicts
resp.Header.Del("Access-Control-Expose-Headers") // llamactl will add its own CORS headers
resp.Header.Del("Access-Control-Allow-Origin")
for key, value := range responseHeaders { resp.Header.Del("Access-Control-Allow-Methods")
resp.Header.Set(key, value) resp.Header.Del("Access-Control-Allow-Headers")
resp.Header.Del("Access-Control-Allow-Credentials")
resp.Header.Del("Access-Control-Max-Age")
resp.Header.Del("Access-Control-Expose-Headers")
for key, value := range p.responseHeaders {
resp.Header.Set(key, value)
}
return nil
} }
return nil
} }
return proxy, nil return proxy, nil

View File

@@ -35,9 +35,7 @@ type instanceManager struct {
lifecycle *lifecycleManager lifecycle *lifecycleManager
// Configuration // Configuration
instancesConfig config.InstancesConfig globalConfig *config.AppConfig
backendsConfig config.BackendConfig
localNodeName string // Name of the local node
// Synchronization // Synchronization
instanceLocks sync.Map // map[string]*sync.Mutex - per-instance locks for concurrent operations instanceLocks sync.Map // map[string]*sync.Mutex - per-instance locks for concurrent operations
@@ -45,43 +43,42 @@ type instanceManager struct {
} }
// New creates a new instance of InstanceManager. // New creates a new instance of InstanceManager.
func New(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig map[string]config.NodeConfig, localNodeName string) InstanceManager { func New(globalConfig *config.AppConfig) InstanceManager {
if instancesConfig.TimeoutCheckInterval <= 0 {
instancesConfig.TimeoutCheckInterval = 5 // Default to 5 minutes if not set if globalConfig.Instances.TimeoutCheckInterval <= 0 {
globalConfig.Instances.TimeoutCheckInterval = 5 // Default to 5 minutes if not set
} }
// Initialize components // Initialize components
registry := newInstanceRegistry() registry := newInstanceRegistry()
// Initialize port allocator // Initialize port allocator
portRange := instancesConfig.PortRange portRange := globalConfig.Instances.PortRange
ports, err := newPortAllocator(portRange[0], portRange[1]) ports, err := newPortAllocator(portRange[0], portRange[1])
if err != nil { if err != nil {
log.Fatalf("Failed to create port allocator: %v", err) log.Fatalf("Failed to create port allocator: %v", err)
} }
// Initialize persistence // Initialize persistence
persistence, err := newInstancePersister(instancesConfig.InstancesDir) persistence, err := newInstancePersister(globalConfig.Instances.InstancesDir)
if err != nil { if err != nil {
log.Fatalf("Failed to create instance persister: %v", err) log.Fatalf("Failed to create instance persister: %v", err)
} }
// Initialize remote manager // Initialize remote manager
remote := newRemoteManager(nodesConfig, 30*time.Second) remote := newRemoteManager(globalConfig.Nodes, 30*time.Second)
// Create manager instance // Create manager instance
im := &instanceManager{ im := &instanceManager{
registry: registry, registry: registry,
ports: ports, ports: ports,
persistence: persistence, persistence: persistence,
remote: remote, remote: remote,
instancesConfig: instancesConfig, globalConfig: globalConfig,
backendsConfig: backendsConfig,
localNodeName: localNodeName,
} }
// Initialize lifecycle manager (needs reference to manager for Stop/Evict operations) // Initialize lifecycle manager (needs reference to manager for Stop/Evict operations)
checkInterval := time.Duration(instancesConfig.TimeoutCheckInterval) * time.Minute checkInterval := time.Duration(globalConfig.Instances.TimeoutCheckInterval) * time.Minute
im.lifecycle = newLifecycleManager(registry, im, checkInterval, true) im.lifecycle = newLifecycleManager(registry, im, checkInterval, true)
// Load existing instances from disk // Load existing instances from disk
@@ -165,7 +162,7 @@ func (im *instanceManager) loadInstance(persistedInst *instance.Instance) error
var isRemote bool var isRemote bool
var nodeName string var nodeName string
if options != nil { if options != nil {
if _, isLocal := options.Nodes[im.localNodeName]; !isLocal && len(options.Nodes) > 0 { if _, isLocal := options.Nodes[im.globalConfig.LocalNode]; !isLocal && len(options.Nodes) > 0 {
// Get the first node from the set // Get the first node from the set
for node := range options.Nodes { for node := range options.Nodes {
nodeName = node nodeName = node
@@ -184,7 +181,7 @@ func (im *instanceManager) loadInstance(persistedInst *instance.Instance) error
} }
// Create new inst using NewInstance (handles validation, defaults, setup) // Create new inst using NewInstance (handles validation, defaults, setup)
inst := instance.New(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback) inst := instance.New(name, im.globalConfig, options, statusCallback)
// Restore persisted fields that NewInstance doesn't set // Restore persisted fields that NewInstance doesn't set
inst.Created = persistedInst.Created inst.Created = persistedInst.Created

View File

@@ -14,11 +14,10 @@ import (
func TestManager_PersistsAndLoadsInstances(t *testing.T) { func TestManager_PersistsAndLoadsInstances(t *testing.T) {
tempDir := t.TempDir() tempDir := t.TempDir()
cfg := createPersistenceConfig(tempDir) appConfig := createTestAppConfig(tempDir)
backendConfig := createBackendConfig()
// Create instance and check file was created // Create instance and check file was created
manager1 := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") manager1 := manager.New(appConfig)
options := &instance.Options{ options := &instance.Options{
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
@@ -40,7 +39,7 @@ func TestManager_PersistsAndLoadsInstances(t *testing.T) {
} }
// Load instances from disk // Load instances from disk
manager2 := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") manager2 := manager.New(appConfig)
instances, err := manager2.ListInstances() instances, err := manager2.ListInstances()
if err != nil { if err != nil {
t.Fatalf("ListInstances failed: %v", err) t.Fatalf("ListInstances failed: %v", err)
@@ -55,10 +54,9 @@ func TestManager_PersistsAndLoadsInstances(t *testing.T) {
func TestDeleteInstance_RemovesPersistenceFile(t *testing.T) { func TestDeleteInstance_RemovesPersistenceFile(t *testing.T) {
tempDir := t.TempDir() tempDir := t.TempDir()
cfg := createPersistenceConfig(tempDir) appConfig := createTestAppConfig(tempDir)
backendConfig := createBackendConfig()
mgr := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") mgr := manager.New(appConfig)
options := &instance.Options{ options := &instance.Options{
BackendOptions: backends.Options{ BackendOptions: backends.Options{
BackendType: backends.BackendTypeLlamaCpp, BackendType: backends.BackendTypeLlamaCpp,
@@ -135,39 +133,57 @@ func TestConcurrentAccess(t *testing.T) {
} }
// Helper functions for test configuration // Helper functions for test configuration
func createBackendConfig() config.BackendConfig { func createTestAppConfig(instancesDir string) *config.AppConfig {
// Use 'sleep' as a test command instead of 'llama-server' // Use 'sleep' as a test command instead of 'llama-server'
// This allows tests to run in CI environments without requiring actual LLM binaries // This allows tests to run in CI environments without requiring actual LLM binaries
// The sleep command will be invoked with model paths and other args, which it ignores // The sleep command will be invoked with model paths and other args, which it ignores
return config.BackendConfig{ return &config.AppConfig{
LlamaCpp: config.BackendSettings{ Backends: config.BackendConfig{
Command: "sleep", LlamaCpp: config.BackendSettings{
Command: "sleep",
},
MLX: config.BackendSettings{
Command: "sleep",
},
}, },
MLX: config.BackendSettings{ Instances: config.InstancesConfig{
Command: "sleep", PortRange: [2]int{8000, 9000},
InstancesDir: instancesDir,
LogsDir: instancesDir,
MaxInstances: 10,
MaxRunningInstances: 10,
DefaultAutoRestart: true,
DefaultMaxRestarts: 3,
DefaultRestartDelay: 5,
TimeoutCheckInterval: 5,
}, },
} LocalNode: "main",
} Nodes: map[string]config.NodeConfig{},
func createPersistenceConfig(dir string) config.InstancesConfig {
return config.InstancesConfig{
PortRange: [2]int{8000, 9000},
InstancesDir: dir,
MaxInstances: 10,
TimeoutCheckInterval: 5,
} }
} }
func createTestManager() manager.InstanceManager { func createTestManager() manager.InstanceManager {
cfg := config.InstancesConfig{ appConfig := &config.AppConfig{
PortRange: [2]int{8000, 9000}, Backends: config.BackendConfig{
LogsDir: "/tmp/test", LlamaCpp: config.BackendSettings{
MaxInstances: 10, Command: "sleep",
MaxRunningInstances: 10, },
DefaultAutoRestart: true, MLX: config.BackendSettings{
DefaultMaxRestarts: 3, Command: "sleep",
DefaultRestartDelay: 5, },
TimeoutCheckInterval: 5, },
Instances: config.InstancesConfig{
PortRange: [2]int{8000, 9000},
LogsDir: "/tmp/test",
MaxInstances: 10,
MaxRunningInstances: 10,
DefaultAutoRestart: true,
DefaultMaxRestarts: 3,
DefaultRestartDelay: 5,
TimeoutCheckInterval: 5,
},
LocalNode: "main",
Nodes: map[string]config.NodeConfig{},
} }
return manager.New(createBackendConfig(), cfg, map[string]config.NodeConfig{}, "main") return manager.New(appConfig)
} }

View File

@@ -68,7 +68,7 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
} }
// Check if this is a remote instance (local node not in the Nodes set) // Check if this is a remote instance (local node not in the Nodes set)
if _, isLocal := options.Nodes[im.localNodeName]; !isLocal && len(options.Nodes) > 0 { if _, isLocal := options.Nodes[im.globalConfig.LocalNode]; !isLocal && len(options.Nodes) > 0 {
// Get the first node from the set // Get the first node from the set
var nodeName string var nodeName string
for node := range options.Nodes { for node := range options.Nodes {
@@ -94,7 +94,7 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
// Create a local stub that preserves the Nodes field for tracking // Create a local stub that preserves the Nodes field for tracking
// We keep the original options (with Nodes) so IsRemote() works correctly // We keep the original options (with Nodes) so IsRemote() works correctly
inst := instance.New(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, nil) inst := instance.New(name, im.globalConfig, options, nil)
// Update the local stub with all remote data (preserving Nodes) // Update the local stub with all remote data (preserving Nodes)
im.updateLocalInstanceFromRemote(inst, remoteInst) im.updateLocalInstanceFromRemote(inst, remoteInst)
@@ -129,8 +129,8 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
} }
} }
localInstanceCount := totalInstances - remoteCount localInstanceCount := totalInstances - remoteCount
if localInstanceCount >= im.instancesConfig.MaxInstances && im.instancesConfig.MaxInstances != -1 { if localInstanceCount >= im.globalConfig.Instances.MaxInstances && im.globalConfig.Instances.MaxInstances != -1 {
return nil, fmt.Errorf("maximum number of instances (%d) reached", im.instancesConfig.MaxInstances) return nil, fmt.Errorf("maximum number of instances (%d) reached", im.globalConfig.Instances.MaxInstances)
} }
// Assign and validate port for backend-specific options // Assign and validate port for backend-specific options
@@ -155,7 +155,7 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
im.onStatusChange(name, oldStatus, newStatus) im.onStatusChange(name, oldStatus, newStatus)
} }
inst := instance.New(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback) inst := instance.New(name, im.globalConfig, options, statusCallback)
// Add to registry // Add to registry
if err := im.registry.add(inst); err != nil { if err := im.registry.add(inst); err != nil {
@@ -384,7 +384,7 @@ func (im *instanceManager) StartInstance(name string) (*instance.Instance, error
// Check max running instances limit for local instances only // Check max running instances limit for local instances only
if im.IsMaxRunningInstancesReached() { if im.IsMaxRunningInstancesReached() {
return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.instancesConfig.MaxRunningInstances)) return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.globalConfig.Instances.MaxRunningInstances))
} }
if err := inst.Start(); err != nil { if err := inst.Start(); err != nil {
@@ -400,7 +400,7 @@ func (im *instanceManager) StartInstance(name string) (*instance.Instance, error
} }
func (im *instanceManager) IsMaxRunningInstancesReached() bool { func (im *instanceManager) IsMaxRunningInstancesReached() bool {
if im.instancesConfig.MaxRunningInstances == -1 { if im.globalConfig.Instances.MaxRunningInstances == -1 {
return false return false
} }
@@ -412,7 +412,7 @@ func (im *instanceManager) IsMaxRunningInstancesReached() bool {
} }
} }
return localRunningCount >= im.instancesConfig.MaxRunningInstances return localRunningCount >= im.globalConfig.Instances.MaxRunningInstances
} }
// StopInstance stops a running instance and returns it. // StopInstance stops a running instance and returns it.

View File

@@ -36,17 +36,21 @@ func TestCreateInstance_FailsWithDuplicateName(t *testing.T) {
} }
func TestCreateInstance_FailsWhenMaxInstancesReached(t *testing.T) { func TestCreateInstance_FailsWhenMaxInstancesReached(t *testing.T) {
backendConfig := config.BackendConfig{ appConfig := &config.AppConfig{
LlamaCpp: config.BackendSettings{ Backends: config.BackendConfig{
Command: "llama-server", LlamaCpp: config.BackendSettings{
Command: "llama-server",
},
}, },
Instances: config.InstancesConfig{
PortRange: [2]int{8000, 9000},
MaxInstances: 1, // Very low limit for testing
TimeoutCheckInterval: 5,
},
LocalNode: "main",
Nodes: map[string]config.NodeConfig{},
} }
cfg := config.InstancesConfig{ limitedManager := manager.New(appConfig)
PortRange: [2]int{8000, 9000},
MaxInstances: 1, // Very low limit for testing
TimeoutCheckInterval: 5,
}
limitedManager := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main")
options := &instance.Options{ options := &instance.Options{
BackendOptions: backends.Options{ BackendOptions: backends.Options{

View File

@@ -4,8 +4,6 @@ import (
"llamactl/pkg/config" "llamactl/pkg/config"
"llamactl/pkg/manager" "llamactl/pkg/manager"
"net/http" "net/http"
"net/http/httputil"
"sync"
"time" "time"
) )
@@ -13,8 +11,6 @@ type Handler struct {
InstanceManager manager.InstanceManager InstanceManager manager.InstanceManager
cfg config.AppConfig cfg config.AppConfig
httpClient *http.Client httpClient *http.Client
remoteProxies map[string]*httputil.ReverseProxy // Cache of remote proxies by instance name
remoteProxiesMu sync.RWMutex
} }
func NewHandler(im manager.InstanceManager, cfg config.AppConfig) *Handler { func NewHandler(im manager.InstanceManager, cfg config.AppConfig) *Handler {
@@ -24,6 +20,5 @@ func NewHandler(im manager.InstanceManager, cfg config.AppConfig) *Handler {
httpClient: &http.Client{ httpClient: &http.Client{
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
}, },
remoteProxies: make(map[string]*httputil.ReverseProxy),
} }
} }

View File

@@ -49,7 +49,7 @@ func (h *Handler) LlamaCppProxy(onDemandStart bool) http.HandlerFunc {
return return
} }
if !inst.IsRunning() { if !inst.IsRemote() && !inst.IsRunning() {
if !(onDemandStart && options.OnDemandStart != nil && *options.OnDemandStart) { if !(onDemandStart && options.OnDemandStart != nil && *options.OnDemandStart) {
http.Error(w, "Instance is not running", http.StatusServiceUnavailable) http.Error(w, "Instance is not running", http.StatusServiceUnavailable)
@@ -88,12 +88,11 @@ func (h *Handler) LlamaCppProxy(onDemandStart bool) http.HandlerFunc {
return return
} }
// Strip the "/llama-cpp/<name>" prefix from the request URL if !inst.IsRemote() {
prefix := fmt.Sprintf("/llama-cpp/%s", validatedName) // Strip the "/llama-cpp/<name>" prefix from the request URL
r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix) prefix := fmt.Sprintf("/llama-cpp/%s", validatedName)
r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix)
// Update the last request time for the instance }
inst.UpdateLastRequestTime()
proxy.ServeHTTP(w, r) proxy.ServeHTTP(w, r)
} }

View File

@@ -7,8 +7,6 @@ import (
"llamactl/pkg/manager" "llamactl/pkg/manager"
"llamactl/pkg/validation" "llamactl/pkg/validation"
"net/http" "net/http"
"net/http/httputil"
"net/url"
"strconv" "strconv"
"strings" "strings"
@@ -375,12 +373,6 @@ func (h *Handler) ProxyToInstance() http.HandlerFunc {
return return
} }
// Check if this is a remote instance
if inst.IsRemote() {
h.RemoteInstanceProxy(w, r, validatedName, inst)
return
}
if !inst.IsRunning() { if !inst.IsRunning() {
http.Error(w, "Instance is not running", http.StatusServiceUnavailable) http.Error(w, "Instance is not running", http.StatusServiceUnavailable)
return return
@@ -393,12 +385,12 @@ func (h *Handler) ProxyToInstance() http.HandlerFunc {
return return
} }
// Strip the "/api/v1/instances/<name>/proxy" prefix from the request URL // Check if this is a remote instance
prefix := fmt.Sprintf("/api/v1/instances/%s/proxy", validatedName) if !inst.IsRemote() {
r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix) // Strip the "/api/v1/instances/<name>/proxy" prefix from the request URL
prefix := fmt.Sprintf("/api/v1/instances/%s/proxy", validatedName)
// Update the last request time for the instance r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix)
inst.UpdateLastRequestTime() }
// Set forwarded headers // Set forwarded headers
r.Header.Set("X-Forwarded-Host", r.Header.Get("Host")) r.Header.Set("X-Forwarded-Host", r.Header.Get("Host"))
@@ -408,66 +400,3 @@ func (h *Handler) ProxyToInstance() http.HandlerFunc {
proxy.ServeHTTP(w, r) proxy.ServeHTTP(w, r)
} }
} }
// RemoteInstanceProxy proxies requests to a remote instance
func (h *Handler) RemoteInstanceProxy(w http.ResponseWriter, r *http.Request, name string, inst *instance.Instance) {
// Get the node name from instance options
options := inst.GetOptions()
if options == nil {
http.Error(w, "Instance has no options configured", http.StatusInternalServerError)
return
}
// Get the first node from the set
var nodeName string
for node := range options.Nodes {
nodeName = node
break
}
if nodeName == "" {
http.Error(w, "Instance has no node configured", http.StatusInternalServerError)
return
}
// Check if we have a cached proxy for this node
h.remoteProxiesMu.RLock()
proxy, exists := h.remoteProxies[nodeName]
h.remoteProxiesMu.RUnlock()
if !exists {
// Find node configuration
nodeConfig, exists := h.cfg.Nodes[nodeName]
if !exists {
http.Error(w, fmt.Sprintf("Node %s not found", nodeName), http.StatusInternalServerError)
return
}
// Create reverse proxy to remote node
targetURL, err := url.Parse(nodeConfig.Address)
if err != nil {
http.Error(w, "Failed to parse node address: "+err.Error(), http.StatusInternalServerError)
return
}
proxy = httputil.NewSingleHostReverseProxy(targetURL)
// Modify request before forwarding
originalDirector := proxy.Director
apiKey := nodeConfig.APIKey // Capture for closure
proxy.Director = func(req *http.Request) {
originalDirector(req)
// Add API key if configured
if apiKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", apiKey))
}
}
// Cache the proxy by node name
h.remoteProxiesMu.Lock()
h.remoteProxies[nodeName] = proxy
h.remoteProxiesMu.Unlock()
}
// Forward the request using the cached proxy
proxy.ServeHTTP(w, r)
}

View File

@@ -3,13 +3,9 @@ package server
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"llamactl/pkg/instance"
"llamactl/pkg/validation" "llamactl/pkg/validation"
"net/http" "net/http"
"net/http/httputil"
"net/url"
) )
// OpenAIListInstances godoc // OpenAIListInstances godoc
@@ -100,15 +96,7 @@ func (h *Handler) OpenAIProxy() http.HandlerFunc {
return return
} }
// Check if this is a remote instance if !inst.IsRemote() && !inst.IsRunning() {
if inst.IsRemote() {
// Restore the body for the remote proxy
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
h.RemoteOpenAIProxy(w, r, validatedName, inst)
return
}
if !inst.IsRunning() {
options := inst.GetOptions() options := inst.GetOptions()
allowOnDemand := options != nil && options.OnDemandStart != nil && *options.OnDemandStart allowOnDemand := options != nil && options.OnDemandStart != nil && *options.OnDemandStart
if !allowOnDemand { if !allowOnDemand {
@@ -148,9 +136,6 @@ func (h *Handler) OpenAIProxy() http.HandlerFunc {
return return
} }
// Update last request time for the instance
inst.UpdateLastRequestTime()
// Recreate the request body from the bytes we read // Recreate the request body from the bytes we read
r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
r.ContentLength = int64(len(bodyBytes)) r.ContentLength = int64(len(bodyBytes))
@@ -158,66 +143,3 @@ func (h *Handler) OpenAIProxy() http.HandlerFunc {
proxy.ServeHTTP(w, r) proxy.ServeHTTP(w, r)
} }
} }
// RemoteOpenAIProxy proxies OpenAI-compatible requests to a remote instance
func (h *Handler) RemoteOpenAIProxy(w http.ResponseWriter, r *http.Request, modelName string, inst *instance.Instance) {
// Get the node name from instance options
options := inst.GetOptions()
if options == nil {
http.Error(w, "Instance has no options configured", http.StatusInternalServerError)
return
}
// Get the first node from the set
var nodeName string
for node := range options.Nodes {
nodeName = node
break
}
if nodeName == "" {
http.Error(w, "Instance has no node configured", http.StatusInternalServerError)
return
}
// Check if we have a cached proxy for this node
h.remoteProxiesMu.RLock()
proxy, exists := h.remoteProxies[nodeName]
h.remoteProxiesMu.RUnlock()
if !exists {
// Find node configuration
nodeConfig, exists := h.cfg.Nodes[nodeName]
if !exists {
http.Error(w, fmt.Sprintf("Node %s not found", nodeName), http.StatusInternalServerError)
return
}
// Create reverse proxy to remote node
targetURL, err := url.Parse(nodeConfig.Address)
if err != nil {
http.Error(w, "Failed to parse node address: "+err.Error(), http.StatusInternalServerError)
return
}
proxy = httputil.NewSingleHostReverseProxy(targetURL)
// Modify request before forwarding
originalDirector := proxy.Director
apiKey := nodeConfig.APIKey // Capture for closure
proxy.Director = func(req *http.Request) {
originalDirector(req)
// Add API key if configured
if apiKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", apiKey))
}
}
// Cache the proxy
h.remoteProxiesMu.Lock()
h.remoteProxies[nodeName] = proxy
h.remoteProxiesMu.Unlock()
}
// Forward the request using the cached proxy
proxy.ServeHTTP(w, r)
}