Skip to content

Add cross-cluster streaming support to the remote cluster client#22359

Merged
andrross merged 2 commits into
opensearch-project:mainfrom
rishabhmaurya:streaming-transport-cross-cluster
Jul 2, 2026
Merged

Add cross-cluster streaming support to the remote cluster client#22359
andrross merged 2 commits into
opensearch-project:mainfrom
rishabhmaurya:streaming-transport-cross-cluster

Conversation

@rishabhmaurya

@rishabhmaurya rishabhmaurya commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Description

Adds cross-cluster streaming support to the remote cluster client: a plugin can open an Arrow Flight stream to a remote cluster (mirroring the existing regular remote-client request path), with a poll-able cancellation signal so a long-lived sender doesn't leak when the consumer goes away, plus an opt-in synchronous send for callers that need a hard bound on outstanding batches.

This is transport infrastructure underneath cross-cluster replication's streaming GetChanges, but it carries no replication-specific code and is reusable by any streaming cross-cluster action. Three pieces:

1. Stream-aware remote client

RemoteClusterAwareClient.sendStreamRequest(...) sends a streaming request to a remote cluster over the stream (Arrow Flight) transport, with the same connect + node-selection as the existing doExecute request/response path; RemoteClusterService.getRemoteClusterClient(threadPool, alias, streamTransportService) builds such a client. No fallback to the regular transport — a missing stream transport throws. The class is public @ExperimentalApi because streaming has no polymorphic home on the Client interface (streaming is initiated on StreamTransportService, not Client), so a cross-module caller holds this concrete type to reach sendStreamRequest.

2. Optional synchronous send (sendResponseBatch(response, sync))

By default a batch send is dispatched to the channel's send executor and the producer returns once it is queued. The new opt-in overload lets a caller instead block until the batch is on the wire:

  • sendResponseBatch(response) — async (default, unchanged).
  • sendResponseBatch(response, true) — the send still runs on the channel's send executor, but the caller blocks until it completes, so it cannot queue the next batch ahead of this one. Outstanding batches are bounded to one (useful when batches hold large buffers that must not accumulate).

The send always runs on the same single-threaded executor that the channel's close() posts its stream-root free to, so a batch send and a concurrent close() are FIFO-serialized on that executor and cannot race on the stream root. A caller that opts into sync must drive a single stream from a single thread and must not call from the send-executor thread.

3. Cancellation + liveness

TransportChannel.isCancelled() (default method, forwarded by TaskTransportChannel) lets a parked sender poll for consumer cancellation instead of leaking until its next send throws; plus gRPC server keepalive on the Flight server (flight.keepalive.time/timeout, defaulting to gRPC's own 2h/20s so behaviour is unchanged out of the box) to free senders on half-open connections.

Related Issues

N/A — extracted from internal cross-cluster-replication streaming work for upstream review.

Check List

  • Functionality includes testing (stream client connect/route + no-fallback contract, sync send runs on the executor and blocks, sync failure surfaces as StreamException, async dispatch, TaskTransportChannel forwarding, keepalive defaults + validator + applied to the gRPC builder).
  • Commits are signed per the DCO using --signoff.
  • Public APIs annotated (@ExperimentalApi).
  • Docs updated (backpressure.md).

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@github-actions

github-actions Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 0013a8c)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add gRPC keepalive settings for Flight server

Relevant files:

  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java
  • plugins/arrow-flight-rpc/src/test/java/org/apache/arrow/flight/OSFlightServerKeepAliveTests.java
  • plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/bootstrap/ServerConfigTests.java

Sub-PR theme: Add sync sendResponseBatch and isCancelled to streaming channel

Relevant files:

  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportChannel.java
  • plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
  • server/src/main/java/org/opensearch/transport/TaskTransportChannel.java
  • server/src/main/java/org/opensearch/transport/TransportChannel.java
  • server/src/main/java/org/opensearch/transport/stream/StreamingTransportChannel.java
  • plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightOutboundHandlerTests.java
  • plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportChannelTests.java
  • server/src/test/java/org/opensearch/transport/TaskTransportChannelTests.java
  • plugins/arrow-flight-rpc/docs/backpressure.md

Sub-PR theme: Add cross-cluster streaming support to RemoteClusterAwareClient

