diff --git a/cmd/server/main.go b/cmd/server/main.go index de080c7..754b1d7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -58,7 +58,7 @@ func main() { } // Initialize the instance manager - instanceManager := manager.NewInstanceManager(cfg.Backends, cfg.Instances, cfg.Nodes) + instanceManager := manager.NewInstanceManager(cfg.Backends, cfg.Instances, cfg.Nodes, cfg.LocalNode) // Create a new handler with the instance manager handler := server.NewHandler(instanceManager, cfg) diff --git a/docs/getting-started/configuration.md b/docs/getting-started/configuration.md index c43efc6..6f9ee98 100644 --- a/docs/getting-started/configuration.md +++ b/docs/getting-started/configuration.md @@ -261,7 +261,7 @@ nodes: # Node configuration map ``` **Node Configuration Fields:** -- `local_node`: Specifies which node in the `nodes` map represents the local node +- `local_node`: Specifies which node in the `nodes` map represents the local node. Must match exactly what other nodes call this node. - `nodes`: Map of node configurations - `address`: HTTP/HTTPS URL of the remote node (empty for local node) - `api_key`: Management API key for authenticating with the remote node diff --git a/docs/getting-started/installation.md b/docs/getting-started/installation.md index 04e0dfd..413e1fc 100644 --- a/docs/getting-started/installation.md +++ b/docs/getting-started/installation.md @@ -162,6 +162,7 @@ go build -o llamactl ./cmd/server For deployments with remote nodes: - Install llamactl on each node using any of the methods above - Configure API keys for authentication between nodes +- Ensure node names are consistent across all configurations ## Verification diff --git a/docs/user-guide/troubleshooting.md b/docs/user-guide/troubleshooting.md index 4b7a507..ca2b4df 100644 --- a/docs/user-guide/troubleshooting.md +++ b/docs/user-guide/troubleshooting.md @@ -143,7 +143,11 @@ This helps determine if the issue is with llamactl or with the underlying llama. api_key: "secure-key" # Must match worker1's management key ``` -2. **Test remote node connectivity:** +2. **Check node name consistency:** + - `local_node` on each node must match what other nodes call it + - Node names are case-sensitive + +3. **Test remote node connectivity:** ```bash curl -H "Authorization: Bearer remote-node-key" \ http://remote-node:8080/api/v1/instances diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index dcebef4..a6c9657 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -35,6 +35,7 @@ type Process struct { options *CreateInstanceOptions `json:"-"` globalInstanceSettings *config.InstancesConfig globalBackendSettings *config.BackendConfig + localNodeName string `json:"-"` // Name of the local node for remote detection // Status Status InstanceStatus `json:"status"` @@ -66,7 +67,7 @@ type Process struct { } // NewInstance creates a new instance with the given name, log path, and options -func NewInstance(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, options *CreateInstanceOptions, onStatusChange func(oldStatus, newStatus InstanceStatus)) *Process { +func NewInstance(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, options *CreateInstanceOptions, localNodeName string, onStatusChange func(oldStatus, newStatus InstanceStatus)) *Process { // Validate and copy options options.ValidateAndApplyDefaults(name, globalInstanceSettings) @@ -78,6 +79,7 @@ func NewInstance(name string, globalBackendSettings *config.BackendConfig, globa options: options, globalInstanceSettings: globalInstanceSettings, globalBackendSettings: globalBackendSettings, + localNodeName: localNodeName, logger: logger, timeProvider: realTimeProvider{}, Created: time.Now().Unix(), @@ -172,7 +174,7 @@ func (i *Process) GetProxy() (*httputil.ReverseProxy, error) { } // Remote instances should not use local proxy - they are handled by RemoteInstanceProxy - if len(i.options.Nodes) > 0 { + if len(i.options.Nodes) > 0 && i.options.Nodes[0] != i.localNodeName { return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", i.Name) } @@ -309,5 +311,16 @@ func (i *Process) IsRemote() bool { return false } - return len(i.options.Nodes) > 0 + // If no nodes specified, it's a local instance + if len(i.options.Nodes) == 0 { + return false + } + + // If the first node is the local node, treat it as a local instance + if i.options.Nodes[0] == i.localNodeName { + return false + } + + // Otherwise, it's a remote instance + return true } diff --git a/pkg/instance/instance_test.go b/pkg/instance/instance_test.go index fc41a94..14ff78d 100644 --- a/pkg/instance/instance_test.go +++ b/pkg/instance/instance_test.go @@ -44,7 +44,7 @@ func TestNewInstance(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) if inst.Name != "test-instance" { t.Errorf("Expected name 'test-instance', got %q", inst.Name) @@ -115,7 +115,7 @@ func TestNewInstance_WithRestartOptions(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - instance := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + instance := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) opts := instance.GetOptions() // Check that explicit values override defaults @@ -164,7 +164,7 @@ func TestSetOptions(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, initialOptions, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, initialOptions, "main", mockOnStatusChange) // Update options newOptions := &instance.CreateInstanceOptions{ @@ -222,7 +222,7 @@ func TestGetProxy(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) // Get proxy for the first time proxy1, err := inst.GetProxy() @@ -277,7 +277,7 @@ func TestMarshalJSON(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - instance := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + instance := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) data, err := json.Marshal(instance) if err != nil { @@ -446,7 +446,7 @@ func TestCreateInstanceOptionsValidation(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - instance := instance.NewInstance("test", backendConfig, globalSettings, options, mockOnStatusChange) + instance := instance.NewInstance("test", backendConfig, globalSettings, options, "main", mockOnStatusChange) opts := instance.GetOptions() if opts.MaxRestarts == nil { diff --git a/pkg/instance/timeout_test.go b/pkg/instance/timeout_test.go index 1171c6a..c4cf6ae 100644 --- a/pkg/instance/timeout_test.go +++ b/pkg/instance/timeout_test.go @@ -56,7 +56,7 @@ func TestUpdateLastRequestTime(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) // Test that UpdateLastRequestTime doesn't panic inst.UpdateLastRequestTime() @@ -88,7 +88,7 @@ func TestShouldTimeout_NotRunning(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) // Instance is not running, should not timeout regardless of configuration if inst.ShouldTimeout() { @@ -132,7 +132,7 @@ func TestShouldTimeout_NoTimeoutConfigured(t *testing.T) { }, } - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) // Simulate running state inst.SetStatus(instance.Running) @@ -169,7 +169,7 @@ func TestShouldTimeout_WithinTimeLimit(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) inst.SetStatus(instance.Running) // Update last request time to now @@ -207,7 +207,7 @@ func TestShouldTimeout_ExceedsTimeLimit(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) inst.SetStatus(instance.Running) // Use MockTimeProvider to simulate old last request time @@ -263,7 +263,7 @@ func TestTimeoutConfiguration_Validation(t *testing.T) { // Mock onStatusChange function mockOnStatusChange := func(oldStatus, newStatus instance.InstanceStatus) {} - inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, mockOnStatusChange) + inst := instance.NewInstance("test-instance", backendConfig, globalSettings, options, "main", mockOnStatusChange) opts := inst.GetOptions() if opts.IdleTimeout == nil || *opts.IdleTimeout != tt.expectedTimeout { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index b944ef3..c402659 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -49,6 +49,7 @@ type instanceManager struct { ports map[int]bool instancesConfig config.InstancesConfig backendsConfig config.BackendConfig + localNodeName string // Name of the local node // Timeout checker timeoutChecker *time.Ticker @@ -63,7 +64,7 @@ type instanceManager struct { } // NewInstanceManager creates a new instance of InstanceManager. -func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig map[string]config.NodeConfig) InstanceManager { +func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig map[string]config.NodeConfig, localNodeName string) InstanceManager { if instancesConfig.TimeoutCheckInterval <= 0 { instancesConfig.TimeoutCheckInterval = 5 // Default to 5 minutes if not set } @@ -81,6 +82,7 @@ func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig con ports: make(map[int]bool), instancesConfig: instancesConfig, backendsConfig: backendsConfig, + localNodeName: localNodeName, timeoutChecker: time.NewTicker(time.Duration(instancesConfig.TimeoutCheckInterval) * time.Minute), shutdownChan: make(chan struct{}), @@ -274,7 +276,8 @@ func (im *instanceManager) loadInstance(name, path string) error { options := persistedInstance.GetOptions() // Check if this is a remote instance - isRemote := options != nil && len(options.Nodes) > 0 + // An instance is remote if Nodes is specified AND the first node is not the local node + isRemote := options != nil && len(options.Nodes) > 0 && options.Nodes[0] != im.localNodeName var statusCallback func(oldStatus, newStatus instance.InstanceStatus) if !isRemote { @@ -285,7 +288,7 @@ func (im *instanceManager) loadInstance(name, path string) error { } // Create new inst using NewInstance (handles validation, defaults, setup) - inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, statusCallback) + inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback) // Restore persisted fields that NewInstance doesn't set inst.Created = persistedInstance.Created diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index e59e2eb..e4a6329 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -34,7 +34,7 @@ func TestNewInstanceManager(t *testing.T) { TimeoutCheckInterval: 5, } - mgr := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + mgr := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") if mgr == nil { t.Fatal("NewInstanceManager returned nil") } @@ -69,7 +69,7 @@ func TestPersistence(t *testing.T) { } // Test instance persistence on creation - manager1 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + manager1 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") options := &instance.CreateInstanceOptions{ BackendType: backends.BackendTypeLlamaCpp, LlamaServerOptions: &llamacpp.LlamaServerOptions{ @@ -90,7 +90,7 @@ func TestPersistence(t *testing.T) { } // Test loading instances from disk - manager2 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + manager2 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") instances, err := manager2.ListInstances() if err != nil { t.Fatalf("ListInstances failed: %v", err) @@ -207,7 +207,7 @@ func createTestManager() manager.InstanceManager { DefaultRestartDelay: 5, TimeoutCheckInterval: 5, } - return manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + return manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") } func TestAutoRestartDisabledInstanceStatus(t *testing.T) { @@ -227,7 +227,7 @@ func TestAutoRestartDisabledInstanceStatus(t *testing.T) { } // Create first manager and instance with auto-restart disabled - manager1 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + manager1 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") autoRestart := false options := &instance.CreateInstanceOptions{ @@ -252,7 +252,7 @@ func TestAutoRestartDisabledInstanceStatus(t *testing.T) { manager1.Shutdown() // Create second manager (simulating restart of llamactl) - manager2 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + manager2 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") // Get the loaded instance loadedInst, err := manager2.GetInstance("test-instance") diff --git a/pkg/manager/operations.go b/pkg/manager/operations.go index a8b5c3f..0fbbbcb 100644 --- a/pkg/manager/operations.go +++ b/pkg/manager/operations.go @@ -99,7 +99,8 @@ func (im *instanceManager) CreateInstance(name string, options *instance.CreateI } // Check if this is a remote instance - isRemote := len(options.Nodes) > 0 + // An instance is remote if Nodes is specified AND the first node is not the local node + isRemote := len(options.Nodes) > 0 && options.Nodes[0] != im.localNodeName var nodeConfig *config.NodeConfig if isRemote { @@ -119,7 +120,7 @@ func (im *instanceManager) CreateInstance(name string, options *instance.CreateI // Create a local stub that preserves the Nodes field for tracking // We keep the original options (with Nodes) so IsRemote() works correctly - inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, nil) + inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, nil) // Update the local stub with all remote data (preserving Nodes) im.updateLocalInstanceFromRemote(inst, remoteInst) @@ -152,7 +153,7 @@ func (im *instanceManager) CreateInstance(name string, options *instance.CreateI im.onStatusChange(name, oldStatus, newStatus) } - inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, statusCallback) + inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback) im.instances[inst.Name] = inst if err := im.persistInstance(inst); err != nil { diff --git a/pkg/manager/operations_test.go b/pkg/manager/operations_test.go index fdeb44f..6743a4b 100644 --- a/pkg/manager/operations_test.go +++ b/pkg/manager/operations_test.go @@ -75,7 +75,7 @@ func TestCreateInstance_ValidationAndLimits(t *testing.T) { MaxInstances: 1, // Very low limit for testing TimeoutCheckInterval: 5, } - limitedManager := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + limitedManager := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") _, err = limitedManager.CreateInstance("instance1", options) if err != nil { diff --git a/pkg/manager/remote_ops.go b/pkg/manager/remote_ops.go index 40b2384..e98e396 100644 --- a/pkg/manager/remote_ops.go +++ b/pkg/manager/remote_ops.go @@ -10,31 +10,10 @@ import ( "net/http" ) -// stripNodesFromOptions creates a copy of the instance options without the Nodes field -// to prevent routing loops when sending requests to remote nodes -func (im *instanceManager) stripNodesFromOptions(options *instance.CreateInstanceOptions) *instance.CreateInstanceOptions { - if options == nil { - return nil - } - - // Create a copy of the options struct - optionsCopy := *options - - // Clear the Nodes field to prevent the remote node from trying to route further - optionsCopy.Nodes = nil - - return &optionsCopy -} - // makeRemoteRequest is a helper function to make HTTP requests to a remote node func (im *instanceManager) makeRemoteRequest(nodeConfig *config.NodeConfig, method, path string, body any) (*http.Response, error) { var reqBody io.Reader if body != nil { - // Strip nodes from CreateInstanceOptions to prevent routing loops - if options, ok := body.(*instance.CreateInstanceOptions); ok { - body = im.stripNodesFromOptions(options) - } - jsonData, err := json.Marshal(body) if err != nil { return nil, fmt.Errorf("failed to marshal request body: %w", err) diff --git a/pkg/manager/remote_ops_test.go b/pkg/manager/remote_ops_test.go deleted file mode 100644 index 94db40b..0000000 --- a/pkg/manager/remote_ops_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package manager - -import ( - "llamactl/pkg/backends" - "llamactl/pkg/instance" - "testing" -) - -func TestStripNodesFromOptions(t *testing.T) { - im := &instanceManager{} - - // Test nil case - if result := im.stripNodesFromOptions(nil); result != nil { - t.Errorf("Expected nil, got %+v", result) - } - - // Test main case: nodes should be stripped, other fields preserved - options := &instance.CreateInstanceOptions{ - BackendType: backends.BackendTypeLlamaCpp, - Nodes: []string{"node1", "node2"}, - Environment: map[string]string{"TEST": "value"}, - } - - result := im.stripNodesFromOptions(options) - - if result.Nodes != nil { - t.Errorf("Expected Nodes to be nil, got %+v", result.Nodes) - } - if result.BackendType != backends.BackendTypeLlamaCpp { - t.Errorf("Expected BackendType preserved") - } - if result.Environment["TEST"] != "value" { - t.Errorf("Expected Environment preserved") - } - // Original should not be modified - if len(options.Nodes) != 2 { - t.Errorf("Original options should not be modified") - } -} diff --git a/pkg/manager/timeout_test.go b/pkg/manager/timeout_test.go index 55cd781..4992370 100644 --- a/pkg/manager/timeout_test.go +++ b/pkg/manager/timeout_test.go @@ -23,7 +23,7 @@ func TestTimeoutFunctionality(t *testing.T) { MaxInstances: 5, } - manager := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}) + manager := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") if manager == nil { t.Fatal("Manager should be initialized with timeout checker") }