BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Introduction to Apache Beam Using Java

Introduction to Apache Beam Using Java

Lire ce contenu en français

Bookmarks

Key Takeaways

  • Apache Beam is a powerful batch and streaming processing open source project
  • Its portability allows running pipelines on different backends from Apache Spark to Google Cloud Dataflow
  • Beam is extensible, meaning you can write and share new SDKs, IO Connectors and transformers
  • Beam currently supports Python, Java, and Go
  • By using its Java SDK you can take advantage of all the benefits of the JVM

In this article, we are going to introduce Apache Beam, a powerful batch and streaming processing open source project, used by big companies like eBay to integrate its streaming pipelines and by Mozilla to move data safely between its systems.

Overview

Apache Beam is a programming model for processing data, supporting batch and streaming.

Using the provided SDKs for Java, Python and Go, you can develop pipelines and then choose a backend that will run the pipeline.

Advantages of Apache Beam

Beam Model (Frances Perry & Tyler Akidau)

  • Built-in I/O Connectors
    • Apache Beam connectors allow extracting and loading data easily from several types of storage
    • The main connector types are:
      • File-based (ex.: Apache Parquet, Apache Thrift)
      • File System (ex.: Hadoop, Google Cloud Storage, Amazon S3)
      • Messaging (ex.: Apache Kafka, Google Pub/Sub, Amazon SQS)
      • Database (ex.: Apache Cassandra, Elastic Search, MongoDb)
    • As an OSS project, the support for new connectors is growing (ex.: InfluxDB, Neo4J)
  • Portability:
    • Beam provides several runners to run the pipelines, letting you choose the best for each use case and avoid vendor lock-in.
    • Distributed processing backends like Apache Flink, Apache Spark or Google Cloud Dataflow can be used as runners.
  • Distributed Parallel processing:
    • Each item on the dataset is handled independently by default so its processing can be optimized by running in parallel.
    • Developers don't need to manually distribute the load between workers as Beam provides an abstraction for it.

The Beam Model

The key concepts in the Beam programming model are:

  • PCollection: represents a collection of data, ie.: an array of numbers or words extracted from a text.
  • PTransform: a transforming function that receives and returns a PCollection, i.e.: sum all numbers.
  • Pipeline: manages the interactions between PTransforms and PCollections.
  • PipelineRunner: specifies where and how the pipeline should execute.

Quick Start

A basic pipeline operation consists of 3 steps: reading, processing and writing the transformation result. Each one of those steps is defined programmatically using one of the Apache Beam SDKs. 

In this section, we will create pipelines using the Java SDK. You can choose between creating a local application (using Gradle or Maven) or you can use the Online Playground. The examples will use the local runner as it will be easier to verify the result using JUnit Assertions.

Java Local Dependencies

  • beam-sdks-java-core: contains all Beam Model classes.
  • beam-runners-direct-java: by default Apache Beam SDK will use the direct runner, which means the pipeline will run on your local machine.

Multiply by 2

In this first example, the pipeline will receive an array of numbers and will map each element multiplied by 2.

The first step is creating the pipeline instance that will receive the input array and run the transform function.  As we're using JUnit to run Apache Beam, we can easily create a TestPipeline as a test class attribute. If you prefer running on your main application instead, you'll need to set the pipeline configuration options

@Rule
public final transient TestPipeline pipeline = TestPipeline.create();

Now we can create the PCollection that will be used as input to the pipeline. It'll be an array instantiated directly from memory but it could be read from anywhere supported by Apache Beam:

PCollection<Integer> numbers =
            	  pipeline.apply(Create.of(1, 2, 3, 4, 5));

Then we apply our transform function that will multiply each dataset element by two:

PCollection<Integer> output = numbers.apply(
            	  MapElements.into(TypeDescriptors.integers())
                    	.via((Integer number) -> number * 2)
    	);

To verify the results we can write an assertion:

PAssert.that(output)
            	  .containsInAnyOrder(2, 4, 6, 8, 10);

Note the results are not supposed to be sorted as the input, because Apache Beam processes each item independently and in parallel.

The test at this point is done, and we run the pipeline by calling:

pipeline.run();

Reduce operation

The reduce operation is the combination of multiple input elements that results in a smaller collection, usually containing a single element.

MapReduce (Frances Perry & Tyler Akidau)

Now let's extend the example above to sum up all the items multiplied by two, resulting in a MapReduce transform.

Each PCollection transform results in a new PCollection instance, which means we can chain transformations using the apply method. In this case, the Sum operation will be used after multiplying each input by 2:

