diff --git a/pkg/config/config.go b/pkg/config/config.go index 3701643..1d86f4c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -41,6 +41,7 @@ type AppConfig struct { Backends BackendConfig `yaml:"backends"` Instances InstancesConfig `yaml:"instances"` Auth AuthConfig `yaml:"auth"` + Nodes []NodeConfig `yaml:"nodes,omitempty"` Version string `yaml:"-"` CommitHash string `yaml:"-"` BuildTime string `yaml:"-"` @@ -125,6 +126,12 @@ type AuthConfig struct { ManagementKeys []string `yaml:"management_keys"` } +type NodeConfig struct { + Name string `yaml:"name"` + Address string `yaml:"address"` + APIKey string `yaml:"api_key,omitempty"` +} + // LoadConfig loads configuration with the following precedence: // 1. Hardcoded defaults // 2. Config file diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 228f382..dee38ff 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -287,3 +287,14 @@ func (i *Process) UnmarshalJSON(data []byte) error { return nil } + +func (i *Process) IsRemote() bool { + i.mu.RLock() + defer i.mu.RUnlock() + + if i.options == nil { + return false + } + + return len(i.options.Nodes) > 0 +} diff --git a/pkg/instance/options.go b/pkg/instance/options.go index 62181dd..439f426 100644 --- a/pkg/instance/options.go +++ b/pkg/instance/options.go @@ -27,6 +27,8 @@ type CreateInstanceOptions struct { BackendType backends.BackendType `json:"backend_type"` BackendOptions map[string]any `json:"backend_options,omitempty"` + Nodes []string `json:"nodes,omitempty"` + // Backend-specific options LlamaServerOptions *llamacpp.LlamaServerOptions `json:"-"` MlxServerOptions *mlx.MlxServerOptions `json:"-"` diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 6999643..e160d6c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -6,6 +6,7 @@ import ( "llamactl/pkg/config" "llamactl/pkg/instance" "log" + "net/http" "os" "path/filepath" "strings" @@ -29,6 +30,18 @@ type InstanceManager interface { Shutdown() } +type RemoteManager interface { + ListRemoteInstances(node *config.NodeConfig) ([]*instance.Process, error) + CreateRemoteInstance(node *config.NodeConfig, name string, options *instance.CreateInstanceOptions) (*instance.Process, error) + GetRemoteInstance(node *config.NodeConfig, name string) (*instance.Process, error) + UpdateRemoteInstance(node *config.NodeConfig, name string, options *instance.CreateInstanceOptions) (*instance.Process, error) + DeleteRemoteInstance(node *config.NodeConfig, name string) error + StartRemoteInstance(node *config.NodeConfig, name string) (*instance.Process, error) + StopRemoteInstance(node *config.NodeConfig, name string) (*instance.Process, error) + RestartRemoteInstance(node *config.NodeConfig, name string) (*instance.Process, error) + GetRemoteInstanceLogs(node *config.NodeConfig, name string) (string, error) +} + type instanceManager struct { mu sync.RWMutex instances map[string]*instance.Process @@ -42,6 +55,9 @@ type instanceManager struct { shutdownChan chan struct{} shutdownDone chan struct{} isShutdown bool + + // Remote instance management + httpClient *http.Client } // NewInstanceManager creates a new instance of InstanceManager. @@ -59,6 +75,10 @@ func NewInstanceManager(backendsConfig config.BackendConfig, instancesConfig con timeoutChecker: time.NewTicker(time.Duration(instancesConfig.TimeoutCheckInterval) * time.Minute), shutdownChan: make(chan struct{}), shutdownDone: make(chan struct{}), + + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, } // Load existing instances from disk diff --git a/pkg/manager/remote_ops.go b/pkg/manager/remote_ops.go new file mode 100644 index 0000000..5050737 --- /dev/null +++ b/pkg/manager/remote_ops.go @@ -0,0 +1,228 @@ +package manager + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "llamactl/pkg/config" + "llamactl/pkg/instance" + "net/http" +) + +// makeRemoteRequest is a helper function to make HTTP requests to a remote node +func (im *instanceManager) makeRemoteRequest(nodeConfig *config.NodeConfig, method, path string, body any) (*http.Response, error) { + var reqBody io.Reader + if body != nil { + jsonData, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal request body: %w", err) + } + reqBody = bytes.NewBuffer(jsonData) + } + + url := fmt.Sprintf("%s%s", nodeConfig.Address, path) + req, err := http.NewRequest(method, url, reqBody) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + + if nodeConfig.APIKey != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", nodeConfig.APIKey)) + } + + resp, err := im.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + + return resp, nil +} + +// parseRemoteResponse is a helper function to parse API responses +func parseRemoteResponse(resp *http.Response, result any) error { + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + if result != nil { + if err := json.Unmarshal(body, result); err != nil { + return fmt.Errorf("failed to unmarshal response: %w", err) + } + } + + return nil +} + +// ListRemoteInstances lists all instances on the remote node +func (im *instanceManager) ListRemoteInstances(nodeConfig *config.NodeConfig) ([]*instance.Process, error) { + resp, err := im.makeRemoteRequest(nodeConfig, "GET", "/api/v1/instances/", nil) + if err != nil { + return nil, err + } + + var instances []*instance.Process + if err := parseRemoteResponse(resp, &instances); err != nil { + return nil, err + } + + return instances, nil +} + +// CreateRemoteInstance creates a new instance on the remote node +func (im *instanceManager) CreateRemoteInstance(nodeConfig *config.NodeConfig, name string, options *instance.CreateInstanceOptions) (*instance.Process, error) { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + payload := map[string]any{ + "options": options, + } + + resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, payload) + if err != nil { + return nil, err + } + + var inst instance.Process + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// GetRemoteInstance retrieves an instance by name from the remote node +func (im *instanceManager) GetRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Process, error) { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + resp, err := im.makeRemoteRequest(nodeConfig, "GET", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Process + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// UpdateRemoteInstance updates an existing instance on the remote node +func (im *instanceManager) UpdateRemoteInstance(nodeConfig *config.NodeConfig, name string, options *instance.CreateInstanceOptions) (*instance.Process, error) { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + payload := map[string]any{ + "options": options, + } + + resp, err := im.makeRemoteRequest(nodeConfig, "PUT", path, payload) + if err != nil { + return nil, err + } + + var inst instance.Process + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// DeleteRemoteInstance deletes an instance from the remote node +func (im *instanceManager) DeleteRemoteInstance(nodeConfig *config.NodeConfig, name string) error { + path := fmt.Sprintf("/api/v1/instances/%s/", name) + resp, err := im.makeRemoteRequest(nodeConfig, "DELETE", path, nil) + if err != nil { + return err + } + + return parseRemoteResponse(resp, nil) +} + +// StartRemoteInstance starts an instance on the remote node +func (im *instanceManager) StartRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Process, error) { + path := fmt.Sprintf("/api/v1/instances/%s/start", name) + resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Process + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// StopRemoteInstance stops an instance on the remote node +func (im *instanceManager) StopRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Process, error) { + path := fmt.Sprintf("/api/v1/instances/%s/stop", name) + resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Process + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// RestartRemoteInstance restarts an instance on the remote node +func (im *instanceManager) RestartRemoteInstance(nodeConfig *config.NodeConfig, name string) (*instance.Process, error) { + path := fmt.Sprintf("/api/v1/instances/%s/restart", name) + resp, err := im.makeRemoteRequest(nodeConfig, "POST", path, nil) + if err != nil { + return nil, err + } + + var inst instance.Process + if err := parseRemoteResponse(resp, &inst); err != nil { + return nil, err + } + + return &inst, nil +} + +// GetRemoteInstanceLogs retrieves logs for an instance from the remote node +func (im *instanceManager) GetRemoteInstanceLogs(nodeConfig *config.NodeConfig, name string) (string, error) { + path := fmt.Sprintf("/api/v1/instances/%s/logs", name) + resp, err := im.makeRemoteRequest(nodeConfig, "GET", path, nil) + if err != nil { + return "", err + } + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + // Logs endpoint might return plain text or JSON + // Try to parse as JSON first (in case it's wrapped in a response object) + var logResponse struct { + Logs string `json:"logs"` + } + if err := json.Unmarshal(body, &logResponse); err == nil && logResponse.Logs != "" { + return logResponse.Logs, nil + } + + // Otherwise, return as plain text + return string(body), nil +}