Relevant files:

  • server/src/main/java/org/opensearch/transport/RemoteClusterAwareClient.java
  • server/src/main/java/org/opensearch/transport/RemoteClusterService.java
  • server/src/main/java/org/opensearch/transport/StreamTransportService.java
  • server/src/test/java/org/opensearch/transport/RemoteClusterAwareClientTests.java

⚡ Recommended focus areas for review

Potential Double Free

In the sync path, handedOff = true is set only after flightChannel.getExecutor().submit(sendBatch) returns. If submit() throws (e.g. RejectedExecutionException when the executor is shutting down), the task was not accepted so the finally will correctly call releaseUnsent. However, if submit() succeeds but the returned Future somehow reports a work failure whose task path did not consume the source (e.g. task rejected internally after submit returns), the source ownership assumption may not hold. Verify that any code path where submit() returns successfully guarantees the Runnable will run to the point of consuming the source, otherwise the source can leak (task accepted but never executed, e.g. executor shutdown between submit and run).

if (sync) {
    Future<?> future = flightChannel.getExecutor().submit(sendBatch);
    handedOff = true; // task accepted; it now owns (and will free) the source
    awaitSend(future);
} else {
    flightChannel.getExecutor().execute(sendBatch);
    handedOff = true;
}
Missing Null Check

selectTargetNode calls remoteClusterService.getConnection(clusterAlias).getNode() when the request is not a RemoteClusterAwareRequest, but the fallback path in sendStreamRequest only reaches this when targetNode == null. If getConnection returns a connection whose node is on the regular transport but not reachable on the stream transport, connectToNode will fail. The wrapping into ConnectTransportException handles this, but callers relying on a specific node may be surprised — consider documenting that the auto-selected node must be reachable on both transports.

private DiscoveryNode selectTargetNode(TransportRequest request) {
    if (request instanceof RemoteClusterAwareRequest remoteClusterAwareRequest) {
        return remoteClusterAwareRequest.getPreferredTargetNode();
    }
    return remoteClusterService.getConnection(clusterAlias).getNode();
}

@github-actions

github-actions Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 0013a8c

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Route node-selection failures to handler

selectTargetNode may throw (e.g. NoSuchRemoteClusterException from getConnection)
when no preferred target is provided, which would propagate synchronously out of
sendStreamRequest instead of being routed to the handler. Wrap the node resolution
in try/catch and route failures via
handler.handleException(wrapAsTransportException(...)) for consistent error
delivery.

server/src/main/java/org/opensearch/transport/RemoteClusterAwareClient.java [130-134]

 remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
-    final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
-    // The stream transport has its own connection manager, so connect the node there separately
-    // (the ensureConnected above only covers the regular transport).
+    final DiscoveryNode node;
+    try {
+        node = targetNode != null ? targetNode : selectTargetNode(request);
+    } catch (Exception ex) {
+        handler.handleException(wrapAsTransportException(null, ex));
+        return;
+    }
     streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
Suggestion importance[1-10]: 6

__

Why: Valid concern: selectTargetNode calls remoteClusterService.getConnection(clusterAlias) which can throw, and that exception would propagate synchronously rather than being routed to the handler, breaking the consistent error-delivery contract.

Low
Handle executor submit rejection explicitly

