mirror of
https://github.com/lordmathis/llamactl.git
synced 2025-11-06 00:54:23 +00:00
Enhance instance manager to support remote instance management and update related tests
This commit is contained in:
@@ -58,7 +58,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the instance manager
|
// Initialize the instance manager
|
||||||
instanceManager := manager.NewInstanceManager(cfg.Backends, cfg.Instances)
|
instanceManager := manager.NewInstanceManager(cfg.Backends, cfg.Instances, cfg.Nodes)
|
||||||
|
|
||||||
// Create a new handler with the instance manager
|
// Create a new handler with the instance manager
|
||||||
handler := server.NewHandler(instanceManager, cfg)
|
handler := server.NewHandler(instanceManager, cfg)
|
||||||
|
|||||||
@@ -57,14 +57,23 @@ type instanceManager struct {
|
|||||||
isShutdown bool
|
isShutdown bool
|
||||||
|
|
||||||
// Remote instance management
|
// Remote instance management
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
instanceNodeMap map[string]*config.NodeConfig // Maps instance name to its node config
|
||||||
|
nodeConfigMap map[string]*config.NodeConfig // Maps node name to node config for quick lookup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInstanceManager creates a new instance of InstanceManager.
|
// NewInstanceManager creates a new instance of InstanceManager.
|
||||||
func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig) InstanceManager {
|
func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig config.InstancesConfig, nodesConfig []config.NodeConfig) InstanceManager {
|
||||||
if instancesConfig.TimeoutCheckInterval <= 0 {
|
if instancesConfig.TimeoutCheckInterval <= 0 {
|
||||||
instancesConfig.TimeoutCheckInterval = 5 // Default to 5 minutes if not set
|
instancesConfig.TimeoutCheckInterval = 5 // Default to 5 minutes if not set
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build node config map for quick lookup
|
||||||
|
nodeConfigMap := make(map[string]*config.NodeConfig)
|
||||||
|
for i := range nodesConfig {
|
||||||
|
nodeConfigMap[nodesConfig[i].Name] = &nodesConfig[i]
|
||||||
|
}
|
||||||
|
|
||||||
im := &instanceManager{
|
im := &instanceManager{
|
||||||
instances: make(map[string]*instance.Process),
|
instances: make(map[string]*instance.Process),
|
||||||
runningInstances: make(map[string]struct{}),
|
runningInstances: make(map[string]struct{}),
|
||||||
@@ -79,6 +88,9 @@ func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig con
|
|||||||
httpClient: &http.Client{
|
httpClient: &http.Client{
|
||||||
Timeout: 30 * time.Second,
|
Timeout: 30 * time.Second,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
instanceNodeMap: make(map[string]*config.NodeConfig),
|
||||||
|
nodeConfigMap: nodeConfigMap,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load existing instances from disk
|
// Load existing instances from disk
|
||||||
@@ -316,3 +328,18 @@ func (im *instanceManager) onStatusChange(name string, oldStatus, newStatus inst
|
|||||||
delete(im.runningInstances, name)
|
delete(im.runningInstances, name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getNodeForInstance returns the node configuration for a remote instance
|
||||||
|
// Returns nil if the instance is not remote or the node is not found
|
||||||
|
func (im *instanceManager) getNodeForInstance(inst *instance.Process) *config.NodeConfig {
|
||||||
|
if !inst.IsRemote() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have a cached mapping
|
||||||
|
if nodeConfig, exists := im.instanceNodeMap[inst.Name]; exists {
|
||||||
|
return nodeConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ func TestNewInstanceManager(t *testing.T) {
|
|||||||
TimeoutCheckInterval: 5,
|
TimeoutCheckInterval: 5,
|
||||||
}
|
}
|
||||||
|
|
||||||
mgr := manager.NewInstanceManager(backendConfig, cfg)
|
mgr := manager.NewInstanceManager(backendConfig, cfg, nil)
|
||||||
if mgr == nil {
|
if mgr == nil {
|
||||||
t.Fatal("NewInstanceManager returned nil")
|
t.Fatal("NewInstanceManager returned nil")
|
||||||
}
|
}
|
||||||
@@ -69,7 +69,7 @@ func TestPersistence(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test instance persistence on creation
|
// Test instance persistence on creation
|
||||||
manager1 := manager.NewInstanceManager(backendConfig, cfg)
|
manager1 := manager.NewInstanceManager(backendConfig, cfg, nil)
|
||||||
options := &instance.CreateInstanceOptions{
|
options := &instance.CreateInstanceOptions{
|
||||||
BackendType: backends.BackendTypeLlamaCpp,
|
BackendType: backends.BackendTypeLlamaCpp,
|
||||||
LlamaServerOptions: &llamacpp.LlamaServerOptions{
|
LlamaServerOptions: &llamacpp.LlamaServerOptions{
|
||||||
@@ -90,7 +90,7 @@ func TestPersistence(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test loading instances from disk
|
// Test loading instances from disk
|
||||||
manager2 := manager.NewInstanceManager(backendConfig, cfg)
|
manager2 := manager.NewInstanceManager(backendConfig, cfg, nil)
|
||||||
instances, err := manager2.ListInstances()
|
instances, err := manager2.ListInstances()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ListInstances failed: %v", err)
|
t.Fatalf("ListInstances failed: %v", err)
|
||||||
@@ -207,5 +207,5 @@ func createTestManager() manager.InstanceManager {
|
|||||||
DefaultRestartDelay: 5,
|
DefaultRestartDelay: 5,
|
||||||
TimeoutCheckInterval: 5,
|
TimeoutCheckInterval: 5,
|
||||||
}
|
}
|
||||||
return manager.NewInstanceManager(backendConfig, cfg)
|
return manager.NewInstanceManager(backendConfig, cfg, nil)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,26 +75,37 @@ func (im *instanceManager) CreateInstance(name string, options *instance.CreateI
|
|||||||
// GetInstance retrieves an instance by its name.
|
// GetInstance retrieves an instance by its name.
|
||||||
func (im *instanceManager) GetInstance(name string) (*instance.Process, error) {
|
func (im *instanceManager) GetInstance(name string) (*instance.Process, error) {
|
||||||
im.mu.RLock()
|
im.mu.RLock()
|
||||||
defer im.mu.RUnlock()
|
inst, exists := im.instances[name]
|
||||||
|
im.mu.RUnlock()
|
||||||
|
|
||||||
instance, exists := im.instances[name]
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, fmt.Errorf("instance with name %s not found", name)
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
||||||
}
|
}
|
||||||
return instance, nil
|
|
||||||
|
// Check if instance is remote and delegate to remote operation
|
||||||
|
if node := im.getNodeForInstance(inst); node != nil {
|
||||||
|
return im.GetRemoteInstance(node, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return inst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateInstance updates the options of an existing instance and returns it.
|
// UpdateInstance updates the options of an existing instance and returns it.
|
||||||
// If the instance is running, it will be restarted to apply the new options.
|
// If the instance is running, it will be restarted to apply the new options.
|
||||||
func (im *instanceManager) UpdateInstance(name string, options *instance.CreateInstanceOptions) (*instance.Process, error) {
|
func (im *instanceManager) UpdateInstance(name string, options *instance.CreateInstanceOptions) (*instance.Process, error) {
|
||||||
im.mu.RLock()
|
im.mu.RLock()
|
||||||
instance, exists := im.instances[name]
|
inst, exists := im.instances[name]
|
||||||
im.mu.RUnlock()
|
im.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, fmt.Errorf("instance with name %s not found", name)
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if instance is remote and delegate to remote operation
|
||||||
|
if node := im.getNodeForInstance(inst); node != nil {
|
||||||
|
return im.UpdateRemoteInstance(node, name, options)
|
||||||
|
}
|
||||||
|
|
||||||
if options == nil {
|
if options == nil {
|
||||||
return nil, fmt.Errorf("instance options cannot be nil")
|
return nil, fmt.Errorf("instance options cannot be nil")
|
||||||
}
|
}
|
||||||
@@ -105,55 +116,63 @@ func (im *instanceManager) UpdateInstance(name string, options *instance.CreateI
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if instance is running before updating options
|
// Check if instance is running before updating options
|
||||||
wasRunning := instance.IsRunning()
|
wasRunning := inst.IsRunning()
|
||||||
|
|
||||||
// If the instance is running, stop it first
|
// If the instance is running, stop it first
|
||||||
if wasRunning {
|
if wasRunning {
|
||||||
if err := instance.Stop(); err != nil {
|
if err := inst.Stop(); err != nil {
|
||||||
return nil, fmt.Errorf("failed to stop instance %s for update: %w", name, err)
|
return nil, fmt.Errorf("failed to stop instance %s for update: %w", name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now update the options while the instance is stopped
|
// Now update the options while the instance is stopped
|
||||||
instance.SetOptions(options)
|
inst.SetOptions(options)
|
||||||
|
|
||||||
// If it was running before, start it again with the new options
|
// If it was running before, start it again with the new options
|
||||||
if wasRunning {
|
if wasRunning {
|
||||||
if err := instance.Start(); err != nil {
|
if err := inst.Start(); err != nil {
|
||||||
return nil, fmt.Errorf("failed to start instance %s after update: %w", name, err)
|
return nil, fmt.Errorf("failed to start instance %s after update: %w", name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
im.mu.Lock()
|
im.mu.Lock()
|
||||||
defer im.mu.Unlock()
|
defer im.mu.Unlock()
|
||||||
if err := im.persistInstance(instance); err != nil {
|
if err := im.persistInstance(inst); err != nil {
|
||||||
return nil, fmt.Errorf("failed to persist updated instance %s: %w", name, err)
|
return nil, fmt.Errorf("failed to persist updated instance %s: %w", name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return instance, nil
|
return inst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteInstance removes stopped instance by its name.
|
// DeleteInstance removes stopped instance by its name.
|
||||||
func (im *instanceManager) DeleteInstance(name string) error {
|
func (im *instanceManager) DeleteInstance(name string) error {
|
||||||
im.mu.Lock()
|
im.mu.Lock()
|
||||||
defer im.mu.Unlock()
|
inst, exists := im.instances[name]
|
||||||
|
im.mu.Unlock()
|
||||||
|
|
||||||
instance, exists := im.instances[name]
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return fmt.Errorf("instance with name %s not found", name)
|
return fmt.Errorf("instance with name %s not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if instance.IsRunning() {
|
// Check if instance is remote and delegate to remote operation
|
||||||
|
if node := im.getNodeForInstance(inst); node != nil {
|
||||||
|
return im.DeleteRemoteInstance(node, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if inst.IsRunning() {
|
||||||
return fmt.Errorf("instance with name %s is still running, stop it before deleting", name)
|
return fmt.Errorf("instance with name %s is still running, stop it before deleting", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(im.ports, instance.GetPort())
|
im.mu.Lock()
|
||||||
|
defer im.mu.Unlock()
|
||||||
|
|
||||||
|
delete(im.ports, inst.GetPort())
|
||||||
delete(im.instances, name)
|
delete(im.instances, name)
|
||||||
|
|
||||||
// Delete the instance's config file if persistence is enabled
|
// Delete the instance's config file if persistence is enabled
|
||||||
instancePath := filepath.Join(im.instancesConfig.InstancesDir, instance.Name+".json")
|
instancePath := filepath.Join(im.instancesConfig.InstancesDir, inst.Name+".json")
|
||||||
if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) {
|
if err := os.Remove(instancePath); err != nil && !os.IsNotExist(err) {
|
||||||
return fmt.Errorf("failed to delete config file for instance %s: %w", instance.Name, err)
|
return fmt.Errorf("failed to delete config file for instance %s: %w", inst.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -163,33 +182,39 @@ func (im *instanceManager) DeleteInstance(name string) error {
|
|||||||
// If the instance is already running, it returns an error.
|
// If the instance is already running, it returns an error.
|
||||||
func (im *instanceManager) StartInstance(name string) (*instance.Process, error) {
|
func (im *instanceManager) StartInstance(name string) (*instance.Process, error) {
|
||||||
im.mu.RLock()
|
im.mu.RLock()
|
||||||
instance, exists := im.instances[name]
|
inst, exists := im.instances[name]
|
||||||
maxRunningExceeded := len(im.runningInstances) >= im.instancesConfig.MaxRunningInstances && im.instancesConfig.MaxRunningInstances != -1
|
maxRunningExceeded := len(im.runningInstances) >= im.instancesConfig.MaxRunningInstances && im.instancesConfig.MaxRunningInstances != -1
|
||||||
im.mu.RUnlock()
|
im.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, fmt.Errorf("instance with name %s not found", name)
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
||||||
}
|
}
|
||||||
if instance.IsRunning() {
|
|
||||||
return instance, fmt.Errorf("instance with name %s is already running", name)
|
// Check if instance is remote and delegate to remote operation
|
||||||
|
if node := im.getNodeForInstance(inst); node != nil {
|
||||||
|
return im.StartRemoteInstance(node, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if inst.IsRunning() {
|
||||||
|
return inst, fmt.Errorf("instance with name %s is already running", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maxRunningExceeded {
|
if maxRunningExceeded {
|
||||||
return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.instancesConfig.MaxRunningInstances))
|
return nil, MaxRunningInstancesError(fmt.Errorf("maximum number of running instances (%d) reached", im.instancesConfig.MaxRunningInstances))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := instance.Start(); err != nil {
|
if err := inst.Start(); err != nil {
|
||||||
return nil, fmt.Errorf("failed to start instance %s: %w", name, err)
|
return nil, fmt.Errorf("failed to start instance %s: %w", name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
im.mu.Lock()
|
im.mu.Lock()
|
||||||
defer im.mu.Unlock()
|
defer im.mu.Unlock()
|
||||||
err := im.persistInstance(instance)
|
err := im.persistInstance(inst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
|
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return instance, nil
|
return inst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (im *instanceManager) IsMaxRunningInstancesReached() bool {
|
func (im *instanceManager) IsMaxRunningInstancesReached() bool {
|
||||||
@@ -206,49 +231,73 @@ func (im *instanceManager) IsMaxRunningInstancesReached() bool {
|
|||||||
// StopInstance stops a running instance and returns it.
|
// StopInstance stops a running instance and returns it.
|
||||||
func (im *instanceManager) StopInstance(name string) (*instance.Process, error) {
|
func (im *instanceManager) StopInstance(name string) (*instance.Process, error) {
|
||||||
im.mu.RLock()
|
im.mu.RLock()
|
||||||
instance, exists := im.instances[name]
|
inst, exists := im.instances[name]
|
||||||
im.mu.RUnlock()
|
im.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, fmt.Errorf("instance with name %s not found", name)
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
||||||
}
|
}
|
||||||
if !instance.IsRunning() {
|
|
||||||
return instance, fmt.Errorf("instance with name %s is already stopped", name)
|
// Check if instance is remote and delegate to remote operation
|
||||||
|
if node := im.getNodeForInstance(inst); node != nil {
|
||||||
|
return im.StopRemoteInstance(node, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := instance.Stop(); err != nil {
|
if !inst.IsRunning() {
|
||||||
|
return inst, fmt.Errorf("instance with name %s is already stopped", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := inst.Stop(); err != nil {
|
||||||
return nil, fmt.Errorf("failed to stop instance %s: %w", name, err)
|
return nil, fmt.Errorf("failed to stop instance %s: %w", name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
im.mu.Lock()
|
im.mu.Lock()
|
||||||
defer im.mu.Unlock()
|
defer im.mu.Unlock()
|
||||||
err := im.persistInstance(instance)
|
err := im.persistInstance(inst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
|
return nil, fmt.Errorf("failed to persist instance %s: %w", name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return instance, nil
|
return inst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestartInstance stops and then starts an instance, returning the updated instance.
|
// RestartInstance stops and then starts an instance, returning the updated instance.
|
||||||
func (im *instanceManager) RestartInstance(name string) (*instance.Process, error) {
|
func (im *instanceManager) RestartInstance(name string) (*instance.Process, error) {
|
||||||
instance, err := im.StopInstance(name)
|
im.mu.RLock()
|
||||||
|
inst, exists := im.instances[name]
|
||||||
|
im.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("instance with name %s not found", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if instance is remote and delegate to remote operation
|
||||||
|
if node := im.getNodeForInstance(inst); node != nil {
|
||||||
|
return im.RestartRemoteInstance(node, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
inst, err := im.StopInstance(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return im.StartInstance(instance.Name)
|
return im.StartInstance(inst.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetInstanceLogs retrieves the logs for a specific instance by its name.
|
// GetInstanceLogs retrieves the logs for a specific instance by its name.
|
||||||
func (im *instanceManager) GetInstanceLogs(name string) (string, error) {
|
func (im *instanceManager) GetInstanceLogs(name string) (string, error) {
|
||||||
im.mu.RLock()
|
im.mu.RLock()
|
||||||
_, exists := im.instances[name]
|
inst, exists := im.instances[name]
|
||||||
im.mu.RUnlock()
|
im.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return "", fmt.Errorf("instance with name %s not found", name)
|
return "", fmt.Errorf("instance with name %s not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if instance is remote and delegate to remote operation
|
||||||
|
if node := im.getNodeForInstance(inst); node != nil {
|
||||||
|
return im.GetRemoteInstanceLogs(node, name)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Implement actual log retrieval logic
|
// TODO: Implement actual log retrieval logic
|
||||||
return fmt.Sprintf("Logs for instance %s", name), nil
|
return fmt.Sprintf("Logs for instance %s", name), nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ func TestCreateInstance_ValidationAndLimits(t *testing.T) {
|
|||||||
MaxInstances: 1, // Very low limit for testing
|
MaxInstances: 1, // Very low limit for testing
|
||||||
TimeoutCheckInterval: 5,
|
TimeoutCheckInterval: 5,
|
||||||
}
|
}
|
||||||
limitedManager := manager.NewInstanceManager(backendConfig, cfg)
|
limitedManager := manager.NewInstanceManager(backendConfig, cfg, nil)
|
||||||
|
|
||||||
_, err = limitedManager.CreateInstance("instance1", options)
|
_, err = limitedManager.CreateInstance("instance1", options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ func TestTimeoutFunctionality(t *testing.T) {
|
|||||||
MaxInstances: 5,
|
MaxInstances: 5,
|
||||||
}
|
}
|
||||||
|
|
||||||
manager := manager.NewInstanceManager(backendConfig, cfg)
|
manager := manager.NewInstanceManager(backendConfig, cfg, nil)
|
||||||
if manager == nil {
|
if manager == nil {
|
||||||
t.Fatal("Manager should be initialized with timeout checker")
|
t.Fatal("Manager should be initialized with timeout checker")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user