Skip to content

Commit a2fc92d

Browse files
authored
Merge pull request #134 from vcon-dev/feature/optimize-ingest-indexing
Optimize vCon ingest: index by parties only, remove default TTL
2 parents 570eab9 + 8e6bead commit a2fc92d

3 files changed

Lines changed: 88 additions & 106 deletions

File tree

server/api.py

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -631,11 +631,6 @@ async def post_vcon(
631631
Stores the vCon in Redis and indexes it for searching. The vCon is added to a sorted
632632
set for timestamp-based retrieval and indexed by party information for searching.
633633
Optionally adds the vCon UUID to specified ingress lists for immediate processing.
634-
635-
The vCon is stored with a default TTL of VCON_REDIS_EXPIRY seconds (default 3600s/1 hour).
636-
This means vCons will automatically expire from Redis cache unless persisted to a
637-
storage backend or the expiry is updated. Configure VCON_REDIS_EXPIRY environment
638-
variable to change the default expiry time.
639634
640635
Args:
641636
inbound_vcon: The vCon to store
@@ -659,16 +654,12 @@ async def post_vcon(
659654

660655
logger.debug(f"Storing vCon {inbound_vcon.uuid} ({len(dict_vcon)} bytes)")
661656
await redis_async.json().set(key, "$", dict_vcon)
662-
663-
# Set default expiry on newly created vCons
664-
await redis_async.expire(key, VCON_REDIS_EXPIRY)
665-
logger.debug(f"Set TTL of {VCON_REDIS_EXPIRY}s on vCon {inbound_vcon.uuid}")
666-
657+
667658
logger.debug(f"Adding vCon {inbound_vcon.uuid} to sorted set")
668659
await add_vcon_to_set(key, timestamp)
669660

670661
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
671-
await index_vcon(inbound_vcon.uuid)
662+
await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
672663

673664
# Add to ingress lists if specified
674665
if ingress_lists:
@@ -720,9 +711,7 @@ async def external_ingress_vcon(
720711
- Multiple API keys can be configured for the same ingress list
721712
722713
The submitted vCon is stored, indexed, and automatically queued for processing
723-
in the specified ingress list. The vCon is stored with a default TTL of
724-
VCON_REDIS_EXPIRY seconds (default 3600s/1 hour), after which it will expire
725-
from Redis cache unless persisted to a storage backend.
714+
in the specified ingress list.
726715
727716
Args:
728717
request: FastAPI Request object for accessing headers
@@ -760,16 +749,12 @@ async def external_ingress_vcon(
760749
f"Storing vCon {inbound_vcon.uuid} ({len(dict_vcon)} bytes) via external ingress"
761750
)
762751
await redis_async.json().set(key, "$", dict_vcon)
763-
764-
# Set default expiry on newly created vCons
765-
await redis_async.expire(key, VCON_REDIS_EXPIRY)
766-
logger.debug(f"Set TTL of {VCON_REDIS_EXPIRY}s on vCon {inbound_vcon.uuid}")
767752

768753
logger.debug(f"Adding vCon {inbound_vcon.uuid} to sorted set")
769754
await add_vcon_to_set(key, timestamp)
770755

771756
logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
772-
await index_vcon(inbound_vcon.uuid)
757+
await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])
773758

774759
# Always add to the specified ingress list (required for this endpoint)
775760
vcon_uuid_str = str(inbound_vcon.uuid)
@@ -1057,25 +1042,17 @@ async def get_dlq_vcons(
10571042
raise HTTPException(status_code=500, detail="Failed to read DLQ")
10581043

10591044

1060-
async def index_vcon(uuid: UUID) -> None:
1061-
"""Index a vCon for searching.
1045+
async def index_vcon_parties(vcon_uuid: str, parties: list) -> None:
1046+
"""Index a vCon's parties for searching.
10621047
1063-
Adds the vCon to the sorted set and indexes it by party information
1064-
(tel, mailto, name) for searching. All indexed keys will expire after
1065-
VCON_INDEX_EXPIRY seconds.
1048+
Indexes by party information (tel, mailto, name). All indexed keys
1049+
will expire after VCON_INDEX_EXPIRY seconds.
10661050
10671051
Args:
1068-
uuid: UUID of the vCon to index
1052+
vcon_uuid: UUID string of the vCon
1053+
parties: List of party dicts from the vCon
10691054
"""
1070-
key = f"vcon:{uuid}"
1071-
vcon = await redis_async.json().get(key)
1072-
created_at = datetime.fromisoformat(vcon["created_at"])
1073-
timestamp = int(created_at.timestamp())
1074-
vcon_uuid = vcon["uuid"]
1075-
await add_vcon_to_set(key, timestamp)
1076-
1077-
# Index by party information with expiration
1078-
for party in vcon["parties"]:
1055+
for party in parties:
10791056
if party.get("tel"):
10801057
tel_key = f"tel:{party['tel']}"
10811058
await redis_async.sadd(tel_key, vcon_uuid)
@@ -1090,6 +1067,25 @@ async def index_vcon(uuid: UUID) -> None:
10901067
await redis_async.expire(name_key, VCON_INDEX_EXPIRY)
10911068

10921069

1070+
async def index_vcon(uuid: UUID) -> None:
1071+
"""Index a vCon for searching (reads from Redis).
1072+
1073+
Reads the vCon from Redis, adds it to the sorted set, and indexes
1074+
by party information. Used for bulk re-indexing. For the ingest path,
1075+
use index_vcon_parties() directly to avoid redundant Redis reads.
1076+
1077+
Args:
1078+
uuid: UUID of the vCon to index
1079+
"""
1080+
key = f"vcon:{uuid}"
1081+
vcon = await redis_async.json().get(key)
1082+
created_at = datetime.fromisoformat(vcon["created_at"])
1083+
timestamp = int(created_at.timestamp())
1084+
vcon_uuid = vcon["uuid"]
1085+
await add_vcon_to_set(key, timestamp)
1086+
await index_vcon_parties(vcon_uuid, vcon["parties"])
1087+
1088+
10931089
@api_router.get(
10941090
"/index_vcons",
10951091
status_code=200,

server/tests/test_external_ingress.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ def test_successful_submission_single_api_key(
4747
# Configure mocks
4848
mock_get_ingress_auth.return_value = {self.ingress_list: self.valid_api_key}
4949

50-
# Mock Redis client properly
50+
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
5151
mock_redis = MagicMock()
5252
mock_json = MagicMock()
5353
mock_json.set = AsyncMock()
5454
mock_redis.json.return_value = mock_json
5555
mock_redis.expire = AsyncMock()
56+
mock_redis.sadd = AsyncMock()
5657
mock_redis.rpush = AsyncMock()
5758

5859
# Set the global redis_async directly in the api module
@@ -77,12 +78,10 @@ def test_successful_submission_single_api_key(
7778

7879
# Verify Redis operations were called
7980
mock_json.set.assert_called_once()
80-
mock_redis.expire.assert_called_once() # Verify expiry was set
8181
mock_redis.rpush.assert_called_once_with(
8282
self.ingress_list, self.test_vcon["uuid"]
8383
)
8484
mock_add_vcon_to_set.assert_called_once()
85-
mock_index_vcon.assert_called_once()
8685

8786
finally:
8887
# Clean up the global variable
@@ -100,12 +99,13 @@ def test_successful_submission_multiple_api_keys(
10099
self.ingress_list: ["partner-1-key", self.valid_api_key, "partner-3-key"]
101100
}
102101

103-
# Mock Redis client properly
102+
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
104103
mock_redis = MagicMock()
105104
mock_json = MagicMock()
106105
mock_json.set = AsyncMock()
107106
mock_redis.json.return_value = mock_json
108107
mock_redis.expire = AsyncMock()
108+
mock_redis.sadd = AsyncMock()
109109
mock_redis.rpush = AsyncMock()
110110

111111
# Set the global redis_async directly in the api module
@@ -233,6 +233,7 @@ def test_redis_failure_handling(self, mock_get_ingress_auth):
233233
mock_json.set = AsyncMock(side_effect=Exception("Redis connection failed"))
234234
mock_redis.json.return_value = mock_json
235235
mock_redis.expire = AsyncMock()
236+
mock_redis.sadd = AsyncMock()
236237
mock_redis.rpush = AsyncMock()
237238

238239
# Set the global redis_async directly in the api module
@@ -266,12 +267,13 @@ def test_multiple_ingress_lists_isolation(
266267
"shared_ingress": ["partner-a-key", "partner-b-key-1", "shared-key"],
267268
}
268269

269-
# Mock Redis client properly
270+
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
270271
mock_redis = MagicMock()
271272
mock_json = MagicMock()
272273
mock_json.set = AsyncMock()
273274
mock_redis.json.return_value = mock_json
274275
mock_redis.expire = AsyncMock()
276+
mock_redis.sadd = AsyncMock()
275277
mock_redis.rpush = AsyncMock()
276278

277279
# Set the global redis_async directly in the api module

0 commit comments

Comments
 (0)