[SPARK-56227][CORE] Fix GcmTransportCipher to correctly handle multiple messages per channel#55028
Draft
aajisaka wants to merge 1 commit intoapache:masterfrom
Draft
[SPARK-56227][CORE] Fix GcmTransportCipher to correctly handle multiple messages per channel#55028aajisaka wants to merge 1 commit intoapache:masterfrom
aajisaka wants to merge 1 commit intoapache:masterfrom
Conversation
This fixes two bugs in `GcmTransportCipher` introduced by SPARK-47172. Bug 1: `DecryptionHandler` silently drops every message after the first. `AesGcmHkdfStreaming` is a one-shot streaming primitive — each independently encrypted message carries its own random IV and requires a fresh `StreamSegmentDecrypter`. The `DecryptionHandler` never reset its per-message state (`completed`, `decrypterInit`, `expectedLength`, `segmentNumber`, etc.) nor replaced the single `final StreamSegmentDecrypter` instance between messages. After the first message was decoded, `completed = true` permanently, and all subsequent messages were silently dropped because both `initalizeExpectedLength()` and `initalizeDecrypter()` returned early as no-ops and the inner while loop never ran. Fix: add `resetForNextMessage()` which clears all per-message fields and allocates a new `StreamSegmentDecrypter`; call it after each fully decoded message. Bug 2: `DecryptionHandler` discards bytes from messages batched in the same `channelRead()` call. Under shuffle load, TCP coalesces multiple encrypted messages into a single `ByteBuf`. The original code exited the decryption loop as soon as one message completed and released the buffer — including any trailing bytes belonging to subsequent messages. The next `channelRead()` then received bytes starting mid-stream of the second message, interpreted them as an 8-byte length header, and threw: `IllegalStateException: Invalid expected ciphertext length.` Fix: wrap the decryption logic in an outer loop that continues consuming messages from the same buffer until either the buffer is exhausted or a partial message is encountered. `resetForNextMessage()` is called inside the loop immediately after each complete message while the buffer is still held. Bug 3 (minor): `EncryptionHandler` shares working buffers across concurrent `GcmEncryptedMessage` instances. `plaintextBuffer` and `ciphertextBuffer` were fields of `EncryptionHandler` passed into every `GcmEncryptedMessage`. The constructor's `ciphertextBuffer.limit(0)` call could corrupt an in-flight message's buffer state if Netty batched writes. Fix: move buffer ownership into `GcmEncryptedMessage` so each message allocates its own working buffers. Without these fixes, enabling `AES/GCM/NoPadding` RPC encryption causes YARN executor containers to fail: the auth handshake succeeds but all post-auth RPC messages are dropped or corrupted, leaving the channel hung until YARN kills the container. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Member
Author
|
Converted to draft. We are seeing job failures in benchmarking test. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This fixes two bugs in
GcmTransportCipherintroduced by SPARK-47172.Bug 1:
DecryptionHandlersilently drops every message after the first.AesGcmHkdfStreamingis a one-shot streaming primitive — each independently encrypted message carries its own random IV and requires a freshStreamSegmentDecrypter. TheDecryptionHandlernever reset its per-message state (completed,decrypterInit,expectedLength,segmentNumber, etc.) nor replaced the singlefinal StreamSegmentDecrypterinstance between messages. After the first message was decoded,completed = truepermanently, and all subsequent messages were silently dropped because bothinitalizeExpectedLength()andinitalizeDecrypter()returned early as no-ops and the inner while loop never ran.Fix: add
resetForNextMessage()which clears all per-message fields and allocates a newStreamSegmentDecrypter; call it after each fully decoded message.Bug 2:
DecryptionHandlerdiscards bytes from messages batched in the samechannelRead()call.Under shuffle load, TCP coalesces multiple encrypted messages into a single
ByteBuf. The original code exited the decryption loop as soon as one message completed and released the buffer — including any trailing bytes belonging to subsequent messages. The nextchannelRead()then received bytes starting mid-stream of the second message, interpreted them as an 8-byte length header, and threw:IllegalStateException: Invalid expected ciphertext length.Fix: wrap the decryption logic in an outer loop that continues consuming messages from the same buffer until either the buffer is exhausted or a partial message is encountered.
resetForNextMessage()is called inside the loop immediately after each complete message while the buffer is still held.Bug 3 (minor):
EncryptionHandlershares working buffers across concurrentGcmEncryptedMessageinstances.plaintextBufferandciphertextBufferwere fields ofEncryptionHandlerpassed into everyGcmEncryptedMessage. The constructor'sciphertextBuffer.limit(0)call could corrupt an in-flight message's buffer state if Netty batched writes. Fix: move buffer ownership intoGcmEncryptedMessageso each message allocates its own working buffers.Without these fixes, enabling
AES/GCM/NoPaddingRPC encryption causes YARN executor containers to fail: the auth handshake succeeds but all post-auth RPC messages are dropped or corrupted, leaving the channel hung until YARN kills the container.Why are the changes needed?
To successfully run Spark jobs on YARN with
spark.network.crypto.cipher="AES/GCM/NoPadding"Fixes #54999
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests:
testMultipleMessages: encrypts and decrypts two independent messages through the same handler pair with separatechannelRead()calls.testBatchedMessages: concatenates two ciphertexts into oneByteBufand delivers them in a singlechannelRead()call, verifying both are decoded correctly.Ported these changes to our Spark 3.4.x-based internal branch and ran multiple jobs in YARN cluster successfully.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.6)