1919package org .apache .flink .fs .s3native ;
2020
2121import org .apache .flink .annotation .Internal ;
22+ import org .apache .flink .annotation .VisibleForTesting ;
2223import org .apache .flink .core .fs .ICloseableRegistry ;
2324import org .apache .flink .core .fs .PathsCopyingFileSystem ;
2425
3738import java .util .concurrent .CompletableFuture ;
3839import java .util .concurrent .ExecutionException ;
3940
41+ import static org .apache .flink .util .Preconditions .checkArgument ;
42+
4043/**
4144 * Helper class for performing bulk S3 to local file system copies using S3TransferManager.
4245 *
4346 * <p><b>Concurrency Model:</b> Uses batch-based concurrency control with {@code
44- * maxConcurrentCopies} to limit parallel downloads. The current implementation waits for each batch
45- * to complete before starting the next batch. A future enhancement could use a bounded thread pool
46- * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow continuous submission
47- * of new downloads as slots become available, which would provide better throughput by avoiding the
48- * "slowest task in batch" bottleneck.
47+ * maxConcurrentCopies} to limit parallel downloads. The effective concurrency is clamped to the
48+ * HTTP connection pool size ({@code maxConnections}) to prevent connection pool exhaustion. The
49+ * current implementation waits for each batch to complete before starting the next batch. A future
50+ * enhancement could use a bounded thread pool (e.g., {@link java.util.concurrent.Semaphore} or
51+ * bounded executor) to allow continuous submission of new downloads as slots become available,
52+ * which would provide better throughput by avoiding the "slowest task in batch" bottleneck.
4953 *
5054 * <p><b>Retry Handling:</b> Relies on the S3TransferManager's built-in retry mechanism for
5155 * transient failures. If a download fails after retries:
@@ -70,10 +74,39 @@ class NativeS3BulkCopyHelper {
7074
7175 private final S3TransferManager transferManager ;
7276 private final int maxConcurrentCopies ;
77+ private final int maxConnections ;
7378
74- public NativeS3BulkCopyHelper (S3TransferManager transferManager , int maxConcurrentCopies ) {
79+ /**
80+ * Creates a new bulk copy helper.
81+ *
82+ * @param transferManager the S3 transfer manager for async downloads
83+ * @param maxConcurrentCopies the requested maximum number of concurrent copy operations
84+ * @param maxConnections the HTTP connection pool size; if {@code maxConcurrentCopies} exceeds
85+ * this value, it is clamped down to prevent connection pool exhaustion
86+ */
87+ NativeS3BulkCopyHelper (
88+ S3TransferManager transferManager , int maxConcurrentCopies , int maxConnections ) {
89+ checkArgument (maxConcurrentCopies > 0 , "maxConcurrentCopies must be positive" );
90+ checkArgument (maxConnections > 0 , "maxConnections must be positive" );
7591 this .transferManager = transferManager ;
76- this .maxConcurrentCopies = maxConcurrentCopies ;
92+ this .maxConnections = maxConnections ;
93+ if (maxConcurrentCopies > maxConnections ) {
94+ LOG .warn (
95+ "{} ({}) exceeds {} ({}). "
96+ + "Clamping concurrent copies to {} to prevent connection pool exhaustion." ,
97+ NativeS3FileSystemFactory .BULK_COPY_MAX_CONCURRENT .key (),
98+ maxConcurrentCopies ,
99+ NativeS3FileSystemFactory .MAX_CONNECTIONS .key (),
100+ maxConnections ,
101+ maxConnections );
102+ this .maxConcurrentCopies = maxConnections ;
103+ } else {
104+ this .maxConcurrentCopies = maxConcurrentCopies ;
105+ }
106+ }
107+
108+ int getMaxConcurrentCopies () {
109+ return maxConcurrentCopies ;
77110 }
78111
79112 /**
@@ -97,9 +130,17 @@ public void copyFiles(
97130 return ;
98131 }
99132
100- LOG .info ("Starting bulk copy of {} files using S3TransferManager" , requests .size ());
133+ int totalFiles = requests .size ();
134+ int totalBatches = (totalFiles + maxConcurrentCopies - 1 ) / maxConcurrentCopies ;
135+ LOG .info (
136+ "Starting bulk copy of {} files using S3TransferManager "
137+ + "(batch size: {}, total batches: {})" ,
138+ totalFiles ,
139+ maxConcurrentCopies ,
140+ totalBatches );
101141
102142 List <CompletableFuture <CompletedCopy >> copyFutures = new ArrayList <>();
143+ int batchNumber = 0 ;
103144
104145 try {
105146 for (int i = 0 ; i < requests .size (); i ++) {
@@ -113,12 +154,18 @@ public void copyFiles(
113154 }
114155
115156 if (copyFutures .size () >= maxConcurrentCopies || i == requests .size () - 1 ) {
157+ batchNumber ++;
158+ LOG .debug (
159+ "Waiting for batch {}/{} ({} files)" ,
160+ batchNumber ,
161+ totalBatches ,
162+ copyFutures .size ());
116163 waitForCopies (copyFutures );
117164 copyFutures .clear ();
118165 }
119166 }
120167
121- LOG .info ("Completed bulk copy of {} files" , requests . size () );
168+ LOG .info ("Completed bulk copy of {} files" , totalFiles );
122169 } catch (Exception e ) {
123170 if (!copyFutures .isEmpty ()) {
124171 LOG .warn (
@@ -181,8 +228,42 @@ private void waitForCopies(List<CompletableFuture<CompletedCopy>> futures) throw
181228 Thread .currentThread ().interrupt ();
182229 throw new IOException ("Bulk copy interrupted" , e );
183230 } catch (ExecutionException e ) {
184- throw new IOException ("Bulk copy failed" , e .getCause ());
231+ Throwable cause = e .getCause ();
232+ if (isConnectionPoolExhausted (cause )) {
233+ throw new IOException (
234+ String .format (
235+ "S3 connection pool exhausted during bulk copy. "
236+ + "The configured connection pool size (%d) could not serve "
237+ + "the concurrent download requests (%d). "
238+ + "Consider reducing '%s' or increasing '%s'." ,
239+ maxConnections ,
240+ maxConcurrentCopies ,
241+ NativeS3FileSystemFactory .BULK_COPY_MAX_CONCURRENT .key (),
242+ NativeS3FileSystemFactory .MAX_CONNECTIONS .key ()),
243+ cause );
244+ }
245+ throw new IOException ("Bulk copy failed" , cause );
246+ }
247+ }
248+
249+ /**
250+ * Checks whether a failure was caused by HTTP connection pool exhaustion.
251+ *
252+ * <p>Walks the causal chain looking for the SDK's characteristic message about connection
253+ * acquire timeouts. This detection is deliberately broad (substring match on the message) to
254+ * remain resilient to minor SDK wording changes across versions.
255+ */
256+ @ VisibleForTesting
257+ static boolean isConnectionPoolExhausted (Throwable throwable ) {
258+ Throwable current = throwable ;
259+ while (current != null ) {
260+ String message = current .getMessage ();
261+ if (message != null && message .contains ("Acquire operation took longer than" )) {
262+ return true ;
263+ }
264+ current = current .getCause ();
185265 }
266+ return false ;
186267 }
187268
188269 /**
0 commit comments