Split manager into multiple structs

This commit is contained in:
2025-10-20 21:55:50 +02:00
parent 91d956203d
commit ffb4b49c94
13 changed files with 1307 additions and 701 deletions

View File

@@ -1,44 +1,28 @@
package manager
import (
"context"
"fmt"
"llamactl/pkg/instance"
"llamactl/pkg/validation"
"os"
"path/filepath"
"log"
)
type MaxRunningInstancesError error
// updateLocalInstanceFromRemote updates the local stub instance with data from the remote instance
// while preserving the Nodes field to maintain remote instance tracking
func (im *instanceManager) updateLocalInstanceFromRemote(localInst *instance.Instance, remoteInst *instance.Instance) {
if localInst == nil || remoteInst == nil {
return
}
// Get the remote instance options
remoteOptions := remoteInst.GetOptions()
if remoteOptions == nil {
return
}
// Preserve the Nodes field from the local instance
localOptions := localInst.GetOptions()
var preservedNodes map[string]struct{}
if localOptions != nil && len(localOptions.Nodes) > 0 {
preservedNodes = make(map[string]struct{}, len(localOptions.Nodes))
for node := range localOptions.Nodes {
preservedNodes[node] = struct{}{}
}
}
// Create a copy of remote options and restore the Nodes field
updatedOptions := *remoteOptions
updatedOptions.Nodes = preservedNodes
// Update the local instance with all remote data
localInst.SetOptions(&updatedOptions)
localInst.SetOptions(remoteOptions)
localInst.SetStatus(remoteInst.GetStatus())
localInst.Created = remoteInst.Created
}
@@ -46,17 +30,13 @@ func (im *instanceManager) updateLocalInstanceFromRemote(localInst *instance.Ins
// ListInstances returns a list of all instances managed by the instance manager.
// For remote instances, this fetches the live state from remote nodes and updates local stubs.
func (im *instanceManager) ListInstances() ([]*instance.Instance, error) {
im.mu.RLock()
localInstances := make([]*instance.Instance, 0, len(im.instances))
for _, inst := range im.instances {
localInstances = append(localInstances, inst)
}
im.mu.RUnlock()
instances := im.registry.List()
// Update remote instances with live state
for _, inst := range localInstances {
ctx := context.Background()
for _, inst := range instances {
if node := im.getNodeForInstance(inst); node != nil {
remoteInst, err := im.GetRemoteInstance(node, inst.Name)
remoteInst, err := im.remote.GetInstance(ctx, node, inst.Name)
if err != nil {
// Log error but continue with stale data
// Don't fail the entire list operation due to one remote failure
@@ -64,13 +44,11 @@ func (im *instanceManager) ListInstances() ([]*instance.Instance, error) {
}
// Update the local stub with all remote data (preserving Nodes)
im.mu.Lock()
im.updateLocalInstanceFromRemote(inst, remoteInst)
im.mu.Unlock()
}
}
return localInstances, nil
return instances, nil
}
// CreateInstance creates a new instance with the given options and returns it.
@@ -90,11 +68,8 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
return nil, err
}
im.mu.Lock()
defer im.mu.Unlock()
// Check if instance with this name already exists (must be globally unique)
if im.instances[name] != nil {
if _, exists := im.registry.Get(name); exists {
return nil, fmt.Errorf("instance with name %s already exists", name)
}
@@ -107,14 +82,18 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
break
}
// Validate that the node exists
nodeConfig, exists := im.nodeConfigMap[nodeName]
// Create the remote instance on the remote node
ctx := context.Background()
nodeConfig, exists := im.remote.GetNodeForInstance(nodeName)
if !exists {
return nil, fmt.Errorf("node %s not found", nodeName)
// Try to set the node if it doesn't exist yet
if err := im.remote.SetInstanceNode(name, nodeName); err != nil {
return nil, fmt.Errorf("node %s not found", nodeName)
}
nodeConfig, _ = im.remote.GetNodeForInstance(name)
}
// Create the remote instance on the remote node
remoteInst, err := im.CreateRemoteInstance(nodeConfig, name, options)
remoteInst, err := im.remote.CreateInstance(ctx, nodeConfig, name, options)
if err != nil {
return nil, err
}
@@ -126,12 +105,20 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
// Update the local stub with all remote data (preserving Nodes)
im.updateLocalInstanceFromRemote(inst, remoteInst)
// Add to local tracking maps (but don't count towards limits)
im.instances[name] = inst
im.instanceNodeMap[name] = nodeConfig
// Map instance to node
if err := im.remote.SetInstanceNode(name, nodeName); err != nil {
return nil, fmt.Errorf("failed to map instance to node: %w", err)
}
// Add to registry (doesn't count towards local limits)
if err := im.registry.Add(inst); err != nil {
return nil, fmt.Errorf("failed to add instance to registry: %w", err)
}
// Persist the remote instance locally for tracking across restarts
if err := im.persistInstance(inst); err != nil {
// Rollback: remove from registry
im.registry.Remove(name)
return nil, fmt.Errorf("failed to persist remote instance %s: %w", name, err)
}
@@ -140,14 +127,34 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
// Local instance creation
// Check max instances limit for local instances only
localInstanceCount := len(im.instances) - len(im.instanceNodeMap)
totalInstances := im.registry.Count()
remoteCount := 0
for _, inst := range im.registry.List() {
if inst.IsRemote() {
remoteCount++
}
}
localInstanceCount := totalInstances - remoteCount
if localInstanceCount >= im.instancesConfig.MaxInstances && im.instancesConfig.MaxInstances != -1 {
return nil, fmt.Errorf("maximum number of instances (%d) reached", im.instancesConfig.MaxInstances)
}
// Assign and validate port for backend-specific options
if err := im.assignAndValidatePort(options); err != nil {
return nil, err
currentPort := im.getPortFromOptions(options)
var allocatedPort int
if currentPort == 0 {
// Allocate a port if not specified
allocatedPort, err = im.ports.Allocate(name)
if err != nil {
return nil, fmt.Errorf("failed to allocate port: %w", err)
}
im.setPortInOptions(options, allocatedPort)
} else {
// Use the specified port
if err := im.ports.AllocateSpecific(currentPort, name); err != nil {
return nil, fmt.Errorf("port %d is already in use: %w", currentPort, err)
}
allocatedPort = currentPort
}
statusCallback := func(oldStatus, newStatus instance.Status) {
@@ -155,10 +162,17 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
}
inst := instance.New(name, &im.backendsConfig, &im.instancesConfig, options, im.localNodeName, statusCallback)
im.instances[inst.Name] = inst
// Add to registry
if err := im.registry.Add(inst); err != nil {
// Rollback: release port
im.ports.Release(allocatedPort)
return nil, fmt.Errorf("failed to add instance to registry: %w", err)
}
// Persist instance (best-effort, don't fail if persistence fails)
if err := im.persistInstance(inst); err != nil {
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
log.Printf("Warning: failed to persist instance %s: %v", name, err)
}
return inst, nil
@@ -167,25 +181,21 @@ func (im *instanceManager) CreateInstance(name string, options *instance.Options
// GetInstance retrieves an instance by its name.
// For remote instances, this fetches the live state from the remote node and updates the local stub.
func (im *instanceManager) GetInstance(name string) (*instance.Instance, error) {
im.mu.RLock()
inst, exists := im.instances[name]
im.mu.RUnlock()
inst, exists := im.registry.Get(name)
if !exists {
return nil, fmt.Errorf("instance with name %s not found", name)
}
// Check if instance is remote and fetch live state
if node := im.getNodeForInstance(inst); node != nil {
remoteInst, err := im.GetRemoteInstance(node, name)
ctx := context.Background()
remoteInst, err := im.remote.GetInstance(ctx, node, name)
if err != nil {
return nil, err
}
// Update the local stub with all remote data (preserving Nodes)
im.mu.Lock()
im.updateLocalInstanceFromRemote(inst, remoteInst)
im.mu.Unlock()
// Return the local stub (preserving Nodes field)
return inst, nil
@@ -197,29 +207,23 @@ func (im *instanceManager) GetInstance(name string) (*instance.Instance, error)
// UpdateInstance updates the options of an existing instance and returns it.
// If the instance is running, it will be restarted to apply the new options.
func (im *instanceManager) UpdateInstance(name string, options *instance.Options) (*instance.Instance, error) {
im.mu.RLock()
inst, exists := im.instances[name]
im.mu.RUnlock()
inst, exists := im.registry.Get(name)
if !exists {
return nil, fmt.Errorf("instance with name %s not found", name)
}
// Check if instance is remote and delegate to remote operation
if node := im.getNodeForInstance(inst); node != nil {
remoteInst, err := im.UpdateRemoteInstance(node, name, options)
ctx := context.Background()
remoteInst, err := im.remote.UpdateInstance(ctx, node, name, options)
if err != nil {
return nil, err
}
// Update the local stub with all remote data (preserving Nodes)
im.mu.Lock()
im.updateLocalInstanceFromRemote(inst, remoteInst)
im.mu.Unlock()
// Persist the updated remote instance locally
im.mu.Lock()
defer im.mu.Unlock()
if err := im.persistInstance(inst); err != nil {
return nil, fmt.Errorf("failed to persist updated remote instance %s: %w", name, err)
}
@@ -236,6 +240,42 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.Options
return nil, err
}
// Lock for local instance operations to prevent races
im.operationMu.Lock()
defer im.operationMu.Unlock()
// Handle port changes
oldPort := inst.GetPort()
newPort := im.getPortFromOptions(options)
var allocatedPort int
if newPort != oldPort {
// Port is changing - need to release old and allocate new
if newPort == 0 {
// Auto-allocate new port
allocatedPort, err = im.ports.Allocate(name)
if err != nil {
return nil, fmt.Errorf("failed to allocate new port: %w", err)
}
im.setPortInOptions(options, allocatedPort)
} else {
// Use specified port
if err := im.ports.AllocateSpecific(newPort, name); err != nil {
return nil, fmt.Errorf("failed to allocate port %d: %w", newPort, err)
}
allocatedPort = newPort
}
// Release old port
if oldPort > 0 {
if err := im.ports.Release(oldPort); err != nil {
// Rollback new port allocation
im.ports.Release(allocatedPort)
return nil, fmt.Errorf("failed to release old port %d: %w", oldPort, err)
}
}
}
// Check if instance is running before updating options
wasRunning := inst.IsRunning()
@@ -256,8 +296,6 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.Options
}
}
im.mu.Lock()
defer im.mu.Unlock()
if err := im.persistInstance(inst); err != nil {
return nil, fmt.Errorf("failed to persist updated instance %s: %w", name, err)
}
@@ -267,60 +305,50 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.Options
// DeleteInstance removes stopped instance by its name.
func (im *instanceManager) DeleteInstance(name string) error {
im.mu.Lock()
inst, exists := im.instances[name]
im.mu.Unlock()
inst, exists := im.registry.Get(name)
if !exists {
return fmt.Errorf("instance with name %s not found", name)
}
// Check if instance is remote and delegate to remote operation
if node := im.getNodeForInstance(inst); node != nil {
err := im.DeleteRemoteInstance(node, name)
ctx := context.Background()
err := im.remote.DeleteInstance(ctx, node, name)
if err != nil {
return err
}
// Clean up local tracking
im.mu.Lock()
defer im.mu.Unlock()
delete(im.instances, name)
delete(im.instanceNodeMap, name)
im.remote.RemoveInstance(name)
im.registry.Remove(name)
// Delete the instance's config file if persistence is enabled
// Re-validate instance name for security (defense in depth)
validatedName, err := validation.ValidateInstanceName(name)
if err != nil {
return fmt.Errorf("invalid instance name for file deletion: %w", err)
}
instancePath := filepath.Join(im.instancesConfig.InstancesDir, validatedName+".json")
if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to delete config file for remote instance %s: %w", validatedName, err)
// Delete the instance's persistence file
if err := im.persistence.Delete(name); err != nil {
return fmt.Errorf("failed to delete config file for remote instance %s: %w", name, err)
}
return nil
}
// Lock for local instance operations to prevent races
im.operationMu.Lock()
defer im.operationMu.Unlock()
if inst.IsRunning() {
return fmt.Errorf("instance with name %s is still running, stop it before deleting", name)
}
im.mu.Lock()
defer im.mu.Unlock()
// Release port (use ReleaseByInstance for proper cleanup)
im.ports.ReleaseByInstance(name)
delete(im.ports, inst.GetPort())
delete(im.instances, name)
// Delete the instance's config file if persistence is enabled
// Re-validate instance name for security (defense in depth)
validatedName, err := validation.ValidateInstanceName(inst.Name)
if err != nil {
return fmt.Errorf("invalid instance name for file deletion: %w", err)
// Remove from registry
if err := im.registry.Remove(name); err != nil {
return fmt.Errorf("failed to remove instance from registry: %w", err)
}
instancePath := filepath.Join(im.instancesConfig.InstancesDir, validatedName+".json")
if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to delete config file for instance %s: %w", validatedName, err)
// Delete persistence file
if err := im.persistence.Delete(name); err != nil {
return fmt.Errorf("failed to delete config file for instance %s: %w", name, err)
}
return nil
@@ -329,45 +357,35 @@ func (im *instanceManager) DeleteInstance(name string) error {
// StartInstance starts a stopped instance and returns it.
// If the instance is already running, it returns an error.
func (im *instanceManager) StartInstance(name string) (*instance.Instance, error) {
im.mu.RLock()
inst, exists := im.instances[name]
im.mu.RUnlock()
inst, exists := im.registry.Get(name)
if !exists {
return nil, fmt.Errorf("instance with name %s not found", name)
}
// Check if instance is remote and delegate to remote operation
if node := im.getNodeForInstance(inst); node != nil {
remoteInst, err := im.StartRemoteInstance(node, name)
ctx := context.Background()
remoteInst, err := im.remote.StartInstance(ctx, node, name)
if err != nil {
return nil, err
}
// Update the local stub with all remote data (preserving Nodes)
im.mu.Lock()
im.updateLocalInstanceFromRemote(inst, remoteInst)
im.mu.Unlock()
return inst, nil
}
// Lock for local instance operations to prevent races
im.operationMu.Lock()
defer im.operationMu.Unlock()
if inst.IsRunning() {
return inst, fmt.Errorf("instance with name %s is already running", name)
}
// Check max running instances limit for local instances only
im.mu.RLock()
localRunningCount := 0
for instName := range im.runningInstances {
if _, isRemote := im.instanceNodeMap[instName]; !isRemote {
localRunningCount++
}
}
maxRunningExceeded := localRunningCount >= im.instancesConfig.MaxRunningInstances && im.instancesConfig.MaxRunningInstances != -1
im.mu.RUnlock()
if maxRunningExceeded {
if im.IsMaxRunningInstancesReached() {
return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.instancesConfig.MaxRunningInstances))
}
@@ -375,52 +393,55 @@ func (im *instanceManager) StartInstance(name string) (*instance.Instance, error
return nil, fmt.Errorf("failed to start instance %s: %w", name, err)
}
im.mu.Lock()
defer im.mu.Unlock()
err := im.persistInstance(inst)
if err != nil {
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
// Persist instance (best-effort, don't fail if persistence fails)
if err := im.persistInstance(inst); err != nil {
log.Printf("Warning: failed to persist instance %s: %v", name, err)
}
return inst, nil
}
func (im *instanceManager) IsMaxRunningInstancesReached() bool {
im.mu.RLock()
defer im.mu.RUnlock()
if im.instancesConfig.MaxRunningInstances != -1 && len(im.runningInstances) >= im.instancesConfig.MaxRunningInstances {
return true
if im.instancesConfig.MaxRunningInstances == -1 {
return false
}
return false
// Count only local running instances (each node has its own limits)
localRunningCount := 0
for _, inst := range im.registry.ListRunning() {
if !inst.IsRemote() {
localRunningCount++
}
}
return localRunningCount >= im.instancesConfig.MaxRunningInstances
}
// StopInstance stops a running instance and returns it.
func (im *instanceManager) StopInstance(name string) (*instance.Instance, error) {
im.mu.RLock()
inst, exists := im.instances[name]
im.mu.RUnlock()
inst, exists := im.registry.Get(name)
if !exists {
return nil, fmt.Errorf("instance with name %s not found", name)
}
// Check if instance is remote and delegate to remote operation
if node := im.getNodeForInstance(inst); node != nil {
remoteInst, err := im.StopRemoteInstance(node, name)
ctx := context.Background()
remoteInst, err := im.remote.StopInstance(ctx, node, name)
if err != nil {
return nil, err
}
// Update the local stub with all remote data (preserving Nodes)
im.mu.Lock()
im.updateLocalInstanceFromRemote(inst, remoteInst)
im.mu.Unlock()
return inst, nil
}
// Lock for local instance operations to prevent races
im.operationMu.Lock()
defer im.operationMu.Unlock()
if !inst.IsRunning() {
return inst, fmt.Errorf("instance with name %s is already stopped", name)
}
@@ -429,11 +450,9 @@ func (im *instanceManager) StopInstance(name string) (*instance.Instance, error)
return nil, fmt.Errorf("failed to stop instance %s: %w", name, err)
}
im.mu.Lock()
defer im.mu.Unlock()
err := im.persistInstance(inst)
if err != nil {
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
// Persist instance (best-effort, don't fail if persistence fails)
if err := im.persistInstance(inst); err != nil {
log.Printf("Warning: failed to persist instance %s: %v", name, err)
}
return inst, nil
@@ -441,49 +460,60 @@ func (im *instanceManager) StopInstance(name string) (*instance.Instance, error)
// RestartInstance stops and then starts an instance, returning the updated instance.
func (im *instanceManager) RestartInstance(name string) (*instance.Instance, error) {
im.mu.RLock()
inst, exists := im.instances[name]
im.mu.RUnlock()
inst, exists := im.registry.Get(name)
if !exists {
return nil, fmt.Errorf("instance with name %s not found", name)
}
// Check if instance is remote and delegate to remote operation
if node := im.getNodeForInstance(inst); node != nil {
remoteInst, err := im.RestartRemoteInstance(node, name)
ctx := context.Background()
remoteInst, err := im.remote.RestartInstance(ctx, node, name)
if err != nil {
return nil, err
}
// Update the local stub with all remote data (preserving Nodes)
im.mu.Lock()
im.updateLocalInstanceFromRemote(inst, remoteInst)
im.mu.Unlock()
return inst, nil
}
inst, err := im.StopInstance(name)
if err != nil {
return nil, err
// Lock for the entire restart operation to ensure atomicity
im.operationMu.Lock()
defer im.operationMu.Unlock()
// Stop the instance
if inst.IsRunning() {
if err := inst.Stop(); err != nil {
return nil, fmt.Errorf("failed to stop instance %s: %w", name, err)
}
}
return im.StartInstance(inst.Name)
// Start the instance
if err := inst.Start(); err != nil {
return nil, fmt.Errorf("failed to start instance %s: %w", name, err)
}
// Persist the restarted instance
if err := im.persistInstance(inst); err != nil {
log.Printf("Warning: failed to persist instance %s: %v", name, err)
}
return inst, nil
}
// GetInstanceLogs retrieves the logs for a specific instance by its name.
func (im *instanceManager) GetInstanceLogs(name string, numLines int) (string, error) {
im.mu.RLock()
inst, exists := im.instances[name]
im.mu.RUnlock()
inst, exists := im.registry.Get(name)
if !exists {
return "", fmt.Errorf("instance with name %s not found", name)
}
// Check if instance is remote and delegate to remote operation
if node := im.getNodeForInstance(inst); node != nil {
return im.GetRemoteInstanceLogs(node, name, numLines)
ctx := context.Background()
return im.remote.GetInstanceLogs(ctx, node, name, numLines)
}
// Get logs from the local instance
@@ -500,27 +530,7 @@ func (im *instanceManager) setPortInOptions(options *instance.Options, port int)
options.BackendOptions.SetPort(port)
}
// assignAndValidatePort assigns a port if not specified and validates it's not in use
func (im *instanceManager) assignAndValidatePort(options *instance.Options) error {
currentPort := im.getPortFromOptions(options)
if currentPort == 0 {
// Assign a port if not specified
port, err := im.getNextAvailablePort()
if err != nil {
return fmt.Errorf("failed to get next available port: %w", err)
}
im.setPortInOptions(options, port)
// Mark the port as used
im.ports[port] = true
} else {
// Validate the specified port
if _, exists := im.ports[currentPort]; exists {
return fmt.Errorf("port %d is already in use", currentPort)
}
// Mark the port as used
im.ports[currentPort] = true
}
return nil
// EvictLRUInstance finds and stops the least recently used running instance.
func (im *instanceManager) EvictLRUInstance() error {
return im.lifecycle.EvictLRU()
}