mirror of
https://github.com/lordmathis/llamactl.git
synced 2025-11-06 00:54:23 +00:00
Unexport struct methods
This commit is contained in:
@@ -12,29 +12,27 @@ import (
|
|||||||
|
|
||||||
// Instance represents a running instance of the llama server
|
// Instance represents a running instance of the llama server
|
||||||
type Instance struct {
|
type Instance struct {
|
||||||
// Immutable identity (no locking needed after creation)
|
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Created int64 `json:"created,omitempty"` // Unix timestamp when the instance was created
|
Created int64 `json:"created,omitempty"` // Unix timestamp when the instance was created
|
||||||
|
|
||||||
// Mutable state - each owns its own lock
|
// Global configuration
|
||||||
status *status `json:"-"` // unexported - status owns its lock
|
|
||||||
options *options `json:"-"` // unexported - options owns its lock
|
|
||||||
|
|
||||||
// Global configuration (read-only, no lock needed)
|
|
||||||
globalInstanceSettings *config.InstancesConfig
|
globalInstanceSettings *config.InstancesConfig
|
||||||
globalBackendSettings *config.BackendConfig
|
globalBackendSettings *config.BackendConfig
|
||||||
localNodeName string `json:"-"` // Name of the local node for remote detection
|
localNodeName string `json:"-"` // Name of the local node for remote detection
|
||||||
|
|
||||||
// Components (can be nil for remote instances or when stopped)
|
status *status `json:"-"`
|
||||||
process *process `json:"-"` // nil for remote instances, nil when stopped
|
options *options `json:"-"`
|
||||||
proxy *proxy `json:"-"` // nil for remote instances, created on demand
|
|
||||||
logger *logger `json:"-"` // nil for remote instances
|
// Components (can be nil for remote instances)
|
||||||
|
process *process `json:"-"`
|
||||||
|
proxy *proxy `json:"-"`
|
||||||
|
logger *logger `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new instance with the given name, log path, and options
|
// New creates a new instance with the given name, log path, options and local node name
|
||||||
func New(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, opts *Options, localNodeName string, onStatusChange func(oldStatus, newStatus Status)) *Instance {
|
func New(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, opts *Options, localNodeName string, onStatusChange func(oldStatus, newStatus Status)) *Instance {
|
||||||
// Validate and copy options
|
// Validate and copy options
|
||||||
opts.ValidateAndApplyDefaults(name, globalInstanceSettings)
|
opts.validateAndApplyDefaults(name, globalInstanceSettings)
|
||||||
|
|
||||||
// Create status wrapper
|
// Create status wrapper
|
||||||
status := newStatus(Stopped)
|
status := newStatus(Stopped)
|
||||||
@@ -54,45 +52,76 @@ func New(name string, globalBackendSettings *config.BackendConfig, globalInstanc
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Only create logger, proxy, and process for local instances
|
// Only create logger, proxy, and process for local instances
|
||||||
// Remote instances are metadata only (no logger, proxy, or process)
|
|
||||||
if !instance.IsRemote() {
|
if !instance.IsRemote() {
|
||||||
instance.logger = NewLogger(name, globalInstanceSettings.LogsDir)
|
instance.logger = newLogger(name, globalInstanceSettings.LogsDir)
|
||||||
instance.proxy = NewProxy(instance)
|
instance.proxy = newProxy(instance)
|
||||||
instance.process = newProcess(instance)
|
instance.process = newProcess(instance)
|
||||||
}
|
}
|
||||||
|
|
||||||
return instance
|
return instance
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetOptions returns the current options, delegating to options component
|
// Start starts the instance
|
||||||
|
func (i *Instance) Start() error {
|
||||||
|
if i.process == nil {
|
||||||
|
return fmt.Errorf("instance %s has no process component (remote instances cannot be started locally)", i.Name)
|
||||||
|
}
|
||||||
|
return i.process.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the instance
|
||||||
|
func (i *Instance) Stop() error {
|
||||||
|
if i.process == nil {
|
||||||
|
return fmt.Errorf("instance %s has no process component (remote instances cannot be stopped locally)", i.Name)
|
||||||
|
}
|
||||||
|
return i.process.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart restarts the instance
|
||||||
|
func (i *Instance) Restart() error {
|
||||||
|
if i.process == nil {
|
||||||
|
return fmt.Errorf("instance %s has no process component (remote instances cannot be restarted locally)", i.Name)
|
||||||
|
}
|
||||||
|
return i.process.restart()
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForHealthy waits for the instance to become healthy
|
||||||
|
func (i *Instance) WaitForHealthy(timeout int) error {
|
||||||
|
if i.process == nil {
|
||||||
|
return fmt.Errorf("instance %s has no process component (remote instances cannot be health checked locally)", i.Name)
|
||||||
|
}
|
||||||
|
return i.process.waitForHealthy(timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOptions returns the current options
|
||||||
func (i *Instance) GetOptions() *Options {
|
func (i *Instance) GetOptions() *Options {
|
||||||
if i.options == nil {
|
if i.options == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return i.options.Get()
|
return i.options.get()
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStatus returns the current status, delegating to status component
|
// GetStatus returns the current status
|
||||||
func (i *Instance) GetStatus() Status {
|
func (i *Instance) GetStatus() Status {
|
||||||
if i.status == nil {
|
if i.status == nil {
|
||||||
return Stopped
|
return Stopped
|
||||||
}
|
}
|
||||||
return i.status.Get()
|
return i.status.get()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetStatus sets the status, delegating to status component
|
// SetStatus sets the status
|
||||||
func (i *Instance) SetStatus(s Status) {
|
func (i *Instance) SetStatus(s Status) {
|
||||||
if i.status != nil {
|
if i.status != nil {
|
||||||
i.status.Set(s)
|
i.status.set(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsRunning returns true if the status is Running, delegating to status component
|
// IsRunning returns true if the status is Running
|
||||||
func (i *Instance) IsRunning() bool {
|
func (i *Instance) IsRunning() bool {
|
||||||
if i.status == nil {
|
if i.status == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return i.status.IsRunning()
|
return i.status.isRunning()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Instance) GetPort() int {
|
func (i *Instance) GetPort() int {
|
||||||
@@ -137,7 +166,7 @@ func (i *Instance) GetHost() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetOptions sets the options, delegating to options component
|
// SetOptions sets the options
|
||||||
func (i *Instance) SetOptions(opts *Options) {
|
func (i *Instance) SetOptions(opts *Options) {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
log.Println("Warning: Attempted to set nil options on instance", i.Name)
|
log.Println("Warning: Attempted to set nil options on instance", i.Name)
|
||||||
@@ -145,32 +174,31 @@ func (i *Instance) SetOptions(opts *Options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Preserve the original nodes to prevent changing instance location
|
// Preserve the original nodes to prevent changing instance location
|
||||||
if i.options != nil && i.options.Get() != nil && i.options.Get().Nodes != nil {
|
if i.options != nil && i.options.get() != nil && i.options.get().Nodes != nil {
|
||||||
opts.Nodes = i.options.Get().Nodes
|
opts.Nodes = i.options.get().Nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate and copy options
|
// Validate and copy options
|
||||||
opts.ValidateAndApplyDefaults(i.Name, i.globalInstanceSettings)
|
opts.validateAndApplyDefaults(i.Name, i.globalInstanceSettings)
|
||||||
|
|
||||||
if i.options != nil {
|
if i.options != nil {
|
||||||
i.options.Set(opts)
|
i.options.set(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear the proxy so it gets recreated with new options
|
// Clear the proxy so it gets recreated with new options
|
||||||
if i.proxy != nil {
|
if i.proxy != nil {
|
||||||
i.proxy.clearProxy()
|
i.proxy.clear()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetTimeProvider sets a custom time provider for testing
|
// SetTimeProvider sets a custom time provider for testing
|
||||||
// Delegates to the Proxy component
|
|
||||||
func (i *Instance) SetTimeProvider(tp TimeProvider) {
|
func (i *Instance) SetTimeProvider(tp TimeProvider) {
|
||||||
if i.proxy != nil {
|
if i.proxy != nil {
|
||||||
i.proxy.SetTimeProvider(tp)
|
i.proxy.setTimeProvider(tp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProxy returns the reverse proxy for this instance, delegating to Proxy component
|
// GetProxy returns the reverse proxy for this instance
|
||||||
func (i *Instance) GetProxy() (*httputil.ReverseProxy, error) {
|
func (i *Instance) GetProxy() (*httputil.ReverseProxy, error) {
|
||||||
if i.proxy == nil {
|
if i.proxy == nil {
|
||||||
return nil, fmt.Errorf("instance %s has no proxy component", i.Name)
|
return nil, fmt.Errorf("instance %s has no proxy component", i.Name)
|
||||||
@@ -184,7 +212,93 @@ func (i *Instance) GetProxy() (*httputil.ReverseProxy, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return i.proxy.GetProxy()
|
return i.proxy.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Instance) IsRemote() bool {
|
||||||
|
opts := i.GetOptions()
|
||||||
|
if opts == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no nodes specified, it's a local instance
|
||||||
|
if len(opts.Nodes) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the local node is in the nodes map, treat it as a local instance
|
||||||
|
if _, isLocal := opts.Nodes[i.localNodeName]; isLocal {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, it's a remote instance
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLogs retrieves the last n lines of logs from the instance
|
||||||
|
func (i *Instance) GetLogs(num_lines int) (string, error) {
|
||||||
|
if i.logger == nil {
|
||||||
|
return "", fmt.Errorf("instance %s has no logger (remote instances don't have logs)", i.Name)
|
||||||
|
}
|
||||||
|
return i.logger.getLogs(num_lines)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastRequestTime returns the last request time as a Unix timestamp
|
||||||
|
func (i *Instance) LastRequestTime() int64 {
|
||||||
|
if i.proxy == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return i.proxy.getLastRequestTime()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateLastRequestTime updates the last request access time for the instance via proxy
|
||||||
|
func (i *Instance) UpdateLastRequestTime() {
|
||||||
|
if i.proxy != nil {
|
||||||
|
i.proxy.updateLastRequestTime()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ShouldTimeout checks if the instance should timeout based on idle time
|
||||||
|
func (i *Instance) ShouldTimeout() bool {
|
||||||
|
if i.proxy == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return i.proxy.shouldTimeout()
|
||||||
|
}
|
||||||
|
|
||||||
|
// getBackendHostPort extracts the host and port from instance options
|
||||||
|
// Returns the configured host and port for the backend
|
||||||
|
func (i *Instance) getBackendHostPort() (string, int) {
|
||||||
|
opts := i.GetOptions()
|
||||||
|
if opts == nil {
|
||||||
|
return "localhost", 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var host string
|
||||||
|
var port int
|
||||||
|
switch opts.BackendType {
|
||||||
|
case backends.BackendTypeLlamaCpp:
|
||||||
|
if opts.LlamaServerOptions != nil {
|
||||||
|
host = opts.LlamaServerOptions.Host
|
||||||
|
port = opts.LlamaServerOptions.Port
|
||||||
|
}
|
||||||
|
case backends.BackendTypeMlxLm:
|
||||||
|
if opts.MlxServerOptions != nil {
|
||||||
|
host = opts.MlxServerOptions.Host
|
||||||
|
port = opts.MlxServerOptions.Port
|
||||||
|
}
|
||||||
|
case backends.BackendTypeVllm:
|
||||||
|
if opts.VllmServerOptions != nil {
|
||||||
|
host = opts.VllmServerOptions.Host
|
||||||
|
port = opts.VllmServerOptions.Port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if host == "" {
|
||||||
|
host = "localhost"
|
||||||
|
}
|
||||||
|
|
||||||
|
return host, port
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalJSON implements json.Marshaler for Instance
|
// MarshalJSON implements json.Marshaler for Instance
|
||||||
@@ -209,7 +323,6 @@ func (i *Instance) MarshalJSON() ([]byte, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Explicitly serialize to maintain backward compatible JSON format
|
|
||||||
return json.Marshal(&struct {
|
return json.Marshal(&struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Status *status `json:"status"`
|
Status *status `json:"status"`
|
||||||
@@ -247,9 +360,9 @@ func (i *Instance) UnmarshalJSON(data []byte) error {
|
|||||||
|
|
||||||
// Handle options with validation and defaults
|
// Handle options with validation and defaults
|
||||||
if i.options != nil {
|
if i.options != nil {
|
||||||
opts := i.options.Get()
|
opts := i.options.get()
|
||||||
if opts != nil {
|
if opts != nil {
|
||||||
opts.ValidateAndApplyDefaults(i.Name, i.globalInstanceSettings)
|
opts.validateAndApplyDefaults(i.Name, i.globalInstanceSettings)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -262,13 +375,12 @@ func (i *Instance) UnmarshalJSON(data []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Only create logger, proxy, and process for non-remote instances
|
// Only create logger, proxy, and process for non-remote instances
|
||||||
// Remote instances are metadata only (no logger, proxy, or process)
|
|
||||||
if !i.IsRemote() {
|
if !i.IsRemote() {
|
||||||
if i.logger == nil && i.globalInstanceSettings != nil {
|
if i.logger == nil && i.globalInstanceSettings != nil {
|
||||||
i.logger = NewLogger(i.Name, i.globalInstanceSettings.LogsDir)
|
i.logger = newLogger(i.Name, i.globalInstanceSettings.LogsDir)
|
||||||
}
|
}
|
||||||
if i.proxy == nil {
|
if i.proxy == nil {
|
||||||
i.proxy = NewProxy(i)
|
i.proxy = newProxy(i)
|
||||||
}
|
}
|
||||||
if i.process == nil {
|
if i.process == nil {
|
||||||
i.process = newProcess(i)
|
i.process = newProcess(i)
|
||||||
@@ -277,106 +389,3 @@ func (i *Instance) UnmarshalJSON(data []byte) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Instance) IsRemote() bool {
|
|
||||||
opts := i.GetOptions()
|
|
||||||
if opts == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// If no nodes specified, it's a local instance
|
|
||||||
if len(opts.Nodes) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the local node is in the nodes map, treat it as a local instance
|
|
||||||
if _, isLocal := opts.Nodes[i.localNodeName]; isLocal {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise, it's a remote instance
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *Instance) GetLogs(num_lines int) (string, error) {
|
|
||||||
if i.logger == nil {
|
|
||||||
return "", fmt.Errorf("instance %s has no logger (remote instances don't have logs)", i.Name)
|
|
||||||
}
|
|
||||||
return i.logger.GetLogs(num_lines)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start starts the instance, delegating to process component
|
|
||||||
func (i *Instance) Start() error {
|
|
||||||
if i.process == nil {
|
|
||||||
return fmt.Errorf("instance %s has no process component (remote instances cannot be started locally)", i.Name)
|
|
||||||
}
|
|
||||||
return i.process.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop stops the instance, delegating to process component
|
|
||||||
func (i *Instance) Stop() error {
|
|
||||||
if i.process == nil {
|
|
||||||
return fmt.Errorf("instance %s has no process component (remote instances cannot be stopped locally)", i.Name)
|
|
||||||
}
|
|
||||||
return i.process.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restart restarts the instance, delegating to process component
|
|
||||||
func (i *Instance) Restart() error {
|
|
||||||
if i.process == nil {
|
|
||||||
return fmt.Errorf("instance %s has no process component (remote instances cannot be restarted locally)", i.Name)
|
|
||||||
}
|
|
||||||
return i.process.Restart()
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitForHealthy waits for the instance to become healthy, delegating to process component
|
|
||||||
func (i *Instance) WaitForHealthy(timeout int) error {
|
|
||||||
if i.process == nil {
|
|
||||||
return fmt.Errorf("instance %s has no process component (remote instances cannot be health checked locally)", i.Name)
|
|
||||||
}
|
|
||||||
return i.process.WaitForHealthy(timeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LastRequestTime returns the last request time as a Unix timestamp
|
|
||||||
// Delegates to the Proxy component
|
|
||||||
func (i *Instance) LastRequestTime() int64 {
|
|
||||||
if i.proxy == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return i.proxy.LastRequestTime()
|
|
||||||
}
|
|
||||||
|
|
||||||
// getBackendHostPort extracts the host and port from instance options
|
|
||||||
// Returns the configured host and port for the backend
|
|
||||||
func (i *Instance) getBackendHostPort() (string, int) {
|
|
||||||
opts := i.GetOptions()
|
|
||||||
if opts == nil {
|
|
||||||
return "localhost", 0
|
|
||||||
}
|
|
||||||
|
|
||||||
var host string
|
|
||||||
var port int
|
|
||||||
switch opts.BackendType {
|
|
||||||
case backends.BackendTypeLlamaCpp:
|
|
||||||
if opts.LlamaServerOptions != nil {
|
|
||||||
host = opts.LlamaServerOptions.Host
|
|
||||||
port = opts.LlamaServerOptions.Port
|
|
||||||
}
|
|
||||||
case backends.BackendTypeMlxLm:
|
|
||||||
if opts.MlxServerOptions != nil {
|
|
||||||
host = opts.MlxServerOptions.Host
|
|
||||||
port = opts.MlxServerOptions.Port
|
|
||||||
}
|
|
||||||
case backends.BackendTypeVllm:
|
|
||||||
if opts.VllmServerOptions != nil {
|
|
||||||
host = opts.VllmServerOptions.Host
|
|
||||||
port = opts.VllmServerOptions.Port
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if host == "" {
|
|
||||||
host = "localhost"
|
|
||||||
}
|
|
||||||
|
|
||||||
return host, port
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -18,15 +18,15 @@ type logger struct {
|
|||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLogger(name string, logDir string) *logger {
|
func newLogger(name string, logDir string) *logger {
|
||||||
return &logger{
|
return &logger{
|
||||||
name: name,
|
name: name,
|
||||||
logDir: logDir,
|
logDir: logDir,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create creates and opens the log files for stdout and stderr
|
// create creates and opens the log files for stdout and stderr
|
||||||
func (i *logger) Create() error {
|
func (i *logger) create() error {
|
||||||
i.mu.Lock()
|
i.mu.Lock()
|
||||||
defer i.mu.Unlock()
|
defer i.mu.Unlock()
|
||||||
|
|
||||||
@@ -56,8 +56,8 @@ func (i *logger) Create() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLogs retrieves the last n lines of logs from the instance
|
// getLogs retrieves the last n lines of logs from the instance
|
||||||
func (i *logger) GetLogs(num_lines int) (string, error) {
|
func (i *logger) getLogs(num_lines int) (string, error) {
|
||||||
i.mu.RLock()
|
i.mu.RLock()
|
||||||
defer i.mu.RUnlock()
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
@@ -97,8 +97,8 @@ func (i *logger) GetLogs(num_lines int) (string, error) {
|
|||||||
return strings.Join(lines[start:], "\n"), nil
|
return strings.Join(lines[start:], "\n"), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeLogFile closes the log files
|
// close closes the log files
|
||||||
func (i *logger) Close() {
|
func (i *logger) close() {
|
||||||
i.mu.Lock()
|
i.mu.Lock()
|
||||||
defer i.mu.Unlock()
|
defer i.mu.Unlock()
|
||||||
|
|
||||||
|
|||||||
@@ -51,15 +51,15 @@ func newOptions(opts *Options) *options {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a copy of the current options
|
// get returns a copy of the current options
|
||||||
func (o *options) Get() *Options {
|
func (o *options) get() *Options {
|
||||||
o.mu.RLock()
|
o.mu.RLock()
|
||||||
defer o.mu.RUnlock()
|
defer o.mu.RUnlock()
|
||||||
return o.opts
|
return o.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set updates the options
|
// set updates the options
|
||||||
func (o *options) Set(opts *Options) {
|
func (o *options) set(opts *Options) {
|
||||||
o.mu.Lock()
|
o.mu.Lock()
|
||||||
defer o.mu.Unlock()
|
defer o.mu.Unlock()
|
||||||
o.opts = opts
|
o.opts = opts
|
||||||
@@ -222,8 +222,8 @@ func (c *Options) MarshalJSON() ([]byte, error) {
|
|||||||
return json.Marshal(aux)
|
return json.Marshal(aux)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateAndApplyDefaults validates the instance options and applies constraints
|
// validateAndApplyDefaults validates the instance options and applies constraints
|
||||||
func (c *Options) ValidateAndApplyDefaults(name string, globalSettings *config.InstancesConfig) {
|
func (c *Options) validateAndApplyDefaults(name string, globalSettings *config.InstancesConfig) {
|
||||||
// Validate and apply constraints
|
// Validate and apply constraints
|
||||||
if c.MaxRestarts != nil && *c.MaxRestarts < 0 {
|
if c.MaxRestarts != nil && *c.MaxRestarts < 0 {
|
||||||
log.Printf("Instance %s MaxRestarts value (%d) cannot be negative, setting to 0", name, *c.MaxRestarts)
|
log.Printf("Instance %s MaxRestarts value (%d) cannot be negative, setting to 0", name, *c.MaxRestarts)
|
||||||
@@ -261,7 +261,8 @@ func (c *Options) ValidateAndApplyDefaults(name string, globalSettings *config.I
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Options) GetCommand(backendConfig *config.BackendSettings) string {
|
// getCommand builds the command to run the backend
|
||||||
|
func (c *Options) getCommand(backendConfig *config.BackendSettings) string {
|
||||||
|
|
||||||
if backendConfig.Docker != nil && backendConfig.Docker.Enabled && c.BackendType != backends.BackendTypeMlxLm {
|
if backendConfig.Docker != nil && backendConfig.Docker.Enabled && c.BackendType != backends.BackendTypeMlxLm {
|
||||||
return "docker"
|
return "docker"
|
||||||
@@ -270,8 +271,8 @@ func (c *Options) GetCommand(backendConfig *config.BackendSettings) string {
|
|||||||
return backendConfig.Command
|
return backendConfig.Command
|
||||||
}
|
}
|
||||||
|
|
||||||
// BuildCommandArgs builds command line arguments for the backend
|
// buildCommandArgs builds command line arguments for the backend
|
||||||
func (c *Options) BuildCommandArgs(backendConfig *config.BackendSettings) []string {
|
func (c *Options) buildCommandArgs(backendConfig *config.BackendSettings) []string {
|
||||||
|
|
||||||
var args []string
|
var args []string
|
||||||
|
|
||||||
@@ -314,7 +315,8 @@ func (c *Options) BuildCommandArgs(backendConfig *config.BackendSettings) []stri
|
|||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Options) BuildEnvironment(backendConfig *config.BackendSettings) map[string]string {
|
// buildEnvironment builds the environment variables for the backend process
|
||||||
|
func (c *Options) buildEnvironment(backendConfig *config.BackendSettings) map[string]string {
|
||||||
env := map[string]string{}
|
env := map[string]string{}
|
||||||
|
|
||||||
if backendConfig.Environment != nil {
|
if backendConfig.Environment != nil {
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ import (
|
|||||||
"llamactl/pkg/config"
|
"llamactl/pkg/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// process manages the OS process lifecycle for a local instance (unexported).
|
// process manages the OS process lifecycle for a local instance.
|
||||||
// process owns its complete lifecycle including auto-restart logic.
|
// process owns its complete lifecycle including auto-restart logic.
|
||||||
type process struct {
|
type process struct {
|
||||||
instance *Instance // Back-reference for SetStatus, GetOptions
|
instance *Instance // Back-reference for SetStatus, GetOptions
|
||||||
@@ -28,7 +28,7 @@ type process struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
stdout io.ReadCloser
|
stdout io.ReadCloser
|
||||||
stderr io.ReadCloser
|
stderr io.ReadCloser
|
||||||
restarts int // process owns restart counter
|
restarts int
|
||||||
restartCancel context.CancelFunc
|
restartCancel context.CancelFunc
|
||||||
monitorDone chan struct{}
|
monitorDone chan struct{}
|
||||||
}
|
}
|
||||||
@@ -40,8 +40,8 @@ func newProcess(instance *Instance) *process {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the OS process and returns an error if it fails.
|
// start starts the OS process and returns an error if it fails.
|
||||||
func (p *process) Start() error {
|
func (p *process) start() error {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
@@ -62,14 +62,14 @@ func (p *process) Start() error {
|
|||||||
|
|
||||||
// Initialize last request time to current time when starting
|
// Initialize last request time to current time when starting
|
||||||
if p.instance.proxy != nil {
|
if p.instance.proxy != nil {
|
||||||
p.instance.proxy.UpdateLastRequestTime()
|
p.instance.proxy.updateLastRequestTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create context before building command (needed for CommandContext)
|
// Create context before building command (needed for CommandContext)
|
||||||
p.ctx, p.cancel = context.WithCancel(context.Background())
|
p.ctx, p.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
// Create log files
|
// Create log files
|
||||||
if err := p.instance.logger.Create(); err != nil {
|
if err := p.instance.logger.create(); err != nil {
|
||||||
return fmt.Errorf("failed to create log files: %w", err)
|
return fmt.Errorf("failed to create log files: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,13 +87,13 @@ func (p *process) Start() error {
|
|||||||
var err error
|
var err error
|
||||||
p.stdout, err = p.cmd.StdoutPipe()
|
p.stdout, err = p.cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.instance.logger.Close()
|
p.instance.logger.close()
|
||||||
return fmt.Errorf("failed to get stdout pipe: %w", err)
|
return fmt.Errorf("failed to get stdout pipe: %w", err)
|
||||||
}
|
}
|
||||||
p.stderr, err = p.cmd.StderrPipe()
|
p.stderr, err = p.cmd.StderrPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.stdout.Close()
|
p.stdout.Close()
|
||||||
p.instance.logger.Close()
|
p.instance.logger.close()
|
||||||
return fmt.Errorf("failed to get stderr pipe: %w", err)
|
return fmt.Errorf("failed to get stderr pipe: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -114,8 +114,8 @@ func (p *process) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop terminates the subprocess without restarting
|
// stop terminates the subprocess without restarting
|
||||||
func (p *process) Stop() error {
|
func (p *process) stop() error {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
|
|
||||||
if !p.instance.IsRunning() {
|
if !p.instance.IsRunning() {
|
||||||
@@ -152,7 +152,7 @@ func (p *process) Stop() error {
|
|||||||
|
|
||||||
// If no process exists, we can return immediately
|
// If no process exists, we can return immediately
|
||||||
if p.cmd == nil || monitorDone == nil {
|
if p.cmd == nil || monitorDone == nil {
|
||||||
p.instance.logger.Close()
|
p.instance.logger.close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -178,15 +178,15 @@ func (p *process) Stop() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.instance.logger.Close()
|
p.instance.logger.close()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restart manually restarts the process (resets restart counter)
|
// restart manually restarts the process (resets restart counter)
|
||||||
func (p *process) Restart() error {
|
func (p *process) restart() error {
|
||||||
// Stop the process first
|
// Stop the process first
|
||||||
if err := p.Stop(); err != nil {
|
if err := p.stop(); err != nil {
|
||||||
// If it's not running, that's ok - we'll just start it
|
// If it's not running, that's ok - we'll just start it
|
||||||
if err.Error() != fmt.Sprintf("instance %s is not running", p.instance.Name) {
|
if err.Error() != fmt.Sprintf("instance %s is not running", p.instance.Name) {
|
||||||
return fmt.Errorf("failed to stop instance during restart: %w", err)
|
return fmt.Errorf("failed to stop instance during restart: %w", err)
|
||||||
@@ -199,11 +199,11 @@ func (p *process) Restart() error {
|
|||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
|
|
||||||
// Start the process
|
// Start the process
|
||||||
return p.Start()
|
return p.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForHealthy waits for the process to become healthy
|
// waitForHealthy waits for the process to become healthy
|
||||||
func (p *process) WaitForHealthy(timeout int) error {
|
func (p *process) waitForHealthy(timeout int) error {
|
||||||
if !p.instance.IsRunning() {
|
if !p.instance.IsRunning() {
|
||||||
return fmt.Errorf("instance %s is not running", p.instance.Name)
|
return fmt.Errorf("instance %s is not running", p.instance.Name)
|
||||||
}
|
}
|
||||||
@@ -284,7 +284,7 @@ func (p *process) monitorProcess() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
p.instance.SetStatus(Stopped)
|
p.instance.SetStatus(Stopped)
|
||||||
p.instance.logger.Close()
|
p.instance.logger.close()
|
||||||
|
|
||||||
// Cancel any existing restart context since we're handling a new exit
|
// Cancel any existing restart context since we're handling a new exit
|
||||||
if p.restartCancel != nil {
|
if p.restartCancel != nil {
|
||||||
@@ -373,7 +373,7 @@ func (p *process) handleAutoRestart(err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Restart the instance
|
// Restart the instance
|
||||||
if err := p.Start(); err != nil {
|
if err := p.start(); err != nil {
|
||||||
log.Printf("Failed to restart instance %s: %v", p.instance.Name, err)
|
log.Printf("Failed to restart instance %s: %v", p.instance.Name, err)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Successfully restarted instance %s", p.instance.Name)
|
log.Printf("Successfully restarted instance %s", p.instance.Name)
|
||||||
@@ -399,13 +399,13 @@ func (p *process) buildCommand() (*exec.Cmd, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Build the environment variables
|
// Build the environment variables
|
||||||
env := opts.BuildEnvironment(backendConfig)
|
env := opts.buildEnvironment(backendConfig)
|
||||||
|
|
||||||
// Get the command to execute
|
// Get the command to execute
|
||||||
command := opts.GetCommand(backendConfig)
|
command := opts.getCommand(backendConfig)
|
||||||
|
|
||||||
// Build command arguments
|
// Build command arguments
|
||||||
args := opts.BuildCommandArgs(backendConfig)
|
args := opts.buildCommandArgs(backendConfig)
|
||||||
|
|
||||||
// Create the exec.Cmd
|
// Create the exec.Cmd
|
||||||
cmd := exec.CommandContext(p.ctx, command, args...)
|
cmd := exec.CommandContext(p.ctx, command, args...)
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ func (realTimeProvider) Now() time.Time {
|
|||||||
|
|
||||||
// proxy manages HTTP reverse proxy and request tracking for an instance.
|
// proxy manages HTTP reverse proxy and request tracking for an instance.
|
||||||
type proxy struct {
|
type proxy struct {
|
||||||
process *Instance // Owner reference - Proxy is owned by Process
|
instance *Instance
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
proxy *httputil.ReverseProxy
|
proxy *httputil.ReverseProxy
|
||||||
@@ -35,44 +35,44 @@ type proxy struct {
|
|||||||
timeProvider TimeProvider
|
timeProvider TimeProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProxy creates a new Proxy for the given process
|
// newProxy creates a new Proxy for the given instance
|
||||||
func NewProxy(process *Instance) *proxy {
|
func newProxy(instance *Instance) *proxy {
|
||||||
return &proxy{
|
return &proxy{
|
||||||
process: process,
|
instance: instance,
|
||||||
timeProvider: realTimeProvider{},
|
timeProvider: realTimeProvider{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProxy returns the reverse proxy for this instance, creating it if needed.
|
// get returns the reverse proxy for this instance, creating it if needed.
|
||||||
// Uses sync.Once to ensure thread-safe one-time initialization.
|
// Uses sync.Once to ensure thread-safe one-time initialization.
|
||||||
func (p *proxy) GetProxy() (*httputil.ReverseProxy, error) {
|
func (p *proxy) get() (*httputil.ReverseProxy, error) {
|
||||||
// sync.Once guarantees buildProxy() is called exactly once
|
// sync.Once guarantees buildProxy() is called exactly once
|
||||||
// Other callers block until first initialization completes
|
// Other callers block until first initialization completes
|
||||||
p.proxyOnce.Do(func() {
|
p.proxyOnce.Do(func() {
|
||||||
p.proxy, p.proxyErr = p.buildProxy()
|
p.proxy, p.proxyErr = p.build()
|
||||||
})
|
})
|
||||||
|
|
||||||
return p.proxy, p.proxyErr
|
return p.proxy, p.proxyErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildProxy creates the reverse proxy based on instance options
|
// build creates the reverse proxy based on instance options
|
||||||
func (p *proxy) buildProxy() (*httputil.ReverseProxy, error) {
|
func (p *proxy) build() (*httputil.ReverseProxy, error) {
|
||||||
options := p.process.GetOptions()
|
options := p.instance.GetOptions()
|
||||||
if options == nil {
|
if options == nil {
|
||||||
return nil, fmt.Errorf("instance %s has no options set", p.process.Name)
|
return nil, fmt.Errorf("instance %s has no options set", p.instance.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remote instances should not use local proxy - they are handled by RemoteInstanceProxy
|
// Remote instances should not use local proxy - they are handled by RemoteInstanceProxy
|
||||||
if len(options.Nodes) > 0 {
|
if len(options.Nodes) > 0 {
|
||||||
return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", p.process.Name)
|
return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", p.instance.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get host/port from process
|
// Get host/port from process
|
||||||
host, port := p.process.getBackendHostPort()
|
host, port := p.instance.getBackendHostPort()
|
||||||
|
|
||||||
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port))
|
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.process.Name, err)
|
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.instance.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := httputil.NewSingleHostReverseProxy(targetURL)
|
proxy := httputil.NewSingleHostReverseProxy(targetURL)
|
||||||
@@ -81,11 +81,11 @@ func (p *proxy) buildProxy() (*httputil.ReverseProxy, error) {
|
|||||||
var responseHeaders map[string]string
|
var responseHeaders map[string]string
|
||||||
switch options.BackendType {
|
switch options.BackendType {
|
||||||
case backends.BackendTypeLlamaCpp:
|
case backends.BackendTypeLlamaCpp:
|
||||||
responseHeaders = p.process.globalBackendSettings.LlamaCpp.ResponseHeaders
|
responseHeaders = p.instance.globalBackendSettings.LlamaCpp.ResponseHeaders
|
||||||
case backends.BackendTypeVllm:
|
case backends.BackendTypeVllm:
|
||||||
responseHeaders = p.process.globalBackendSettings.VLLM.ResponseHeaders
|
responseHeaders = p.instance.globalBackendSettings.VLLM.ResponseHeaders
|
||||||
case backends.BackendTypeMlxLm:
|
case backends.BackendTypeMlxLm:
|
||||||
responseHeaders = p.process.globalBackendSettings.MLX.ResponseHeaders
|
responseHeaders = p.instance.globalBackendSettings.MLX.ResponseHeaders
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy.ModifyResponse = func(resp *http.Response) error {
|
proxy.ModifyResponse = func(resp *http.Response) error {
|
||||||
@@ -107,9 +107,8 @@ func (p *proxy) buildProxy() (*httputil.ReverseProxy, error) {
|
|||||||
return proxy, nil
|
return proxy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// clearProxy resets the proxy, allowing it to be recreated when options change.
|
// clear resets the proxy, allowing it to be recreated when options change.
|
||||||
// This resets the sync.Once so the next GetProxy call will rebuild the proxy.
|
func (p *proxy) clear() {
|
||||||
func (p *proxy) clearProxy() {
|
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
@@ -118,24 +117,24 @@ func (p *proxy) clearProxy() {
|
|||||||
p.proxyOnce = sync.Once{} // Reset Once for next GetProxy call
|
p.proxyOnce = sync.Once{} // Reset Once for next GetProxy call
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateLastRequestTime updates the last request access time for the instance
|
// updateLastRequestTime updates the last request access time for the instance
|
||||||
func (p *proxy) UpdateLastRequestTime() {
|
func (p *proxy) updateLastRequestTime() {
|
||||||
lastRequestTime := p.timeProvider.Now().Unix()
|
lastRequestTime := p.timeProvider.Now().Unix()
|
||||||
p.lastRequestTime.Store(lastRequestTime)
|
p.lastRequestTime.Store(lastRequestTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastRequestTime returns the last request time as a Unix timestamp
|
// getLastRequestTime returns the last request time as a Unix timestamp
|
||||||
func (p *proxy) LastRequestTime() int64 {
|
func (p *proxy) getLastRequestTime() int64 {
|
||||||
return p.lastRequestTime.Load()
|
return p.lastRequestTime.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldTimeout checks if the instance should timeout based on idle time
|
// shouldTimeout checks if the instance should timeout based on idle time
|
||||||
func (p *proxy) ShouldTimeout() bool {
|
func (p *proxy) shouldTimeout() bool {
|
||||||
if !p.process.IsRunning() {
|
if !p.instance.IsRunning() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
options := p.process.GetOptions()
|
options := p.instance.GetOptions()
|
||||||
if options == nil || options.IdleTimeout == nil || *options.IdleTimeout <= 0 {
|
if options == nil || options.IdleTimeout == nil || *options.IdleTimeout <= 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -150,7 +149,7 @@ func (p *proxy) ShouldTimeout() bool {
|
|||||||
return (p.timeProvider.Now().Unix() - lastRequest) > idleTimeoutSeconds
|
return (p.timeProvider.Now().Unix() - lastRequest) > idleTimeoutSeconds
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetTimeProvider sets a custom time provider for testing
|
// setTimeProvider sets a custom time provider for testing
|
||||||
func (p *proxy) SetTimeProvider(tp TimeProvider) {
|
func (p *proxy) setTimeProvider(tp TimeProvider) {
|
||||||
p.timeProvider = tp
|
p.timeProvider = tp
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,15 +69,15 @@ func newStatus(initial Status) *status {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the current status
|
// get returns the current status
|
||||||
func (st *status) Get() Status {
|
func (st *status) get() Status {
|
||||||
st.mu.RLock()
|
st.mu.RLock()
|
||||||
defer st.mu.RUnlock()
|
defer st.mu.RUnlock()
|
||||||
return st.s
|
return st.s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set updates the status and triggers the onStatusChange callback if set
|
// set updates the status and triggers the onStatusChange callback if set
|
||||||
func (st *status) Set(newStatus Status) {
|
func (st *status) set(newStatus Status) {
|
||||||
st.mu.Lock()
|
st.mu.Lock()
|
||||||
oldStatus := st.s
|
oldStatus := st.s
|
||||||
st.s = newStatus
|
st.s = newStatus
|
||||||
@@ -90,8 +90,8 @@ func (st *status) Set(newStatus Status) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsRunning returns true if the status is Running
|
// isRunning returns true if the status is Running
|
||||||
func (st *status) IsRunning() bool {
|
func (st *status) isRunning() bool {
|
||||||
st.mu.RLock()
|
st.mu.RLock()
|
||||||
defer st.mu.RUnlock()
|
defer st.mu.RUnlock()
|
||||||
return st.s == Running
|
return st.s == Running
|
||||||
|
|||||||
@@ -1,18 +0,0 @@
|
|||||||
package instance
|
|
||||||
|
|
||||||
// UpdateLastRequestTime updates the last request access time for the instance via proxy
|
|
||||||
// Delegates to the Proxy component
|
|
||||||
func (i *Instance) UpdateLastRequestTime() {
|
|
||||||
if i.proxy != nil {
|
|
||||||
i.proxy.UpdateLastRequestTime()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ShouldTimeout checks if the instance should timeout based on idle time
|
|
||||||
// Delegates to the Proxy component
|
|
||||||
func (i *Instance) ShouldTimeout() bool {
|
|
||||||
if i.proxy == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return i.proxy.ShouldTimeout()
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user