Add cross-cluster streaming support to the remote cluster client#22359
Conversation
PR Reviewer Guide 🔍(Review updated until commit 0013a8c)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 0013a8c Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit ac293d7
Suggestions up to commit 957720a
Suggestions up to commit 66f6911
Suggestions up to commit a423007
Suggestions up to commit aad809e
|
4e812ed to
df4bd45
Compare
|
Persistent review updated to latest commit df4bd45 |
df4bd45 to
2ccf667
Compare
|
Persistent review updated to latest commit 2ccf667 |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #22359 +/- ##
============================================
- Coverage 73.40% 73.35% -0.06%
+ Complexity 76146 76072 -74
============================================
Files 6076 6076
Lines 345628 345704 +76
Branches 49753 49758 +5
============================================
- Hits 253724 253579 -145
- Misses 71711 71878 +167
- Partials 20193 20247 +54 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
2ccf667 to
2c95f70
Compare
|
Persistent review updated to latest commit 2c95f70 |
2c95f70 to
e715658
Compare
|
Persistent review updated to latest commit e715658 |
e715658 to
c62541e
Compare
|
Persistent review updated to latest commit c62541e |
|
❌ Gradle check result for c62541e: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
c62541e to
b63aa16
Compare
|
Persistent review updated to latest commit b63aa16 |
b63aa16 to
c00ede5
Compare
|
Persistent review updated to latest commit c00ede5 |
c00ede5 to
ac75d5f
Compare
|
Persistent review updated to latest commit ac75d5f |
|
@rishabhmaurya Are the three things listed in the description here related? I don't understand what items 2 and 3 have to do with a stream-aware remote client. |
|
@andrross we can decouple them into separate PR. They are related here due to the nature of the long lived connections we will be creating in CCR. Synchronous send is important for integration with virtual threads and backpressure in CCR. Cancellation and keepalive are needed for long lived nature of stream and better control on it instead of relying on grpc defaults. |
ac75d5f to
aad809e
Compare
|
Persistent review updated to latest commit aad809e |
aad809e to
a423007
Compare
|
Persistent review updated to latest commit a423007 |
@rishabhmaurya Another way to solve this problem is to let the producer manage it's own permit system for how many in-flight sends it can allow, with something like: It looks like sendResponseBatch() doesn't have a callback to notify the caller that the batch has been pulled off of its executor queue and written to the outbound buffer. That would allow the caller to limit how many in-flight requests there are queued up. |
a423007 to
66f6911
Compare
|
Persistent review updated to latest commit 66f6911 |
66f6911 to
957720a
Compare
|
Persistent review updated to latest commit 957720a |
|
Persistent review updated to latest commit ac293d7 |
Enable a plugin to open an Arrow Flight stream to a *remote cluster* (mirroring the existing pull remote-client path), with producer-side backpressure and a poll-able cancellation signal so a long-lived producer does not leak when the consumer goes away. This is the transport infrastructure underneath cross-cluster replication's streaming GetChanges, but it carries no replication-specific code and is reusable by any streaming cross-cluster action. Stream-aware remote cluster client: - RemoteClusterAwareClient.sendStreamRequest(action, request, targetNode, handler): same ensureConnected + node selection as doExecute, but carries the request over StreamTransportService with Type.STREAM (connecting the target node on the stream transport first, since it has its own connection manager). No fallback to the regular transport — a missing stream transport throws. - RemoteClusterService.getRemoteClusterClient(threadPool, alias, streamTransportService) overload builds a stream-capable client; the existing 2-arg factory is unchanged. - StreamTransportService marked @experimentalapi (now referenced by the @publicapi RemoteClusterService) and exposes getStreamTransportReqTimeout(). Producer backpressure (inline send): - RequestHandlerRegistry carries a streamSendInline bit; StreamTransportService .registerStreamRequestHandler sets it so a streaming action can send response batches on the producer thread instead of an intermediate executor, making transport backpressure block the producer (bounding producer heap on a slow consumer). Contract: one stream is driven by a single producer thread. - arrow-flight-rpc FlightOutboundHandler/FlightTransportChannel/FlightMessageHandler honor the bit: inline send on the producer thread (rethrowing the failure so the producer loop can exit) vs the channel executor otherwise (search path unchanged). Cancellation + liveness: - StreamingTransportChannel.isCancelled() default method (fulfils the existing TODO) lets a parked producer poll for consumer cancellation instead of leaking until its next send throws; FlightServerChannel/FlightTransportChannel wire it to the gRPC cancel signal. - gRPC/HTTP-2 keepalive on the Flight server (flight.keepalive.time/timeout) frees a parked producer on a half-open connection within the keepalive window. Tests: FlightInlineSendBackpressureTests (inline vs executor dispatch), FlightOutboundHandlerTests and FlightTransportChannelTests updated for the new sendInline arg. Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
- Correct docs/comments: a sync send returns once the batch is pushed to gRPC's per-stream outbound buffer (not "on the wire"); a full buffer is throttled by the isReady() readiness gate. Stated positively (TransportChannel, FlightOutboundHandler, backpressure.md). - FlightTransport: use imported Consumer / NettyServerBuilder / TimeUnit instead of fully-qualified names in the keepalive transportHint. - FlightServerChannel: state the single-writer invariant explicitly — the channel is not thread-safe; all send-side mutation (root, terminalSent, serverStreamListener calls) must run on the single flight-executor thread, never concurrently, or Arrow refcounts race and buffers orphan/double-free. Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
ac293d7 to
0013a8c
Compare
|
Persistent review updated to latest commit 0013a8c |
Description
Adds cross-cluster streaming support to the remote cluster client: a plugin can open an Arrow Flight stream to a remote cluster (mirroring the existing regular remote-client request path), with a poll-able cancellation signal so a long-lived sender doesn't leak when the consumer goes away, plus an opt-in synchronous send for callers that need a hard bound on outstanding batches.
This is transport infrastructure underneath cross-cluster replication's streaming GetChanges, but it carries no replication-specific code and is reusable by any streaming cross-cluster action. Three pieces:
1. Stream-aware remote client
RemoteClusterAwareClient.sendStreamRequest(...)sends a streaming request to a remote cluster over the stream (Arrow Flight) transport, with the same connect + node-selection as the existingdoExecuterequest/response path;RemoteClusterService.getRemoteClusterClient(threadPool, alias, streamTransportService)builds such a client. No fallback to the regular transport — a missing stream transport throws. The class ispublic @ExperimentalApibecause streaming has no polymorphic home on theClientinterface (streaming is initiated onStreamTransportService, notClient), so a cross-module caller holds this concrete type to reachsendStreamRequest.2. Optional synchronous send (
sendResponseBatch(response, sync))By default a batch send is dispatched to the channel's send executor and the producer returns once it is queued. The new opt-in overload lets a caller instead block until the batch is on the wire:
sendResponseBatch(response)— async (default, unchanged).sendResponseBatch(response, true)— the send still runs on the channel's send executor, but the caller blocks until it completes, so it cannot queue the next batch ahead of this one. Outstanding batches are bounded to one (useful when batches hold large buffers that must not accumulate).The send always runs on the same single-threaded executor that the channel's
close()posts its stream-root free to, so a batch send and a concurrentclose()are FIFO-serialized on that executor and cannot race on the stream root. A caller that opts intosyncmust drive a single stream from a single thread and must not call from the send-executor thread.3. Cancellation + liveness
TransportChannel.isCancelled()(default method, forwarded byTaskTransportChannel) lets a parked sender poll for consumer cancellation instead of leaking until its next send throws; plus gRPC server keepalive on the Flight server (flight.keepalive.time/timeout, defaulting to gRPC's own 2h/20s so behaviour is unchanged out of the box) to free senders on half-open connections.Related Issues
N/A — extracted from internal cross-cluster-replication streaming work for upstream review.
Check List
TaskTransportChannelforwarding, keepalive defaults + validator + applied to the gRPC builder).--signoff.@ExperimentalApi).backpressure.md).By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.