submit() can throw RejectedExecutionException (e.g. executor shutdown), in which
case handedOff is never set and the finally correctly runs releaseUnsent. However,
setting handedOff = true only after submit() returns is correct — but ensure that
the executor's submit contract guarantees the task will either run or the exception
is thrown before returning. If the executor could accept the task and then reject
execution asynchronously, the source would leak; consider documenting or verifying
this invariant for the executor used.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [176-180]

 if (sync) {
-    Future<?> future = flightChannel.getExecutor().submit(sendBatch);
+    Future<?> future;
+    try {
+        future = flightChannel.getExecutor().submit(sendBatch);
+    } catch (RejectedExecutionException ree) {
+        throw new StreamException(StreamErrorCode.UNAVAILABLE, "Flight executor rejected batch", ree);
+    }
     handedOff = true; // task accepted; it now owns (and will free) the source
     awaitSend(future);
 } else {
Suggestion importance[1-10]: 3

__

Why: The existing code already handles rejection correctly via the finally block releasing the source when handedOff is false. The suggestion adds marginal value by converting to a StreamException but the current behavior is functionally acceptable.

Low

Previous suggestions

Suggestions up to commit ac293d7
CategorySuggestion                                                                                                                                    Impact
General
Route synchronous errors through the handler

The inner selectTargetNode / sendRequest block can throw synchronously (e.g.
NoSuchRemoteClusterException from getConnection, or any runtime error from
sendRequest), which would propagate to the caller instead of being routed to the
handler — violating the handler-based error contract. Wrap the body in try/catch and
route failures via handler.handleException.

server/src/main/java/org/opensearch/transport/RemoteClusterAwareClient.java [130-141]

 remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
-    final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
-    // The stream transport has its own connection manager, so connect the node there separately
-    // (the ensureConnected above only covers the regular transport).
-    streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
-        final TransportRequestOptions options = TransportRequestOptions.builder()
-            .withType(TransportRequestOptions.Type.STREAM)
-            .withTimeout(streamTransportService.getStreamTransportReqTimeout())
-            .build();
-        streamTransportService.sendRequest(node, action, request, options, handler);
-    }, e -> handler.handleException(wrapAsTransportException(node, e))));
+    try {
+        final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
+        // The stream transport has its own connection manager, so connect the node there separately
+        // (the ensureConnected above only covers the regular transport).
+        streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
+            try {
+                final TransportRequestOptions options = TransportRequestOptions.builder()
+                    .withType(TransportRequestOptions.Type.STREAM)
+                    .withTimeout(streamTransportService.getStreamTransportReqTimeout())
+                    .build();
+                streamTransportService.sendRequest(node, action, request, options, handler);
+            } catch (Exception ex) {
+                handler.handleException(wrapAsTransportException(node, ex));
+            }
+        }, e -> handler.handleException(wrapAsTransportException(node, e))));
+    } catch (Exception ex) {
+        handler.handleException(wrapAsTransportException(null, ex));
+    }
 }, e -> handler.handleException(wrapAsTransportException(null, e))));
Suggestion importance[1-10]: 6

__

Why: Wrapping the inner block in try/catch prevents synchronous exceptions (e.g. from selectTargetNode or sendRequest) from bypassing the handler contract, which is a reasonable correctness improvement.

Low
Handle CancellationException in sync send

The Future.get() can also throw CancellationException (unchecked), which would
propagate raw to the caller and bypass the StreamException wrapping contract, and
could also leave the caller unable to distinguish this from other failures. Catch it
explicitly and wrap it as a StreamException so the sync path always surfaces
failures uniformly.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [205-213]

 } catch (ExecutionException e) {
     // The work body catches its own exceptions and routes them to the listener, so this is not
     // expected; surface it rather than swallow if it ever happens.
     Throwable cause = e.getCause() != null ? e.getCause() : e;
     if (cause instanceof StreamException se) {
         throw se;
     }
     throw new StreamException(StreamErrorCode.INTERNAL, "Error sending batch synchronously", cause);
+} catch (java.util.concurrent.CancellationException e) {
+    throw new StreamException(StreamErrorCode.CANCELLED, "Batch send was cancelled", e);
 }
Suggestion importance[1-10]: 4

__

Why: Handling CancellationException explicitly is a minor robustness improvement, but in practice the submitted task is unlikely to be cancelled in this flow, so impact is limited.

Low
Clarify sync submit rejection handling

If submit() throws (e.g. RejectedExecutionException when the executor is shutting
down), handedOff remains false so releaseUnsent is called — but the task was never
accepted and won't run, so ownership was never transferred. That is correct, however
the same is true for execute(); ensure both are inside the try so the ordering (set
handedOff = true only after successful acceptance) is preserved. The current code is
correct for this, but consider explicitly noting that submit's failure path must not
run awaitSend — which it currently doesn't because handedOff is set between them.
Verify the RejectedExecutionException test still exercises submit() for the sync
path.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [176-183]

 if (sync) {
-    Future<?> future = flightChannel.getExecutor().submit(sendBatch);
+    Future<?> future;
+    try {
+        future = flightChannel.getExecutor().submit(sendBatch);
+    } catch (RejectedExecutionException rex) {
+        throw rex; // handedOff still false -> finally releases source
+    }
     handedOff = true; // task accepted; it now owns (and will free) the source
     awaitSend(future);
 } else {
     flightChannel.getExecutor().execute(sendBatch);
     handedOff = true;
 }
