@@ -130,6 +130,7 @@ public CachingStream(OOCStream<IndexedMatrixValue> source, long streamId) {
130130 boolean ownsEntry = true ;
131131 if (tmp instanceof OOCCacheManager .CachedGroupCallback <?> cachedGroup ) {
132132 baseKey = cachedGroup .getBlockKey ();
133+ ensureReferencedOrRematerialize (baseKey , cachedGroup );
133134 ownsEntry = false ;
134135 if (mSubscribers != null && mSubscribers .length > 0 )
135136 mCallback = tmp .keepOpen ();
@@ -183,12 +184,14 @@ public CachingStream(OOCStream<IndexedMatrixValue> source, long streamId) {
183184
184185 if (tmp instanceof OOCCacheManager .CachedQueueCallback <?> cachedQueue ) {
185186 blockKey = cachedQueue .getBlockKey ();
187+ ensureReferencedOrRematerialize (blockKey , task );
186188 ownsEntry = false ;
187189 if (mSubscribers != null && mSubscribers .length > 0 )
188190 mCallback = tmp .keepOpen ();
189191 }
190192 else if (tmp instanceof OOCCacheManager .CachedSubCallback <?> cachedSub ) {
191193 BlockKey parent = cachedSub .getParent ().getBlockKey ();
194+ ensureReferencedOrRematerialize (parent , cachedSub .getParent ());
192195 blockKey = new GroupedBlockKey (parent .getStreamId (), (int ) parent .getSequenceNumber (),
193196 cachedSub .getGroupIndex ());
194197 ownsEntry = false ;
@@ -297,6 +300,49 @@ else if(tmp instanceof OOCCacheManager.CachedSubCallback<?> cachedSub) {
297300 });
298301 }
299302
303+
304+ private void ensureReferencedOrRematerialize (BlockKey key , IndexedMatrixValue value ) {
305+ try {
306+ OOCCacheManager .getCache ().addReference (key );
307+ }
308+ catch (IllegalArgumentException ex ) {
309+ try {
310+ OOCCacheManager .putRaw (key , value , ((MatrixBlock ) value .getValue ()).getExactSerializedSize ());
311+ }
312+ catch (IllegalStateException putEx ) {
313+ // Another downstream stream may have re-materialized the same entry first.
314+ OOCCacheManager .getCache ().addReference (key );
315+ }
316+ }
317+ }
318+
319+ private void ensureReferencedOrRematerialize (BlockKey key , OOCCacheManager .CachedGroupCallback <?> group ) {
320+ try {
321+ OOCCacheManager .getCache ().addReference (key );
322+ }
323+ catch (IllegalArgumentException ex ) {
324+ try {
325+ List <IndexedMatrixValue > values = new ArrayList <>(group .size ());
326+ long totalSize = 0 ;
327+ for (int gi = 0 ; gi < group .size (); gi ++) {
328+ @ SuppressWarnings ("unchecked" )
329+ OOCStream .QueueCallback <IndexedMatrixValue > sub =
330+ (OOCStream .QueueCallback <IndexedMatrixValue >) group .getCallback (gi );
331+ try (sub ) {
332+ IndexedMatrixValue imv = sub .get ();
333+ values .add (imv );
334+ totalSize += ((MatrixBlock ) imv .getValue ()).getExactSerializedSize ();
335+ }
336+ }
337+ OOCCacheManager .putRaw (key , values , totalSize );
338+ }
339+ catch (IllegalStateException putEx ) {
340+ // Another downstream stream may have re-materialized the same entry first.
341+ OOCCacheManager .getCache ().addReference (key );
342+ }
343+ }
344+ }
345+
300346 private String getCtxMsg () {
301347 StackTraceElement [] st = new Exception ().getStackTrace ();
302348 // Skip the first few frames (constructor, createWritableStream, etc.)
@@ -687,7 +733,7 @@ public void setSubscriber(Consumer<OOCStream.QueueCallback<IndexedMatrixValue>>
687733 if (groupIdx > 0 )
688734 continue ; // only replay grouped blocks once at the base index
689735
690- BlockKey replayKey = (groupSize > 1 && groupIdx == 0 ) ? new BlockKey ( _streamId , idx ) : getBlockKey (i );
736+ BlockKey replayKey = (groupSize > 1 && groupIdx == 0 ) ? getEntryBlockKey ( idx ) : getBlockKey (i );
691737 OOCCacheManager .requestBlock (replayKey ).whenComplete ((cb , r ) -> {
692738 if (r != null ) {
693739 subscriber .accept (OOCStream .eos (DMLRuntimeException .of (r )));
@@ -697,7 +743,6 @@ public void setSubscriber(Consumer<OOCStream.QueueCallback<IndexedMatrixValue>>
697743 synchronized (CachingStream .this ) {
698744 if (_index != null ) {
699745 if (cb instanceof OOCStream .GroupQueueCallback <?> && groupSize > 1 ) {
700- @ SuppressWarnings ("unchecked" )
701746 OOCStream .GroupQueueCallback <IndexedMatrixValue > group =
702747 (OOCStream .GroupQueueCallback <IndexedMatrixValue >) cb ;
703748 for (int gi = 0 ; gi < groupSize ; gi ++) {
0 commit comments