diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ecc231ab825e..299e7fa21ed1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1244,8 +1244,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { - List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); - if (!experiments.contains("use_runner_v2")) { + if (!useUnifiedWorker(options)) { + List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade."); @@ -1256,7 +1256,9 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (useUnifiedWorker(options)) { if (hasExperiment(options, "disable_runner_v2") || hasExperiment(options, "disable_runner_v2_until_2023") - || hasExperiment(options, "disable_prime_runner_v2")) { + || hasExperiment(options, "disable_prime_runner_v2") + || hasExperiment(options, "disable_portable_runner") + || hasExperiment(options, "enable_streaming_java_runner")) { throw new IllegalArgumentException( "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); } @@ -2729,7 +2731,8 @@ static boolean useUnifiedWorker(DataflowPipelineOptions options) { return hasExperiment(options, "beam_fn_api") || hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker") - || hasExperiment(options, "use_portable_job_submission"); + || hasExperiment(options, "use_portable_job_submission") + || hasExperiment(options, "enable_portable_runner"); } static void verifyDoFnSupported( diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 8c33123be6d5..ab3b62a0aa1b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1783,7 +1783,11 @@ public void testSdkHarnessConfigurationPrime() throws IOException { public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception { for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + "beam_fn_api", + "use_runner_v2", + "use_unified_worker", + "use_portable_job_submission", + "enable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); ExperimentalOptions.addExperiment(options, experiment); Pipeline p = Pipeline.create(options); @@ -1798,7 +1802,11 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + "beam_fn_api", + "use_runner_v2", + "use_unified_worker", + "use_portable_job_submission", + "enable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); options.setStreaming(true); ExperimentalOptions.addExperiment(options, experiment); @@ -1822,10 +1830,18 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception public void testSettingConflictingEnableAndDisableExperimentsThrowsException() throws Exception { for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + "beam_fn_api", + "use_runner_v2", + "use_unified_worker", + "use_portable_job_submission", + "enable_portable_runner")) { for (String disabledExperiment : ImmutableList.of( - "disable_runner_v2", "disable_runner_v2_until_2023", "disable_prime_runner_v2")) { + "disable_runner_v2", + "disable_runner_v2_until_2023", + "disable_prime_runner_v2", + "enable_streaming_java_runner", + "disable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); ExperimentalOptions.addExperiment(options, experiment); ExperimentalOptions.addExperiment(options, disabledExperiment); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java index efbb03519789..bba10c84fab6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java @@ -625,7 +625,7 @@ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception { LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool); // Ideally we would never create more than 100, however with contention it is still possible // some extra threads will be created. - assertTrue(largestPool <= 110); + assertTrue(largestPool <= 120); executorService.shutdown(); } }