Suggestion importance[1-10]: 2

__

Why: The suggestion acknowledges the existing code is already correct and only asks to verify test coverage; the proposed change adds no functional improvement.

Low
Suggestions up to commit 957720a
CategorySuggestion                                                                                                                                    Impact
General
Route synchronous failures to the handler

selectTargetNode may throw synchronously (e.g. getConnection throws
NoSuchRemoteClusterException when no connected node exists), which would propagate
to the caller of sendStreamRequest instead of being routed to
handler.handleException. Wrap the node-selection and subsequent call in a try/catch
so all failures are consistently delivered to the handler.

server/src/main/java/org/opensearch/transport/RemoteClusterAwareClient.java [130-141]

 remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
-    final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
+    final DiscoveryNode node;
+    try {
+        node = targetNode != null ? targetNode : selectTargetNode(request);
+    } catch (Exception ex) {
+        handler.handleException(wrapAsTransportException(null, ex));
+        return;
+    }
     // The stream transport has its own connection manager, so connect the node there separately
     // (the ensureConnected above only covers the regular transport).
     streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
         final TransportRequestOptions options = TransportRequestOptions.builder()
             .withType(TransportRequestOptions.Type.STREAM)
             .withTimeout(streamTransportService.getStreamTransportReqTimeout())
             .build();
         streamTransportService.sendRequest(node, action, request, options, handler);
     }, e -> handler.handleException(wrapAsTransportException(node, e))));
 }, e -> handler.handleException(wrapAsTransportException(null, e))));
Suggestion importance[1-10]: 6

__

Why: Valid concern: selectTargetNode can throw NoSuchRemoteClusterException synchronously inside the ensureConnected callback, which would not be routed to the handler consistently. Wrapping it improves error-handling symmetry, though impact is moderate.

Low
Wrap sync executor rejection as StreamException

submit(sendBatch) may throw RejectedExecutionException (e.g. executor shutdown), in
which case handedOff remains false and releaseUnsent is correctly invoked — but the
exception then escapes as a raw RejectedExecutionException rather than being wrapped
as StreamException, unlike the async path already covered by tests. Consider
catching rejection here and translating it to a StreamException for consistent error
semantics on the sync path.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [175-182]

 if (sync) {
-    Future<?> future = flightChannel.getExecutor().submit(sendBatch);
+    final Future<?> future;
+    try {
+        future = flightChannel.getExecutor().submit(sendBatch);
+    } catch (RejectedExecutionException ree) {
+        throw new StreamException(StreamErrorCode.UNAVAILABLE, "Executor rejected batch send", ree);
+    }
     handedOff = true; // task accepted; it now owns (and will free) the source
     awaitSend(future);
 } else {
     flightChannel.getExecutor().execute(sendBatch);
     handedOff = true;
 }
Suggestion importance[1-10]: 5

__

Why: Reasonable consistency improvement: RejectedExecutionException on the sync path currently escapes unwrapped, while the async tests expect release-on-rejection semantics. Wrapping as StreamException gives uniform error semantics, but is a minor issue.

Low
Suggestions up to commit 66f6911
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent double-release on sync send failure

When sync == true and runOnExecutorAndWait throws a StreamException, handedOff
remains false and the finally block calls flightChannel.releaseUnsent(response), but
the submitted BatchTask already closed the response via try-with-resources on the
executor thread (double release). Set handedOff = true before invoking the sync
path, or reorganize so the release only happens if the task never ran.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [173-178]

 if (sync) {
+    handedOff = true;
     runOnExecutorAndWait(flightChannel, sendBatch);
 } else {
     flightChannel.getExecutor().execute(sendBatch);
+    handedOff = true;
 }
-handedOff = true;
Suggestion importance[1-10]: 7

__

Why: Valid concern: in the sync path, the submitted BatchTask uses try-with-resources to close the response on the executor thread. If runOnExecutorAndWait throws after the task ran, handedOff stays false and releaseUnsent would be called, causing a potential double-release. However, the exact behavior depends on releaseUnsent/close idempotency.

