Skip to content

Commit 45905a0

Browse files
authored
fix: cancel even when vrpc is not started yet and fix newRealCall (#2897)
* fix: notify listener for unstarted vRPCs. Fix newRealCall * convert state transition to precondition check Change-Id: I81320830192df0cd16f405487c25b5dcb28985fe * remove onVRpcStarted Change-Id: I1064ff6be95987c994360119c832e9e55abe60e5 * remove full import path from test Change-Id: I65308166955609dfc08ee0fec13987b469683dd0
1 parent 8220283 commit 45905a0

5 files changed

Lines changed: 243 additions & 12 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ VRpc<ReqT, RespT> newCall(VRpcDescriptor<OpenReqT, ReqT, RespT> descriptor) {
324324

325325
long rpcId = nextRpcId;
326326
nextRpcId = Math.incrementExact(nextRpcId);
327-
return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo());
327+
return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo(), debugTagTracer);
328328
}
329329
}
330330

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public Optional<SessionHandle> checkoutSession(AfeHandle afeHandle) {
111111
handle -> {
112112
poolStats.readyCount--;
113113
poolStats.inUseCount++;
114-
114+
inUseSessions.add(handle);
115115
if (handle.afe.get().sessions.isEmpty()) {
116116
afesWithReadySessions.remove(afeHandle);
117117
}
@@ -194,11 +194,6 @@ void onSessionStarted() {
194194
poolStats.readyCount++;
195195
}
196196

197-
void onVRpcStarted() {
198-
// Pool stats and AFE list are updated in SessionList#checkoutSession
199-
inUseSessions.add(this);
200-
}
201-
202197
/**
203198
* The session is returned to the pool after use. This undoes what SessionList#checkoutSession
204199
*/

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,6 @@ private void tryDrainPendingRpcs() {
534534
if (!handle.isPresent()) {
535535
break;
536536
}
537-
handle.get().onVRpcStarted();
538537
PendingVRpc<?, ?> rpc = pendingRpcs.removeFirst();
539538
rpc.drainTo(handle.get());
540539
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.bigtable.v2.VirtualRpcRequest;
2121
import com.google.bigtable.v2.VirtualRpcRequest.Metadata;
2222
import com.google.bigtable.v2.VirtualRpcResponse;
23+
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DebugTagTracer;
2324
import com.google.cloud.bigtable.data.v2.internal.middleware.VRpc;
2425
import com.google.protobuf.Message;
2526
import com.google.protobuf.MessageLite;
@@ -72,16 +73,20 @@ private enum State {
7273

7374
private AtomicReference<State> state;
7475

76+
private final DebugTagTracer debugTagTracer;
77+
7578
public VRpcImpl(
7679
VRpcSessionApi session,
7780
VRpcDescriptor<OpenReqT, ReqT, RespT> desc,
7881
long rpcId,
79-
PeerInfo peerInfo) {
82+
PeerInfo peerInfo,
83+
DebugTagTracer debugTagTracer) {
8084
this.session = session;
8185
this.desc = desc;
8286
this.rpcId = rpcId;
8387
this.state = new AtomicReference<>(State.NEW);
8488
this.peerInfo = peerInfo;
89+
this.debugTagTracer = debugTagTracer;
8590
}
8691

8792
@Override
@@ -96,6 +101,8 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
96101
retryable = false;
97102
} else if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS)
98103
< TimeUnit.MILLISECONDS.toMicros(1)) {
104+
// transitioning to the close state is handled below
105+
state.set(State.STARTED);
99106
// Don't send RPCs that don't have any hope of succeeding
100107
status =
101108
Status.DEADLINE_EXCEEDED.withDescription("Remaining deadline is too short to send RPC");
@@ -124,9 +131,11 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
124131
}
125132

126133
if (!status.isOk()) {
127-
if (!state.compareAndSet(State.STARTED, State.CLOSED)) {
128-
return;
129-
}
134+
debugTagTracer.checkPrecondition(
135+
state.compareAndSet(State.STARTED, State.CLOSED),
136+
"vrpc_incorrect_start_state",
137+
"VRpc has incorrect state. Expected to be started but was %s",
138+
state);
130139
// TODO: loop through the session executor
131140
if (retryable) {
132141
listener.onClose(VRpcResult.createUncommitedError(status));
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub;
17+
18+
import static com.google.common.truth.Truth.assertThat;
19+
import static org.junit.Assert.fail;
20+
21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.gax.core.NoCredentialsProvider;
23+
import com.google.bigtable.v2.BigtableGrpc;
24+
import com.google.bigtable.v2.ClientConfiguration;
25+
import com.google.bigtable.v2.GetClientConfigurationRequest;
26+
import com.google.bigtable.v2.OpenSessionResponse;
27+
import com.google.bigtable.v2.PeerInfo;
28+
import com.google.bigtable.v2.SessionRequest;
29+
import com.google.bigtable.v2.SessionResponse;
30+
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
31+
import com.google.cloud.bigtable.data.v2.models.Query;
32+
import com.google.cloud.bigtable.data.v2.models.Row;
33+
import com.google.cloud.bigtable.data.v2.models.TableId;
34+
import io.grpc.Context;
35+
import io.grpc.ForwardingServerCall;
36+
import io.grpc.Metadata;
37+
import io.grpc.Server;
38+
import io.grpc.ServerCall;
39+
import io.grpc.ServerCallHandler;
40+
import io.grpc.ServerInterceptor;
41+
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
42+
import io.grpc.stub.StreamObserver;
43+
import java.io.IOException;
44+
import java.util.Base64;
45+
import java.util.concurrent.ExecutionException;
46+
import java.util.concurrent.Executors;
47+
import java.util.concurrent.ScheduledExecutorService;
48+
import java.util.concurrent.TimeUnit;
49+
import org.junit.After;
50+
import org.junit.Before;
51+
import org.junit.Test;
52+
53+
public class SessionDeadlineTest {
54+
55+
private Server server;
56+
private EnhancedBigtableStubSettings defaultSettings;
57+
private FakeDataService fakeDataService;
58+
59+
@Before
60+
public void setUp() throws IOException {
61+
fakeDataService = new FakeDataService();
62+
server =
63+
NettyServerBuilder.forPort(0)
64+
.addService(fakeDataService)
65+
.intercept(new ResponseHeaderInterceptor())
66+
.build()
67+
.start();
68+
69+
defaultSettings =
70+
BigtableDataSettings.newBuilderForEmulator(server.getPort())
71+
.setProjectId("fake-project")
72+
.setInstanceId("fake-instance")
73+
.setAppProfileId("fake-app-profile")
74+
.setCredentialsProvider(NoCredentialsProvider.create())
75+
.build()
76+
.getStubSettings();
77+
}
78+
79+
@After
80+
public void tearDown() throws InterruptedException {
81+
if (fakeDataService != null) {
82+
fakeDataService.shutdown();
83+
}
84+
if (server != null) {
85+
server.shutdownNow();
86+
server.awaitTermination();
87+
}
88+
}
89+
90+
@Test(timeout = 1000)
91+
public void testShortDeadlineCancellation() throws Exception {
92+
EnhancedBigtableStubSettings settings =
93+
defaultSettings.toBuilder().setSessionsEnabled(true).build();
94+
95+
try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) {
96+
Query request = Query.create(TableId.of("fake-table")).rowKey("row-key");
97+
98+
try (io.grpc.Context.CancellableContext ctx =
99+
io.grpc.Context.current()
100+
.withDeadlineAfter(
101+
5,
102+
TimeUnit.MILLISECONDS,
103+
settings.getBackgroundExecutorProvider().getExecutor())) {
104+
105+
ctx.run(
106+
() -> {
107+
ApiFuture<Row> future = stub.readRowCallable().futureCall(request);
108+
try {
109+
future.get();
110+
fail("Should throw exception");
111+
} catch (ExecutionException e) {
112+
assertThat(e).hasMessageThat().contains("DEADLINE_EXCEEDED");
113+
} catch (InterruptedException e) {
114+
fail("Should not throw interrupted exception");
115+
}
116+
});
117+
}
118+
}
119+
}
120+
121+
@Test(timeout = 10000)
122+
public void testMissedHeartbeat() throws Exception {
123+
EnhancedBigtableStubSettings settings =
124+
defaultSettings.toBuilder().setSessionsEnabled(true).build();
125+
126+
try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) {
127+
Query request = Query.create(TableId.of("fake-table")).rowKey("row-key");
128+
129+
try (Context.CancellableContext ctx =
130+
Context.current()
131+
.withDeadlineAfter(
132+
1, TimeUnit.SECONDS, settings.getBackgroundExecutorProvider().getExecutor())) {
133+
ctx.run(
134+
() -> {
135+
ApiFuture<Row> future = stub.readRowCallable().futureCall(request);
136+
try {
137+
future.get();
138+
fail("Should throw exception");
139+
} catch (ExecutionException e) {
140+
assertThat(e).hasMessageThat().contains("missed heartbeat");
141+
} catch (InterruptedException e) {
142+
fail("Should not throw interrupted exception");
143+
}
144+
});
145+
}
146+
}
147+
}
148+
149+
private static class FakeDataService extends BigtableGrpc.BigtableImplBase {
150+
private final ScheduledExecutorService serverExecutor = Executors.newScheduledThreadPool(4);
151+
152+
public void shutdown() {
153+
serverExecutor.shutdownNow();
154+
}
155+
156+
@Override
157+
public void getClientConfiguration(
158+
GetClientConfigurationRequest request,
159+
StreamObserver<ClientConfiguration> responseObserver) {
160+
responseObserver.onNext(
161+
ClientConfiguration.newBuilder()
162+
.setSessionConfiguration(
163+
com.google.bigtable.v2.SessionClientConfiguration.newBuilder()
164+
.setSessionLoad(1)
165+
.build())
166+
.build());
167+
responseObserver.onCompleted();
168+
}
169+
170+
@Override
171+
public StreamObserver<SessionRequest> openTable(
172+
StreamObserver<SessionResponse> responseObserver) {
173+
return new StreamObserver<SessionRequest>() {
174+
@Override
175+
public void onNext(SessionRequest sessionRequest) {
176+
if (sessionRequest.hasOpenSession()) {
177+
responseObserver.onNext(
178+
SessionResponse.newBuilder()
179+
.setOpenSession(OpenSessionResponse.getDefaultInstance())
180+
.build());
181+
} else if (sessionRequest.hasVirtualRpc()) {
182+
// Server hangs
183+
}
184+
}
185+
186+
@Override
187+
public void onError(Throwable t) {}
188+
189+
@Override
190+
public void onCompleted() {
191+
responseObserver.onCompleted();
192+
}
193+
};
194+
}
195+
}
196+
197+
private static class ResponseHeaderInterceptor implements ServerInterceptor {
198+
@Override
199+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
200+
ServerCall<ReqT, RespT> serverCall,
201+
Metadata metadata,
202+
ServerCallHandler<ReqT, RespT> serverCallHandler) {
203+
return serverCallHandler.startCall(
204+
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
205+
@Override
206+
public void sendHeaders(Metadata headers) {
207+
Metadata.Key<String> peerInfoKey =
208+
Metadata.Key.of("bigtable-peer-info", Metadata.ASCII_STRING_MARSHALLER);
209+
String encoded =
210+
Base64.getUrlEncoder()
211+
.encodeToString(
212+
PeerInfo.newBuilder()
213+
.setApplicationFrontendRegion("us-east1")
214+
.build()
215+
.toByteArray());
216+
headers.put(peerInfoKey, encoded);
217+
super.sendHeaders(headers);
218+
}
219+
220+
@Override
221+
public void close(io.grpc.Status status, Metadata trailers) {
222+
super.close(status, trailers);
223+
}
224+
},
225+
metadata);
226+
}
227+
}
228+
}

0 commit comments

Comments
 (0)