From ffb4b49c9479ad87ba73c5e88796f385929fcbd5 Mon Sep 17 00:00:00 2001 From: LordMathis Date: Mon, 20 Oct 2025 21:55:50 +0200 Subject: [PATCH] Split manager into multiple structs --- cmd/server/main.go | 2 +- pkg/manager/lifecycle.go | 152 ++++++++++++++ pkg/manager/manager.go | 341 +++++++++++-------------------- pkg/manager/manager_test.go | 12 +- pkg/manager/operations.go | 362 +++++++++++++++++---------------- pkg/manager/operations_test.go | 2 +- pkg/manager/persistence.go | 241 ++++++++++++++++++++++ pkg/manager/ports.go | 184 +++++++++++++++++ pkg/manager/registry.go | 131 ++++++++++++ pkg/manager/remote.go | 283 ++++++++++++++++++++++++++ pkg/manager/remote_ops.go | 222 -------------------- pkg/manager/timeout.go | 74 ------- pkg/manager/timeout_test.go | 2 +- 13 files changed, 1307 insertions(+), 701 deletions(-) create mode 100644 pkg/manager/lifecycle.go create mode 100644 pkg/manager/persistence.go create mode 100644 pkg/manager/ports.go create mode 100644 pkg/manager/registry.go create mode 100644 pkg/manager/remote.go delete mode 100644 pkg/manager/remote_ops.go delete mode 100644 pkg/manager/timeout.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 754b1d7..843a594 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -58,7 +58,7 @@ func main() { } // Initialize the instance manager - instanceManager := manager.NewInstanceManager(cfg.Backends, cfg.Instances, cfg.Nodes, cfg.LocalNode) + instanceManager := manager.New(cfg.Backends, cfg.Instances, cfg.Nodes, cfg.LocalNode) // Create a new handler with the instance manager handler := server.NewHandler(instanceManager, cfg) diff --git a/pkg/manager/lifecycle.go b/pkg/manager/lifecycle.go new file mode 100644 index 0000000..4045022 --- /dev/null +++ b/pkg/manager/lifecycle.go @@ -0,0 +1,152 @@ +package manager + +import ( + "fmt" + "llamactl/pkg/instance" + "log" + "sync" + "time" +) + +// lifecycleManager handles background timeout checking and LRU eviction. +// It properly coordinates shutdown to prevent races with the timeout checker. +type lifecycleManager struct { + registry *instanceRegistry + manager InstanceManager // For calling Stop/Evict operations + + ticker *time.Ticker + checkInterval time.Duration + enableLRU bool + + shutdownChan chan struct{} + shutdownDone chan struct{} + shutdownOnce sync.Once +} + +// NewLifecycleManager creates a new lifecycle manager. +func NewLifecycleManager( + registry *instanceRegistry, + manager InstanceManager, + checkInterval time.Duration, + enableLRU bool, +) *lifecycleManager { + if checkInterval <= 0 { + checkInterval = 5 * time.Minute // Default to 5 minutes + } + + return &lifecycleManager{ + registry: registry, + manager: manager, + ticker: time.NewTicker(checkInterval), + checkInterval: checkInterval, + enableLRU: enableLRU, + shutdownChan: make(chan struct{}), + shutdownDone: make(chan struct{}), + } +} + +// Start begins the timeout checking loop in a goroutine. +func (l *lifecycleManager) Start() { + go l.timeoutCheckLoop() +} + +// Stop gracefully stops the lifecycle manager. +// This ensures the timeout checker completes before instance cleanup begins. +func (l *lifecycleManager) Stop() { + l.shutdownOnce.Do(func() { + close(l.shutdownChan) + <-l.shutdownDone // Wait for checker to finish (prevents shutdown race) + l.ticker.Stop() + }) +} + +// timeoutCheckLoop is the main loop that periodically checks for timeouts. +func (l *lifecycleManager) timeoutCheckLoop() { + defer close(l.shutdownDone) // Signal completion + + for { + select { + case <-l.ticker.C: + l.checkTimeouts() + case <-l.shutdownChan: + return // Exit goroutine on shutdown + } + } +} + +// checkTimeouts checks all instances for timeout and stops those that have timed out. +func (l *lifecycleManager) checkTimeouts() { + // Get all instances from registry + instances := l.registry.List() + + var timeoutInstances []string + + // Identify instances that should timeout + for _, inst := range instances { + // Skip remote instances - they are managed by their respective nodes + if inst.IsRemote() { + continue + } + + // Only check running instances + if !l.registry.IsRunning(inst.Name) { + continue + } + + if inst.ShouldTimeout() { + timeoutInstances = append(timeoutInstances, inst.Name) + } + } + + // Stop the timed-out instances + for _, name := range timeoutInstances { + log.Printf("Instance %s has timed out, stopping it", name) + if _, err := l.manager.StopInstance(name); err != nil { + log.Printf("Error stopping instance %s: %v", name, err) + } else { + log.Printf("Instance %s stopped successfully", name) + } + } +} + +// EvictLRU finds and stops the least recently used running instance. +// This is called when max running instances limit is reached. +func (l *lifecycleManager) EvictLRU() error { + if !l.enableLRU { + return fmt.Errorf("LRU eviction is not enabled") + } + + // Get all running instances + runningInstances := l.registry.ListRunning() + + var lruInstance *instance.Instance + + for _, inst := range runningInstances { + // Skip remote instances - they are managed by their respective nodes + if inst.IsRemote() { + continue + } + + // Skip instances without idle timeout + if inst.GetOptions() != nil && inst.GetOptions().IdleTimeout != nil && *inst.GetOptions().IdleTimeout <= 0 { + continue + } + + if lruInstance == nil { + lruInstance = inst + } + + if inst.LastRequestTime() < lruInstance.LastRequestTime() { + lruInstance = inst + } + } + + if lruInstance == nil { + return fmt.Errorf("failed to find lru instance") + } + + // Evict the LRU instance + log.Printf("Evicting LRU instance %s", lruInstance.Name) + _, err := l.manager.StopInstance(lruInstance.Name) + return err +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 894b8df..5863f46 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -1,15 +1,11 @@ package manager import ( - "encoding/json" + "context" "fmt" "llamactl/pkg/config" "llamactl/pkg/instance" "log" - "net/http" - "os" - "path/filepath" - "strings" "sync" "time" ) @@ -43,243 +39,145 @@ type RemoteManager interface { } type instanceManager struct { - mu sync.RWMutex - instances map[string]*instance.Instance - runningInstances map[string]struct{} - ports map[int]bool - instancesConfig config.InstancesConfig - backendsConfig config.BackendConfig - localNodeName string // Name of the local node + // Components (each with own synchronization) + registry *instanceRegistry + ports *portAllocator + persistence *instancePersister + remote *remoteManager + lifecycle *lifecycleManager - // Timeout checker - timeoutChecker *time.Ticker - shutdownChan chan struct{} - shutdownDone chan struct{} - isShutdown bool + // Configuration + instancesConfig config.InstancesConfig + backendsConfig config.BackendConfig + localNodeName string // Name of the local node - // Remote instance management - httpClient *http.Client - instanceNodeMap map[string]*config.NodeConfig // Maps instance name to its node config - nodeConfigMap map[string]*config.NodeConfig // Maps node name to node config for quick lookup + // Synchronization + operationMu sync.Mutex // Protects start/stop/update/delete/restart operations + shutdownOnce sync.Once } -// NewInstanceManager creates a new instance of InstanceManager. -func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig map[string]config.NodeConfig, localNodeName string) InstanceManager { +// New creates a new instance of InstanceManager. +func New(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig map[string]config.NodeConfig, localNodeName string) InstanceManager { if instancesConfig.TimeoutCheckInterval <= 0 { instancesConfig.TimeoutCheckInterval = 5 // Default to 5 minutes if not set } - // Build node config map for quick lookup - nodeConfigMap := make(map[string]*config.NodeConfig) - for name := range nodesConfig { - nodeCopy := nodesConfig[name] - nodeConfigMap[name] = &nodeCopy + // Initialize components + registry := NewInstanceRegistry() + + // Initialize port allocator + portRange := instancesConfig.PortRange + ports, err := NewPortAllocator(portRange[0], portRange[1]) + if err != nil { + log.Fatalf("Failed to create port allocator: %v", err) } + // Initialize persistence + persistence, err := NewInstancePersister(instancesConfig.InstancesDir) + if err != nil { + log.Fatalf("Failed to create instance persister: %v", err) + } + + // Initialize remote manager + remote := NewRemoteManager(nodesConfig, 30*time.Second) + + // Create manager instance im := &instanceManager{ - instances: make(map[string]*instance.Instance), - runningInstances: make(map[string]struct{}), - ports: make(map[int]bool), - instancesConfig: instancesConfig, - backendsConfig: backendsConfig, - localNodeName: localNodeName, - - timeoutChecker: time.NewTicker(time.Duration(instancesConfig.TimeoutCheckInterval) * time.Minute), - shutdownChan: make(chan struct{}), - shutdownDone: make(chan struct{}), - - httpClient: &http.Client{ - Timeout: 30 * time.Second, - }, - - instanceNodeMap: make(map[string]*config.NodeConfig), - nodeConfigMap: nodeConfigMap, + registry: registry, + ports: ports, + persistence: persistence, + remote: remote, + instancesConfig: instancesConfig, + backendsConfig: backendsConfig, + localNodeName: localNodeName, } + // Initialize lifecycle manager (needs reference to manager for Stop/Evict operations) + checkInterval := time.Duration(instancesConfig.TimeoutCheckInterval) * time.Minute + im.lifecycle = NewLifecycleManager(registry, im, checkInterval, true) + // Load existing instances from disk if err := im.loadInstances(); err != nil { log.Printf("Error loading instances: %v", err) } - // Start the timeout checker goroutine after initialization is complete - go func() { - defer close(im.shutdownDone) - - for { - select { - case <-im.timeoutChecker.C: - im.checkAllTimeouts() - case <-im.shutdownChan: - return // Exit goroutine on shutdown - } - } - }() + // Start the lifecycle manager + im.lifecycle.Start() return im } -func (im *instanceManager) getNextAvailablePort() (int, error) { - portRange := im.instancesConfig.PortRange - - for port := portRange[0]; port <= portRange[1]; port++ { - if !im.ports[port] { - im.ports[port] = true - return port, nil - } - } - - return 0, fmt.Errorf("no available ports in the specified range") -} - -// persistInstance saves an instance to its JSON file -func (im *instanceManager) persistInstance(instance *instance.Instance) error { - if im.instancesConfig.InstancesDir == "" { - return nil // Persistence disabled - } - - instancePath := filepath.Join(im.instancesConfig.InstancesDir, instance.Name+".json") - tempPath := instancePath + ".tmp" - - // Serialize instance to JSON - jsonData, err := json.MarshalIndent(instance, "", " ") - if err != nil { - return fmt.Errorf("failed to marshal instance %s: %w", instance.Name, err) - } - - // Write to temporary file first - if err := os.WriteFile(tempPath, jsonData, 0644); err != nil { - return fmt.Errorf("failed to write temp file for instance %s: %w", instance.Name, err) - } - - // Atomic rename - if err := os.Rename(tempPath, instancePath); err != nil { - os.Remove(tempPath) // Clean up temp file - return fmt.Errorf("failed to rename temp file for instance %s: %w", instance.Name, err) - } - - return nil +// persistInstance saves an instance using the persistence component +func (im *instanceManager) persistInstance(inst *instance.Instance) error { + return im.persistence.Save(inst) } func (im *instanceManager) Shutdown() { - im.mu.Lock() + im.shutdownOnce.Do(func() { + // 1. Stop lifecycle manager (stops timeout checker) + im.lifecycle.Stop() - // Check if already shutdown - if im.isShutdown { - im.mu.Unlock() - return - } - im.isShutdown = true + // 2. Get running instances (no lock needed - registry handles it) + running := im.registry.ListRunning() - // Signal the timeout checker to stop - close(im.shutdownChan) - - // Create a list of running instances to stop - var runningInstances []*instance.Instance - var runningNames []string - for name, inst := range im.instances { - if inst.IsRunning() { - runningInstances = append(runningInstances, inst) - runningNames = append(runningNames, name) - } - } - - // Release lock before stopping instances to avoid deadlock - im.mu.Unlock() - - // Wait for the timeout checker goroutine to actually stop - <-im.shutdownDone - - // Now stop the ticker - if im.timeoutChecker != nil { - im.timeoutChecker.Stop() - } - - // Stop instances without holding the manager lock - var wg sync.WaitGroup - wg.Add(len(runningInstances)) - - for i, inst := range runningInstances { - go func(name string, inst *instance.Instance) { - defer wg.Done() - fmt.Printf("Stopping instance %s...\n", name) - // Attempt to stop the instance gracefully - if err := inst.Stop(); err != nil { - fmt.Printf("Error stopping instance %s: %v\n", name, err) + // 3. Stop local instances concurrently + var wg sync.WaitGroup + for _, inst := range running { + if inst.IsRemote() { + continue // Skip remote instances } - }(runningNames[i], inst) - } - - wg.Wait() - fmt.Println("All instances stopped.") + wg.Add(1) + go func(inst *instance.Instance) { + defer wg.Done() + fmt.Printf("Stopping instance %s...\n", inst.Name) + if err := inst.Stop(); err != nil { + fmt.Printf("Error stopping instance %s: %v\n", inst.Name, err) + } + }(inst) + } + wg.Wait() + fmt.Println("All instances stopped.") + }) } -// loadInstances restores all instances from disk +// loadInstances restores all instances from disk using the persistence component func (im *instanceManager) loadInstances() error { - if im.instancesConfig.InstancesDir == "" { - return nil // Persistence disabled - } - - // Check if instances directory exists - if _, err := os.Stat(im.instancesConfig.InstancesDir); os.IsNotExist(err) { - return nil // No instances directory, start fresh - } - - // Read all JSON files from instances directory - files, err := os.ReadDir(im.instancesConfig.InstancesDir) + // Load all instances from persistence + instances, err := im.persistence.LoadAll() if err != nil { - return fmt.Errorf("failed to read instances directory: %w", err) + return fmt.Errorf("failed to load instances: %w", err) } - loadedCount := 0 - for _, file := range files { - if file.IsDir() || !strings.HasSuffix(file.Name(), ".json") { + if len(instances) == 0 { + return nil + } + + // Process each loaded instance + for _, persistedInst := range instances { + if err := im.loadInstance(persistedInst); err != nil { + log.Printf("Failed to load instance %s: %v", persistedInst.Name, err) continue } - - instanceName := strings.TrimSuffix(file.Name(), ".json") - instancePath := filepath.Join(im.instancesConfig.InstancesDir, file.Name()) - - if err := im.loadInstance(instanceName, instancePath); err != nil { - log.Printf("Failed to load instance %s: %v", instanceName, err) - continue - } - - loadedCount++ } - if loadedCount > 0 { - log.Printf("Loaded %d instances from persistence", loadedCount) - // Auto-start instances that have auto-restart enabled - go im.autoStartInstances() - } + log.Printf("Loaded %d instances from persistence", len(instances)) + + // Auto-start instances that have auto-restart enabled + go im.autoStartInstances() return nil } -// loadInstance loads a single instance from its JSON file -func (im *instanceManager) loadInstance(name, path string) error { - data, err := os.ReadFile(path) - if err != nil { - return fmt.Errorf("failed to read instance file: %w", err) - } - - var persistedInstance instance.Instance - if err := json.Unmarshal(data, &persistedInstance); err != nil { - return fmt.Errorf("failed to unmarshal instance: %w", err) - } - - // Validate the instance name matches the filename - if persistedInstance.Name != name { - return fmt.Errorf("instance name mismatch: file=%s, instance.Name=%s", name, persistedInstance.Name) - } - - options := persistedInstance.GetOptions() +// loadInstance loads a single persisted instance and adds it to the registry +func (im *instanceManager) loadInstance(persistedInst *instance.Instance) error { + name := persistedInst.Name + options := persistedInst.GetOptions() // Check if this is a remote instance (local node not in the Nodes set) var isRemote bool var nodeName string if options != nil { - if _, isLocal := options.Nodes[im.localNodeName]; !isLocal { + if _, isLocal := options.Nodes[im.localNodeName]; !isLocal && len(options.Nodes) > 0 { // Get the first node from the set for node := range options.Nodes { nodeName = node @@ -293,7 +191,7 @@ func (im *instanceManager) loadInstance(name, path string) error { if !isRemote { // Only set status callback for local instances statusCallback = func(oldStatus, newStatus instance.Status) { - im.onStatusChange(persistedInstance.Name, oldStatus, newStatus) + im.onStatusChange(name, oldStatus, newStatus) } } @@ -301,38 +199,42 @@ func (im *instanceManager) loadInstance(name, path string) error { inst := instance.New(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback) // Restore persisted fields that NewInstance doesn't set - inst.Created = persistedInstance.Created - inst.SetStatus(persistedInstance.GetStatus()) + inst.Created = persistedInst.Created + inst.SetStatus(persistedInst.GetStatus()) // Handle remote instance mapping if isRemote { - nodeConfig, exists := im.nodeConfigMap[nodeName] - if !exists { - return fmt.Errorf("node %s not found for remote instance %s", nodeName, name) + // Map instance to node in remote manager + if err := im.remote.SetInstanceNode(name, nodeName); err != nil { + return fmt.Errorf("failed to set instance node: %w", err) } - im.instanceNodeMap[name] = nodeConfig } else { - // Check for port conflicts only for local instances + // Allocate port for local instances if inst.GetPort() > 0 { port := inst.GetPort() - if im.ports[port] { - return fmt.Errorf("port conflict: instance %s wants port %d which is already in use", name, port) + if err := im.ports.AllocateSpecific(port, name); err != nil { + return fmt.Errorf("port conflict: instance %s wants port %d which is already in use: %w", name, port, err) } - im.ports[port] = true } } - im.instances[name] = inst + // Add instance to registry + if err := im.registry.Add(inst); err != nil { + return fmt.Errorf("failed to add instance to registry: %w", err) + } + return nil } // autoStartInstances starts instances that were running when persisted and have auto-restart enabled // For instances with auto-restart disabled, it sets their status to Stopped func (im *instanceManager) autoStartInstances() { - im.mu.RLock() + instances := im.registry.List() + var instancesToStart []*instance.Instance var instancesToStop []*instance.Instance - for _, inst := range im.instances { + + for _, inst := range instances { if inst.IsRunning() && // Was running when persisted inst.GetOptions() != nil && inst.GetOptions().AutoRestart != nil { @@ -344,12 +246,12 @@ func (im *instanceManager) autoStartInstances() { } } } - im.mu.RUnlock() // Stop instances that have auto-restart disabled for _, inst := range instancesToStop { log.Printf("Instance %s was running but auto-restart is disabled, setting status to stopped", inst.Name) inst.SetStatus(instance.Stopped) + im.registry.MarkStopped(inst.Name) } // Start instances that have auto-restart enabled @@ -357,11 +259,13 @@ func (im *instanceManager) autoStartInstances() { log.Printf("Auto-starting instance %s", inst.Name) // Reset running state before starting (since Start() expects stopped instance) inst.SetStatus(instance.Stopped) + im.registry.MarkStopped(inst.Name) // Check if this is a remote instance - if node := im.getNodeForInstance(inst); node != nil { - // Remote instance - use StartRemoteInstance - if _, err := im.StartRemoteInstance(node, inst.Name); err != nil { + if node, exists := im.remote.GetNodeForInstance(inst.Name); exists && node != nil { + // Remote instance - use remote manager with context + ctx := context.Background() + if _, err := im.remote.StartInstance(ctx, node, inst.Name); err != nil { log.Printf("Failed to auto-start remote instance %s: %v", inst.Name, err) } } else { @@ -374,13 +278,10 @@ func (im *instanceManager) autoStartInstances() { } func (im *instanceManager) onStatusChange(name string, oldStatus, newStatus instance.Status) { - im.mu.Lock() - defer im.mu.Unlock() - if newStatus == instance.Running { - im.runningInstances[name] = struct{}{} + im.registry.MarkRunning(name) } else { - delete(im.runningInstances, name) + im.registry.MarkStopped(name) } } @@ -391,8 +292,8 @@ func (im *instanceManager) getNodeForInstance(inst *instance.Instance) *config.N return nil } - // Check if we have a cached mapping - if nodeConfig, exists := im.instanceNodeMap[inst.Name]; exists { + // Check if we have a node mapping in remote manager + if nodeConfig, exists := im.remote.GetNodeForInstance(inst.Name); exists { return nodeConfig } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 8c1be1c..997e0f6 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -33,7 +33,7 @@ func TestNewInstanceManager(t *testing.T) { TimeoutCheckInterval: 5, } - mgr := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + mgr := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") if mgr == nil { t.Fatal("NewInstanceManager returned nil") } @@ -68,7 +68,7 @@ func TestPersistence(t *testing.T) { } // Test instance persistence on creation - manager1 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + manager1 := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") options := &instance.Options{ BackendOptions: backends.Options{ BackendType: backends.BackendTypeLlamaCpp, @@ -91,7 +91,7 @@ func TestPersistence(t *testing.T) { } // Test loading instances from disk - manager2 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + manager2 := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") instances, err := manager2.ListInstances() if err != nil { t.Fatalf("ListInstances failed: %v", err) @@ -212,7 +212,7 @@ func createTestManager() manager.InstanceManager { DefaultRestartDelay: 5, TimeoutCheckInterval: 5, } - return manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + return manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") } func TestAutoRestartDisabledInstanceStatus(t *testing.T) { @@ -232,7 +232,7 @@ func TestAutoRestartDisabledInstanceStatus(t *testing.T) { } // Create first manager and instance with auto-restart disabled - manager1 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + manager1 := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") autoRestart := false options := &instance.Options{ @@ -259,7 +259,7 @@ func TestAutoRestartDisabledInstanceStatus(t *testing.T) { manager1.Shutdown() // Create second manager (simulating restart of llamactl) - manager2 := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + manager2 := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") // Get the loaded instance loadedInst, err := manager2.GetInstance("test-instance") diff --git a/pkg/manager/operations.go b/pkg/manager/operations.go index fd150e8..51099ad 100644 --- a/pkg/manager/operations.go +++ b/pkg/manager/operations.go @@ -1,44 +1,28 @@ package manager import ( + "context" "fmt" "llamactl/pkg/instance" "llamactl/pkg/validation" - "os" - "path/filepath" + "log" ) type MaxRunningInstancesError error // updateLocalInstanceFromRemote updates the local stub instance with data from the remote instance -// while preserving the Nodes field to maintain remote instance tracking func (im *instanceManager) updateLocalInstanceFromRemote(localInst *instance.Instance, remoteInst *instance.Instance) { if localInst == nil || remoteInst == nil { return } - // Get the remote instance options remoteOptions := remoteInst.GetOptions() if remoteOptions == nil { return } - // Preserve the Nodes field from the local instance - localOptions := localInst.GetOptions() - var preservedNodes map[string]struct{} - if localOptions != nil && len(localOptions.Nodes) > 0 { - preservedNodes = make(map[string]struct{}, len(localOptions.Nodes)) - for node := range localOptions.Nodes { - preservedNodes[node] = struct{}{} - } - } - - // Create a copy of remote options and restore the Nodes field - updatedOptions := *remoteOptions - updatedOptions.Nodes = preservedNodes - // Update the local instance with all remote data - localInst.SetOptions(&updatedOptions) + localInst.SetOptions(remoteOptions) localInst.SetStatus(remoteInst.GetStatus()) localInst.Created = remoteInst.Created } @@ -46,17 +30,13 @@ func (im *instanceManager) updateLocalInstanceFromRemote(localInst *instance.Ins // ListInstances returns a list of all instances managed by the instance manager. // For remote instances, this fetches the live state from remote nodes and updates local stubs. func (im *instanceManager) ListInstances() ([]*instance.Instance, error) { - im.mu.RLock() - localInstances := make([]*instance.Instance, 0, len(im.instances)) - for _, inst := range im.instances { - localInstances = append(localInstances, inst) - } - im.mu.RUnlock() + instances := im.registry.List() // Update remote instances with live state - for _, inst := range localInstances { + ctx := context.Background() + for _, inst := range instances { if node := im.getNodeForInstance(inst); node != nil { - remoteInst, err := im.GetRemoteInstance(node, inst.Name) + remoteInst, err := im.remote.GetInstance(ctx, node, inst.Name) if err != nil { // Log error but continue with stale data // Don't fail the entire list operation due to one remote failure @@ -64,13 +44,11 @@ func (im *instanceManager) ListInstances() ([]*instance.Instance, error) { } // Update the local stub with all remote data (preserving Nodes) - im.mu.Lock() im.updateLocalInstanceFromRemote(inst, remoteInst) - im.mu.Unlock() } } - return localInstances, nil + return instances, nil } // CreateInstance creates a new instance with the given options and returns it. @@ -90,11 +68,8 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options return nil, err } - im.mu.Lock() - defer im.mu.Unlock() - // Check if instance with this name already exists (must be globally unique) - if im.instances[name] != nil { + if _, exists := im.registry.Get(name); exists { return nil, fmt.Errorf("instance with name %s already exists", name) } @@ -107,14 +82,18 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options break } - // Validate that the node exists - nodeConfig, exists := im.nodeConfigMap[nodeName] + // Create the remote instance on the remote node + ctx := context.Background() + nodeConfig, exists := im.remote.GetNodeForInstance(nodeName) if !exists { - return nil, fmt.Errorf("node %s not found", nodeName) + // Try to set the node if it doesn't exist yet + if err := im.remote.SetInstanceNode(name, nodeName); err != nil { + return nil, fmt.Errorf("node %s not found", nodeName) + } + nodeConfig, _ = im.remote.GetNodeForInstance(name) } - // Create the remote instance on the remote node - remoteInst, err := im.CreateRemoteInstance(nodeConfig, name, options) + remoteInst, err := im.remote.CreateInstance(ctx, nodeConfig, name, options) if err != nil { return nil, err } @@ -126,12 +105,20 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options // Update the local stub with all remote data (preserving Nodes) im.updateLocalInstanceFromRemote(inst, remoteInst) - // Add to local tracking maps (but don't count towards limits) - im.instances[name] = inst - im.instanceNodeMap[name] = nodeConfig + // Map instance to node + if err := im.remote.SetInstanceNode(name, nodeName); err != nil { + return nil, fmt.Errorf("failed to map instance to node: %w", err) + } + + // Add to registry (doesn't count towards local limits) + if err := im.registry.Add(inst); err != nil { + return nil, fmt.Errorf("failed to add instance to registry: %w", err) + } // Persist the remote instance locally for tracking across restarts if err := im.persistInstance(inst); err != nil { + // Rollback: remove from registry + im.registry.Remove(name) return nil, fmt.Errorf("failed to persist remote instance %s: %w", name, err) } @@ -140,14 +127,34 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options // Local instance creation // Check max instances limit for local instances only - localInstanceCount := len(im.instances) - len(im.instanceNodeMap) + totalInstances := im.registry.Count() + remoteCount := 0 + for _, inst := range im.registry.List() { + if inst.IsRemote() { + remoteCount++ + } + } + localInstanceCount := totalInstances - remoteCount if localInstanceCount >= im.instancesConfig.MaxInstances && im.instancesConfig.MaxInstances != -1 { return nil, fmt.Errorf("maximum number of instances (%d) reached", im.instancesConfig.MaxInstances) } // Assign and validate port for backend-specific options - if err := im.assignAndValidatePort(options); err != nil { - return nil, err + currentPort := im.getPortFromOptions(options) + var allocatedPort int + if currentPort == 0 { + // Allocate a port if not specified + allocatedPort, err = im.ports.Allocate(name) + if err != nil { + return nil, fmt.Errorf("failed to allocate port: %w", err) + } + im.setPortInOptions(options, allocatedPort) + } else { + // Use the specified port + if err := im.ports.AllocateSpecific(currentPort, name); err != nil { + return nil, fmt.Errorf("port %d is already in use: %w", currentPort, err) + } + allocatedPort = currentPort } statusCallback := func(oldStatus, newStatus instance.Status) { @@ -155,10 +162,17 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options } inst := instance.New(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback) - im.instances[inst.Name] = inst + // Add to registry + if err := im.registry.Add(inst); err != nil { + // Rollback: release port + im.ports.Release(allocatedPort) + return nil, fmt.Errorf("failed to add instance to registry: %w", err) + } + + // Persist instance (best-effort, don't fail if persistence fails) if err := im.persistInstance(inst); err != nil { - return nil, fmt.Errorf("failed to persist instance %s: %w", name, err) + log.Printf("Warning: failed to persist instance %s: %v", name, err) } return inst, nil @@ -167,25 +181,21 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options // GetInstance retrieves an instance by its name. // For remote instances, this fetches the live state from the remote node and updates the local stub. func (im *instanceManager) GetInstance(name string) (*instance.Instance, error) { - im.mu.RLock() - inst, exists := im.instances[name] - im.mu.RUnlock() - + inst, exists := im.registry.Get(name) if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } // Check if instance is remote and fetch live state if node := im.getNodeForInstance(inst); node != nil { - remoteInst, err := im.GetRemoteInstance(node, name) + ctx := context.Background() + remoteInst, err := im.remote.GetInstance(ctx, node, name) if err != nil { return nil, err } // Update the local stub with all remote data (preserving Nodes) - im.mu.Lock() im.updateLocalInstanceFromRemote(inst, remoteInst) - im.mu.Unlock() // Return the local stub (preserving Nodes field) return inst, nil @@ -197,29 +207,23 @@ func (im *instanceManager) GetInstance(name string) (*instance.Instance, error) // UpdateInstance updates the options of an existing instance and returns it. // If the instance is running, it will be restarted to apply the new options. func (im *instanceManager) UpdateInstance(name string, options *instance.Options) (*instance.Instance, error) { - im.mu.RLock() - inst, exists := im.instances[name] - im.mu.RUnlock() - + inst, exists := im.registry.Get(name) if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } // Check if instance is remote and delegate to remote operation if node := im.getNodeForInstance(inst); node != nil { - remoteInst, err := im.UpdateRemoteInstance(node, name, options) + ctx := context.Background() + remoteInst, err := im.remote.UpdateInstance(ctx, node, name, options) if err != nil { return nil, err } // Update the local stub with all remote data (preserving Nodes) - im.mu.Lock() im.updateLocalInstanceFromRemote(inst, remoteInst) - im.mu.Unlock() // Persist the updated remote instance locally - im.mu.Lock() - defer im.mu.Unlock() if err := im.persistInstance(inst); err != nil { return nil, fmt.Errorf("failed to persist updated remote instance %s: %w", name, err) } @@ -236,6 +240,42 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.Options return nil, err } + // Lock for local instance operations to prevent races + im.operationMu.Lock() + defer im.operationMu.Unlock() + + // Handle port changes + oldPort := inst.GetPort() + newPort := im.getPortFromOptions(options) + var allocatedPort int + + if newPort != oldPort { + // Port is changing - need to release old and allocate new + if newPort == 0 { + // Auto-allocate new port + allocatedPort, err = im.ports.Allocate(name) + if err != nil { + return nil, fmt.Errorf("failed to allocate new port: %w", err) + } + im.setPortInOptions(options, allocatedPort) + } else { + // Use specified port + if err := im.ports.AllocateSpecific(newPort, name); err != nil { + return nil, fmt.Errorf("failed to allocate port %d: %w", newPort, err) + } + allocatedPort = newPort + } + + // Release old port + if oldPort > 0 { + if err := im.ports.Release(oldPort); err != nil { + // Rollback new port allocation + im.ports.Release(allocatedPort) + return nil, fmt.Errorf("failed to release old port %d: %w", oldPort, err) + } + } + } + // Check if instance is running before updating options wasRunning := inst.IsRunning() @@ -256,8 +296,6 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.Options } } - im.mu.Lock() - defer im.mu.Unlock() if err := im.persistInstance(inst); err != nil { return nil, fmt.Errorf("failed to persist updated instance %s: %w", name, err) } @@ -267,60 +305,50 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.Options // DeleteInstance removes stopped instance by its name. func (im *instanceManager) DeleteInstance(name string) error { - im.mu.Lock() - inst, exists := im.instances[name] - im.mu.Unlock() - + inst, exists := im.registry.Get(name) if !exists { return fmt.Errorf("instance with name %s not found", name) } // Check if instance is remote and delegate to remote operation if node := im.getNodeForInstance(inst); node != nil { - err := im.DeleteRemoteInstance(node, name) + ctx := context.Background() + err := im.remote.DeleteInstance(ctx, node, name) if err != nil { return err } // Clean up local tracking - im.mu.Lock() - defer im.mu.Unlock() - delete(im.instances, name) - delete(im.instanceNodeMap, name) + im.remote.RemoveInstance(name) + im.registry.Remove(name) - // Delete the instance's config file if persistence is enabled - // Re-validate instance name for security (defense in depth) - validatedName, err := validation.ValidateInstanceName(name) - if err != nil { - return fmt.Errorf("invalid instance name for file deletion: %w", err) - } - instancePath := filepath.Join(im.instancesConfig.InstancesDir, validatedName+".json") - if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to delete config file for remote instance %s: %w", validatedName, err) + // Delete the instance's persistence file + if err := im.persistence.Delete(name); err != nil { + return fmt.Errorf("failed to delete config file for remote instance %s: %w", name, err) } return nil } + // Lock for local instance operations to prevent races + im.operationMu.Lock() + defer im.operationMu.Unlock() + if inst.IsRunning() { return fmt.Errorf("instance with name %s is still running, stop it before deleting", name) } - im.mu.Lock() - defer im.mu.Unlock() + // Release port (use ReleaseByInstance for proper cleanup) + im.ports.ReleaseByInstance(name) - delete(im.ports, inst.GetPort()) - delete(im.instances, name) - - // Delete the instance's config file if persistence is enabled - // Re-validate instance name for security (defense in depth) - validatedName, err := validation.ValidateInstanceName(inst.Name) - if err != nil { - return fmt.Errorf("invalid instance name for file deletion: %w", err) + // Remove from registry + if err := im.registry.Remove(name); err != nil { + return fmt.Errorf("failed to remove instance from registry: %w", err) } - instancePath := filepath.Join(im.instancesConfig.InstancesDir, validatedName+".json") - if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to delete config file for instance %s: %w", validatedName, err) + + // Delete persistence file + if err := im.persistence.Delete(name); err != nil { + return fmt.Errorf("failed to delete config file for instance %s: %w", name, err) } return nil @@ -329,45 +357,35 @@ func (im *instanceManager) DeleteInstance(name string) error { // StartInstance starts a stopped instance and returns it. // If the instance is already running, it returns an error. func (im *instanceManager) StartInstance(name string) (*instance.Instance, error) { - im.mu.RLock() - inst, exists := im.instances[name] - im.mu.RUnlock() - + inst, exists := im.registry.Get(name) if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } // Check if instance is remote and delegate to remote operation if node := im.getNodeForInstance(inst); node != nil { - remoteInst, err := im.StartRemoteInstance(node, name) + ctx := context.Background() + remoteInst, err := im.remote.StartInstance(ctx, node, name) if err != nil { return nil, err } // Update the local stub with all remote data (preserving Nodes) - im.mu.Lock() im.updateLocalInstanceFromRemote(inst, remoteInst) - im.mu.Unlock() return inst, nil } + // Lock for local instance operations to prevent races + im.operationMu.Lock() + defer im.operationMu.Unlock() + if inst.IsRunning() { return inst, fmt.Errorf("instance with name %s is already running", name) } // Check max running instances limit for local instances only - im.mu.RLock() - localRunningCount := 0 - for instName := range im.runningInstances { - if _, isRemote := im.instanceNodeMap[instName]; !isRemote { - localRunningCount++ - } - } - maxRunningExceeded := localRunningCount >= im.instancesConfig.MaxRunningInstances && im.instancesConfig.MaxRunningInstances != -1 - im.mu.RUnlock() - - if maxRunningExceeded { + if im.IsMaxRunningInstancesReached() { return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.instancesConfig.MaxRunningInstances)) } @@ -375,52 +393,55 @@ func (im *instanceManager) StartInstance(name string) (*instance.Instance, error return nil, fmt.Errorf("failed to start instance %s: %w", name, err) } - im.mu.Lock() - defer im.mu.Unlock() - err := im.persistInstance(inst) - if err != nil { - return nil, fmt.Errorf("failed to persist instance %s: %w", name, err) + // Persist instance (best-effort, don't fail if persistence fails) + if err := im.persistInstance(inst); err != nil { + log.Printf("Warning: failed to persist instance %s: %v", name, err) } return inst, nil } func (im *instanceManager) IsMaxRunningInstancesReached() bool { - im.mu.RLock() - defer im.mu.RUnlock() - - if im.instancesConfig.MaxRunningInstances != -1 && len(im.runningInstances) >= im.instancesConfig.MaxRunningInstances { - return true + if im.instancesConfig.MaxRunningInstances == -1 { + return false } - return false + // Count only local running instances (each node has its own limits) + localRunningCount := 0 + for _, inst := range im.registry.ListRunning() { + if !inst.IsRemote() { + localRunningCount++ + } + } + + return localRunningCount >= im.instancesConfig.MaxRunningInstances } // StopInstance stops a running instance and returns it. func (im *instanceManager) StopInstance(name string) (*instance.Instance, error) { - im.mu.RLock() - inst, exists := im.instances[name] - im.mu.RUnlock() - + inst, exists := im.registry.Get(name) if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } // Check if instance is remote and delegate to remote operation if node := im.getNodeForInstance(inst); node != nil { - remoteInst, err := im.StopRemoteInstance(node, name) + ctx := context.Background() + remoteInst, err := im.remote.StopInstance(ctx, node, name) if err != nil { return nil, err } // Update the local stub with all remote data (preserving Nodes) - im.mu.Lock() im.updateLocalInstanceFromRemote(inst, remoteInst) - im.mu.Unlock() return inst, nil } + // Lock for local instance operations to prevent races + im.operationMu.Lock() + defer im.operationMu.Unlock() + if !inst.IsRunning() { return inst, fmt.Errorf("instance with name %s is already stopped", name) } @@ -429,11 +450,9 @@ func (im *instanceManager) StopInstance(name string) (*instance.Instance, error) return nil, fmt.Errorf("failed to stop instance %s: %w", name, err) } - im.mu.Lock() - defer im.mu.Unlock() - err := im.persistInstance(inst) - if err != nil { - return nil, fmt.Errorf("failed to persist instance %s: %w", name, err) + // Persist instance (best-effort, don't fail if persistence fails) + if err := im.persistInstance(inst); err != nil { + log.Printf("Warning: failed to persist instance %s: %v", name, err) } return inst, nil @@ -441,49 +460,60 @@ func (im *instanceManager) StopInstance(name string) (*instance.Instance, error) // RestartInstance stops and then starts an instance, returning the updated instance. func (im *instanceManager) RestartInstance(name string) (*instance.Instance, error) { - im.mu.RLock() - inst, exists := im.instances[name] - im.mu.RUnlock() - + inst, exists := im.registry.Get(name) if !exists { return nil, fmt.Errorf("instance with name %s not found", name) } // Check if instance is remote and delegate to remote operation if node := im.getNodeForInstance(inst); node != nil { - remoteInst, err := im.RestartRemoteInstance(node, name) + ctx := context.Background() + remoteInst, err := im.remote.RestartInstance(ctx, node, name) if err != nil { return nil, err } // Update the local stub with all remote data (preserving Nodes) - im.mu.Lock() im.updateLocalInstanceFromRemote(inst, remoteInst) - im.mu.Unlock() return inst, nil } - inst, err := im.StopInstance(name) - if err != nil { - return nil, err + // Lock for the entire restart operation to ensure atomicity + im.operationMu.Lock() + defer im.operationMu.Unlock() + + // Stop the instance + if inst.IsRunning() { + if err := inst.Stop(); err != nil { + return nil, fmt.Errorf("failed to stop instance %s: %w", name, err) + } } - return im.StartInstance(inst.Name) + + // Start the instance + if err := inst.Start(); err != nil { + return nil, fmt.Errorf("failed to start instance %s: %w", name, err) + } + + // Persist the restarted instance + if err := im.persistInstance(inst); err != nil { + log.Printf("Warning: failed to persist instance %s: %v", name, err) + } + + return inst, nil } // GetInstanceLogs retrieves the logs for a specific instance by its name. func (im *instanceManager) GetInstanceLogs(name string, numLines int) (string, error) { - im.mu.RLock() - inst, exists := im.instances[name] - im.mu.RUnlock() - + inst, exists := im.registry.Get(name) if !exists { return "", fmt.Errorf("instance with name %s not found", name) } // Check if instance is remote and delegate to remote operation if node := im.getNodeForInstance(inst); node != nil { - return im.GetRemoteInstanceLogs(node, name, numLines) + ctx := context.Background() + return im.remote.GetInstanceLogs(ctx, node, name, numLines) } // Get logs from the local instance @@ -500,27 +530,7 @@ func (im *instanceManager) setPortInOptions(options *instance.Options, port int) options.BackendOptions.SetPort(port) } -// assignAndValidatePort assigns a port if not specified and validates it's not in use -func (im *instanceManager) assignAndValidatePort(options *instance.Options) error { - currentPort := im.getPortFromOptions(options) - - if currentPort == 0 { - // Assign a port if not specified - port, err := im.getNextAvailablePort() - if err != nil { - return fmt.Errorf("failed to get next available port: %w", err) - } - im.setPortInOptions(options, port) - // Mark the port as used - im.ports[port] = true - } else { - // Validate the specified port - if _, exists := im.ports[currentPort]; exists { - return fmt.Errorf("port %d is already in use", currentPort) - } - // Mark the port as used - im.ports[currentPort] = true - } - - return nil +// EvictLRUInstance finds and stops the least recently used running instance. +func (im *instanceManager) EvictLRUInstance() error { + return im.lifecycle.EvictLRU() } diff --git a/pkg/manager/operations_test.go b/pkg/manager/operations_test.go index 3a0651d..7523c5b 100644 --- a/pkg/manager/operations_test.go +++ b/pkg/manager/operations_test.go @@ -78,7 +78,7 @@ func TestCreateInstance_ValidationAndLimits(t *testing.T) { MaxInstances: 1, // Very low limit for testing TimeoutCheckInterval: 5, } - limitedManager := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + limitedManager := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") _, err = limitedManager.CreateInstance("instance1", options) if err != nil { diff --git a/pkg/manager/persistence.go b/pkg/manager/persistence.go new file mode 100644 index 0000000..a2853cb --- /dev/null +++ b/pkg/manager/persistence.go @@ -0,0 +1,241 @@ +package manager + +import ( + "encoding/json" + "fmt" + "llamactl/pkg/instance" + "log" + "os" + "path/filepath" + "strings" + "sync" +) + +// instancePersister provides atomic file-based persistence with durability guarantees. +type instancePersister struct { + mu sync.Mutex + instancesDir string + enabled bool +} + +// NewInstancePersister creates a new instance persister. +// If instancesDir is empty, persistence is disabled. +func NewInstancePersister(instancesDir string) (*instancePersister, error) { + if instancesDir == "" { + return &instancePersister{ + enabled: false, + }, nil + } + + // Ensure the instances directory exists + if err := os.MkdirAll(instancesDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create instances directory: %w", err) + } + + return &instancePersister{ + instancesDir: instancesDir, + enabled: true, + }, nil +} + +// Save persists an instance to disk with atomic write +func (p *instancePersister) Save(inst *instance.Instance) error { + if !p.enabled { + return nil + } + + if inst == nil { + return fmt.Errorf("cannot save nil instance") + } + + // Validate instance name to prevent path traversal + if err := p.validateInstanceName(inst.Name); err != nil { + return err + } + + p.mu.Lock() + defer p.mu.Unlock() + + instancePath := filepath.Join(p.instancesDir, inst.Name+".json") + tempPath := instancePath + ".tmp" + + // Serialize instance to JSON + jsonData, err := json.MarshalIndent(inst, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal instance %s: %w", inst.Name, err) + } + + // Create temporary file + tempFile, err := os.OpenFile(tempPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temp file for instance %s: %w", inst.Name, err) + } + + // Write data to temporary file + if _, err := tempFile.Write(jsonData); err != nil { + tempFile.Close() + os.Remove(tempPath) + return fmt.Errorf("failed to write temp file for instance %s: %w", inst.Name, err) + } + + // Sync to disk before rename to ensure durability + if err := tempFile.Sync(); err != nil { + tempFile.Close() + os.Remove(tempPath) + return fmt.Errorf("failed to sync temp file for instance %s: %w", inst.Name, err) + } + + // Close the file + if err := tempFile.Close(); err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to close temp file for instance %s: %w", inst.Name, err) + } + + // Atomic rename (this is atomic on POSIX systems) + if err := os.Rename(tempPath, instancePath); err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to rename temp file for instance %s: %w", inst.Name, err) + } + + return nil +} + +// Load loads a single instance from disk by name. +func (p *instancePersister) Load(name string) (*instance.Instance, error) { + if !p.enabled { + return nil, fmt.Errorf("persistence is disabled") + } + + if err := p.validateInstanceName(name); err != nil { + return nil, err + } + + p.mu.Lock() + defer p.mu.Unlock() + + instancePath := filepath.Join(p.instancesDir, name+".json") + + inst, err := p.loadInstanceFile(name, instancePath) + if err != nil { + if os.IsNotExist(err) { + return nil, fmt.Errorf("instance %s not found", name) + } + return nil, err + } + + return inst, nil +} + +// Delete removes an instance's persistence file from disk. +func (p *instancePersister) Delete(name string) error { + if !p.enabled { + return nil + } + + if err := p.validateInstanceName(name); err != nil { + return err + } + + p.mu.Lock() + defer p.mu.Unlock() + + instancePath := filepath.Join(p.instancesDir, name+".json") + + if err := os.Remove(instancePath); err != nil { + if os.IsNotExist(err) { + // Not an error if file doesn't exist + return nil + } + return fmt.Errorf("failed to delete instance file for %s: %w", name, err) + } + + return nil +} + +// LoadAll loads all persisted instances from disk. +// Returns a slice of instances and any errors encountered during loading. +func (p *instancePersister) LoadAll() ([]*instance.Instance, error) { + if !p.enabled { + return nil, nil + } + + p.mu.Lock() + defer p.mu.Unlock() + + // Check if instances directory exists + if _, err := os.Stat(p.instancesDir); os.IsNotExist(err) { + return nil, nil // No instances directory, return empty list + } + + // Read all JSON files from instances directory + files, err := os.ReadDir(p.instancesDir) + if err != nil { + return nil, fmt.Errorf("failed to read instances directory: %w", err) + } + + instances := make([]*instance.Instance, 0) + var loadErrors []string + + for _, file := range files { + if file.IsDir() || !strings.HasSuffix(file.Name(), ".json") { + continue + } + + instanceName := strings.TrimSuffix(file.Name(), ".json") + instancePath := filepath.Join(p.instancesDir, file.Name()) + + inst, err := p.loadInstanceFile(instanceName, instancePath) + if err != nil { + log.Printf("Failed to load instance %s: %v", instanceName, err) + loadErrors = append(loadErrors, fmt.Sprintf("%s: %v", instanceName, err)) + continue + } + + instances = append(instances, inst) + } + + if len(loadErrors) > 0 { + log.Printf("Loaded %d instances with %d errors", len(instances), len(loadErrors)) + } else if len(instances) > 0 { + log.Printf("Loaded %d instances from persistence", len(instances)) + } + + return instances, nil +} + +// loadInstanceFile is an internal helper that loads a single instance file. +// Note: This assumes the mutex is already held by the caller. +func (p *instancePersister) loadInstanceFile(name, path string) (*instance.Instance, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read instance file: %w", err) + } + + var inst instance.Instance + if err := json.Unmarshal(data, &inst); err != nil { + return nil, fmt.Errorf("failed to unmarshal instance: %w", err) + } + + // Validate the instance name matches the filename + if inst.Name != name { + return nil, fmt.Errorf("instance name mismatch: file=%s, instance.Name=%s", name, inst.Name) + } + + return &inst, nil +} + +// validateInstanceName ensures the instance name is safe for filesystem operations. +func (p *instancePersister) validateInstanceName(name string) error { + if name == "" { + return fmt.Errorf("instance name cannot be empty") + } + + cleaned := filepath.Clean(name) + + // After cleaning, name should not contain any path separators + if cleaned != name || strings.Contains(cleaned, string(filepath.Separator)) { + return fmt.Errorf("invalid instance name: %s", name) + } + + return nil +} diff --git a/pkg/manager/ports.go b/pkg/manager/ports.go new file mode 100644 index 0000000..a77ff0e --- /dev/null +++ b/pkg/manager/ports.go @@ -0,0 +1,184 @@ +package manager + +import ( + "fmt" + "math/bits" + "sync" +) + +// portAllocator provides efficient port allocation using a bitmap for O(1) operations. +// The bitmap approach prevents unbounded memory growth and simplifies port management. +type portAllocator struct { + mu sync.Mutex + + // Bitmap for O(1) allocation/release + // Each bit represents a port (1 = allocated, 0 = free) + bitmap []uint64 // Each uint64 covers 64 ports + + // Map port to instance name for cleanup operations + allocated map[int]string + + minPort int + maxPort int + rangeSize int +} + +// NewPortAllocator creates a new port allocator for the given port range. +// Returns an error if the port range is invalid. +func NewPortAllocator(minPort, maxPort int) (*portAllocator, error) { + if minPort <= 0 || maxPort <= 0 { + return nil, fmt.Errorf("invalid port range: min=%d, max=%d (must be > 0)", minPort, maxPort) + } + if minPort > maxPort { + return nil, fmt.Errorf("invalid port range: min=%d > max=%d", minPort, maxPort) + } + + rangeSize := maxPort - minPort + 1 + bitmapSize := (rangeSize + 63) / 64 // Round up to nearest uint64 + + return &portAllocator{ + bitmap: make([]uint64, bitmapSize), + allocated: make(map[int]string), + minPort: minPort, + maxPort: maxPort, + rangeSize: rangeSize, + }, nil +} + +// Allocate finds and allocates the first available port for the given instance. +// Returns the allocated port or an error if no ports are available. +func (p *portAllocator) Allocate(instanceName string) (int, error) { + if instanceName == "" { + return 0, fmt.Errorf("instance name cannot be empty") + } + + p.mu.Lock() + defer p.mu.Unlock() + + port, err := p.findFirstFreeBit() + if err != nil { + return 0, err + } + + p.setBit(port) + p.allocated[port] = instanceName + + return port, nil +} + +// AllocateSpecific allocates a specific port for the given instance. +// Returns an error if the port is already allocated or out of range. +func (p *portAllocator) AllocateSpecific(port int, instanceName string) error { + if instanceName == "" { + return fmt.Errorf("instance name cannot be empty") + } + if port < p.minPort || port > p.maxPort { + return fmt.Errorf("port %d is out of range [%d-%d]", port, p.minPort, p.maxPort) + } + + p.mu.Lock() + defer p.mu.Unlock() + + if p.isBitSet(port) { + return fmt.Errorf("port %d is already allocated", port) + } + + p.setBit(port) + p.allocated[port] = instanceName + + return nil +} + +// Release releases a specific port, making it available for reuse. +// Returns an error if the port is not allocated. +func (p *portAllocator) Release(port int) error { + if port < p.minPort || port > p.maxPort { + return fmt.Errorf("port %d is out of range [%d-%d]", port, p.minPort, p.maxPort) + } + + p.mu.Lock() + defer p.mu.Unlock() + + if !p.isBitSet(port) { + return fmt.Errorf("port %d is not allocated", port) + } + + p.clearBit(port) + delete(p.allocated, port) + + return nil +} + +// ReleaseByInstance releases all ports allocated to the given instance. +// This is useful for cleanup when deleting or updating an instance. +// Returns the number of ports released. +func (p *portAllocator) ReleaseByInstance(instanceName string) int { + if instanceName == "" { + return 0 + } + + p.mu.Lock() + defer p.mu.Unlock() + + portsToRelease := make([]int, 0) + for port, name := range p.allocated { + if name == instanceName { + portsToRelease = append(portsToRelease, port) + } + } + + for _, port := range portsToRelease { + p.clearBit(port) + delete(p.allocated, port) + } + + return len(portsToRelease) +} + +// --- Internal bitmap operations --- + +// portToBitPos converts a port number to bitmap array index and bit position. +func (p *portAllocator) portToBitPos(port int) (index int, bit uint) { + offset := port - p.minPort + index = offset / 64 + bit = uint(offset % 64) + return +} + +// setBit marks a port as allocated in the bitmap. +func (p *portAllocator) setBit(port int) { + index, bit := p.portToBitPos(port) + p.bitmap[index] |= (1 << bit) +} + +// clearBit marks a port as free in the bitmap. +func (p *portAllocator) clearBit(port int) { + index, bit := p.portToBitPos(port) + p.bitmap[index] &^= (1 << bit) +} + +// isBitSet checks if a port is allocated in the bitmap. +func (p *portAllocator) isBitSet(port int) bool { + index, bit := p.portToBitPos(port) + return (p.bitmap[index] & (1 << bit)) != 0 +} + +// findFirstFreeBit scans the bitmap to find the first unallocated port. +// Returns the port number or an error if no ports are available. +func (p *portAllocator) findFirstFreeBit() (int, error) { + for i, word := range p.bitmap { + if word != ^uint64(0) { // Not all bits are set (some ports are free) + // Find the first 0 bit in this word + // XOR with all 1s to flip bits, then find first 1 (which was 0) + bit := bits.TrailingZeros64(^word) + port := p.minPort + (i * 64) + bit + + // Ensure we don't go beyond maxPort due to bitmap rounding + if port <= p.maxPort { + return port, nil + } + } + } + + return 0, fmt.Errorf("no available ports in range [%d-%d]", p.minPort, p.maxPort) +} diff --git a/pkg/manager/registry.go b/pkg/manager/registry.go new file mode 100644 index 0000000..41d801b --- /dev/null +++ b/pkg/manager/registry.go @@ -0,0 +1,131 @@ +package manager + +import ( + "fmt" + "llamactl/pkg/instance" + "sync" +) + +// instanceRegistry provides thread-safe storage and lookup of instances +// with running state tracking using lock-free sync.Map for status checks. +type instanceRegistry struct { + mu sync.RWMutex + instances map[string]*instance.Instance + running sync.Map // map[string]struct{} - lock-free for status checks +} + +// NewInstanceRegistry creates a new instance registry. +func NewInstanceRegistry() *instanceRegistry { + return &instanceRegistry{ + instances: make(map[string]*instance.Instance), + } +} + +// Get retrieves an instance by name. +// Returns the instance and true if found, nil and false otherwise. +func (r *instanceRegistry) Get(name string) (*instance.Instance, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + inst, exists := r.instances[name] + return inst, exists +} + +// List returns a snapshot copy of all instances to prevent external mutation. +func (r *instanceRegistry) List() []*instance.Instance { + r.mu.RLock() + defer r.mu.RUnlock() + + result := make([]*instance.Instance, 0, len(r.instances)) + for _, inst := range r.instances { + result = append(result, inst) + } + return result +} + +// ListRunning returns a snapshot of all currently running instances. +func (r *instanceRegistry) ListRunning() []*instance.Instance { + r.mu.RLock() + defer r.mu.RUnlock() + + result := make([]*instance.Instance, 0) + for name, inst := range r.instances { + if _, isRunning := r.running.Load(name); isRunning { + result = append(result, inst) + } + } + return result +} + +// Add adds a new instance to the registry. +// Returns an error if an instance with the same name already exists. +func (r *instanceRegistry) Add(inst *instance.Instance) error { + if inst == nil { + return fmt.Errorf("cannot add nil instance") + } + + r.mu.Lock() + defer r.mu.Unlock() + + if _, exists := r.instances[inst.Name]; exists { + return fmt.Errorf("instance %s already exists", inst.Name) + } + + r.instances[inst.Name] = inst + + // Initialize running state if the instance is running + if inst.IsRunning() { + r.running.Store(inst.Name, struct{}{}) + } + + return nil +} + +// Remove removes an instance from the registry. +// Returns an error if the instance doesn't exist. +func (r *instanceRegistry) Remove(name string) error { + r.mu.Lock() + defer r.mu.Unlock() + + if _, exists := r.instances[name]; !exists { + return fmt.Errorf("instance %s not found", name) + } + + delete(r.instances, name) + r.running.Delete(name) + + return nil +} + +// MarkRunning marks an instance as running using lock-free sync.Map. +func (r *instanceRegistry) MarkRunning(name string) { + r.running.Store(name, struct{}{}) +} + +// MarkStopped marks an instance as stopped using lock-free sync.Map. +func (r *instanceRegistry) MarkStopped(name string) { + r.running.Delete(name) +} + +// IsRunning checks if an instance is running using lock-free sync.Map. +func (r *instanceRegistry) IsRunning(name string) bool { + _, isRunning := r.running.Load(name) + return isRunning +} + +// Count returns the total number of instances in the registry. +func (r *instanceRegistry) Count() int { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.instances) +} + +// CountRunning returns the number of currently running instances. +func (r *instanceRegistry) CountRunning() int { + count := 0 + r.running.Range(func(key, value any) bool { + count++ + return true + }) + return count +} diff --git a/pkg/manager/remote.go b/pkg/manager/remote.go new file mode 100644 index 0000000..9cb147c --- /dev/null +++ b/pkg/manager/remote.go @@ -0,0 +1,283 @@ +package manager + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "llamactl/pkg/config" + "llamactl/pkg/instance" + "net/http" + "sync" + "time" +) + +// remoteManager handles HTTP operations for remote instances. +type remoteManager struct { + mu sync.RWMutex + client *http.Client + nodeMap map[string]*config.NodeConfig // node name -> node config + instanceToNode map[string]*config.NodeConfig // instance name -> node config +} + +// NewRemoteManager creates a new remote manager. +func NewRemoteManager(nodes map[string]config.NodeConfig, timeout time.Duration) *remoteManager { + if timeout <= 0 { + timeout = 30 * time.Second + } + + // Build node config map + nodeMap := make(map[string]*config.NodeConfig) + for name := range nodes { + nodeCopy := nodes[name] + nodeMap[name] = &nodeCopy + } + + return &remoteManager{ + client: &http.Client{ + Timeout: timeout, + }, + nodeMap: nodeMap, + instanceToNode: make(map[string]*config.NodeConfig), + } +} + +// GetNodeForInstance returns the node configuration for a given instance. +// Returns nil if the instance is not mapped to any node. +func (rm *remoteManager) GetNodeForInstance(instanceName string) (*config.NodeConfig, bool) { + rm.mu.RLock() + defer rm.mu.RUnlock() + + node, exists := rm.instanceToNode[instanceName] + return node, exists +} + +// SetInstanceNode maps an instance to a specific node. +// Returns an error if the node doesn't exist. +func (rm *remoteManager) SetInstanceNode(instanceName, nodeName string) error { + rm.mu.Lock() + defer rm.mu.Unlock() + + node, exists := rm.nodeMap[nodeName] + if !exists { + return fmt.Errorf("node %s not found", nodeName) + } + + rm.instanceToNode[instanceName] = node + return nil +} + +// RemoveInstance removes the instance-to-node mapping. +func (rm *remoteManager) RemoveInstance(instanceName string) { + rm.mu.Lock() + defer rm.mu.Unlock() + + delete(rm.instanceToNode, instanceName) +} + +// --- HTTP request helpers --- + +// makeRemoteRequest creates and executes an HTTP request to a remote node with context support. +func (rm *remoteManager) makeRemoteRequest(ctx context.Context, nodeConfig *config.NodeConfig, method, path string, body any) (*http.Response, error) { + var reqBody io.Reader + if body != nil { + jsonData, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal request body: %w", err) + } + reqBody = bytes.NewBuffer(jsonData) + } + + url := fmt.Sprintf("%s%s", nodeConfig.Address, path) + req, err := http.NewRequestWithContext(ctx, method, url, reqBody) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + + if nodeConfig.APIKey != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", nodeConfig.APIKey)) + } + + resp, err := rm.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + + return resp, nil +} + +// parseRemoteResponse parses an HTTP response and unmarshals the result. +func parseRemoteResponse(resp *http.Response, result any) error { + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + if result != nil { + if err := json.Unmarshal(body, result); err != nil { + return fmt.Errorf("failed to unmarshal response: %w", err) + } + } + + return nil +} + +// --- Remote CRUD operations --- + +// ListInstances lists all instances on a remote node. +func (rm *remoteManager) ListInstances(ctx context.Context, node *config.NodeConfig) ([]*instance.Instance, error) { + resp, err := rm.makeRemoteRequest(ctx, node, "GET", "/api/v1/instances/", nil) + if err != nil { + return nil, err + } + + var instances []*instance.Instance + if err := parseRemoteResponse(resp, &instances); err != nil { + return nil, err + } + + return instances, nil +} + +// CreateInstance creates a new instance on a remote node. +func (rm *remoteManager) CreateInstance(ctx context.Context, node *config.NodeConfig, name string, opts *instance.Options) (*instance.Instance, error) { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + + resp, err := rm.makeRemoteRequest(ctx, node, "POST", path, opts) + if err != nil { + return nil, err + } + + var inst instance.Instance + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// GetInstance retrieves an instance by name from a remote node. +func (rm *remoteManager) GetInstance(ctx context.Context, node *config.NodeConfig, name string) (*instance.Instance, error) { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + resp, err := rm.makeRemoteRequest(ctx, node, "GET", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Instance + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// UpdateInstance updates an existing instance on a remote node. +func (rm *remoteManager) UpdateInstance(ctx context.Context, node *config.NodeConfig, name string, opts *instance.Options) (*instance.Instance, error) { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + + resp, err := rm.makeRemoteRequest(ctx, node, "PUT", path, opts) + if err != nil { + return nil, err + } + + var inst instance.Instance + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// DeleteInstance deletes an instance from a remote node. +func (rm *remoteManager) DeleteInstance(ctx context.Context, node *config.NodeConfig, name string) error { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + resp, err := rm.makeRemoteRequest(ctx, node, "DELETE", path, nil) + if err != nil { + return err + } + + return parseRemoteResponse(resp, nil) +} + +// StartInstance starts an instance on a remote node. +func (rm *remoteManager) StartInstance(ctx context.Context, node *config.NodeConfig, name string) (*instance.Instance, error) { + path := fmt.Sprintf("/api/v1/instances/%s/start", name) + resp, err := rm.makeRemoteRequest(ctx, node, "POST", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Instance + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// StopInstance stops an instance on a remote node. +func (rm *remoteManager) StopInstance(ctx context.Context, node *config.NodeConfig, name string) (*instance.Instance, error) { + path := fmt.Sprintf("/api/v1/instances/%s/stop", name) + resp, err := rm.makeRemoteRequest(ctx, node, "POST", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Instance + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// RestartInstance restarts an instance on a remote node. +func (rm *remoteManager) RestartInstance(ctx context.Context, node *config.NodeConfig, name string) (*instance.Instance, error) { + path := fmt.Sprintf("/api/v1/instances/%s/restart", name) + resp, err := rm.makeRemoteRequest(ctx, node, "POST", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Instance + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// GetInstanceLogs retrieves logs for an instance from a remote node. +func (rm *remoteManager) GetInstanceLogs(ctx context.Context, node *config.NodeConfig, name string, numLines int) (string, error) { + path := fmt.Sprintf("/api/v1/instances/%s/logs?lines=%d", name, numLines) + resp, err := rm.makeRemoteRequest(ctx, node, "GET", path, nil) + if err != nil { + return "", err + } + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + // Logs endpoint returns plain text (Content-Type: text/plain) + return string(body), nil +} diff --git a/pkg/manager/remote_ops.go b/pkg/manager/remote_ops.go deleted file mode 100644 index 143c317..0000000 --- a/pkg/manager/remote_ops.go +++ /dev/null @@ -1,222 +0,0 @@ -package manager - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "llamactl/pkg/config" - "llamactl/pkg/instance" - "net/http" -) - -// makeRemoteRequest is a helper function to make HTTP requests to a remote node -func (im *instanceManager) makeRemoteRequest(nodeConfig *config.NodeConfig, method, path string, body any) (*http.Response, error) { - var reqBody io.Reader - if body != nil { - jsonData, err := json.Marshal(body) - if err != nil { - return nil, fmt.Errorf("failed to marshal request body: %w", err) - } - reqBody = bytes.NewBuffer(jsonData) - } - - url := fmt.Sprintf("%s%s", nodeConfig.Address, path) - req, err := http.NewRequest(method, url, reqBody) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - if body != nil { - req.Header.Set("Content-Type", "application/json") - } - - if nodeConfig.APIKey != "" { - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", nodeConfig.APIKey)) - } - - resp, err := im.httpClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to execute request: %w", err) - } - - return resp, nil -} - -// parseRemoteResponse is a helper function to parse API responses -func parseRemoteResponse(resp *http.Response, result any) error { - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("failed to read response body: %w", err) - } - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) - } - - if result != nil { - if err := json.Unmarshal(body, result); err != nil { - return fmt.Errorf("failed to unmarshal response: %w", err) - } - } - - return nil -} - -// ListRemoteInstances lists all instances on the remote node -func (im *instanceManager) ListRemoteInstances(nodeConfig *config.NodeConfig) ([]*instance.Instance, error) { - resp, err := im.makeRemoteRequest(nodeConfig, "GET", "/api/v1/instances/", nil) - if err != nil { - return nil, err - } - - var instances []*instance.Instance - if err := parseRemoteResponse(resp, &instances); err != nil { - return nil, err - } - - return instances, nil -} - -// CreateRemoteInstance creates a new instance on the remote node -func (im *instanceManager) CreateRemoteInstance(nodeConfig *config.NodeConfig, name string, options *instance.Options) (*instance.Instance, error) { - path := fmt.Sprintf("/api/v1/instances/%s/", name) - - resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, options) - if err != nil { - return nil, err - } - - var inst instance.Instance - if err := parseRemoteResponse(resp, &inst); err != nil { - return nil, err - } - - return &inst, nil -} - -// GetRemoteInstance retrieves an instance by name from the remote node -func (im *instanceManager) GetRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Instance, error) { - path := fmt.Sprintf("/api/v1/instances/%s/", name) - resp, err := im.makeRemoteRequest(nodeConfig, "GET", path, nil) - if err != nil { - return nil, err - } - - var inst instance.Instance - if err := parseRemoteResponse(resp, &inst); err != nil { - return nil, err - } - - return &inst, nil -} - -// UpdateRemoteInstance updates an existing instance on the remote node -func (im *instanceManager) UpdateRemoteInstance(nodeConfig *config.NodeConfig, name string, options *instance.Options) (*instance.Instance, error) { - path := fmt.Sprintf("/api/v1/instances/%s/", name) - - resp, err := im.makeRemoteRequest(nodeConfig, "PUT", path, options) - if err != nil { - return nil, err - } - - var inst instance.Instance - if err := parseRemoteResponse(resp, &inst); err != nil { - return nil, err - } - - return &inst, nil -} - -// DeleteRemoteInstance deletes an instance from the remote node -func (im *instanceManager) DeleteRemoteInstance(nodeConfig *config.NodeConfig, name string) error { - path := fmt.Sprintf("/api/v1/instances/%s/", name) - resp, err := im.makeRemoteRequest(nodeConfig, "DELETE", path, nil) - if err != nil { - return err - } - - return parseRemoteResponse(resp, nil) -} - -// StartRemoteInstance starts an instance on the remote node -func (im *instanceManager) StartRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Instance, error) { - path := fmt.Sprintf("/api/v1/instances/%s/start", name) - resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, nil) - if err != nil { - return nil, err - } - - var inst instance.Instance - if err := parseRemoteResponse(resp, &inst); err != nil { - return nil, err - } - - return &inst, nil -} - -// StopRemoteInstance stops an instance on the remote node -func (im *instanceManager) StopRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Instance, error) { - path := fmt.Sprintf("/api/v1/instances/%s/stop", name) - resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, nil) - if err != nil { - return nil, err - } - - var inst instance.Instance - if err := parseRemoteResponse(resp, &inst); err != nil { - return nil, err - } - - return &inst, nil -} - -// RestartRemoteInstance restarts an instance on the remote node -func (im *instanceManager) RestartRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Instance, error) { - path := fmt.Sprintf("/api/v1/instances/%s/restart", name) - resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, nil) - if err != nil { - return nil, err - } - - var inst instance.Instance - if err := parseRemoteResponse(resp, &inst); err != nil { - return nil, err - } - - return &inst, nil -} - -// GetRemoteInstanceLogs retrieves logs for an instance from the remote node -func (im *instanceManager) GetRemoteInstanceLogs(nodeConfig *config.NodeConfig, name string, numLines int) (string, error) { - path := fmt.Sprintf("/api/v1/instances/%s/logs?lines=%d", name, numLines) - resp, err := im.makeRemoteRequest(nodeConfig, "GET", path, nil) - if err != nil { - return "", err - } - - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", fmt.Errorf("failed to read response body: %w", err) - } - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return "", fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) - } - - // Logs endpoint might return plain text or JSON - // Try to parse as JSON first (in case it's wrapped in a response object) - var logResponse struct { - Logs string `json:"logs"` - } - if err := json.Unmarshal(body, &logResponse); err == nil && logResponse.Logs != "" { - return logResponse.Logs, nil - } - - // Otherwise, return as plain text - return string(body), nil -} diff --git a/pkg/manager/timeout.go b/pkg/manager/timeout.go deleted file mode 100644 index 2e0314a..0000000 --- a/pkg/manager/timeout.go +++ /dev/null @@ -1,74 +0,0 @@ -package manager - -import ( - "fmt" - "llamactl/pkg/instance" - "log" -) - -func (im *instanceManager) checkAllTimeouts() { - im.mu.RLock() - var timeoutInstances []string - - // Identify instances that should timeout - for _, inst := range im.instances { - // Skip remote instances - they are managed by their respective nodes - if inst.IsRemote() { - continue - } - - if inst.ShouldTimeout() { - timeoutInstances = append(timeoutInstances, inst.Name) - } - } - im.mu.RUnlock() // Release read lock before calling StopInstance - - // Stop the timed-out instances - for _, name := range timeoutInstances { - log.Printf("Instance %s has timed out, stopping it", name) - if _, err := im.StopInstance(name); err != nil { - log.Printf("Error stopping instance %s: %v", name, err) - } else { - log.Printf("Instance %s stopped successfully", name) - } - } -} - -// EvictLRUInstance finds and stops the least recently used running instance. -func (im *instanceManager) EvictLRUInstance() error { - im.mu.RLock() - var lruInstance *instance.Instance - - for name := range im.runningInstances { - inst := im.instances[name] - if inst == nil { - continue - } - - // Skip remote instances - they are managed by their respective nodes - if inst.IsRemote() { - continue - } - - if inst.GetOptions() != nil && inst.GetOptions().IdleTimeout != nil && *inst.GetOptions().IdleTimeout <= 0 { - continue // Skip instances without idle timeout - } - - if lruInstance == nil { - lruInstance = inst - } - - if inst.LastRequestTime() < lruInstance.LastRequestTime() { - lruInstance = inst - } - } - im.mu.RUnlock() - - if lruInstance == nil { - return fmt.Errorf("failed to find lru instance") - } - - // Evict Instance - _, err := im.StopInstance(lruInstance.Name) - return err -} diff --git a/pkg/manager/timeout_test.go b/pkg/manager/timeout_test.go index d1c3a47..e05c400 100644 --- a/pkg/manager/timeout_test.go +++ b/pkg/manager/timeout_test.go @@ -22,7 +22,7 @@ func TestTimeoutFunctionality(t *testing.T) { MaxInstances: 5, } - manager := manager.NewInstanceManager(backendConfig, cfg, map[string]config.NodeConfig{}, "main") + manager := manager.New(backendConfig, cfg, map[string]config.NodeConfig{}, "main") if manager == nil { t.Fatal("Manager should be initialized with timeout checker") }