Implement caching for remote instance proxies and enhance proxy request handling

This commit is contained in:
2025-10-07 18:44:23 +02:00
parent 554796391b
commit aae3f84d49
2 changed files with 57 additions and 51 deletions

View File

@@ -4,6 +4,8 @@ import (
"llamactl/pkg/config" "llamactl/pkg/config"
"llamactl/pkg/manager" "llamactl/pkg/manager"
"net/http" "net/http"
"net/http/httputil"
"sync"
"time" "time"
) )
@@ -11,6 +13,8 @@ type Handler struct {
InstanceManager manager.InstanceManager InstanceManager manager.InstanceManager
cfg config.AppConfig cfg config.AppConfig
httpClient *http.Client httpClient *http.Client
remoteProxies map[string]*httputil.ReverseProxy // Cache of remote proxies by instance name
remoteProxiesMu sync.RWMutex
} }
func NewHandler(im manager.InstanceManager, cfg config.AppConfig) *Handler { func NewHandler(im manager.InstanceManager, cfg config.AppConfig) *Handler {
@@ -20,5 +24,6 @@ func NewHandler(im manager.InstanceManager, cfg config.AppConfig) *Handler {
httpClient: &http.Client{ httpClient: &http.Client{
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
}, },
remoteProxies: make(map[string]*httputil.ReverseProxy),
} }
} }

View File

@@ -3,11 +3,12 @@ package server
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"llamactl/pkg/config" "llamactl/pkg/config"
"llamactl/pkg/instance" "llamactl/pkg/instance"
"llamactl/pkg/manager" "llamactl/pkg/manager"
"net/http" "net/http"
"net/http/httputil"
"net/url"
"strconv" "strconv"
"strings" "strings"
@@ -361,6 +362,12 @@ func (h *Handler) ProxyToInstance() http.HandlerFunc {
return return
} }
// Check if this is a remote instance
if inst.IsRemote() {
h.RemoteInstanceProxy(w, r, name, inst)
return
}
if !inst.IsRunning() { if !inst.IsRunning() {
http.Error(w, "Instance is not running", http.StatusServiceUnavailable) http.Error(w, "Instance is not running", http.StatusServiceUnavailable)
return return
@@ -399,59 +406,53 @@ func (h *Handler) RemoteInstanceProxy(w http.ResponseWriter, r *http.Request, na
} }
nodeName := options.Nodes[0] nodeName := options.Nodes[0]
var nodeConfig *config.NodeConfig
for i := range h.cfg.Nodes { // Check if we have a cached proxy for this instance
if h.cfg.Nodes[i].Name == nodeName { h.remoteProxiesMu.RLock()
nodeConfig = &h.cfg.Nodes[i] proxy, exists := h.remoteProxies[name]
break h.remoteProxiesMu.RUnlock()
if !exists {
// Find node configuration
var nodeConfig *config.NodeConfig
for i := range h.cfg.Nodes {
if h.cfg.Nodes[i].Name == nodeName {
nodeConfig = &h.cfg.Nodes[i]
break
}
} }
}
if nodeConfig == nil { if nodeConfig == nil {
http.Error(w, fmt.Sprintf("Node %s not found", nodeName), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Node %s not found", nodeName), http.StatusInternalServerError)
return return
}
// Strip the "/api/v1/instances/<name>/proxy" prefix from the request URL
prefix := fmt.Sprintf("/api/v1/instances/%s/proxy", name)
proxyPath := r.URL.Path[len(prefix):]
// Build the remote URL
remoteURL := fmt.Sprintf("%s/api/v1/instances/%s/proxy%s", nodeConfig.Address, name, proxyPath)
// Create a new request to the remote node
req, err := http.NewRequest(r.Method, remoteURL, r.Body)
if err != nil {
http.Error(w, "Failed to create remote request: "+err.Error(), http.StatusInternalServerError)
return
}
// Copy headers
req.Header = r.Header.Clone()
// Add API key if configured
if nodeConfig.APIKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", nodeConfig.APIKey))
}
// Forward the request
resp, err := h.httpClient.Do(req)
if err != nil {
http.Error(w, "Failed to proxy to remote instance: "+err.Error(), http.StatusBadGateway)
return
}
defer resp.Body.Close()
// Copy response headers
for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
} }
// Create reverse proxy to remote node
targetURL, err := url.Parse(nodeConfig.Address)
if err != nil {
http.Error(w, "Failed to parse node address: "+err.Error(), http.StatusInternalServerError)
return
}
proxy = httputil.NewSingleHostReverseProxy(targetURL)
// Modify request before forwarding
originalDirector := proxy.Director
apiKey := nodeConfig.APIKey // Capture for closure
proxy.Director = func(req *http.Request) {
originalDirector(req)
// Add API key if configured
if apiKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", apiKey))
}
}
// Cache the proxy
h.remoteProxiesMu.Lock()
h.remoteProxies[name] = proxy
h.remoteProxiesMu.Unlock()
} }
// Copy status code // Forward the request using the cached proxy
w.WriteHeader(resp.StatusCode) proxy.ServeHTTP(w, r)
// Copy response body
io.Copy(w, resp.Body)
} }