Lesson 04: Flink's common DataSet and datastream APIs

In this lesson, we mainly introduce Flink's DataSet and DataStream's API, simulate the scene of real-time calculation, and explain the use of the commonly used API of DataStream in detail.

It's said that flow and batch are integrated

Present situation

As mentioned in the previous course, one of the most important features of Flink is "flow batch integration". In fact, Flink does not fully achieve the so-called "flow batch integration", that is, writing a set of code that can support both flow computing scenarios and batch computing scenarios. Up to now, version 1.10 still uses two API s, DataSet and DataStream, to adapt to different application scenarios.

Difference and connection between DateSet and DataStream

On the official website or other websites, you can find that Flink currently supports two sets of API s and some application scenarios, but most of them lack the thinking of "why".

The design philosophy of Apache Flink at its inception was to support multiple forms of computing with the same engine, including batch processing, stream processing and machine learning. Especially in the field of streaming computing, Flink realizes the integration of streaming and batching at the level of computing engine. So for ordinary developers, if you use the native Flink, you need to write two sets of code directly.

The overall structure is as follows:

In Flink's source code, we can find all the core classes about DataSet in the Flink Java module, and the core implementation class of DataStream is in the Flink streaming Java module.

In the above two figures, we open the two classes of DataSet and DataStream respectively. We can see that the API s supported by the two classes are very rich and very similar, such as the common transformation functions such as map, filter, join, etc.

We talked about Flink's programming model in the previous lesson. For DataSet, the Source part comes from file, table or Java collection, while the Source part of DataStream is generally message oriented middleware such as Kafka.

Because Flink DataSet and DataStream API are highly similar, and Flink is more widely used in the field of real-time computing. So let's explain the use of DataStream API in detail.

DataStream

Let's review Flink's programming model first. As mentioned in the previous lesson, the basic building blocks of Flink program are Streams and Transformations. Each data stream starts from one or more sources and ends at one or more Sink. Data flow is similar to directed acyclic graph (DAG).

In lesson 02, we simulated a streaming computing environment. We chose to listen to a local Socket port and use the scroll window in Flink to print the calculation results every 5 seconds.

Custom real time data source

In this lesson, we use the custom Source function provided by Flink to realize a custom real-time data Source. The specific implementation is as follows:

public class MyStreamingSource implements SourceFunction<MyStreamingSource.Item> {

    private boolean isRunning = true;

