package instance import ( "context" "encoding/json" "fmt" "io" "llamactl/pkg/backends" "llamactl/pkg/config" "log" "net/http" "net/http/httputil" "net/url" "os/exec" "sync" "sync/atomic" "time" ) // TimeProvider interface allows for testing with mock time type TimeProvider interface { Now() time.Time } // realTimeProvider implements TimeProvider using the actual time type realTimeProvider struct{} func (realTimeProvider) Now() time.Time { return time.Now() } // Process represents a running instance of the llama server type Process struct { Name string `json:"name"` options *CreateInstanceOptions `json:"-"` globalInstanceSettings *config.InstancesConfig globalBackendSettings *config.BackendConfig // Status Status InstanceStatus `json:"status"` onStatusChange func(oldStatus, newStatus InstanceStatus) // Creation time Created int64 `json:"created,omitempty"` // Unix timestamp when the instance was created // Logging file logger *InstanceLogger `json:"-"` // internal cmd *exec.Cmd `json:"-"` // Command to run the instance ctx context.Context `json:"-"` // Context for managing the instance lifecycle cancel context.CancelFunc `json:"-"` // Function to cancel the context stdout io.ReadCloser `json:"-"` // Standard output stream stderr io.ReadCloser `json:"-"` // Standard error stream mu sync.RWMutex `json:"-"` // RWMutex for better read/write separation restarts int `json:"-"` // Number of restarts proxy *httputil.ReverseProxy `json:"-"` // Reverse proxy for this instance // Restart control restartCancel context.CancelFunc `json:"-"` // Cancel function for pending restarts monitorDone chan struct{} `json:"-"` // Channel to signal monitor goroutine completion // Timeout management lastRequestTime atomic.Int64 // Unix timestamp of last request timeProvider TimeProvider `json:"-"` // Time provider for testing } // NewInstance creates a new instance with the given name, log path, and options func NewInstance(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, options *CreateInstanceOptions, onStatusChange func(oldStatus, newStatus InstanceStatus)) *Process { // Validate and copy options options.ValidateAndApplyDefaults(name, globalInstanceSettings) // Create the instance logger logger := NewInstanceLogger(name, globalInstanceSettings.LogsDir) return &Process{ Name: name, options: options, globalInstanceSettings: globalInstanceSettings, globalBackendSettings: globalBackendSettings, logger: logger, timeProvider: realTimeProvider{}, Created: time.Now().Unix(), Status: Stopped, onStatusChange: onStatusChange, } } func (i *Process) GetOptions() *CreateInstanceOptions { i.mu.RLock() defer i.mu.RUnlock() return i.options } func (i *Process) GetPort() int { i.mu.RLock() defer i.mu.RUnlock() if i.options != nil { switch i.options.BackendType { case backends.BackendTypeLlamaCpp: if i.options.LlamaServerOptions != nil { return i.options.LlamaServerOptions.Port } case backends.BackendTypeMlxLm: if i.options.MlxServerOptions != nil { return i.options.MlxServerOptions.Port } case backends.BackendTypeVllm: if i.options.VllmServerOptions != nil { return i.options.VllmServerOptions.Port } } } return 0 } func (i *Process) GetHost() string { i.mu.RLock() defer i.mu.RUnlock() if i.options != nil { switch i.options.BackendType { case backends.BackendTypeLlamaCpp: if i.options.LlamaServerOptions != nil { return i.options.LlamaServerOptions.Host } case backends.BackendTypeMlxLm: if i.options.MlxServerOptions != nil { return i.options.MlxServerOptions.Host } case backends.BackendTypeVllm: if i.options.VllmServerOptions != nil { return i.options.VllmServerOptions.Host } } } return "" } func (i *Process) SetOptions(options *CreateInstanceOptions) { i.mu.Lock() defer i.mu.Unlock() if options == nil { log.Println("Warning: Attempted to set nil options on instance", i.Name) return } // Validate and copy options options.ValidateAndApplyDefaults(i.Name, i.globalInstanceSettings) i.options = options // Clear the proxy so it gets recreated with new options i.proxy = nil } // SetTimeProvider sets a custom time provider for testing func (i *Process) SetTimeProvider(tp TimeProvider) { i.timeProvider = tp } // GetProxy returns the reverse proxy for this instance, creating it if needed func (i *Process) GetProxy() (*httputil.ReverseProxy, error) { i.mu.Lock() defer i.mu.Unlock() if i.proxy != nil { return i.proxy, nil } if i.options == nil { return nil, fmt.Errorf("instance %s has no options set", i.Name) } var host string var port int switch i.options.BackendType { case backends.BackendTypeLlamaCpp: if i.options.LlamaServerOptions != nil { host = i.options.LlamaServerOptions.Host port = i.options.LlamaServerOptions.Port } case backends.BackendTypeMlxLm: if i.options.MlxServerOptions != nil { host = i.options.MlxServerOptions.Host port = i.options.MlxServerOptions.Port } case backends.BackendTypeVllm: if i.options.VllmServerOptions != nil { host = i.options.VllmServerOptions.Host port = i.options.VllmServerOptions.Port } } targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port)) if err != nil { return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", i.Name, err) } proxy := httputil.NewSingleHostReverseProxy(targetURL) var responseHeaders map[string]string switch i.options.BackendType { case backends.BackendTypeLlamaCpp: responseHeaders = i.globalBackendSettings.LlamaCpp.ResponseHeaders case backends.BackendTypeVllm: responseHeaders = i.globalBackendSettings.VLLM.ResponseHeaders case backends.BackendTypeMlxLm: responseHeaders = i.globalBackendSettings.MLX.ResponseHeaders } proxy.ModifyResponse = func(resp *http.Response) error { // Remove CORS headers from llama-server response to avoid conflicts // llamactl will add its own CORS headers resp.Header.Del("Access-Control-Allow-Origin") resp.Header.Del("Access-Control-Allow-Methods") resp.Header.Del("Access-Control-Allow-Headers") resp.Header.Del("Access-Control-Allow-Credentials") resp.Header.Del("Access-Control-Max-Age") resp.Header.Del("Access-Control-Expose-Headers") for key, value := range responseHeaders { resp.Header.Set(key, value) } return nil } i.proxy = proxy return i.proxy, nil } // MarshalJSON implements json.Marshaler for Instance func (i *Process) MarshalJSON() ([]byte, error) { // Use read lock since we're only reading data i.mu.RLock() defer i.mu.RUnlock() // Determine if docker is enabled for this instance's backend var dockerEnabled bool if i.options != nil { switch i.options.BackendType { case backends.BackendTypeLlamaCpp: if i.globalBackendSettings != nil && i.globalBackendSettings.LlamaCpp.Docker != nil && i.globalBackendSettings.LlamaCpp.Docker.Enabled { dockerEnabled = true } case backends.BackendTypeVllm: if i.globalBackendSettings != nil && i.globalBackendSettings.VLLM.Docker != nil && i.globalBackendSettings.VLLM.Docker.Enabled { dockerEnabled = true } case backends.BackendTypeMlxLm: // MLX does not support docker currently } } // Use anonymous struct to avoid recursion type Alias Process return json.Marshal(&struct { *Alias Options *CreateInstanceOptions `json:"options,omitempty"` DockerEnabled bool `json:"docker_enabled,omitempty"` }{ Alias: (*Alias)(i), Options: i.options, DockerEnabled: dockerEnabled, }) } // UnmarshalJSON implements json.Unmarshaler for Instance func (i *Process) UnmarshalJSON(data []byte) error { // Use anonymous struct to avoid recursion type Alias Process aux := &struct { *Alias Options *CreateInstanceOptions `json:"options,omitempty"` }{ Alias: (*Alias)(i), } if err := json.Unmarshal(data, aux); err != nil { return err } // Handle options with validation and defaults if aux.Options != nil { aux.Options.ValidateAndApplyDefaults(i.Name, i.globalInstanceSettings) i.options = aux.Options } return nil } func (i *Process) IsRemote() bool { i.mu.RLock() defer i.mu.RUnlock() if i.options == nil { return false } return len(i.options.Nodes) > 0 }