package manager import ( "context" "fmt" "llamactl/pkg/config" "llamactl/pkg/instance" "log" "sync" "time" ) // InstanceManager defines the interface for managing instances of the llama server. type InstanceManager interface { ListInstances() ([]*instance.Instance, error) CreateInstance(name string, options *instance.Options) (*instance.Instance, error) GetInstance(name string) (*instance.Instance, error) UpdateInstance(name string, options *instance.Options) (*instance.Instance, error) DeleteInstance(name string) error StartInstance(name string) (*instance.Instance, error) IsMaxRunningInstancesReached() bool StopInstance(name string) (*instance.Instance, error) EvictLRUInstance() error RestartInstance(name string) (*instance.Instance, error) GetInstanceLogs(name string, numLines int) (string, error) Shutdown() } type instanceManager struct { // Components (each with own synchronization) registry *instanceRegistry ports *portAllocator persistence *instancePersister remote *remoteManager lifecycle *lifecycleManager // Configuration instancesConfig config.InstancesConfig backendsConfig config.BackendConfig localNodeName string // Name of the local node // Synchronization operationMu sync.Mutex // Protects start/stop/update/delete/restart operations shutdownOnce sync.Once } // New creates a new instance of InstanceManager. func New(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig map[string]config.NodeConfig, localNodeName string) InstanceManager { if instancesConfig.TimeoutCheckInterval <= 0 { instancesConfig.TimeoutCheckInterval = 5 // Default to 5 minutes if not set } // Initialize components registry := NewInstanceRegistry() // Initialize port allocator portRange := instancesConfig.PortRange ports, err := NewPortAllocator(portRange[0], portRange[1]) if err != nil { log.Fatalf("Failed to create port allocator: %v", err) } // Initialize persistence persistence, err := NewInstancePersister(instancesConfig.InstancesDir) if err != nil { log.Fatalf("Failed to create instance persister: %v", err) } // Initialize remote manager remote := NewRemoteManager(nodesConfig, 30*time.Second) // Create manager instance im := &instanceManager{ registry: registry, ports: ports, persistence: persistence, remote: remote, instancesConfig: instancesConfig, backendsConfig: backendsConfig, localNodeName: localNodeName, } // Initialize lifecycle manager (needs reference to manager for Stop/Evict operations) checkInterval := time.Duration(instancesConfig.TimeoutCheckInterval) * time.Minute im.lifecycle = NewLifecycleManager(registry, im, checkInterval, true) // Load existing instances from disk if err := im.loadInstances(); err != nil { log.Printf("Error loading instances: %v", err) } // Start the lifecycle manager im.lifecycle.start() return im } // persistInstance saves an instance using the persistence component func (im *instanceManager) persistInstance(inst *instance.Instance) error { return im.persistence.save(inst) } func (im *instanceManager) Shutdown() { im.shutdownOnce.Do(func() { // 1. Stop lifecycle manager (stops timeout checker) im.lifecycle.stop() // 2. Get running instances (no lock needed - registry handles it) running := im.registry.listRunning() // 3. Stop local instances concurrently var wg sync.WaitGroup for _, inst := range running { if inst.IsRemote() { continue // Skip remote instances } wg.Add(1) go func(inst *instance.Instance) { defer wg.Done() fmt.Printf("Stopping instance %s...\n", inst.Name) if err := inst.Stop(); err != nil { fmt.Printf("Error stopping instance %s: %v\n", inst.Name, err) } }(inst) } wg.Wait() fmt.Println("All instances stopped.") }) } // loadInstances restores all instances from disk using the persistence component func (im *instanceManager) loadInstances() error { // Load all instances from persistence instances, err := im.persistence.loadAll() if err != nil { return fmt.Errorf("failed to load instances: %w", err) } if len(instances) == 0 { return nil } // Process each loaded instance for _, persistedInst := range instances { if err := im.loadInstance(persistedInst); err != nil { log.Printf("Failed to load instance %s: %v", persistedInst.Name, err) continue } } log.Printf("Loaded %d instances from persistence", len(instances)) // Auto-start instances that have auto-restart enabled go im.autoStartInstances() return nil } // loadInstance loads a single persisted instance and adds it to the registry func (im *instanceManager) loadInstance(persistedInst *instance.Instance) error { name := persistedInst.Name options := persistedInst.GetOptions() // Check if this is a remote instance (local node not in the Nodes set) var isRemote bool var nodeName string if options != nil { if _, isLocal := options.Nodes[im.localNodeName]; !isLocal && len(options.Nodes) > 0 { // Get the first node from the set for node := range options.Nodes { nodeName = node isRemote = true break } } } var statusCallback func(oldStatus, newStatus instance.Status) if !isRemote { // Only set status callback for local instances statusCallback = func(oldStatus, newStatus instance.Status) { im.onStatusChange(name, oldStatus, newStatus) } } // Create new inst using NewInstance (handles validation, defaults, setup) inst := instance.New(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback) // Restore persisted fields that NewInstance doesn't set inst.Created = persistedInst.Created inst.SetStatus(persistedInst.GetStatus()) // Handle remote instance mapping if isRemote { // Map instance to node in remote manager if err := im.remote.setInstanceNode(name, nodeName); err != nil { return fmt.Errorf("failed to set instance node: %w", err) } } else { // Allocate port for local instances if inst.GetPort() > 0 { port := inst.GetPort() if err := im.ports.allocateSpecific(port, name); err != nil { return fmt.Errorf("port conflict: instance %s wants port %d which is already in use: %w", name, port, err) } } } // Add instance to registry if err := im.registry.add(inst); err != nil { return fmt.Errorf("failed to add instance to registry: %w", err) } return nil } // autoStartInstances starts instances that were running when persisted and have auto-restart enabled // For instances with auto-restart disabled, it sets their status to Stopped func (im *instanceManager) autoStartInstances() { instances := im.registry.list() var instancesToStart []*instance.Instance var instancesToStop []*instance.Instance for _, inst := range instances { if inst.IsRunning() && // Was running when persisted inst.GetOptions() != nil && inst.GetOptions().AutoRestart != nil { if *inst.GetOptions().AutoRestart { instancesToStart = append(instancesToStart, inst) } else { // Instance was running but auto-restart is disabled, mark as stopped instancesToStop = append(instancesToStop, inst) } } } // Stop instances that have auto-restart disabled for _, inst := range instancesToStop { log.Printf("Instance %s was running but auto-restart is disabled, setting status to stopped", inst.Name) inst.SetStatus(instance.Stopped) im.registry.markStopped(inst.Name) } // Start instances that have auto-restart enabled for _, inst := range instancesToStart { log.Printf("Auto-starting instance %s", inst.Name) // Reset running state before starting (since Start() expects stopped instance) inst.SetStatus(instance.Stopped) im.registry.markStopped(inst.Name) // Check if this is a remote instance if node, exists := im.remote.getNodeForInstance(inst.Name); exists && node != nil { // Remote instance - use remote manager with context ctx := context.Background() if _, err := im.remote.startInstance(ctx, node, inst.Name); err != nil { log.Printf("Failed to auto-start remote instance %s: %v", inst.Name, err) } } else { // Local instance - call Start() directly if err := inst.Start(); err != nil { log.Printf("Failed to auto-start instance %s: %v", inst.Name, err) } } } } func (im *instanceManager) onStatusChange(name string, oldStatus, newStatus instance.Status) { if newStatus == instance.Running { im.registry.markRunning(name) } else { im.registry.markStopped(name) } } // getNodeForInstance returns the node configuration for a remote instance // Returns nil if the instance is not remote or the node is not found func (im *instanceManager) getNodeForInstance(inst *instance.Instance) *config.NodeConfig { if !inst.IsRemote() { return nil } // Check if we have a node mapping in remote manager if nodeConfig, exists := im.remote.getNodeForInstance(inst.Name); exists { return nodeConfig } return nil }