Medium
General
Route node-selection failures to handler

selectTargetNode may throw (e.g. getConnection throws NoSuchRemoteClusterException
when no connected node is available), but this runs inside the ensureConnected
success callback so the exception will propagate to the listener framework rather
than to the handler. Wrap the body so any synchronous failure is routed to
handler.handleException for consistent error delivery.

server/src/main/java/org/opensearch/transport/RemoteClusterAwareClient.java [130-134]

 remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
-    final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
-    // The stream transport has its own connection manager, so connect the node there separately
-    // (the ensureConnected above only covers the regular transport).
+    final DiscoveryNode node;
+    try {
+        node = targetNode != null ? targetNode : selectTargetNode(request);
+    } catch (Exception ex) {
+        handler.handleException(wrapAsTransportException(null, ex));
+        return;
+    }
     streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
Suggestion importance[1-10]: 5

__

Why: Reasonable robustness improvement: selectTargetNode can throw synchronously (e.g. NoSuchRemoteClusterException), and wrapping it ensures the handler receives the failure consistently rather than propagating through the listener chain.

Low
Log keepalive configuration for observability

The transport hint API is documented as best-effort ("not guaranteed to have any
effect"); if a future Arrow upgrade silently drops this hook, keepalive becomes a
no-op with no runtime signal. Consider logging at INFO/DEBUG when the consumer is
registered (or asserting it in a startup check) so operators can detect the
regression without relying solely on the unit test.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java [260-263]

+logger.debug("Configuring Flight server gRPC keepalive: time={}ms, timeout={}ms", keepAliveTimeMs, keepAliveTimeoutMs);
 builder.transportHint("grpc.builderConsumer", (java.util.function.Consumer<io.grpc.netty.NettyServerBuilder>) b -> {
     b.keepAliveTime(keepAliveTimeMs, java.util.concurrent.TimeUnit.MILLISECONDS);
     b.keepAliveTimeout(keepAliveTimeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS);
 });
Suggestion importance[1-10]: 3

__

Why: Minor observability improvement; adding a debug log is helpful but low impact and does not address the underlying best-effort concern of the hint API.

Low
Suggestions up to commit a423007
CategorySuggestion                                                                                                                                    Impact
General
Route synchronous errors to the handler

The selectTargetNode call and subsequent operations may throw synchronously (e.g.
NoSuchRemoteClusterException or if no connected node exists), but they run inside
the ensureConnected success callback where the exception won't be routed to the
handler. Wrap the callback body in a try/catch that invokes
handler.handleException(wrapAsTransportException(...)) to preserve the same
error-delivery contract as the failure branch.

server/src/main/java/org/opensearch/transport/RemoteClusterAwareClient.java [130-141]

 remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
-    final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
-    // The stream transport has its own connection manager, so connect the node there separately
-    // (the ensureConnected above only covers the regular transport).
-    streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
+    try {
+        final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
+        // The stream transport has its own connection manager, so connect the node there separately
+        // (the ensureConnected above only covers the regular transport).
+        streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
Suggestion importance[1-10]: 6

__

Why: Valid concern: selectTargetNode can throw (e.g. NoSuchRemoteClusterException) inside the ensureConnected success callback, and that exception would not be routed to the handler. Wrapping in try/catch improves error-delivery consistency.

Low
Latch sync lane monotonically

syncSend is latched unconditionally on every batch, so a later batch with sync=false
on a non-Arrow response could downgrade the lane after an earlier batch latched it
to true, breaking the "same lane for completion" invariant if the caller mixes sync
flags. Once latched to true, keep it true for the lifetime of the stream.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportChannel.java [111-113]

 // Latch the lane for this stream so completeStream/sendResponse(Exception) reuse it.
 final boolean resolvedSync = resolveSync(response, sync);
-this.syncSend = resolvedSync;
+if (resolvedSync) {
+    this.syncSend = true;
+}
Suggestion importance[1-10]: 6

__

Why: Reasonable observation: if a caller mixes sync flags across batches on the same stream, the lane could downgrade, potentially violating the "same lane for completion" invariant. Making it monotonic strengthens the invariant.

Low
Enforce keepalive validator at init

FLIGHT_KEEPALIVE_TIMEOUT.get(settings) does not run the cross-setting validator
(which requires the settings map); if a user sets flight.keepalive.timeout >=
flight.keepalive.time in opensearch.yml, node init will still accept it here and
gRPC will receive an invalid pair. Explicitly validate through the ClusterSettings
path or invoke the validator with both values before applying them.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java [286-287]

 grpcKeepAliveTime = FLIGHT_KEEPALIVE_TIME.get(settings);
 grpcKeepAliveTimeout = FLIGHT_KEEPALIVE_TIMEOUT.get(settings);
+new KeepAliveTimeoutValidator().validate(grpcKeepAliveTimeout, Map.of(FLIGHT_KEEPALIVE_TIME, grpcKeepAliveTime));
Suggestion importance[1-10]: 5

__

Why: Valid point that Setting.get(settings) doesn't invoke cross-setting validators; explicitly running the validator at init would catch misconfigurations in opensearch.yml earlier. Moderate impact since ClusterSettings would still validate on cluster-level updates.

Low
Possible issue
Prevent double-release on sync send failure

In the sync path, handedOff = true is set after processBatchTask runs, but if
processBatchTask throws it is caught locally and the finally block will still see
handedOff == false, causing flightChannel.releaseUnsent(response) to attempt a
double-free (the source was already released inside processBatchTask's
try-with-resources on BatchTask). Set handedOff = true before executing the sync
task, mirroring the async path where handoff is claimed once the executor accepts
the runnable.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java [161-176]

 if (sync) {
+    handedOff = true;
     try (BatchTask ignored = task) {
         processBatchTask(task);
     } catch (Exception e) {
         messageListener.onResponseSent(requestId, action, e);
     }
 } else {
     flightChannel.getExecutor().execute(threadPool.getThreadContext().preserveContext(() -> {
         try (BatchTask ignored = task) {
             processBatchTask(task);
         } catch (Exception e) {
             messageListener.onResponseSent(requestId, action, e);
         }
     }));
+    handedOff = true;
 }
-handedOff = true;
Suggestion importance[1-10]: 3

__

Why: The suggestion's premise appears incorrect: exceptions inside processBatchTask are caught locally, so control falls through to handedOff = true normally. The double-free concern is not clearly demonstrated by the diff.

Low
Suggestions up to commit aad809e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Latch send lane only once per stream

The syncSend lane is latched on every batch call, so if a caller sends the first
batch with sync=false and a subsequent batch with sync=true (or vice versa), the
latched lane changes mid-stream. This defeats the stated invariant that terminal
messages cannot be reordered ahead of an in-flight send. Latch only once (on the
first batch) to enforce the documented lane-consistency guarantee.

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportChannel.java [112-114]

-// Latch the lane for this stream so completeStream/sendResponse(Exception) reuse it.
+// Latch the lane for this stream on the first batch so completeStream/sendResponse(Exception)
+// reuse it and cannot be reordered ahead of an in-flight send.
 final boolean resolvedSync = resolveSync(response, sync);
-this.syncSend = resolvedSync;
+if (laneLatched.compareAndSet(false, true)) {
+    this.syncSend = resolvedSync;
+} else if (this.syncSend != resolvedSync) {
+    throw new StreamException(
+        StreamErrorCode.INTERNAL,
+        "sendResponseBatch lane changed mid-stream for requestId [" + requestId + "]"
+    );
+}
Suggestion importance[1-10]: 6

__

Why: Valid concern: re-latching syncSend on every batch could allow the lane to change mid-stream, weakening the ordering guarantee documented in the code. Latching only on the first batch better enforces the stated invariant.

Low
General
Route synchronous send errors to handler

The sendStreamRequest method should protect against exceptions thrown synchronously
from streamTransportService.sendRequest (e.g. serialization or connection lookup
failures). Currently such exceptions would propagate to the caller instead of being
routed to handler.handleException, which is inconsistent with the async error paths
in the same method and with doExecute's contract.

server/src/main/java/org/opensearch/transport/RemoteClusterAwareClient.java [126-137]

 remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
     final DiscoveryNode node = targetNode != null ? targetNode : selectTargetNode(request);
     // The stream transport has its own connection manager, so connect the node there separately
     // (the ensureConnected above only covers the regular transport).
     streamTransportService.connectToNode(node, null, ActionListener.wrap(c -> {
-        final TransportRequestOptions options = TransportRequestOptions.builder()
-            .withType(TransportRequestOptions.Type.STREAM)
-            .withTimeout(streamTransportService.getStreamTransportReqTimeout())
-            .build();
-        streamTransportService.sendRequest(node, action, request, options, handler);
+        try {
+            final TransportRequestOptions options = TransportRequestOptions.builder()
+                .withType(TransportRequestOptions.Type.STREAM)
+                .withTimeout(streamTransportService.getStreamTransportReqTimeout())
+                .build();
+            streamTransportService.sendRequest(node, action, request, options, handler);
+        } catch (Exception ex) {
+            handler.handleException(wrapAsTransportException(node, ex));
+        }
     }, e -> handler.handleException(wrapAsTransportException(node, e))));
 }, e -> handler.handleException(wrapAsTransportException(null, e))));
