diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 762d49e..0bfaec5 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -17,6 +17,7 @@ type Instance struct { // Global configuration globalInstanceSettings *config.InstancesConfig globalBackendSettings *config.BackendConfig + globalNodesConfig map[string]config.NodeConfig localNodeName string `json:"-"` // Name of the local node for remote detection status *status `json:"-"` @@ -29,7 +30,12 @@ type Instance struct { } // New creates a new instance with the given name, log path, options and local node name -func New(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, opts *Options, localNodeName string, onStatusChange func(oldStatus, newStatus Status)) *Instance { +func New(name string, globalConfig *config.AppConfig, opts *Options, localNodeName string, onStatusChange func(oldStatus, newStatus Status)) *Instance { + + globalInstanceSettings := &globalConfig.Instances + globalBackendSettings := &globalConfig.Backends + globalNodesConfig := globalConfig.Nodes + // Validate and copy options opts.validateAndApplyDefaults(name, globalInstanceSettings) @@ -45,15 +51,21 @@ func New(name string, globalBackendSettings *config.BackendConfig, globalInstanc options: options, globalInstanceSettings: globalInstanceSettings, globalBackendSettings: globalBackendSettings, + globalNodesConfig: globalNodesConfig, localNodeName: localNodeName, Created: time.Now().Unix(), status: status, } + var err error + instance.proxy, err = newProxy(instance) + if err != nil { + log.Println("Warning: Failed to create proxy for instance", instance.Name, "-", err) + } + // Only create logger, proxy, and process for local instances if !instance.IsRemote() { instance.logger = newLogger(name, globalInstanceSettings.LogsDir) - instance.proxy = newProxy(instance) instance.process = newProcess(instance) } @@ -323,14 +335,18 @@ func (i *Instance) UnmarshalJSON(data []byte) error { i.options = newOptions(&Options{}) } + // Recreate the proxy + var err error + i.proxy, err = newProxy(i) + if err != nil { + log.Println("Warning: Failed to create proxy for instance", i.Name, "-", err) + } + // Only create logger, proxy, and process for non-remote instances if !i.IsRemote() { if i.logger == nil && i.globalInstanceSettings != nil { i.logger = newLogger(i.Name, i.globalInstanceSettings.LogsDir) } - if i.proxy == nil { - i.proxy = newProxy(i) - } if i.process == nil { i.process = newProcess(i) } diff --git a/pkg/instance/proxy.go b/pkg/instance/proxy.go index a429889..b80be3a 100644 --- a/pkg/instance/proxy.go +++ b/pkg/instance/proxy.go @@ -26,20 +26,78 @@ func (realTimeProvider) Now() time.Time { type proxy struct { instance *Instance - mu sync.RWMutex - proxy *httputil.ReverseProxy - proxyOnce sync.Once - proxyErr error + targetURL *url.URL + apiKey string // For remote instances + + responseHeaders map[string]string + + mu sync.RWMutex + + proxy *httputil.ReverseProxy + proxyOnce sync.Once + proxyErr error + lastRequestTime atomic.Int64 timeProvider TimeProvider } // newProxy creates a new Proxy for the given instance -func newProxy(instance *Instance) *proxy { - return &proxy{ +func newProxy(instance *Instance) (*proxy, error) { + + p := &proxy{ instance: instance, timeProvider: realTimeProvider{}, } + + var err error + + options := instance.GetOptions() + if options == nil { + return nil, fmt.Errorf("instance %s has no options set", instance.Name) + } + + if instance.IsRemote() { + + // Take the first remote node as the target for now + var nodeName string + for node := range options.Nodes { + nodeName = node + break + } + + if nodeName == "" { + return nil, fmt.Errorf("instance %s has no remote nodes defined", p.instance.Name) + } + + node, ok := p.instance.globalNodesConfig[nodeName] + if !ok { + return nil, fmt.Errorf("remote node %s is not defined", nodeName) + } + + p.targetURL, err = url.Parse(node.Address) + if err != nil { + return nil, fmt.Errorf("failed to parse target URL for remote instance %s: %w", p.instance.Name, err) + } + + p.apiKey = node.APIKey + } else { + // Get host/port from process + host := p.instance.options.GetHost() + port := p.instance.options.GetPort() + if port == 0 { + return nil, fmt.Errorf("instance %s has no port assigned", p.instance.Name) + } + p.targetURL, err = url.Parse(fmt.Sprintf("http://%s:%d", host, port)) + if err != nil { + return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.instance.Name, err) + } + + // Get response headers from backend config + p.responseHeaders = options.BackendOptions.GetResponseHeaders(p.instance.globalBackendSettings) + } + + return p, nil + } // get returns the reverse proxy for this instance, creating it if needed. @@ -56,46 +114,35 @@ func (p *proxy) get() (*httputil.ReverseProxy, error) { // build creates the reverse proxy based on instance options func (p *proxy) build() (*httputil.ReverseProxy, error) { - options := p.instance.GetOptions() - if options == nil { - return nil, fmt.Errorf("instance %s has no options set", p.instance.Name) + + proxy := httputil.NewSingleHostReverseProxy(p.targetURL) + + // Modify the request before sending it to the backend + originalDirector := proxy.Director + proxy.Director = func(req *http.Request) { + originalDirector(req) + + // Update last request time + p.updateLastRequestTime() } - // Remote instances should not use local proxy - they are handled by RemoteInstanceProxy - if _, isLocal := options.Nodes[p.instance.localNodeName]; !isLocal { - return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", p.instance.Name) - } + if !p.instance.IsRemote() { + // Add custom headers to the request + proxy.ModifyResponse = func(resp *http.Response) error { + // Remove CORS headers from backend response to avoid conflicts + // llamactl will add its own CORS headers + resp.Header.Del("Access-Control-Allow-Origin") + resp.Header.Del("Access-Control-Allow-Methods") + resp.Header.Del("Access-Control-Allow-Headers") + resp.Header.Del("Access-Control-Allow-Credentials") + resp.Header.Del("Access-Control-Max-Age") + resp.Header.Del("Access-Control-Expose-Headers") - // Get host/port from process - host := p.instance.options.GetHost() - port := p.instance.options.GetPort() - if port == 0 { - return nil, fmt.Errorf("instance %s has no port assigned", p.instance.Name) - } - targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port)) - if err != nil { - return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.instance.Name, err) - } - - proxy := httputil.NewSingleHostReverseProxy(targetURL) - - // Get response headers from backend config - responseHeaders := options.BackendOptions.GetResponseHeaders(p.instance.globalBackendSettings) - - proxy.ModifyResponse = func(resp *http.Response) error { - // Remove CORS headers from backend response to avoid conflicts - // llamactl will add its own CORS headers - resp.Header.Del("Access-Control-Allow-Origin") - resp.Header.Del("Access-Control-Allow-Methods") - resp.Header.Del("Access-Control-Allow-Headers") - resp.Header.Del("Access-Control-Allow-Credentials") - resp.Header.Del("Access-Control-Max-Age") - resp.Header.Del("Access-Control-Expose-Headers") - - for key, value := range responseHeaders { - resp.Header.Set(key, value) + for key, value := range p.responseHeaders { + resp.Header.Set(key, value) + } + return nil } - return nil } return proxy, nil