Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -74,9 +76,8 @@ private static Row flattenAnalyticsRow(Row row) {

static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String, Long>> {
@ProcessElement
public void processElement(ProcessContext c) {
Row row = c.element();
c.output(
public void processElement(@Element Row row, OutputReceiver<KV<String, Long>> receiver) {
receiver.output(
KV.of(
Preconditions.checkStateNotNull(row.getString("browser")),
Preconditions.checkStateNotNull(row.getInt64("transactions"))));
Expand All @@ -85,13 +86,13 @@ public void processElement(ProcessContext c) {

static class FormatCountsFn extends DoFn<KV<String, Long>, Row> {
@ProcessElement
public void processElement(ProcessContext c) {
public void processElement(@Element KV<String, Long> element, OutputReceiver<Row> receiver) {
Row row =
Row.withSchema(AGGREGATED_SCHEMA)
.withFieldValue("browser", c.element().getKey())
.withFieldValue("transaction_count", c.element().getValue())
.withFieldValue("browser", element.getKey())
.withFieldValue("transaction_count", element.getValue())
.build();
c.output(row);
receiver.output(row);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// description: Demonstration of Schema transform usage.
// multifile: false
// default_example: false
// context_line: 60
// context_line: 65
// categories:
// - Schemas
// - Combiners
Expand All @@ -42,6 +42,8 @@
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -101,9 +103,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// description: Demonstration of SQL transform usage.
// multifile: false
// default_example: false
// context_line: 60
// context_line: 62
// categories:
// - Beam SQL
// - Combiners
Expand All @@ -41,6 +41,8 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -95,9 +97,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.beam.sdk.transforms.ApproximateQuantiles;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
Expand All @@ -34,7 +36,7 @@
// description: Demonstration of ApproximateQuantiles transform usage.
// multifile: false
// default_example: false
// context_line: 46
// context_line: 48
// categories:
// - Core Transforms
// complexity: BASIC
Expand Down Expand Up @@ -70,9 +72,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// description: Demonstration of Composed Combine transform usage.
// multifile: false
// default_example: false
// context_line: 143
// context_line: 145
// categories:
// - Schemas
// - Combiners
Expand All @@ -46,6 +46,8 @@
import org.apache.beam.sdk.transforms.CombineFns;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -185,13 +187,16 @@ public Long apply(Long input) {
new DoFn<
KV<Long, CombineFns.CoCombineResult>, KV<Long, Iterable<KV<String, Long>>>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
CombineFns.CoCombineResult e = c.element().getValue();
public void processElement(
@Element KV<Long, CombineFns.CoCombineResult> element,
OutputReceiver<KV<Long, Iterable<KV<String, Long>>>> receiver)
throws Exception {
CombineFns.CoCombineResult e = element.getValue();
ArrayList<KV<String, Long>> o = new ArrayList<KV<String, Long>>();
o.add(KV.of(minTag.getId(), e.get(minTag)));
o.add(KV.of(maxTag.getId(), e.get(maxTag)));
o.add(KV.of(sumTag.getId(), e.get(sumTag)));
c.output(KV.of(c.element().getKey(), o));
receiver.output(KV.of(element.getKey(), o));
}
}));

Expand All @@ -210,9 +215,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
Expand All @@ -37,7 +39,7 @@
// description: Demonstration of CoGroupByKey transform usage.
// multifile: false
// default_example: false
// context_line: 54
// context_line: 56
// categories:
// - Core Transforms
// - Joins
Expand Down Expand Up @@ -84,9 +86,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -34,7 +36,7 @@
// description: Demonstration of Combine transform usage.
// multifile: false
// default_example: false
// context_line: 47
// context_line: 49
// categories:
// - Core Transforms
// - Combiners
Expand Down Expand Up @@ -68,9 +70,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
Expand All @@ -33,7 +35,7 @@
// description: Demonstration of Count transform usage.
// multifile: false
// default_example: false
// context_line: 45
// context_line: 47
// categories:
// - Core Transforms
// complexity: BASIC
Expand Down Expand Up @@ -63,9 +65,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -34,7 +36,7 @@
// description: Demonstration of Count.perKey transform usage.
// multifile: false
// default_example: false
// context_line: 47
// context_line: 49
// categories:
// - Core Transforms
// complexity: BASIC
Expand Down Expand Up @@ -67,9 +69,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -38,7 +40,7 @@
// description: Demonstration of Create transform usage.
// multifile: false
// default_example: false
// context_line: 51
// context_line: 53
// categories:
// - Core Transforms
// complexity: BASIC
Expand Down Expand Up @@ -79,9 +81,9 @@ public LogOutput(String prefix) {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("{}{}", prefix, c.element());
c.output(c.element());
public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
LOG.info("{}{}", prefix, element);
receiver.output(element);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
// filter("Flourish|stomach").
// multifile: false
// pipeline_options: --output output.txt
// context_line: 180
// context_line: 182
// categories:
// - Debugging
// - Filtering
Expand All @@ -46,6 +46,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -115,18 +117,19 @@ public FilterTextFn(String pattern) {
private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");

@ProcessElement
public void processElement(ProcessContext c) {
if (filter.matcher(c.element().getKey()).matches()) {
public void processElement(
@Element KV<String, Long> element, OutputReceiver<KV<String, Long>> receiver) {
if (filter.matcher(element.getKey()).matches()) {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
// these log lines will appear only if the log level is set to "DEBUG" or lower.
LOG.debug("Matched: {}", c.element().getKey());
LOG.debug("Matched: {}", element.getKey());
matchedWords.inc();
c.output(c.element());
receiver.output(element);
} else {
// Log at the "TRACE" level each element that is not matched. Different log levels
// can be used to control the verbosity of logging providing an effective mechanism
// to filter less important information.
LOG.trace("Did not match: {}", c.element().getKey());
LOG.trace("Did not match: {}", element.getKey());
unmatchedWords.inc();
}
}
Expand Down
Loading
Loading