Suggestion importance[1-10]: 5

__

Why: Wrapping the synchronous call to sendRequest in a try/catch ensures consistent error routing to the handler, though TransportService.sendRequest typically catches internally and routes to the handler already, so the practical impact is modest.

Low

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from 4e812ed to df4bd45 Compare June 30, 2026 20:39
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit df4bd45

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from df4bd45 to 2ccf667 Compare June 30, 2026 20:47
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2ccf667

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 2ccf667: SUCCESS

@codecov

codecov Bot commented Jun 30, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 70.37037% with 24 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.35%. Comparing base (3845aac) to head (0013a8c).

Files with missing lines Patch % Lines
...opensearch/transport/RemoteClusterAwareClient.java 45.45% 9 Missing and 3 partials ⚠️
.../arrow/flight/transport/FlightOutboundHandler.java 70.00% 4 Missing and 2 partials ⚠️
...ava/org/opensearch/transport/TransportChannel.java 0.00% 3 Missing ⚠️
...ch/arrow/flight/transport/FlightServerChannel.java 0.00% 1 Missing ⚠️
...arrow/flight/transport/FlightTransportChannel.java 66.66% 1 Missing ⚠️
...g/opensearch/transport/StreamTransportService.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #22359      +/-   ##
============================================
- Coverage     73.40%   73.35%   -0.06%     
+ Complexity    76146    76072      -74     
============================================
  Files          6076     6076              
  Lines        345628   345704      +76     
  Branches      49753    49758       +5     
