mirror of
https://github.com/lordmathis/llamactl.git
synced 2025-11-05 16:44:22 +00:00
552 lines
16 KiB
Go
552 lines
16 KiB
Go
package manager
|
|
|
|
import (
|
|
"fmt"
|
|
"llamactl/pkg/backends"
|
|
"llamactl/pkg/config"
|
|
"llamactl/pkg/instance"
|
|
"llamactl/pkg/validation"
|
|
"os"
|
|
"path/filepath"
|
|
)
|
|
|
|
type MaxRunningInstancesError error
|
|
|
|
// updateLocalInstanceFromRemote updates the local stub instance with data from the remote instance
|
|
// while preserving the Nodes field to maintain remote instance tracking
|
|
func (im *instanceManager) updateLocalInstanceFromRemote(localInst *instance.Process, remoteInst *instance.Process) {
|
|
if localInst == nil || remoteInst == nil {
|
|
return
|
|
}
|
|
|
|
// Get the remote instance options
|
|
remoteOptions := remoteInst.GetOptions()
|
|
if remoteOptions == nil {
|
|
return
|
|
}
|
|
|
|
// Preserve the Nodes field from the local instance
|
|
localOptions := localInst.GetOptions()
|
|
var preservedNodes []string
|
|
if localOptions != nil && len(localOptions.Nodes) > 0 {
|
|
preservedNodes = make([]string, len(localOptions.Nodes))
|
|
copy(preservedNodes, localOptions.Nodes)
|
|
}
|
|
|
|
// Create a copy of remote options and restore the Nodes field
|
|
updatedOptions := *remoteOptions
|
|
updatedOptions.Nodes = preservedNodes
|
|
|
|
// Update the local instance with all remote data
|
|
localInst.SetOptions(&updatedOptions)
|
|
localInst.Status = remoteInst.Status
|
|
localInst.Created = remoteInst.Created
|
|
}
|
|
|
|
// ListInstances returns a list of all instances managed by the instance manager.
|
|
// For remote instances, this fetches the live state from remote nodes and updates local stubs.
|
|
func (im *instanceManager) ListInstances() ([]*instance.Process, error) {
|
|
im.mu.RLock()
|
|
localInstances := make([]*instance.Process, 0, len(im.instances))
|
|
for _, inst := range im.instances {
|
|
localInstances = append(localInstances, inst)
|
|
}
|
|
im.mu.RUnlock()
|
|
|
|
// Update remote instances with live state
|
|
for _, inst := range localInstances {
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
remoteInst, err := im.GetRemoteInstance(node, inst.Name)
|
|
if err != nil {
|
|
// Log error but continue with stale data
|
|
// Don't fail the entire list operation due to one remote failure
|
|
continue
|
|
}
|
|
|
|
// Update the local stub with all remote data (preserving Nodes)
|
|
im.mu.Lock()
|
|
im.updateLocalInstanceFromRemote(inst, remoteInst)
|
|
im.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
return localInstances, nil
|
|
}
|
|
|
|
// CreateInstance creates a new instance with the given options and returns it.
|
|
// The instance is initially in a "stopped" state.
|
|
func (im *instanceManager) CreateInstance(name string, options *instance.CreateInstanceOptions) (*instance.Process, error) {
|
|
if options == nil {
|
|
return nil, fmt.Errorf("instance options cannot be nil")
|
|
}
|
|
|
|
name, err := validation.ValidateInstanceName(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = validation.ValidateInstanceOptions(options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
|
|
// Check if instance with this name already exists (must be globally unique)
|
|
if im.instances[name] != nil {
|
|
return nil, fmt.Errorf("instance with name %s already exists", name)
|
|
}
|
|
|
|
// Check if this is a remote instance
|
|
isRemote := len(options.Nodes) > 0
|
|
var nodeConfig *config.NodeConfig
|
|
|
|
if isRemote {
|
|
// Validate that the node exists
|
|
nodeName := options.Nodes[0] // Use first node for now
|
|
var exists bool
|
|
nodeConfig, exists = im.nodeConfigMap[nodeName]
|
|
if !exists {
|
|
return nil, fmt.Errorf("node %s not found", nodeName)
|
|
}
|
|
|
|
// Create the remote instance on the remote node
|
|
remoteInst, err := im.CreateRemoteInstance(nodeConfig, name, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create a local stub that preserves the Nodes field for tracking
|
|
// We keep the original options (with Nodes) so IsRemote() works correctly
|
|
inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, nil)
|
|
|
|
// Update the local stub with all remote data (preserving Nodes)
|
|
im.updateLocalInstanceFromRemote(inst, remoteInst)
|
|
|
|
// Add to local tracking maps (but don't count towards limits)
|
|
im.instances[name] = inst
|
|
im.instanceNodeMap[name] = nodeConfig
|
|
|
|
// Persist the remote instance locally for tracking across restarts
|
|
if err := im.persistInstance(inst); err != nil {
|
|
return nil, fmt.Errorf("failed to persist remote instance %s: %w", name, err)
|
|
}
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
// Local instance creation
|
|
// Check max instances limit for local instances only
|
|
localInstanceCount := len(im.instances) - len(im.instanceNodeMap)
|
|
if localInstanceCount >= im.instancesConfig.MaxInstances && im.instancesConfig.MaxInstances != -1 {
|
|
return nil, fmt.Errorf("maximum number of instances (%d) reached", im.instancesConfig.MaxInstances)
|
|
}
|
|
|
|
// Assign and validate port for backend-specific options
|
|
if err := im.assignAndValidatePort(options); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
statusCallback := func(oldStatus, newStatus instance.InstanceStatus) {
|
|
im.onStatusChange(name, oldStatus, newStatus)
|
|
}
|
|
|
|
inst := instance.NewInstance(name, &im.backendsConfig, &im.instancesConfig, options, statusCallback)
|
|
im.instances[inst.Name] = inst
|
|
|
|
if err := im.persistInstance(inst); err != nil {
|
|
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
|
|
}
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
// GetInstance retrieves an instance by its name.
|
|
// For remote instances, this fetches the live state from the remote node and updates the local stub.
|
|
func (im *instanceManager) GetInstance(name string) (*instance.Process, error) {
|
|
im.mu.RLock()
|
|
inst, exists := im.instances[name]
|
|
im.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
|
}
|
|
|
|
// Check if instance is remote and fetch live state
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
remoteInst, err := im.GetRemoteInstance(node, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update the local stub with all remote data (preserving Nodes)
|
|
im.mu.Lock()
|
|
im.updateLocalInstanceFromRemote(inst, remoteInst)
|
|
im.mu.Unlock()
|
|
|
|
// Return the local stub (preserving Nodes field)
|
|
return inst, nil
|
|
}
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
// UpdateInstance updates the options of an existing instance and returns it.
|
|
// If the instance is running, it will be restarted to apply the new options.
|
|
func (im *instanceManager) UpdateInstance(name string, options *instance.CreateInstanceOptions) (*instance.Process, error) {
|
|
im.mu.RLock()
|
|
inst, exists := im.instances[name]
|
|
im.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
|
}
|
|
|
|
// Check if instance is remote and delegate to remote operation
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
remoteInst, err := im.UpdateRemoteInstance(node, name, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update the local stub with all remote data (preserving Nodes)
|
|
im.mu.Lock()
|
|
im.updateLocalInstanceFromRemote(inst, remoteInst)
|
|
im.mu.Unlock()
|
|
|
|
// Persist the updated remote instance locally
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
if err := im.persistInstance(inst); err != nil {
|
|
return nil, fmt.Errorf("failed to persist updated remote instance %s: %w", name, err)
|
|
}
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
if options == nil {
|
|
return nil, fmt.Errorf("instance options cannot be nil")
|
|
}
|
|
|
|
err := validation.ValidateInstanceOptions(options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check if instance is running before updating options
|
|
wasRunning := inst.IsRunning()
|
|
|
|
// If the instance is running, stop it first
|
|
if wasRunning {
|
|
if err := inst.Stop(); err != nil {
|
|
return nil, fmt.Errorf("failed to stop instance %s for update: %w", name, err)
|
|
}
|
|
}
|
|
|
|
// Now update the options while the instance is stopped
|
|
inst.SetOptions(options)
|
|
|
|
// If it was running before, start it again with the new options
|
|
if wasRunning {
|
|
if err := inst.Start(); err != nil {
|
|
return nil, fmt.Errorf("failed to start instance %s after update: %w", name, err)
|
|
}
|
|
}
|
|
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
if err := im.persistInstance(inst); err != nil {
|
|
return nil, fmt.Errorf("failed to persist updated instance %s: %w", name, err)
|
|
}
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
// DeleteInstance removes stopped instance by its name.
|
|
func (im *instanceManager) DeleteInstance(name string) error {
|
|
im.mu.Lock()
|
|
inst, exists := im.instances[name]
|
|
im.mu.Unlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("instance with name %s not found", name)
|
|
}
|
|
|
|
// Check if instance is remote and delegate to remote operation
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
err := im.DeleteRemoteInstance(node, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Clean up local tracking
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
delete(im.instances, name)
|
|
delete(im.instanceNodeMap, name)
|
|
|
|
// Delete the instance's config file if persistence is enabled
|
|
// Re-validate instance name for security (defense in depth)
|
|
validatedName, err := validation.ValidateInstanceName(name)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid instance name for file deletion: %w", err)
|
|
}
|
|
instancePath := filepath.Join(im.instancesConfig.InstancesDir, validatedName+".json")
|
|
if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("failed to delete config file for remote instance %s: %w", validatedName, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if inst.IsRunning() {
|
|
return fmt.Errorf("instance with name %s is still running, stop it before deleting", name)
|
|
}
|
|
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
|
|
delete(im.ports, inst.GetPort())
|
|
delete(im.instances, name)
|
|
|
|
// Delete the instance's config file if persistence is enabled
|
|
// Re-validate instance name for security (defense in depth)
|
|
validatedName, err := validation.ValidateInstanceName(inst.Name)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid instance name for file deletion: %w", err)
|
|
}
|
|
instancePath := filepath.Join(im.instancesConfig.InstancesDir, validatedName+".json")
|
|
if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("failed to delete config file for instance %s: %w", validatedName, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StartInstance starts a stopped instance and returns it.
|
|
// If the instance is already running, it returns an error.
|
|
func (im *instanceManager) StartInstance(name string) (*instance.Process, error) {
|
|
im.mu.RLock()
|
|
inst, exists := im.instances[name]
|
|
im.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
|
}
|
|
|
|
// Check if instance is remote and delegate to remote operation
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
remoteInst, err := im.StartRemoteInstance(node, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update the local stub with all remote data (preserving Nodes)
|
|
im.mu.Lock()
|
|
im.updateLocalInstanceFromRemote(inst, remoteInst)
|
|
im.mu.Unlock()
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
if inst.IsRunning() {
|
|
return inst, fmt.Errorf("instance with name %s is already running", name)
|
|
}
|
|
|
|
// Check max running instances limit for local instances only
|
|
im.mu.RLock()
|
|
localRunningCount := 0
|
|
for instName := range im.runningInstances {
|
|
if _, isRemote := im.instanceNodeMap[instName]; !isRemote {
|
|
localRunningCount++
|
|
}
|
|
}
|
|
maxRunningExceeded := localRunningCount >= im.instancesConfig.MaxRunningInstances && im.instancesConfig.MaxRunningInstances != -1
|
|
im.mu.RUnlock()
|
|
|
|
if maxRunningExceeded {
|
|
return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.instancesConfig.MaxRunningInstances))
|
|
}
|
|
|
|
if err := inst.Start(); err != nil {
|
|
return nil, fmt.Errorf("failed to start instance %s: %w", name, err)
|
|
}
|
|
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
err := im.persistInstance(inst)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
|
|
}
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
func (im *instanceManager) IsMaxRunningInstancesReached() bool {
|
|
im.mu.RLock()
|
|
defer im.mu.RUnlock()
|
|
|
|
if im.instancesConfig.MaxRunningInstances != -1 && len(im.runningInstances) >= im.instancesConfig.MaxRunningInstances {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// StopInstance stops a running instance and returns it.
|
|
func (im *instanceManager) StopInstance(name string) (*instance.Process, error) {
|
|
im.mu.RLock()
|
|
inst, exists := im.instances[name]
|
|
im.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
|
}
|
|
|
|
// Check if instance is remote and delegate to remote operation
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
remoteInst, err := im.StopRemoteInstance(node, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update the local stub with all remote data (preserving Nodes)
|
|
im.mu.Lock()
|
|
im.updateLocalInstanceFromRemote(inst, remoteInst)
|
|
im.mu.Unlock()
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
if !inst.IsRunning() {
|
|
return inst, fmt.Errorf("instance with name %s is already stopped", name)
|
|
}
|
|
|
|
if err := inst.Stop(); err != nil {
|
|
return nil, fmt.Errorf("failed to stop instance %s: %w", name, err)
|
|
}
|
|
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
err := im.persistInstance(inst)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
|
|
}
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
// RestartInstance stops and then starts an instance, returning the updated instance.
|
|
func (im *instanceManager) RestartInstance(name string) (*instance.Process, error) {
|
|
im.mu.RLock()
|
|
inst, exists := im.instances[name]
|
|
im.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
|
}
|
|
|
|
// Check if instance is remote and delegate to remote operation
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
remoteInst, err := im.RestartRemoteInstance(node, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update the local stub with all remote data (preserving Nodes)
|
|
im.mu.Lock()
|
|
im.updateLocalInstanceFromRemote(inst, remoteInst)
|
|
im.mu.Unlock()
|
|
|
|
return inst, nil
|
|
}
|
|
|
|
inst, err := im.StopInstance(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return im.StartInstance(inst.Name)
|
|
}
|
|
|
|
// GetInstanceLogs retrieves the logs for a specific instance by its name.
|
|
func (im *instanceManager) GetInstanceLogs(name string, numLines int) (string, error) {
|
|
im.mu.RLock()
|
|
inst, exists := im.instances[name]
|
|
im.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return "", fmt.Errorf("instance with name %s not found", name)
|
|
}
|
|
|
|
// Check if instance is remote and delegate to remote operation
|
|
if node := im.getNodeForInstance(inst); node != nil {
|
|
return im.GetRemoteInstanceLogs(node, name, numLines)
|
|
}
|
|
|
|
// Get logs from the local instance
|
|
return inst.GetLogs(numLines)
|
|
}
|
|
|
|
// getPortFromOptions extracts the port from backend-specific options
|
|
func (im *instanceManager) getPortFromOptions(options *instance.CreateInstanceOptions) int {
|
|
switch options.BackendType {
|
|
case backends.BackendTypeLlamaCpp:
|
|
if options.LlamaServerOptions != nil {
|
|
return options.LlamaServerOptions.Port
|
|
}
|
|
case backends.BackendTypeMlxLm:
|
|
if options.MlxServerOptions != nil {
|
|
return options.MlxServerOptions.Port
|
|
}
|
|
case backends.BackendTypeVllm:
|
|
if options.VllmServerOptions != nil {
|
|
return options.VllmServerOptions.Port
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// setPortInOptions sets the port in backend-specific options
|
|
func (im *instanceManager) setPortInOptions(options *instance.CreateInstanceOptions, port int) {
|
|
switch options.BackendType {
|
|
case backends.BackendTypeLlamaCpp:
|
|
if options.LlamaServerOptions != nil {
|
|
options.LlamaServerOptions.Port = port
|
|
}
|
|
case backends.BackendTypeMlxLm:
|
|
if options.MlxServerOptions != nil {
|
|
options.MlxServerOptions.Port = port
|
|
}
|
|
case backends.BackendTypeVllm:
|
|
if options.VllmServerOptions != nil {
|
|
options.VllmServerOptions.Port = port
|
|
}
|
|
}
|
|
}
|
|
|
|
// assignAndValidatePort assigns a port if not specified and validates it's not in use
|
|
func (im *instanceManager) assignAndValidatePort(options *instance.CreateInstanceOptions) error {
|
|
currentPort := im.getPortFromOptions(options)
|
|
|
|
if currentPort == 0 {
|
|
// Assign a port if not specified
|
|
port, err := im.getNextAvailablePort()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get next available port: %w", err)
|
|
}
|
|
im.setPortInOptions(options, port)
|
|
// Mark the port as used
|
|
im.ports[port] = true
|
|
} else {
|
|
// Validate the specified port
|
|
if _, exists := im.ports[currentPort]; exists {
|
|
return fmt.Errorf("port %d is already in use", currentPort)
|
|
}
|
|
// Mark the port as used
|
|
im.ports[currentPort] = true
|
|
}
|
|
|
|
return nil
|
|
}
|