PCollection<Integer> numbers =
            pipeline.apply(Create.of(1, 2, 3, 4, 5));

PCollection<Integer> output = numbers
            .apply(
                   MapElements.into(TypeDescriptors.integers())
                    	 .via((Integer number) -> number * 2))
            .apply(Sum.integersGlobally());

PAssert.that(output)
             .containsInAnyOrder(30);

pipeline.run();

FlatMap operation

FlatMap is an operation that first applies a map on each input element that usually returns a new collection, resulting in a collection of collections. A flat operation is then applied to merge all the nested collections, resulting in a single one.

The next example will be transforming arrays of strings into a unique array containing each word. 

First, we declare our list of words that will be used as the pipeline input:

final String[] WORDS_ARRAY = new String[] {
        	"hi bob", "hello alice", "hi sue"};

final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

Then we create the input PCollection using the list above:

PCollection<String> input = pipeline.apply(Create.of(WORDS));

Now we apply the flatmap transformation, which will split the words in each nested array and merge the results in a single list:

PCollection<String> output = input.apply(
      FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split(" ")))
);

PAssert.that(output)
      .containsInAnyOrder("hi", "bob", "hello", "alice", "hi", "sue");

pipeline.run();

Group operation

A common job in data processing is aggregating or counting by a specific key. We'll demonstrate it by counting the number of occurrences of each word from the previous example.

After having the flat array of string, we can chain another PTransform:

PCollection<KV<String, Long>> output = input
            .apply(
                  FlatMapElements.into(TypeDescriptors.strings())
                    	.via((String line) -> Arrays.asList(line.split(" ")))
            )
            .apply(Count.<String>perElement());

Resulting in:

PAssert.that(output)
.containsInAnyOrder(
       KV.of("hi", 2L),
       KV.of("hello", 1L),
       KV.of("alice", 1L),
       KV.of("sue", 1L),
       KV.of("bob", 1L));

Reading from a file

One of the principles of Apache Beam is reading data from anywhere, so let's see in practice how to use a text file as a datasource.

The following example will read the content of a "words.txt"  with the content "An advanced unified programming model". Then the transform function will return a PCollection containing each word from the text.

PCollection<String> input =
      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

PCollection<String> output = input.apply(
      FlatMapElements.into(TypeDescriptors.strings())
      .via((String line) -> Arrays.asList(line.split(" ")))
);

PAssert.that(output)
      .containsInAnyOrder("An", "advanced", "unified", "programming", "model");

pipeline.run();

Writing output to a file

As seen in the previous example for the input, Apache Beam has multiple built-in output connectors. In the following example, we will count the number of each word present in the text file "words.txt" that contains only a single sentence ("An advanced unified programming model") and the output will be persisted in a text file format.

PCollection<String> input =
      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

PCollection<KV<String, Long>> output = input
      .apply(
            FlatMapElements.into(TypeDescriptors.strings())
            	   .via((String line) -> Arrays.asList(line.split(" ")))
            )
            .apply(Count.<String>perElement());;

       PAssert.that(output)
             .containsInAnyOrder(
                 KV.of("An", 1L),
                 KV.of("advanced", 1L),
                 KV.of("unified", 1L),
                 KV.of("programming", 1L),
                 KV.of("model", 1L)
            );

    	output
             .apply(
                   MapElements.into(TypeDescriptors.strings())
                  	     .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue()))
             .apply(TextIO.write().to("./src/main/resources/wordscount"));

    	pipeline.run();

Even the file writing is optimized for parallelism by default, which means Beam will determine the best number of shards (files) to persist the result. The files will be located on folder src/main/resources and will have the prefix "wordcount", the shard number and the total number of shards  as defined in the last output transformation.

When running it on my laptop, four shards were generated:

First shard (file name: wordscount-00001-of-00003):

An 1
advanced 1

Second shard (file name: wordscount-00002-of-00003):

unified 1
model 1

Third shard (file name: wordscount-00003-of-00003):

programming 1

The last shard was created but in the end was empty, because all words were already processed.

Extending Apache Beam

We can take advantage of Beam extensibility by writing a custom transform function. A custom transformer will improve code maintainability as will remove duplication.

Basically we'd need to create a subclass of PTransform, stating the type of the input and output as Java Generics. Then we override the expand method and inside its content we place the duplicated logic, that receives a single string and returns a PCollection containing each word.

public class WordsFileParser extends PTransform<PCollection<String>, PCollection<String>> {