============================================
- Hits         253724   253579     -145     
- Misses        71711    71878     +167     
- Partials      20193    20247      +54     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2c95f70

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from 2c95f70 to e715658 Compare June 30, 2026 23:40
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e715658

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from e715658 to c62541e Compare July 1, 2026 00:07
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c62541e

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

❌ Gradle check result for c62541e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from c62541e to b63aa16 Compare July 1, 2026 00:15
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b63aa16

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from b63aa16 to c00ede5 Compare July 1, 2026 00:17
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c00ede5

@rishabhmaurya rishabhmaurya marked this pull request as ready for review July 1, 2026 00:20
@rishabhmaurya rishabhmaurya requested review from a team and peternied as code owners July 1, 2026 00:20
@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from c00ede5 to ac75d5f Compare July 1, 2026 00:20
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ac75d5f

@andrross

andrross commented Jul 1, 2026

Copy link
Copy Markdown
Member

@rishabhmaurya Are the three things listed in the description here related? I don't understand what items 2 and 3 have to do with a stream-aware remote client.

@rishabhmaurya

Copy link
Copy Markdown
Contributor Author

@andrross we can decouple them into separate PR. They are related here due to the nature of the long lived connections we will be creating in CCR. Synchronous send is important for integration with virtual threads and backpressure in CCR. Cancellation and keepalive are needed for long lived nature of stream and better control on it instead of relying on grpc defaults.

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from ac75d5f to aad809e Compare July 1, 2026 01:06
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit aad809e

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from aad809e to a423007 Compare July 1, 2026 01:40
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a423007

@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for a423007: SUCCESS

@andrross