    /**
     * Rewrite run method to produce a continuous data sending source
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Item> ctx) throws Exception {
        while(isRunning){
            Item item = generateItem();
            ctx.collect(item);

            //One data per second
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }

    //Randomly generate a product data
    private Item generateItem(){
        int i = new Random().nextInt(100);

        Item item = new Item();
        item.setName("name" + i);
        item.setId(i);
        return item;
    }

    class Item{
        private String name;
        private Integer id;

        Item() {
        }

        public String getName() {
            return name;
        }

        void setName(String name) {
            this.name = name;
        }

        private Integer getId() {
            return id;
        }

        void setId(Integer id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "Item{" +
                    "name='" + name + '\'' +
                    ", id=" + id +
                    '}';
        }
    }
}


class StreamingDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Get data source
        DataStreamSource<MyStreamingSource.Item> text = 
        //Note: the parallelism is set to 1, and we will explain it in detail in the following courses
        env.addSource(new MyStreamingSource()).setParallelism(1); 
        DataStream<MyStreamingSource.Item> item = text.map(
                (MapFunction<MyStreamingSource.Item, MyStreamingSource.Item>) value -> value);

        //Print results
        item.print().setParallelism(1);
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }

}

In the custom data source, the SourceFunction interface in Flink is implemented, and the run method is implemented. In the run method, a custom Item is randomly sent every second.

You can directly run the main method to test:

As you can see in the console, there is a steady stream of data starting to be output. Let's use a custom real-time data source to demonstrate the use of the DataStream API.

Map

Map takes an element as input and processes the output according to the logic defined by the developer.

class StreamingDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Get data source
        DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1); 
        //Map
        SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {
            @Override
            public Object map(MyStreamingSource.Item item) throws Exception {
                return item.getName();
            }
        });
        //Print results
        mapItems.print().setParallelism(1);
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }
}

We only take the name field of each Item for printing.

Note that map operator is one of the most commonly used operators. The expression on the official website is to map a DataStream, and MapFunction function will be called every time. During the conversion from the source DataStream to the target DataStream, the singleoutputstream operator is returned. Of course, we can also use lambda expressions in overridden map functions.

SingleOutputStreamOperator<Object> mapItems = items.map(
      item -> item.getName()
);

You can even customize your own map functions. Define your own map function by overriding MapFunction or RichMapFunction.

class StreamingDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Get data source
        DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1);
        SingleOutputStreamOperator<String> mapItems = items.map(new MyMapFunction());
        //Print results
        mapItems.print().setParallelism(1);
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }

    static class MyMapFunction extends RichMapFunction<MyStreamingSource.Item,String> {

        @Override
        public String map(MyStreamingSource.Item item) throws Exception {
            return item.getName();
        }
    }
}

In addition, functions such as open and close are provided in RichMapFunction. Rewriting these methods can also achieve more complex functions, such as acquiring accumulator and counter.

FlatMap

FlatMap takes one element and returns zero to multiple elements. FlatMap is similar to Map, but when the return value is a list, FlatMap will "tile" the list, that is, output it as a single element.

SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
    @Override
    public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {
        String name = item.getName();
        collector.collect(name);
    }
});

The above program will output the names one by one. We can also implement more complex logic in FlatMap, such as filtering out some data we don't need.

Filter

As the name implies, Fliter means to filter out unnecessary data. Each element will be processed by the filter function. If the filter function returns true, it will be retained, otherwise it will be discarded.

For example, we keep only those item s with an even id.

SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
    @Override
    public boolean filter(MyStreamingSource.Item item) throws Exception {

        return item.getId() % 2 == 0;
    }
});

Of course, we can also use lambda expressions in filter:

SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter( 
    item -> item.getId() % 2 == 0
);

KeyBy

Before introducing the KeyBy function, you need to understand a concept: KeyedStream. In the actual business, we often need to group data according to a certain attribute or a single field, and then process different groups differently. For example, when we need to describe a user's portrait, we need to weight it according to different behavior events of the user; for example, when we monitor the trading market of double 11, we need to group goods according to their categories and calculate sales respectively.

When we use the KeyBy function, we will convert DataStream to KeyedStream. In fact, KeyedStream inherits DataStream, and the elements in KeyedStream will be grouped according to the parameters passed in by users.

The WordCount program we explained in lesson 02 used KeyBy:

    // Split, group, calculate and aggregate the received data
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds
                ....

Be careful when using the KeyBy function in a production environment! This function will group the data according to the key specified by the user, then the data of the same group will be distributed to a subtask for processing. When the large amount of data and the key distribution are uneven, data skew and backpressure are very easy to occur, resulting in task failure.

The common solution is to add random prefixes to all the data, which we will explain in depth in the later lessons.

Aggregations

Aggregations is the general term of aggregation functions. Common aggregation functions include but are not limited to sum, max, min, etc. Aggregations also need to specify a key for aggregation. The official website gives several common examples:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

In the above functions, max, min and sum will return the maximum value, minimum value and summary value respectively, while minBy and maxBy will return all the minimum or maximum elements.

Let's take max and maxBy as examples:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Get data source
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));

DataStreamSource<MyStreamingSource.Item> items = env.fromCollection(data);
items.keyBy(0).max(2).printToErr();

//Print results
String jobName = "user defined streaming source";
env.execute(jobName);

When we run the program directly, we will find a strange scene:

As you can see from the above figure, we want to aggregate according to the first element of Tuple3 and get the maximum value according to the third element. As we expected, the result is indeed printed according to the size of the third element, but the result shows such an element (0, 1, 2), which does not exist in our source data.

Our documents on Flink's website can be found as follows:

The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

The document said: the difference between min and minBy is that Min will return the maximum value of the field we set, and minBy will return the corresponding element (the same as max and maxBy).

A lot of information on the Internet also says this: the difference between min and minBy is that Min returns the minimum value, while minBy returns the key of the minimum value, which is not correct strictly.

Min and minBy will return the whole element, but min will take the minimum value according to the field specified by the user, and save the value in the corresponding location, while for other fields, it is not guaranteed that the value is correct. max and maxBy are the same.

In fact, for the Aggregations function, Flink helps us encapsulate the state data, which will not be cleaned up, so we should try to avoid using Aggregations on an infinite flow in the actual production environment. Furthermore, the Aggregation function can only be called once for the same keyedStream.

Reduce

The principle of Reduce function is that it will take effect on the keyedStream of each group, and it will group and aggregate according to the user-defined aggregation logic.

For example:

List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));

DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
//items.keyBy(0).max(2).printToErr();

SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> reduce = items.keyBy(0).reduce(new ReduceFunction<Tuple3<Integer, Integer, Integer>>() {
    @Override
    public Tuple3<Integer,Integer,Integer> reduce(Tuple3<Integer, Integer, Integer> t1, Tuple3<Integer, Integer, Integer> t2) throws Exception {
        Tuple3<Integer,Integer,Integer> newTuple = new Tuple3<>();

        newTuple.setFields(0,0,(Integer)t1.getField(2) + (Integer) t2.getField(2));
        return newTuple;
    }
});

reduce.printToErr().setParallelism(1);

The following elements are grouped according to the first element, the third element is summed separately, and the first and second elements are set to 0:

data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));

Then we will finally get: (0,0,6) and (0,0,38).

summary This lesson introduces the common API operations. In fact, the API of DataStream is far more than these. When we look at the official documents, we need to operate and verify them. The more advanced API will be emphasized in the actual combat class.

Click here to download the source code of this course.

Tags: Big Data Java Programming Lambda Apache

Posted on Tue, 28 Apr 2020 02:01:10 -0700 by yuws