Skip to content

Commit 6feae14

Browse files
committed
fix: fix handling recycled channels
1 parent 86c0cca commit 6feae14

2 files changed

Lines changed: 107 additions & 3 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,11 +262,16 @@ private void removeGroup(AfeChannelGroup group) {
262262
}
263263

264264
@GuardedBy("this")
265-
private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) {
265+
private void rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId) {
266+
// No need to rehome recycled channels.
267+
if (channelWrapper.channel.isShutdown()) {
268+
return;
269+
}
270+
266271
AfeChannelGroup origGroup = channelWrapper.group;
267272

268273
if (Objects.equals(origGroup.afeId, afeId)) {
269-
return origGroup;
274+
return;
270275
}
271276

272277
log(Level.FINE, "Rehoming channel from: %s to %s", origGroup.afeId, afeId);
@@ -291,7 +296,7 @@ private AfeChannelGroup rehomeChannel(ChannelWrapper channelWrapper, AfeId afeId
291296
newGroup.channels.add(channelWrapper);
292297
newGroup.numStreams += channelWrapper.numOutstanding;
293298

294-
return newGroup;
299+
return;
295300
}
296301

297302
// Update accounting when a stream is closed and releases its channel
@@ -322,6 +327,11 @@ private static boolean shouldRecycleChannel(Status status) {
322327

323328
@GuardedBy("this")
324329
private void recycleChannel(ChannelWrapper channelWrapper) {
330+
if (channelWrapper.channel.isShutdown()) {
331+
// Channel is already recycled.
332+
return;
333+
}
334+
325335
channelWrapper.group.channels.remove(channelWrapper);
326336
channelWrapper.channel.shutdown();
327337
// Checking for starting group because we don't want to delete the stating group.

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,4 +375,98 @@ void testRecycleChannelInGroupOnUnimplemented() {
375375

376376
pool.close();
377377
}
378+
379+
@Test
380+
void testDoubleRecycleCreatesExtraChannel() {
381+
when(channelSupplier.get()).thenReturn(channel);
382+
when(channel.newCall(any(), any())).thenReturn(clientCall);
383+
doNothing().when(clientCall).start(listener.capture(), any());
384+
385+
ChannelPoolDpImpl pool =
386+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
387+
388+
// Create 2 streams on the same channel
389+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
390+
.start(Mockito.mock(Listener.class), new Metadata());
391+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
392+
.start(Mockito.mock(Listener.class), new Metadata());
393+
394+
// Initially 1 channel
395+
verify(channelSupplier, times(1)).get();
396+
397+
// Trigger first recycle via first stream
398+
ClientCall.Listener<Object> listener1 = listener.getAllValues().get(0);
399+
listener1.onClose(Status.UNIMPLEMENTED, new Metadata());
400+
401+
// Channel should be recycled (shutdown + addChannel)
402+
verify(channel, times(1)).shutdown();
403+
// Now isShutdown returns true for the channel
404+
when(channel.isShutdown()).thenReturn(true);
405+
verify(channelSupplier, times(2)).get();
406+
407+
// Trigger second recycle via second stream on the SAME (already recycled) channel
408+
ClientCall.Listener<Object> listener2 = listener.getAllValues().get(1);
409+
listener2.onClose(Status.UNIMPLEMENTED, new Metadata());
410+
411+
// BUG: This should NOT cause another addChannel() or shutdown()
412+
// If it fails, times(3) will be true.
413+
verify(channelSupplier, times(2)).get();
414+
verify(channel, times(1)).shutdown();
415+
416+
pool.close();
417+
}
418+
419+
@Test
420+
void testRecycledChannelDoesNotRejoinPool() throws InterruptedException {
421+
when(channelSupplier.get()).thenReturn(channel);
422+
when(channel.newCall(any(), any())).thenReturn(clientCall);
423+
doNothing().when(clientCall).start(listener.capture(), any());
424+
doReturn(Attributes.EMPTY).when(clientCall).getAttributes();
425+
426+
ChannelPoolDpImpl pool =
427+
new ChannelPoolDpImpl(channelSupplier, defaultConfig, debugTagTracer, bgExecutor);
428+
429+
// 1. Create stream1 on channel1
430+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
431+
.start(Mockito.mock(Listener.class), new Metadata());
432+
433+
// 2. Create stream2 on channel1 to trigger recycle
434+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
435+
.start(Mockito.mock(Listener.class), new Metadata());
436+
437+
ClientCall.Listener<Object> listener1 = listener.getAllValues().get(0);
438+
ClientCall.Listener<Object> listener2 = listener.getAllValues().get(1);
439+
440+
// Prepare channel2 that will be picked up by addChannel during recycling
441+
ManagedChannel channel2 = Mockito.mock(ManagedChannel.class);
442+
when(channelSupplier.get()).thenReturn(channel2);
443+
when(channel2.newCall(any(), any())).thenReturn(clientCall);
444+
445+
// 3. Recycle channel1 via stream2
446+
listener2.onClose(Status.UNIMPLEMENTED, new Metadata());
447+
verify(channel, times(1)).shutdown();
448+
// Now isShutdown for the channel1 returns true
449+
when(channel.isShutdown()).thenReturn(true);
450+
451+
// 4. stream1 (on recycled channel1) receives headers with AFE ID
452+
// This triggers rehomeChannel
453+
PeerInfo peerInfo = PeerInfo.newBuilder().setApplicationFrontendId(555).build();
454+
Metadata headers = new Metadata();
455+
headers.put(
456+
SessionStreamImpl.PEER_INFO_KEY,
457+
Base64.getEncoder().encodeToString(peerInfo.toByteArray()));
458+
listener1.onHeaders(headers);
459+
460+
// 5. Try to create a new stream.
461+
// It should NOT pick channel1 because it's recycled/shutdown.
462+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT);
463+
464+
// BUG: If channel1 was re-added to a group, the picker might have picked it.
465+
// channel.newCall was called 2 times (steps 1 and 2). It should NOT be called a 3rd time.
466+
verify(channel, times(2)).newCall(any(), any());
467+
// Instead, it should be called on channel2
468+
verify(channel2, times(1)).newCall(any(), any());
469+
470+
pool.close();
471+
}
378472
}

0 commit comments

Comments
 (0)