From e0f176de107f18154c7b2883f703fffe59f0498f Mon Sep 17 00:00:00 2001 From: LordMathis Date: Wed, 1 Oct 2025 20:25:06 +0200 Subject: [PATCH] Enhance instance manager to support remote instance management and update related tests --- cmd/server/main.go | 2 +- pkg/manager/manager.go | 31 ++++++++- pkg/manager/manager_test.go | 8 +-- pkg/manager/operations.go | 111 ++++++++++++++++++++++++--------- pkg/manager/operations_test.go | 2 +- pkg/manager/timeout_test.go | 2 +- 6 files changed, 116 insertions(+), 40 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index e245ebf..de080c7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -58,7 +58,7 @@ func main() { } // Initialize the instance manager - instanceManager := manager.NewInstanceManager(cfg.Backends, cfg.Instances) + instanceManager := manager.NewInstanceManager(cfg.Backends, cfg.Instances, cfg.Nodes) // Create a new handler with the instance manager handler := server.NewHandler(instanceManager, cfg) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e160d6c..c63391a 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -57,14 +57,23 @@ type instanceManager struct { isShutdown bool // Remote instance management - httpClient *http.Client + httpClient *http.Client + instanceNodeMap map[string]*config.NodeConfig // Maps instance name to its node config + nodeConfigMap map[string]*config.NodeConfig // Maps node name to node config for quick lookup } // NewInstanceManager creates a new instance of InstanceManager. -func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig) InstanceManager { +func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig []config.NodeConfig) InstanceManager { if instancesConfig.TimeoutCheckInterval <= 0 { instancesConfig.TimeoutCheckInterval = 5 // Default to 5 minutes if not set } + + // Build node config map for quick lookup + nodeConfigMap := make(map[string]*config.NodeConfig) + for i := range nodesConfig { + nodeConfigMap[nodesConfig[i].Name] = &nodesConfig[i] + } + im := &instanceManager{ instances: make(map[string]*instance.Process), runningInstances: make(map[string]struct{}), @@ -79,6 +88,9 @@ func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig con httpClient: &http.Client{ Timeout: 30 * time.Second, }, + + instanceNodeMap: make(map[string]*config.NodeConfig), + nodeConfigMap: nodeConfigMap, } // Load existing instances from disk @@ -316,3 +328,18 @@ func (im *instanceManager) onStatusChange(name string, oldStatus, newStatus inst delete(im.runningInstances, name) } } + +// getNodeForInstance returns the node configuration for a remote instance +// Returns nil if the instance is not remote or the node is not found +func (im *instanceManager) getNodeForInstance(inst *instance.Process) *config.NodeConfig { + if !inst.IsRemote() { + return nil + } + + // Check if we have a cached mapping + if nodeConfig, exists := im.instanceNodeMap[inst.Name]; exists { + return nodeConfig + } + + return nil +} diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index c629c63..ed9bde5 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -34,7 +34,7 @@ func TestNewInstanceManager(t *testing.T) { TimeoutCheckInterval: 5, } - mgr := manager.NewInstanceManager(backendConfig, cfg) + mgr := manager.NewInstanceManager(backendConfig, cfg, nil) if mgr == nil { t.Fatal("NewInstanceManager returned nil") } @@ -69,7 +69,7 @@ func TestPersistence(t *testing.T) { } // Test instance persistence on creation - manager1 := manager.NewInstanceManager(backendConfig, cfg) + manager1 := manager.NewInstanceManager(backendConfig, cfg, nil) options := &instance.CreateInstanceOptions{ BackendType: backends.BackendTypeLlamaCpp, LlamaServerOptions: &llamacpp.LlamaServerOptions{ @@ -90,7 +90,7 @@ func TestPersistence(t *testing.T) { } // Test loading instances from disk - manager2 := manager.NewInstanceManager(backendConfig, cfg) + manager2 := manager.NewInstanceManager(backendConfig, cfg, nil) instances, err := manager2.ListInstances() if err != nil { t.Fatalf("ListInstances failed: %v", err) @@ -207,5 +207,5 @@ func createTestManager() manager.InstanceManager { DefaultRestartDelay: 5, TimeoutCheckInterval: 5, } - return manager.NewInstanceManager(backendConfig, cfg) + return manager.NewInstanceManager(backendConfig, cfg, nil) } diff --git a/pkg/manager/operations.go b/pkg/manager/operations.go index b3c0d13..0a53f84 100644 --- a/pkg/manager/operations.go +++ b/pkg/manager/operations.go @@ -75,26 +75,37 @@ func (im *instanceManager) CreateInstance(name string, options *instance.CreateI // GetInstance retrieves an instance by its name. func (im *instanceManager) GetInstance(name string) (*instance.Process, error) { im.mu.RLock() - defer im.mu.RUnlock() + inst, exists := im.instances[name] + im.mu.RUnlock() - instance, exists := im.instances[name] if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } - return instance, nil + + // Check if instance is remote and delegate to remote operation + if node := im.getNodeForInstance(inst); node != nil { + return im.GetRemoteInstance(node, name) + } + + return inst, nil } // UpdateInstance updates the options of an existing instance and returns it. // If the instance is running, it will be restarted to apply the new options. func (im *instanceManager) UpdateInstance(name string, options *instance.CreateInstanceOptions) (*instance.Process, error) { im.mu.RLock() - instance, exists := im.instances[name] + inst, exists := im.instances[name] im.mu.RUnlock() if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } + // Check if instance is remote and delegate to remote operation + if node := im.getNodeForInstance(inst); node != nil { + return im.UpdateRemoteInstance(node, name, options) + } + if options == nil { return nil, fmt.Errorf("instance options cannot be nil") } @@ -105,55 +116,63 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.CreateI } // Check if instance is running before updating options - wasRunning := instance.IsRunning() + wasRunning := inst.IsRunning() // If the instance is running, stop it first if wasRunning { - if err := instance.Stop(); err != nil { + if err := inst.Stop(); err != nil { return nil, fmt.Errorf("failed to stop instance %s for update: %w", name, err) } } // Now update the options while the instance is stopped - instance.SetOptions(options) + inst.SetOptions(options) // If it was running before, start it again with the new options if wasRunning { - if err := instance.Start(); err != nil { + if err := inst.Start(); err != nil { return nil, fmt.Errorf("failed to start instance %s after update: %w", name, err) } } im.mu.Lock() defer im.mu.Unlock() - if err := im.persistInstance(instance); err != nil { + if err := im.persistInstance(inst); err != nil { return nil, fmt.Errorf("failed to persist updated instance %s: %w", name, err) } - return instance, nil + return inst, nil } // DeleteInstance removes stopped instance by its name. func (im *instanceManager) DeleteInstance(name string) error { im.mu.Lock() - defer im.mu.Unlock() + inst, exists := im.instances[name] + im.mu.Unlock() - instance, exists := im.instances[name] if !exists { return fmt.Errorf("instance with name %s not found", name) } - if instance.IsRunning() { + // Check if instance is remote and delegate to remote operation + if node := im.getNodeForInstance(inst); node != nil { + return im.DeleteRemoteInstance(node, name) + } + + if inst.IsRunning() { return fmt.Errorf("instance with name %s is still running, stop it before deleting", name) } - delete(im.ports, instance.GetPort()) + im.mu.Lock() + defer im.mu.Unlock() + + delete(im.ports, inst.GetPort()) delete(im.instances, name) // Delete the instance's config file if persistence is enabled - instancePath := filepath.Join(im.instancesConfig.InstancesDir, instance.Name+".json") + instancePath := filepath.Join(im.instancesConfig.InstancesDir, inst.Name+".json") if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to delete config file for instance %s: %w", instance.Name, err) + return fmt.Errorf("failed to delete config file for instance %s: %w", inst.Name, err) } return nil @@ -163,33 +182,39 @@ func (im *instanceManager) DeleteInstance(name string) error { // If the instance is already running, it returns an error. func (im *instanceManager) StartInstance(name string) (*instance.Process, error) { im.mu.RLock() - instance, exists := im.instances[name] + inst, exists := im.instances[name] maxRunningExceeded := len(im.runningInstances) >= im.instancesConfig.MaxRunningInstances && im.instancesConfig.MaxRunningInstances != -1 im.mu.RUnlock() if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } - if instance.IsRunning() { - return instance, fmt.Errorf("instance with name %s is already running", name) + + // Check if instance is remote and delegate to remote operation + if node := im.getNodeForInstance(inst); node != nil { + return im.StartRemoteInstance(node, name) + } + + if inst.IsRunning() { + return inst, fmt.Errorf("instance with name %s is already running", name) } if maxRunningExceeded { return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.instancesConfig.MaxRunningInstances)) } - if err := instance.Start(); err != nil { + if err := inst.Start(); err != nil { return nil, fmt.Errorf("failed to start instance %s: %w", name, err) } im.mu.Lock() defer im.mu.Unlock() - err := im.persistInstance(instance) + err := im.persistInstance(inst) if err != nil { return nil, fmt.Errorf("failed to persist instance %s: %w", name, err) } - return instance, nil + return inst, nil } func (im *instanceManager) IsMaxRunningInstancesReached() bool { @@ -206,49 +231,73 @@ func (im *instanceManager) IsMaxRunningInstancesReached() bool { // StopInstance stops a running instance and returns it. func (im *instanceManager) StopInstance(name string) (*instance.Process, error) { im.mu.RLock() - instance, exists := im.instances[name] + inst, exists := im.instances[name] im.mu.RUnlock() if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } - if !instance.IsRunning() { - return instance, fmt.Errorf("instance with name %s is already stopped", name) + + // Check if instance is remote and delegate to remote operation + if node := im.getNodeForInstance(inst); node != nil { + return im.StopRemoteInstance(node, name) } - if err := instance.Stop(); err != nil { + if !inst.IsRunning() { + return inst, fmt.Errorf("instance with name %s is already stopped", name) + } + + if err := inst.Stop(); err != nil { return nil, fmt.Errorf("failed to stop instance %s: %w", name, err) } im.mu.Lock() defer im.mu.Unlock() - err := im.persistInstance(instance) + err := im.persistInstance(inst) if err != nil { return nil, fmt.Errorf("failed to persist instance %s: %w", name, err) } - return instance, nil + return inst, nil } // RestartInstance stops and then starts an instance, returning the updated instance. func (im *instanceManager) RestartInstance(name string) (*instance.Process, error) { - instance, err := im.StopInstance(name) + im.mu.RLock() + inst, exists := im.instances[name] + im.mu.RUnlock() + + if !exists { + return nil, fmt.Errorf("instance with name %s not found", name) + } + + // Check if instance is remote and delegate to remote operation + if node := im.getNodeForInstance(inst); node != nil { + return im.RestartRemoteInstance(node, name) + } + + inst, err := im.StopInstance(name) if err != nil { return nil, err } - return im.StartInstance(instance.Name) + return im.StartInstance(inst.Name) } // GetInstanceLogs retrieves the logs for a specific instance by its name. func (im *instanceManager) GetInstanceLogs(name string) (string, error) { im.mu.RLock() - _, exists := im.instances[name] + inst, exists := im.instances[name] im.mu.RUnlock() if !exists { return "", fmt.Errorf("instance with name %s not found", name) } + // Check if instance is remote and delegate to remote operation + if node := im.getNodeForInstance(inst); node != nil { + return im.GetRemoteInstanceLogs(node, name) + } + // TODO: Implement actual log retrieval logic return fmt.Sprintf("Logs for instance %s", name), nil } diff --git a/pkg/manager/operations_test.go b/pkg/manager/operations_test.go index 97358c5..da26742 100644 --- a/pkg/manager/operations_test.go +++ b/pkg/manager/operations_test.go @@ -75,7 +75,7 @@ func TestCreateInstance_ValidationAndLimits(t *testing.T) { MaxInstances: 1, // Very low limit for testing TimeoutCheckInterval: 5, } - limitedManager := manager.NewInstanceManager(backendConfig, cfg) + limitedManager := manager.NewInstanceManager(backendConfig, cfg, nil) _, err = limitedManager.CreateInstance("instance1", options) if err != nil { diff --git a/pkg/manager/timeout_test.go b/pkg/manager/timeout_test.go index 08d500c..31b4298 100644 --- a/pkg/manager/timeout_test.go +++ b/pkg/manager/timeout_test.go @@ -23,7 +23,7 @@ func TestTimeoutFunctionality(t *testing.T) { MaxInstances: 5, } - manager := manager.NewInstanceManager(backendConfig, cfg) + manager := manager.NewInstanceManager(backendConfig, cfg, nil) if manager == nil { t.Fatal("Manager should be initialized with timeout checker") }