mirror of
https://github.com/lordmathis/llamactl.git
synced 2025-11-05 16:44:22 +00:00
Implement remote proxy handling in instance
This commit is contained in:
@@ -17,6 +17,7 @@ type Instance struct {
|
|||||||
// Global configuration
|
// Global configuration
|
||||||
globalInstanceSettings *config.InstancesConfig
|
globalInstanceSettings *config.InstancesConfig
|
||||||
globalBackendSettings *config.BackendConfig
|
globalBackendSettings *config.BackendConfig
|
||||||
|
globalNodesConfig map[string]config.NodeConfig
|
||||||
localNodeName string `json:"-"` // Name of the local node for remote detection
|
localNodeName string `json:"-"` // Name of the local node for remote detection
|
||||||
|
|
||||||
status *status `json:"-"`
|
status *status `json:"-"`
|
||||||
@@ -29,7 +30,12 @@ type Instance struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new instance with the given name, log path, options and local node name
|
// New creates a new instance with the given name, log path, options and local node name
|
||||||
func New(name string, globalBackendSettings *config.BackendConfig, globalInstanceSettings *config.InstancesConfig, opts *Options, localNodeName string, onStatusChange func(oldStatus, newStatus Status)) *Instance {
|
func New(name string, globalConfig *config.AppConfig, opts *Options, localNodeName string, onStatusChange func(oldStatus, newStatus Status)) *Instance {
|
||||||
|
|
||||||
|
globalInstanceSettings := &globalConfig.Instances
|
||||||
|
globalBackendSettings := &globalConfig.Backends
|
||||||
|
globalNodesConfig := globalConfig.Nodes
|
||||||
|
|
||||||
// Validate and copy options
|
// Validate and copy options
|
||||||
opts.validateAndApplyDefaults(name, globalInstanceSettings)
|
opts.validateAndApplyDefaults(name, globalInstanceSettings)
|
||||||
|
|
||||||
@@ -45,15 +51,21 @@ func New(name string, globalBackendSettings *config.BackendConfig, globalInstanc
|
|||||||
options: options,
|
options: options,
|
||||||
globalInstanceSettings: globalInstanceSettings,
|
globalInstanceSettings: globalInstanceSettings,
|
||||||
globalBackendSettings: globalBackendSettings,
|
globalBackendSettings: globalBackendSettings,
|
||||||
|
globalNodesConfig: globalNodesConfig,
|
||||||
localNodeName: localNodeName,
|
localNodeName: localNodeName,
|
||||||
Created: time.Now().Unix(),
|
Created: time.Now().Unix(),
|
||||||
status: status,
|
status: status,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
instance.proxy, err = newProxy(instance)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Warning: Failed to create proxy for instance", instance.Name, "-", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Only create logger, proxy, and process for local instances
|
// Only create logger, proxy, and process for local instances
|
||||||
if !instance.IsRemote() {
|
if !instance.IsRemote() {
|
||||||
instance.logger = newLogger(name, globalInstanceSettings.LogsDir)
|
instance.logger = newLogger(name, globalInstanceSettings.LogsDir)
|
||||||
instance.proxy = newProxy(instance)
|
|
||||||
instance.process = newProcess(instance)
|
instance.process = newProcess(instance)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -323,14 +335,18 @@ func (i *Instance) UnmarshalJSON(data []byte) error {
|
|||||||
i.options = newOptions(&Options{})
|
i.options = newOptions(&Options{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Recreate the proxy
|
||||||
|
var err error
|
||||||
|
i.proxy, err = newProxy(i)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Warning: Failed to create proxy for instance", i.Name, "-", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Only create logger, proxy, and process for non-remote instances
|
// Only create logger, proxy, and process for non-remote instances
|
||||||
if !i.IsRemote() {
|
if !i.IsRemote() {
|
||||||
if i.logger == nil && i.globalInstanceSettings != nil {
|
if i.logger == nil && i.globalInstanceSettings != nil {
|
||||||
i.logger = newLogger(i.Name, i.globalInstanceSettings.LogsDir)
|
i.logger = newLogger(i.Name, i.globalInstanceSettings.LogsDir)
|
||||||
}
|
}
|
||||||
if i.proxy == nil {
|
|
||||||
i.proxy = newProxy(i)
|
|
||||||
}
|
|
||||||
if i.process == nil {
|
if i.process == nil {
|
||||||
i.process = newProcess(i)
|
i.process = newProcess(i)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,20 +26,78 @@ func (realTimeProvider) Now() time.Time {
|
|||||||
type proxy struct {
|
type proxy struct {
|
||||||
instance *Instance
|
instance *Instance
|
||||||
|
|
||||||
mu sync.RWMutex
|
targetURL *url.URL
|
||||||
proxy *httputil.ReverseProxy
|
apiKey string // For remote instances
|
||||||
proxyOnce sync.Once
|
|
||||||
proxyErr error
|
responseHeaders map[string]string
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
|
||||||
|
proxy *httputil.ReverseProxy
|
||||||
|
proxyOnce sync.Once
|
||||||
|
proxyErr error
|
||||||
|
|
||||||
lastRequestTime atomic.Int64
|
lastRequestTime atomic.Int64
|
||||||
timeProvider TimeProvider
|
timeProvider TimeProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
// newProxy creates a new Proxy for the given instance
|
// newProxy creates a new Proxy for the given instance
|
||||||
func newProxy(instance *Instance) *proxy {
|
func newProxy(instance *Instance) (*proxy, error) {
|
||||||
return &proxy{
|
|
||||||
|
p := &proxy{
|
||||||
instance: instance,
|
instance: instance,
|
||||||
timeProvider: realTimeProvider{},
|
timeProvider: realTimeProvider{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
options := instance.GetOptions()
|
||||||
|
if options == nil {
|
||||||
|
return nil, fmt.Errorf("instance %s has no options set", instance.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if instance.IsRemote() {
|
||||||
|
|
||||||
|
// Take the first remote node as the target for now
|
||||||
|
var nodeName string
|
||||||
|
for node := range options.Nodes {
|
||||||
|
nodeName = node
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if nodeName == "" {
|
||||||
|
return nil, fmt.Errorf("instance %s has no remote nodes defined", p.instance.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
node, ok := p.instance.globalNodesConfig[nodeName]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("remote node %s is not defined", nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.targetURL, err = url.Parse(node.Address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse target URL for remote instance %s: %w", p.instance.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.apiKey = node.APIKey
|
||||||
|
} else {
|
||||||
|
// Get host/port from process
|
||||||
|
host := p.instance.options.GetHost()
|
||||||
|
port := p.instance.options.GetPort()
|
||||||
|
if port == 0 {
|
||||||
|
return nil, fmt.Errorf("instance %s has no port assigned", p.instance.Name)
|
||||||
|
}
|
||||||
|
p.targetURL, err = url.Parse(fmt.Sprintf("http://%s:%d", host, port))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.instance.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get response headers from backend config
|
||||||
|
p.responseHeaders = options.BackendOptions.GetResponseHeaders(p.instance.globalBackendSettings)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// get returns the reverse proxy for this instance, creating it if needed.
|
// get returns the reverse proxy for this instance, creating it if needed.
|
||||||
@@ -56,46 +114,35 @@ func (p *proxy) get() (*httputil.ReverseProxy, error) {
|
|||||||
|
|
||||||
// build creates the reverse proxy based on instance options
|
// build creates the reverse proxy based on instance options
|
||||||
func (p *proxy) build() (*httputil.ReverseProxy, error) {
|
func (p *proxy) build() (*httputil.ReverseProxy, error) {
|
||||||
options := p.instance.GetOptions()
|
|
||||||
if options == nil {
|
proxy := httputil.NewSingleHostReverseProxy(p.targetURL)
|
||||||
return nil, fmt.Errorf("instance %s has no options set", p.instance.Name)
|
|
||||||
|
// Modify the request before sending it to the backend
|
||||||
|
originalDirector := proxy.Director
|
||||||
|
proxy.Director = func(req *http.Request) {
|
||||||
|
originalDirector(req)
|
||||||
|
|
||||||
|
// Update last request time
|
||||||
|
p.updateLastRequestTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remote instances should not use local proxy - they are handled by RemoteInstanceProxy
|
if !p.instance.IsRemote() {
|
||||||
if _, isLocal := options.Nodes[p.instance.localNodeName]; !isLocal {
|
// Add custom headers to the request
|
||||||
return nil, fmt.Errorf("instance %s is a remote instance and should not use local proxy", p.instance.Name)
|
proxy.ModifyResponse = func(resp *http.Response) error {
|
||||||
}
|
// Remove CORS headers from backend response to avoid conflicts
|
||||||
|
// llamactl will add its own CORS headers
|
||||||
|
resp.Header.Del("Access-Control-Allow-Origin")
|
||||||
|
resp.Header.Del("Access-Control-Allow-Methods")
|
||||||
|
resp.Header.Del("Access-Control-Allow-Headers")
|
||||||
|
resp.Header.Del("Access-Control-Allow-Credentials")
|
||||||
|
resp.Header.Del("Access-Control-Max-Age")
|
||||||
|
resp.Header.Del("Access-Control-Expose-Headers")
|
||||||
|
|
||||||
// Get host/port from process
|
for key, value := range p.responseHeaders {
|
||||||
host := p.instance.options.GetHost()
|
resp.Header.Set(key, value)
|
||||||
port := p.instance.options.GetPort()
|
}
|
||||||
if port == 0 {
|
return nil
|
||||||
return nil, fmt.Errorf("instance %s has no port assigned", p.instance.Name)
|
|
||||||
}
|
|
||||||
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d", host, port))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse target URL for instance %s: %w", p.instance.Name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
proxy := httputil.NewSingleHostReverseProxy(targetURL)
|
|
||||||
|
|
||||||
// Get response headers from backend config
|
|
||||||
responseHeaders := options.BackendOptions.GetResponseHeaders(p.instance.globalBackendSettings)
|
|
||||||
|
|
||||||
proxy.ModifyResponse = func(resp *http.Response) error {
|
|
||||||
// Remove CORS headers from backend response to avoid conflicts
|
|
||||||
// llamactl will add its own CORS headers
|
|
||||||
resp.Header.Del("Access-Control-Allow-Origin")
|
|
||||||
resp.Header.Del("Access-Control-Allow-Methods")
|
|
||||||
resp.Header.Del("Access-Control-Allow-Headers")
|
|
||||||
resp.Header.Del("Access-Control-Allow-Credentials")
|
|
||||||
resp.Header.Del("Access-Control-Max-Age")
|
|
||||||
resp.Header.Del("Access-Control-Expose-Headers")
|
|
||||||
|
|
||||||
for key, value := range responseHeaders {
|
|
||||||
resp.Header.Set(key, value)
|
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return proxy, nil
|
return proxy, nil
|
||||||
|
|||||||
Reference in New Issue
Block a user