Skip to content

Commit b61308e

Browse files
committed
cleanups
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent 715aace commit b61308e

9 files changed

Lines changed: 64 additions & 195 deletions

File tree

core/application/application.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ func (a *Application) FileManager() *storage.FileManager {
155155
return a.fileManager
156156
}
157157

158+
// HealthMonitor returns the health monitor, or nil if not in distributed mode.
159+
func (a *Application) HealthMonitor() *nodes.HealthMonitor {
160+
return a.healthMon
161+
}
162+
158163
// IsDistributed returns true if the application is running in distributed mode.
159164
func (a *Application) IsDistributed() bool {
160165
return a.applicationConfig.Distributed.Enabled

core/cli/run.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,12 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
520520
xlog.Error("error while stopping all grpc backends", "error", err)
521521
}
522522
// Clean up distributed services
523+
if app.HealthMonitor() != nil {
524+
app.HealthMonitor().Stop()
525+
}
526+
if app.JobDispatcher() != nil {
527+
app.JobDispatcher().Stop()
528+
}
523529
if app.NatsClient() != nil {
524530
app.NatsClient().Close()
525531
}

core/http/endpoints/localai/nodes.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,6 @@ func DeleteModelOnNodeEndpoint(unloader *nodes.RemoteUnloaderAdapter, registry *
316316
if err := unloader.StopBackend(nodeID); err != nil {
317317
// Non-fatal
318318
}
319-
// Send model.delete to remove files
320-
subject := "nodes." + nodeID + ".model.delete"
321-
_ = subject // delete is done via DeleteModelFiles which fans out
322319
registry.RemoveNodeModel(nodeID, req.ModelName)
323320
return c.JSON(http.StatusOK, map[string]string{"message": "model deleted from node"})
324321
}

core/services/agent_pool.go

Lines changed: 3 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -374,83 +374,11 @@ func (s *AgentPoolService) SetAgentStore(store AgentConfigStore) {
374374

375375
// --- Agent CRUD ---
376376

377-
func (s *AgentPoolService) ListAgents() map[string]bool {
378-
statuses := map[string]bool{}
379-
agents := s.pool.List()
380-
for _, a := range agents {
381-
ag := s.pool.GetAgent(a)
382-
if ag == nil {
383-
continue
384-
}
385-
statuses[a] = !ag.Paused()
386-
}
387-
return statuses
388-
}
389-
390-
func (s *AgentPoolService) CreateAgent(config *state.AgentConfig) error {
391-
if config.Name == "" {
392-
return fmt.Errorf("name is required")
393-
}
394-
return s.pool.CreateAgent(config.Name, config)
395-
}
396-
397377
func (s *AgentPoolService) GetAgent(name string) *agent.Agent {
398-
return s.pool.GetAgent(name)
399-
}
400-
401-
func (s *AgentPoolService) GetAgentConfig(name string) *state.AgentConfig {
402-
return s.pool.GetConfig(name)
403-
}
404-
405-
func (s *AgentPoolService) UpdateAgent(name string, config *state.AgentConfig) error {
406-
old := s.pool.GetConfig(name)
407-
if old == nil {
408-
return fmt.Errorf("agent not found: %s", name)
409-
}
410-
return s.pool.RecreateAgent(name, config)
411-
}
412-
413-
func (s *AgentPoolService) DeleteAgent(name string) error {
414-
return s.pool.Remove(name)
415-
}
416-
417-
func (s *AgentPoolService) PauseAgent(name string) error {
418-
ag := s.pool.GetAgent(name)
419-
if ag == nil {
420-
return fmt.Errorf("agent not found: %s", name)
421-
}
422-
ag.Pause()
423-
return nil
424-
}
425-
426-
func (s *AgentPoolService) ResumeAgent(name string) error {
427-
ag := s.pool.GetAgent(name)
428-
if ag == nil {
429-
return fmt.Errorf("agent not found: %s", name)
430-
}
431-
ag.Resume()
432-
return nil
433-
}
434-
435-
func (s *AgentPoolService) GetAgentStatus(name string) *state.Status {
436-
return s.pool.GetStatusHistory(name)
437-
}
438-
439-
func (s *AgentPoolService) GetAgentObservables(name string) ([]coreTypes.Observable, error) {
440-
ag := s.pool.GetAgent(name)
441-
if ag == nil {
442-
return nil, fmt.Errorf("agent not found: %s", name)
443-
}
444-
return ag.Observer().History(), nil
445-
}
446-
447-
func (s *AgentPoolService) ClearAgentObservables(name string) error {
448-
ag := s.pool.GetAgent(name)
449-
if ag == nil {
450-
return fmt.Errorf("agent not found: %s", name)
378+
if s.pool == nil {
379+
return nil
451380
}
452-
ag.Observer().ClearHistory()
453-
return nil
381+
return s.pool.GetAgent(name)
454382
}
455383

456384
// Chat sends a message to an agent and returns immediately. Responses come via SSE.
@@ -616,10 +544,6 @@ func (s *AgentPoolService) collectAndCopyMetadata(metadata map[string]any, userI
616544
}
617545
}
618546

619-
func (s *AgentPoolService) GetSSEManager(name string) sse.Manager {
620-
return s.pool.GetManager(name)
621-
}
622-
623547
func (s *AgentPoolService) GetConfigMeta() state.AgentConfigMeta {
624548
return s.configMeta
625549
}
@@ -667,88 +591,6 @@ func (s *AgentPoolService) ExportAgent(name string) ([]byte, error) {
667591
return nil, fmt.Errorf("agent not found: %s", name)
668592
}
669593

670-
// ImportAgent creates an agent from JSON config data.
671-
func (s *AgentPoolService) ImportAgent(data []byte) error {
672-
var cfg state.AgentConfig
673-
if err := json.Unmarshal(data, &cfg); err != nil {
674-
return fmt.Errorf("invalid agent config: %w", err)
675-
}
676-
if cfg.Name == "" {
677-
return fmt.Errorf("agent name is required")
678-
}
679-
if s.pool == nil {
680-
// Distributed mode — save to DB only
681-
if s.agentStore != nil {
682-
return s.agentStore.SaveConfig(&agents.AgentConfigRecord{
683-
Name: cfg.Name,
684-
ConfigJSON: string(data),
685-
Status: "active",
686-
})
687-
}
688-
return fmt.Errorf("cannot import agent: no pool or store available")
689-
}
690-
return s.pool.CreateAgent(cfg.Name, &cfg)
691-
}
692-
693-
694-
// --- Collections ---
695-
696-
func (s *AgentPoolService) CollectionsBackend() collections.Backend {
697-
return s.collectionsBackend
698-
}
699-
700-
func (s *AgentPoolService) ListCollections() ([]string, error) {
701-
return s.collectionsBackend.ListCollections()
702-
}
703-
704-
func (s *AgentPoolService) CreateCollection(name string) error {
705-
return s.collectionsBackend.CreateCollection(name)
706-
}
707-
708-
func (s *AgentPoolService) UploadToCollection(collection, filename string, fileBody io.Reader) (string, error) {
709-
return s.collectionsBackend.Upload(collection, filename, fileBody)
710-
}
711-
712-
func (s *AgentPoolService) ListCollectionEntries(collection string) ([]string, error) {
713-
return s.collectionsBackend.ListEntries(collection)
714-
}
715-
716-
func (s *AgentPoolService) GetCollectionEntryContent(collection, entry string) (string, int, error) {
717-
return s.collectionsBackend.GetEntryContent(collection, entry)
718-
}
719-
720-
func (s *AgentPoolService) SearchCollection(collection, query string, maxResults int) ([]collections.SearchResult, error) {
721-
return s.collectionsBackend.Search(collection, query, maxResults)
722-
}
723-
724-
func (s *AgentPoolService) ResetCollection(collection string) error {
725-
return s.collectionsBackend.Reset(collection)
726-
}
727-
728-
func (s *AgentPoolService) DeleteCollectionEntry(collection, entry string) ([]string, error) {
729-
return s.collectionsBackend.DeleteEntry(collection, entry)
730-
}
731-
732-
func (s *AgentPoolService) AddCollectionSource(collection, sourceURL string, intervalMin int) error {
733-
return s.collectionsBackend.AddSource(collection, sourceURL, intervalMin)
734-
}
735-
736-
func (s *AgentPoolService) RemoveCollectionSource(collection, sourceURL string) error {
737-
return s.collectionsBackend.RemoveSource(collection, sourceURL)
738-
}
739-
740-
func (s *AgentPoolService) ListCollectionSources(collection string) ([]collections.SourceInfo, error) {
741-
return s.collectionsBackend.ListSources(collection)
742-
}
743-
744-
func (s *AgentPoolService) CollectionEntryExists(collection, entry string) bool {
745-
return s.collectionsBackend.EntryExists(collection, entry)
746-
}
747-
748-
func (s *AgentPoolService) GetCollectionEntryFilePath(collection, entry string) (string, error) {
749-
return s.collectionsBackend.GetEntryFilePath(collection, entry)
750-
}
751-
752594
// --- User Services ---
753595

754596
// SetUserServicesManager sets the user services manager for per-user scoping.

core/services/agents/events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (b *EventBridge) DeregisterCancel(agentName, userID string) {
147147

148148
// StartCancelListener subscribes to NATS cancel events (broadcast to all instances).
149149
func (b *EventBridge) StartCancelListener() (*nats.Subscription, error) {
150-
return b.nats.Subscribe("agent.*.cancel", func(data []byte) {
150+
return b.nats.Subscribe(messaging.SubjectAgentCancelWildcard, func(data []byte) {
151151
var evt AgentCancelEvent
152152
if json.Unmarshal(data, &evt) != nil {
153153
return

core/services/gallery.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/mudler/LocalAI/core/config"
99
"github.com/mudler/LocalAI/core/gallery"
1010
"github.com/mudler/LocalAI/core/services/distributed"
11+
"github.com/mudler/LocalAI/core/services/messaging"
1112
"github.com/mudler/LocalAI/pkg/model"
1213
"github.com/mudler/LocalAI/pkg/system"
1314
)
@@ -118,7 +119,7 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
118119

119120
// Publish progress to NATS in distributed mode
120121
if g.natsClient != nil {
121-
g.natsClient.Publish("gallery."+s+".progress", op)
122+
g.natsClient.Publish(messaging.SubjectGalleryProgress(s), op)
122123
}
123124
}
124125

@@ -156,7 +157,7 @@ func (g *GalleryService) CancelOperation(id string) error {
156157

157158
// Publish cancellation to NATS in distributed mode
158159
if g.natsClient != nil {
159-
g.natsClient.Publish("gallery."+id+".cancel", map[string]string{"id": id})
160+
g.natsClient.Publish(messaging.SubjectGalleryCancel(id), map[string]string{"id": id})
160161
}
161162

162163
// Update status to reflect cancellation

core/services/jobs/dispatcher.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/mudler/LocalAI/core/services/messaging"
1212
"github.com/mudler/xlog"
1313
"github.com/nats-io/nats.go"
14+
"github.com/robfig/cron/v3"
1415
"gorm.io/gorm"
1516
)
1617

@@ -72,8 +73,10 @@ type Dispatcher struct {
7273
cancelRegistry sync.Map // jobID -> context.CancelFunc
7374

7475
// NATS subscriptions
75-
jobSub *nats.Subscription
76-
cancelSub *nats.Subscription
76+
jobSub *nats.Subscription
77+
cancelSub *nats.Subscription
78+
resultSub *nats.Subscription
79+
progressSub *nats.Subscription
7780

7881
// Lifecycle
7982
ctx context.Context
@@ -114,7 +117,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
114117
}
115118

116119
// Subscribe to cancel events (broadcast to all — each instance checks its registry)
117-
d.cancelSub, err = d.nats.Subscribe("jobs.*.cancel", func(data []byte) {
120+
d.cancelSub, err = d.nats.Subscribe(messaging.SubjectJobCancelWildcard, func(data []byte) {
118121
var evt CancelEvent
119122
if json.Unmarshal(data, &evt) != nil {
120123
return
@@ -130,7 +133,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
130133

131134
// Subscribe to job result events from workers (persist to DB)
132135
if d.store != nil {
133-
d.nats.Subscribe("jobs.*.result", func(data []byte) {
136+
d.resultSub, _ = d.nats.Subscribe(messaging.SubjectJobResultWildcard, func(data []byte) {
134137
var evt JobResultEvent
135138
if json.Unmarshal(data, &evt) != nil {
136139
return
@@ -139,7 +142,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
139142
})
140143

141144
// Subscribe to trace events from workers (persist to DB)
142-
d.nats.Subscribe("jobs.*.progress", func(data []byte) {
145+
d.progressSub, _ = d.nats.Subscribe(messaging.SubjectJobProgressWildcard, func(data []byte) {
143146
var evt ProgressEvent
144147
if json.Unmarshal(data, &evt) != nil {
145148
return
@@ -168,6 +171,12 @@ func (d *Dispatcher) Stop() {
168171
if d.cancelSub != nil {
169172
d.cancelSub.Unsubscribe()
170173
}
174+
if d.resultSub != nil {
175+
d.resultSub.Unsubscribe()
176+
}
177+
if d.progressSub != nil {
178+
d.progressSub.Unsubscribe()
179+
}
171180
}
172181

173182
// Enqueue publishes a job to the NATS queue for distributed processing.
@@ -389,23 +398,27 @@ func (d *Dispatcher) runDueCronTasks() {
389398
}
390399
}
391400

392-
// isCronDue checks if a cron task should run now.
393-
// Simple implementation: check if no job ran in the last interval.
401+
// cronParser supports standard 5-field cron expressions and descriptors like @every 5m.
402+
var cronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
403+
404+
// isCronDue checks if a cron task should run now by parsing the cron expression
405+
// and comparing the next scheduled run against the last execution time.
394406
func (d *Dispatcher) isCronDue(task TaskRecord) bool {
407+
schedule, err := cronParser.Parse(task.Cron)
408+
if err != nil {
409+
xlog.Warn("Invalid cron expression, skipping task", "taskID", task.ID, "cron", task.Cron, "error", err)
410+
return false
411+
}
412+
395413
// Find the most recent job for this task triggered by cron
396414
var lastJob JobRecord
397-
err := d.db.Where("task_id = ? AND triggered_by = ?", task.ID, "cron").
415+
err = d.db.Where("task_id = ? AND triggered_by = ?", task.ID, "cron").
398416
Order("created_at DESC").First(&lastJob).Error
399417
if err != nil {
400418
// No previous job — it's due
401419
return true
402420
}
403421

404-
// Parse cron to determine interval (simplified: use 15s minimum)
405-
// A full cron parser would be better, but for MVP just check
406-
// if enough time has passed since the last run
407-
elapsed := time.Since(lastJob.CreatedAt)
408-
409-
// Minimum interval: don't run more than once per minute
410-
return elapsed > 1*time.Minute
422+
nextRun := schedule.Next(lastJob.CreatedAt)
423+
return time.Now().After(nextRun)
411424
}

0 commit comments

Comments
 (0)