Skip to content

Commit b6d4fdc

Browse files
authored
Merge pull request #35177: Introduce WindowedValue receivers and consolidate runner code to them
Introduce WindowedValue receivers and consolidate runner code to them
2 parents 74512d9 + b84cf00 commit b6d4fdc

66 files changed

Lines changed: 409 additions & 711 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/trigger_files/beam_PostCommit_Java_DataflowV1.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
23
"comment": "Modify this file in a trivial way to cause this test suite to run",
34
"modification": 1,
45
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"

.github/trigger_files/beam_PostCommit_Java_DataflowV2.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
23
"comment": "Modify this file in a trivial way to cause this test suite to run",
34
"modification": 3,
45
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Direct.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
23
"comment": "Modify this file in a trivial way to cause this test suite to run",
34
"https://github.com/apache/beam/pull/31761": "noting that PR #31761 should run this test",
45
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
23
"comment": "Modify this file in a trivial way to cause this test suite to run",
34
"https://github.com/apache/beam/pull/32440": "testing datastream optimizations",
45
"runFor": "#33606",

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
23
"comment": "Modify this file in a trivial way to cause this test suite to run",
34
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
45
"https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute",

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Spark.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
23
"comment": "Modify this file in a trivial way to cause this test suite to run",
34
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
45
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
2+
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
23
"comment": "Modify this file in a trivial way to cause this test suite to run",
34
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
45
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
3030
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
3131
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
32+
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
3233
import org.apache.beam.sdk.values.KV;
3334
import org.apache.beam.sdk.values.PCollectionView;
3435
import org.apache.beam.sdk.values.TupleTag;
@@ -41,12 +42,6 @@
4142
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
4243
})
4344
public class DoFnRunners {
44-
/** Information about how to create output receivers and output to them. */
45-
public interface OutputManager {
46-
/** Outputs a single element to the receiver indicated by the given {@link TupleTag}. */
47-
<T> void output(TupleTag<T> tag, WindowedValue<T> output);
48-
}
49-
5045
/**
5146
* Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
5247
*
@@ -58,7 +53,7 @@ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
5853
PipelineOptions options,
5954
DoFn<InputT, OutputT> fn,
6055
SideInputReader sideInputReader,
61-
OutputManager outputManager,
56+
WindowedValueMultiReceiver outputManager,
6257
TupleTag<OutputT> mainOutputTag,
6358
List<TupleTag<?>> additionalOutputTags,
6459
StepContext stepContext,
@@ -168,7 +163,7 @@ ProcessFnRunner<InputT, OutputT, RestrictionT> newProcessFnRunner(
168163
PipelineOptions options,
169164
Collection<PCollectionView<?>> views,
170165
ReadyCheckingSideInputReader sideInputReader,
171-
OutputManager outputManager,
166+
WindowedValueMultiReceiver outputManager,
172167
TupleTag<OutputT> mainOutputTag,
173168
List<TupleTag<?>> additionalOutputTags,
174169
StepContext stepContext,

runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717
*/
1818
package org.apache.beam.runners.core;
1919

20-
import java.util.Collection;
2120
import org.apache.beam.model.pipeline.v1.RunnerApi;
2221
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
2322
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
2423
import org.apache.beam.sdk.transforms.DoFn;
2524
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
26-
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
2725
import org.apache.beam.sdk.util.SystemDoFnInternal;
26+
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
2827
import org.apache.beam.sdk.util.construction.TriggerTranslation;
2928
import org.apache.beam.sdk.values.KV;
3029
import org.apache.beam.sdk.values.TupleTag;
31-
import org.apache.beam.sdk.values.WindowedValues;
3230
import org.apache.beam.sdk.values.WindowingStrategy;
33-
import org.joda.time.Instant;
3431

3532
/**
3633
* A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the {@link
@@ -51,7 +48,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
5148
TimerInternalsFactory<K> timerInternalsFactory,
5249
SideInputReader sideInputReader,
5350
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
54-
DoFnRunners.OutputManager outputManager,
51+
WindowedValueMultiReceiver outputManager,
5552
TupleTag<KV<K, OutputT>> mainTag) {
5653
return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
5754
strategy,
@@ -68,7 +65,7 @@ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
6865
private transient StateInternalsFactory<K> stateInternalsFactory;
6966
private transient TimerInternalsFactory<K> timerInternalsFactory;
7067
private transient SideInputReader sideInputReader;
71-
private transient DoFnRunners.OutputManager outputManager;
68+
private transient WindowedValueMultiReceiver outputManager;
7269
private TupleTag<KV<K, OutputT>> mainTag;
7370

7471
public GroupAlsoByWindowViaWindowSetNewDoFn(
@@ -77,7 +74,7 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
7774
TimerInternalsFactory<K> timerInternalsFactory,
7875
SideInputReader sideInputReader,
7976
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
80-
DoFnRunners.OutputManager outputManager,
77+
WindowedValueMultiReceiver outputManager,
8178
TupleTag<KV<K, OutputT>> mainTag) {
8279
this.timerInternalsFactory = timerInternalsFactory;
8380
this.sideInputReader = sideInputReader;
@@ -91,29 +88,6 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
9188
this.triggerProto = TriggerTranslation.toProto(windowingStrategy.getTrigger());
9289
}
9390

94-
private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
95-
return new OutputWindowedValue<KV<K, OutputT>>() {
96-
@Override
97-
public void outputWindowedValue(
98-
KV<K, OutputT> output,
99-
Instant timestamp,
100-
Collection<? extends BoundedWindow> windows,
101-
PaneInfo pane) {
102-
outputManager.output(mainTag, WindowedValues.of(output, timestamp, windows, pane));
103-
}
104-
105-
@Override
106-
public <AdditionalOutputT> void outputWindowedValue(
107-
TupleTag<AdditionalOutputT> tag,
108-
AdditionalOutputT output,
109-
Instant timestamp,
110-
Collection<? extends BoundedWindow> windows,
111-
PaneInfo pane) {
112-
outputManager.output(tag, WindowedValues.of(output, timestamp, windows, pane));
113-
}
114-
};
115-
}
116-
11791
@ProcessElement
11892
public void processElement(ProcessContext c) throws Exception {
11993
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
@@ -130,7 +104,7 @@ public void processElement(ProcessContext c) throws Exception {
130104
TriggerStateMachines.stateMachineForTrigger(triggerProto)),
131105
stateInternals,
132106
timerInternals,
133-
outputWindowedValue(),
107+
windowedValue -> outputManager.output(mainTag, windowedValue),
134108
sideInputReader,
135109
reduceFn,
136110
c.getPipelineOptions());

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@
4545
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
4646
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4747
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
48+
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
4849
import org.apache.beam.sdk.values.KV;
4950
import org.apache.beam.sdk.values.PCollectionView;
5051
import org.apache.beam.sdk.values.Row;
5152
import org.apache.beam.sdk.values.TupleTag;
5253
import org.apache.beam.sdk.values.WindowedValue;
54+
import org.apache.beam.sdk.values.WindowedValues;
5355
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
5456
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
5557
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -72,19 +74,20 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
7274
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> {
7375
private final DoFn<InputT, OutputT> fn;
7476
private final PipelineOptions pipelineOptions;
75-
private final OutputWindowedValue<OutputT> output;
77+
private final WindowedValueMultiReceiver outputReceiver;
7678
private final SideInputReader sideInputReader;
7779
private final ScheduledExecutorService executor;
7880
private final int maxNumOutputs;
7981
private final Duration maxDuration;
8082
private final Supplier<BundleFinalizer> bundleFinalizer;
83+
private final TupleTag<OutputT> mainOutputTag;
8184

8285
/**
8386
* Creates a new invoker from components.
8487
*
8588
* @param fn The original {@link DoFn}.
8689
* @param pipelineOptions {@link PipelineOptions} to include in the {@link DoFn.ProcessContext}.
87-
* @param output Hook for outputting from the {@link DoFn.ProcessElement} method.
90+
* @param outputReceiver Hook for outputting from the {@link DoFn.ProcessElement} method.
8891
* @param sideInputReader Hook for accessing side inputs.
8992
* @param executor Executor on which a checkpoint will be scheduled after the given duration.
9093
* @param maxNumOutputs Maximum number of outputs, in total over all output tags, after which a
@@ -98,15 +101,17 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
98101
public OutputAndTimeBoundedSplittableProcessElementInvoker(
99102
DoFn<InputT, OutputT> fn,
100103
PipelineOptions pipelineOptions,
101-
OutputWindowedValue<OutputT> output,
104+
WindowedValueMultiReceiver outputReceiver,
105+
TupleTag<OutputT> mainOutputTag,
102106
SideInputReader sideInputReader,
103107
ScheduledExecutorService executor,
104108
int maxNumOutputs,
105109
Duration maxDuration,
106110
Supplier<BundleFinalizer> bundleFinalizer) {
107111
this.fn = fn;
108112
this.pipelineOptions = pipelineOptions;
109-
this.output = output;
113+
this.outputReceiver = outputReceiver;
114+
this.mainOutputTag = mainOutputTag;
110115
this.sideInputReader = sideInputReader;
111116
this.executor = executor;
112117
this.maxNumOutputs = maxNumOutputs;
@@ -403,7 +408,7 @@ public void outputWindowedValue(
403408
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
404409
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
405410
}
406-
output.outputWindowedValue(value, timestamp, windows, paneInfo);
411+
outputReceiver.output(mainOutputTag, WindowedValues.of(value, timestamp, windows, paneInfo));
407412
}
408413

409414
@Override
@@ -413,7 +418,8 @@ public <T> void output(TupleTag<T> tag, T value) {
413418

414419
@Override
415420
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
416-
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
421+
outputReceiver.output(
422+
tag, WindowedValues.of(value, timestamp, element.getWindows(), element.getPane()));
417423
}
418424

419425
@Override
@@ -427,7 +433,7 @@ public <T> void outputWindowedValue(
427433
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
428434
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
429435
}
430-
output.outputWindowedValue(tag, value, timestamp, windows, paneInfo);
436+
outputReceiver.output(tag, WindowedValues.of(value, timestamp, windows, paneInfo));
431437
}
432438

433439
private void noteOutput() {

0 commit comments

Comments
 (0)