andrross commented Jul 1, 2026

Copy link
Copy Markdown
Member

Synchronous send is important for integration with virtual threads

@rishabhmaurya Another way to solve this problem is to let the producer manage it's own permit system for how many in-flight sends it can allow, with something like:

semaphore.acquire();
channel.sendResponseBatch(batch, ActionListener.wrap(semaphore::release, e -> { semaphore.release(); fail(e); }));

It looks like sendResponseBatch() doesn't have a callback to notify the caller that the batch has been pulled off of its executor queue and written to the outbound buffer. That would allow the caller to limit how many in-flight requests there are queued up.

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from a423007 to 66f6911 Compare July 2, 2026 03:23
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 66f6911

@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from 66f6911 to 957720a Compare July 2, 2026 03:36
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 957720a

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 957720a: SUCCESS

Comment thread plugins/arrow-flight-rpc/docs/backpressure.md Outdated
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ac293d7

Enable a plugin to open an Arrow Flight stream to a *remote cluster* (mirroring
the existing pull remote-client path), with producer-side backpressure and a
poll-able cancellation signal so a long-lived producer does not leak when the
consumer goes away. This is the transport infrastructure underneath
cross-cluster replication's streaming GetChanges, but it carries no
replication-specific code and is reusable by any streaming cross-cluster action.

Stream-aware remote cluster client:
- RemoteClusterAwareClient.sendStreamRequest(action, request, targetNode, handler):
  same ensureConnected + node selection as doExecute, but carries the request over
  StreamTransportService with Type.STREAM (connecting the target node on the stream
  transport first, since it has its own connection manager). No fallback to the
  regular transport — a missing stream transport throws.
- RemoteClusterService.getRemoteClusterClient(threadPool, alias, streamTransportService)
  overload builds a stream-capable client; the existing 2-arg factory is unchanged.
- StreamTransportService marked @experimentalapi (now referenced by the @publicapi
  RemoteClusterService) and exposes getStreamTransportReqTimeout().

Producer backpressure (inline send):
- RequestHandlerRegistry carries a streamSendInline bit; StreamTransportService
  .registerStreamRequestHandler sets it so a streaming action can send response
  batches on the producer thread instead of an intermediate executor, making
  transport backpressure block the producer (bounding producer heap on a slow
  consumer). Contract: one stream is driven by a single producer thread.
- arrow-flight-rpc FlightOutboundHandler/FlightTransportChannel/FlightMessageHandler
  honor the bit: inline send on the producer thread (rethrowing the failure so the
  producer loop can exit) vs the channel executor otherwise (search path unchanged).

Cancellation + liveness:
- StreamingTransportChannel.isCancelled() default method (fulfils the existing TODO)
  lets a parked producer poll for consumer cancellation instead of leaking until its
  next send throws; FlightServerChannel/FlightTransportChannel wire it to the gRPC
  cancel signal.
- gRPC/HTTP-2 keepalive on the Flight server (flight.keepalive.time/timeout) frees a
  parked producer on a half-open connection within the keepalive window.

Tests: FlightInlineSendBackpressureTests (inline vs executor dispatch),
FlightOutboundHandlerTests and FlightTransportChannelTests updated for the new
sendInline arg.

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
- Correct docs/comments: a sync send returns once the batch is pushed to gRPC's
  per-stream outbound buffer (not "on the wire"); a full buffer is throttled by
  the isReady() readiness gate. Stated positively (TransportChannel,
  FlightOutboundHandler, backpressure.md).
- FlightTransport: use imported Consumer / NettyServerBuilder / TimeUnit instead
  of fully-qualified names in the keepalive transportHint.
- FlightServerChannel: state the single-writer invariant explicitly — the channel
  is not thread-safe; all send-side mutation (root, terminalSent, serverStreamListener
  calls) must run on the single flight-executor thread, never concurrently, or Arrow
  refcounts race and buffers orphan/double-free.

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
@rishabhmaurya rishabhmaurya force-pushed the streaming-transport-cross-cluster branch from ac293d7 to 0013a8c Compare July 2, 2026 18:32
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0013a8c

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 0013a8c: SUCCESS

@andrross andrross merged commit ca7a121 into opensearch-project:main Jul 2, 2026
14 of 15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants