Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 190 additions & 18 deletions src/backend/src/controller/semantic_models_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,11 @@ def _build_persistent_caches_atomic(self) -> None:
shutil.rmtree(temp_dir)
raise

# Outside the FileLock: refresh the live search index from the freshly
# rebuilt _cached_concepts. No-op when SearchManager isn't wired yet
# (startup path — SearchManager.build_index() runs once afterwards).
self._reindex_concepts_in_search()

def _compute_taxonomies(self) -> List:
"""Compute taxonomies without caching - used for building persistent cache"""
from src.models.ontology import SemanticModel as SemanticModelOntology
Expand Down Expand Up @@ -2563,6 +2568,47 @@ def get_grouped_concepts(self) -> Dict[str, List[OntologyConcept]]:

return grouped

# ------------------------------------------------------------------
# Global search integration (SearchableAsset)
# ------------------------------------------------------------------

@staticmethod
def _make_glossary_search_item(
iri: str,
label: Optional[str],
comment: Optional[str],
concept_type: Optional[str],
synonyms: Optional[List[str]],
source_context_label: Optional[str],
) -> Optional[SearchIndexItem]:
"""Single source of truth for mapping a concept to a SearchIndexItem.

Shared by the bulk path (``get_search_index_items``) and the per-mutation
path (``_upsert_concept_in_search``) so live edits stay shape-consistent
with the startup snapshot.
"""
if not iri:
return None
title = label or iri.rsplit('#', 1)[-1].rsplit('/', 1)[-1]
tags: List[str] = []
if source_context_label and source_context_label != "Unassigned":
tags.append(source_context_label)
if concept_type:
tags.append(concept_type)
return SearchIndexItem(
id=f"glossary-term::{iri}",
type="glossary-term",
title=title,
description=comment,
# Direct path to ConceptDetailView (route 'browser/:iri'). Avoids
# the BusinessTermsView ?concept= redirect hop and matches the
# convention used by concept-detail.tsx / business-terms.tsx.
link=f"/concepts/browser/{urllib.parse.quote(iri, safe='')}",
tags=tags,
feature_id="semantic-models",
extra_data={"synonyms": " ".join(synonyms or [])},
)

def get_search_index_items(self) -> List[SearchIndexItem]:
"""Build SearchIndexItem entries for every indexed ontology concept.

Expand All @@ -2576,29 +2622,127 @@ def get_search_index_items(self) -> List[SearchIndexItem]:
grouped = self.get_grouped_concepts()
for source_context, concepts in grouped.items():
for c in concepts:
if not c.iri:
continue
title = c.label or c.iri.rsplit('#', 1)[-1].rsplit('/', 1)[-1]
tags: List[str] = []
if source_context and source_context != "Unassigned":
tags.append(source_context)
if c.concept_type:
tags.append(c.concept_type)
items.append(SearchIndexItem(
id=f"glossary-term::{c.iri}",
type="glossary-term",
title=title,
description=c.comment,
link=f"/concepts/browser?concept={urllib.parse.quote(c.iri, safe='')}",
tags=tags,
feature_id="semantic-models",
extra_data={"synonyms": " ".join(c.synonyms or [])},
))
item = self._make_glossary_search_item(
iri=c.iri,
label=c.label,
comment=c.comment,
concept_type=c.concept_type,
synonyms=c.synonyms,
source_context_label=source_context,
)
if item:
items.append(item)
logger.info(f"Prepared {len(items)} glossary terms for search index.")
except Exception as e:
logger.error(f"Failed to build glossary-term search index: {e}", exc_info=True)
return items

def _upsert_concept_in_search(self, concept_iri: str) -> None:
"""Push a single concept into the live search index.

Safe no-op until ``SearchManager`` is wired (e.g., during startup).
Reads the concept via ``get_concept`` so it works even when
``_cached_concepts`` is None (post-invalidation).
"""
if self._search_manager is None or not concept_iri:
return
try:
concept = self.get_concept(concept_iri)
if not concept:
return
raw_source = concept.get("source_context")
friendly_source = (
self._extract_source_context(raw_source) if raw_source else None
) or "Unassigned"
item = self._make_glossary_search_item(
iri=concept["iri"],
label=concept.get("label"),
comment=concept.get("comment"),
concept_type=concept.get("concept_type"),
synonyms=concept.get("synonyms"),
source_context_label=friendly_source,
)
if item:
self._notify_index_upsert(item)
except Exception as e:
logger.warning(
f"Failed to upsert concept {concept_iri} in search index: {e}"
)

def _remove_concept_from_search(self, concept_iri: str) -> None:
"""Remove a single concept from the live search index. Safe no-op."""
if self._search_manager is None or not concept_iri:
return
try:
self._notify_index_remove(f"glossary-term::{concept_iri}")
except Exception as e:
logger.warning(
f"Failed to remove concept {concept_iri} from search index: {e}"
)

def _reindex_concepts_in_search(self) -> None:
"""Purge all glossary-term entries and re-add from ``_cached_concepts``.

Used after whole-graph rebuilds (taxonomy enable/disable, semantic model
upload/delete, etc.) to keep the search index consistent without forcing
a full ``SearchManager.build_index()``. No-op when the SearchManager is
not yet wired (startup) or when ``_cached_concepts`` is unavailable.
"""
if self._search_manager is None:
return
if self._cached_concepts is None:
return
try:
# Purge stale glossary-term entries (handles deletions / disables)
for existing in list(self._search_manager.index):
if getattr(existing, "type", None) == "glossary-term":
self._notify_index_remove(existing.id)
count = 0
for c in self._cached_concepts:
source_label = c.source_context or "Unassigned"
item = self._make_glossary_search_item(
iri=c.iri,
label=c.label,
comment=c.comment,
concept_type=c.concept_type,
synonyms=c.synonyms,
source_context_label=source_label,
)
if item:
self._notify_index_upsert(item)
count += 1
logger.info(
f"Reindexed {count} glossary terms in search after bulk rebuild"
)
except Exception as e:
logger.error(
f"Failed to reindex glossary terms in search: {e}", exc_info=True
)

def _collect_concept_iris_in_context(self, context_iri: str) -> List[str]:
"""Return all concept-like subject IRIs in a given named-graph context.

Used by ``delete_collection`` to capture what to evict from the search
index before the underlying triples are wiped.
"""
iris: List[str] = []
try:
ctx = self._graph.get_context(URIRef(context_iri))
seen: set = set()
for subj in ctx.subjects(RDF.type, None):
s = str(subj)
if isinstance(subj, BNode):
continue
if s in seen:
continue
seen.add(s)
iris.append(s)
except Exception as e:
logger.debug(
f"Failed to enumerate concept IRIs in context {context_iri}: {e}"
)
return iris

def get_properties_grouped(self) -> Dict[str, List[Dict[str, Any]]]:
"""Return all RDF/OWL properties grouped by their source context name.

Expand Down Expand Up @@ -3054,6 +3198,11 @@ def delete_collection(self, collection_iri: str, deleted_by: Optional[str] = Non
if existing.get("source_type") == "imported":
raise ValueError("Cannot delete imported collections. Disable editing instead.")

# Snapshot concept IRIs in this collection BEFORE we wipe the context,
# so we can evict each from the live search index. Must happen before
# the in-memory graph removal a few lines below.
concept_iris_to_evict = self._collect_concept_iris_in_context(collection_iri)

# Remove collection metadata from meta context
rdf_triples_repo.remove_by_subject(self._db, collection_iri, META_CONTEXT)

Expand All @@ -3076,6 +3225,10 @@ def delete_collection(self, collection_iri: str, deleted_by: Optional[str] = Non
self._db.commit()
self._invalidate_cache()

# Evict every concept that lived under this collection from search.
for iri in concept_iris_to_evict:
self._remove_concept_from_search(iri)

return True

# ========================================================================
Expand Down Expand Up @@ -3340,6 +3493,7 @@ def create_concept(
logger.warning(f"Failed to add owner {owner_user} to concept: {e}")

self._invalidate_cache()
self._upsert_concept_in_search(concept_iri)

return self.get_concept(concept_iri)

Expand Down Expand Up @@ -3723,6 +3877,7 @@ def update_concept(

self._db.commit()
self._invalidate_cache()
self._upsert_concept_in_search(concept_iri)

return self.get_concept(concept_iri)

Expand Down Expand Up @@ -3766,6 +3921,7 @@ def delete_concept(self, concept_iri: str, deleted_by: Optional[str] = None) ->

self._db.commit()
self._invalidate_cache()
self._remove_concept_from_search(concept_iri)

return True

Expand Down Expand Up @@ -4019,6 +4175,7 @@ def update_concept_status(

self._db.commit()
self._invalidate_cache()
self._upsert_concept_in_search(concept_iri)

return self.get_concept(concept_iri)

Expand Down Expand Up @@ -4147,6 +4304,21 @@ def import_rdf_to_collection(

self._invalidate_cache()

# Upsert each imported concept into the live search index. We pull
# subjects from the just-parsed temp_graph (rather than re-querying
# the whole context) so a partial import doesn't reindex unrelated
# pre-existing concepts.
if self._search_manager is not None:
seen: set = set()
for subj in temp_graph.subjects(RDF.type, None):
if isinstance(subj, BNode):
continue
s = str(subj)
if s in seen:
continue
seen.add(s)
self._upsert_concept_in_search(s)

return count

# ========================================================================
Expand Down