	   @Override
	   public PCollection<String> expand(PCollection<String> input) {
       return input
            	  .apply(FlatMapElements.into(TypeDescriptors.strings())
                  .via((String line) -> Arrays.asList(line.split(" ")))
            	  );
	   }   
}

The test scenario refactored to use WordsFileParser now become:

public class FileIOTest {

	  @Rule
	  public final transient TestPipeline pipeline = TestPipeline.create();

	  @Test
	  public void testReadInputFromFile() {
    	    PCollection<String> input =
            	            pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

    	    PCollection<String> output = input.apply(
            	        new WordsFileParser()
    	    );

    	    PAssert.that(output)
            	        .containsInAnyOrder("An", "advanced", "unified", "programming", "model");

    	    pipeline.run();
	  }

	  @Test
	  public void testWriteOutputToFile() {
    	    PCollection<String> input =
             pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

    	    PCollection<KV<String, Long>> output = input
            	        .apply(new WordsFileParser())
            	        .apply(Count.<String>perElement());

    	    PAssert.that(output)
            	        .containsInAnyOrder(
                    	      KV.of("An", 1L),
                    	      KV.of("advanced", 1L),
                    	      KV.of("unified", 1L),
                    	      KV.of("programming", 1L),
                    	      KV.of("model", 1L)
            	        );

    	     output
            	        .apply(
                    	      MapElements.into(TypeDescriptors.strings())
                    	      .via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue()))
            	        .apply(TextIO.write().to ("./src/main/resources/wordscount"));

       pipeline.run();
	}
}

The result is a clearer and more modular pipeline.

Windowing

Windowing in Apache Beam (Frances Perry & Tyler Akidau)

A common problem in streaming processing is grouping the incoming data by a certain time interval, specially when handling large amounts of data. In this case, the analysis of the aggregated data per hour or per day is more relevant than analyzing each element of the dataset.

In the following example, let's suppose we're working in a fintech and we are receiving transactions events containing the amount and the instant the transaction happened and we want to retrieve the total amount transacted per day.

Beam provides a way to decorate each PCollection element with a timestamp. We can use this to create a PCollection representing 5 money transactions:

  • Amounts 10 and 20 were transferred on 2022-02-01
  • Amounts 30, 40 and 50  were transferred on 2022-02-05
PCollection<Integer> transactions =
      pipeline.apply(
            Create.timestamped(
                  TimestampedValue.of(10, Instant.parse("2022-02-01T00:00:00+00:00")),
                  TimestampedValue.of(20, Instant.parse("2022-02-01T00:00:00+00:00")),
                  TimestampedValue.of(30, Instant.parse("2022-02-05T00:00:00+00:00")),
                  TimestampedValue.of(40, Instant.parse("2022-02-05T00:00:00+00:00")),
                  TimestampedValue.of(50, Instant.parse("2022-02-05T00:00:00+00:00"))
               )
       );

Next, we'll apply two transform functions:

  1. Group the transactions using a one day window 
  2. Sum the amounts in each group
PCollection<Integer> output =
      Transactions
             .apply(Window.into(FixedWindows.of(Duration.standardDays(1))))
             .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());

In the first window (2022-02-01) it's expected the total amount of 30 (10+20), while in the second window (2022-02-05) we should see 120 (30+40+50)  in the total amount.

PAssert.that(output)
                   .inWindow(new IntervalWindow(
                    	 Instant.parse("2022-02-01T00:00:00+00:00"),
                    	 Instant.parse("2022-02-02T00:00:00+00:00")))
            	   .containsInAnyOrder(30);

PAssert.that(output)
            	   .inWindow(new IntervalWindow(
                    	  Instant.parse("2022-02-05T00:00:00+00:00"),
                    	  Instant.parse("2022-02-06T00:00:00+00:00")))
            	   .containsInAnyOrder(120);

Each IntervalWindow instance needs to match the exact beginning and end timestamps of the chosen duration, so the chosen time has to be "00:00:00".

Summary

Apache Beam is a powerful battle-tested data framework, allowing both batching and streaming processing. We have used the Java SDK to build map, reduce, group, windowing and other operations.

Apache Beam can be well suited for developers who works with embarrassingly parallel tasks to simplify the mechanics of large-scale data processing.

Its connectors, SDKs and support for various runners bring flexibility and by choosing a cloud native runner like Google Cloud Dataflow, you get automated management of computational resources.

About the Author

Rate this Article

Adoption
Style

BT