Flink processing function Practice II: KeyedProcessFunction class

This is the second and last article in the Flink series Flink: ProcessFunction class I learned the simplest ProcessFunction class, KeyedProcessFunction I want to know today, and some features brought by this class;

About KeyedProcessFunction

By comparing the class diagram, it can be determined that KeyedProcessFunction and ProcessFunction are not directly related:

KeyedProcessFunction is used to process the data set of KeyedStream. Compared with ProcessFunction class, KeyedProcessFunction has more features. The official document is shown in the red box below. State processing and timer functions are unique to KeyedProcessFunction:

After the introduction, let's learn through examples;

Version information

  1. Development environment operating system: MacBook Pro 13 ", macOS Catalina 10.15.3
  2. Development tool: idea ultimate March 2018
  3. JDK: 1.8.0_211
  4. Maven: 3.6.0
  5. Flink: 1.9.2

Source download

If you don't want to write code, the source code of the whole series can be downloaded from GitHub, and the address and link information are shown in the following table( https://github.com/zq2599/blog_demos):

name link remarks
Project Home https://github.com/zq2599/blog_demos The project's home page on GitHub
git warehouse address (https) https://github.com/zq2599/blog_demos.git The warehouse address of the source code of the project, https protocol
git warehouse address (ssh) git@github.com:zq2599/blog_demos.git The warehouse address of the source code of the project, ssh protocol

There are multiple folders in this git project. The application of this chapter is under the flinkstudy folder, as shown in the red box below:

Introduction to actual combat

The objective of this actual combat is to learn KeyedProcessFunction, as follows:

  1. Monitor the 9999 port of the machine to obtain the string;
  2. Separate each string with a space and turn it into Tuple2 instance. f0 is the separated word, f1 equals to 1;
  3. The above Tuple2 instance uses f0 field partition to get KeyedStream;
  4. KeyedSteam is transferred to user-defined KeyedProcessFunction for processing;
  5. The function of customizing KeyedProcessFunction is to record the latest occurrence time of each word, and then build a timer of 10 seconds. If the word does not reappear after 10 seconds, send the word and its total occurrence times to the downstream operator;


  1. Continue to use Flink: ProcessFunction class The project flinkstudy created in this paper;
  2. Create the bean class CountWithTimestamp, which has three fields. For convenience, set it to public:
package com.bolingcavalry.keyedprocessfunction;

public class CountWithTimestamp {
    public String key;

    public long count;

    public long lastModified;
  1. Create the implementation class Splitter of FlatMapFunction to split the string to generate multiple Tuple2 instances. f0 is the separated word, f1 equals to 1:
package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
            System.out.println("invalid line");

        for(String word : s.split(" ")) {
            collector.collect(new Tuple2<String, Integer>(word, 1));
  1. Finally, the main body of the whole logic function: ProcessTime.java , which includes the user-defined KeyedProcessFunction subclass and the main method of the program entry. After the code is listed below, the key parts will be introduced:
package com.bolingcavalry.keyedprocessfunction;

import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;

 * @author will
 * @email zq2599@gmail.com
 * @date 2020-05-17 13:43
 * @description Experience KeyedProcessFunction class (time type is processing time)
public class ProcessTime {

     * KeyedProcessFunction The function of the subclass is to record the latest occurrence time of each word to backend and create a timer,
     * When the timer is triggered, check whether the word has reached 10 seconds since it last appeared. If so, send it to the downstream operator
    static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {

        // Custom status
        private ValueState<CountWithTimestamp> state;

        public void open(Configuration parameters) throws Exception {
            // Initialization state, name is myState
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));

        public void processElement(
                Tuple2<String, Integer> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // Get the current word
            Tuple currentKey = ctx.getCurrentKey();

            // Get the myState state of the current word from backend
            CountWithTimestamp current = state.value();

            // If myState has never been assigned, initialize here
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;

            // Number of words plus one

            // Take the timestamp of the current element as the last occurrence time of the word
            current.lastModified = ctx.timestamp();

            // Resave to backend, including the number of occurrences of the word and the last occurrence

            // Create a timer for the current word, which will be triggered in ten seconds
            long timer = current.lastModified + 10000;


            // Print all information to verify data correctness
            System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",


         * How to execute after timer triggering
         * @param timestamp This time stamp represents the trigger time of the timer
         * @param ctx
         * @param out
         * @throws Exception
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // Get current word
            Tuple currentKey = ctx.getCurrentKey();

            // Get the myState state of the word
            CountWithTimestamp result = state.value();

            // Flag whether the current element has not appeared for 10 seconds
            boolean isTimeout = false;

            // timestamp is the trigger time of timer. If it is equal to the last update time + 10 seconds, it means that the word has been received in these 10 seconds,
            // This element, which does not appear in ten seconds, is sent to the downstream operator
            if (timestamp == result.lastModified + 10000) {
                // send out
                out.collect(new Tuple2<String, Long>(result.key, result.count));

                isTimeout = true;

            // Print data to verify compliance with expectations
            System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Parallelism 1

        // processing time 

        // Listen to the local 9999 port and read the string
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);

        // If all the entered words do not reappear in more than 10 seconds, you can get them through CountWithTimeoutFunction
        DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
                // Split the received string with spaces to get multiple words
                .flatMap(new Splitter())
                // Set timestamp allocator with current time as timestamp
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {

                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        // Use current system time as timestamp
                        return System.currentTimeMillis();

                    public Watermark getCurrentWatermark() {
                        // In this case, watermark is not required, and null is returned
                        return null;
                // Partition words as key
                // The data partitioned by words is handed to the user-defined KeyedProcessFunction for processing
                .process(new CountWithTimeoutFunction());

        // All the entered words, if they do not reappear in more than 10 seconds, will be printed here

        env.execute("ProcessFunction demo : KeyedProcessFunction");

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));

There are several points to focus on in the above code:

  1. When the timestamps are set through assignTimestampsAndWatermarks, getCurrentWatermark returns null because watermark cannot be used;
  2. In the processElement method, state.value() can get the status of the current word, state.update(current) you can set the status of the current word. For details of this function, please refer to In depth understanding the status operation of ProcessFunction (Flink-1.10)
  3. The registerProcessingTimeTimer method sets the trigger time of the timer. Note that the timer here is based on processTime, which is different from the eventTime in the official demo;
  4. After the timer is triggered, the onTimer method is executed, which contains all the information of the timer, especially the input parameter timestamp, which is the trigger time of the timer originally set;


  1. Execute the command NC-L 9999 on the console, so that you can send strings from the console to the 9999 port of the machine;
  2. Directly execute the main method of ProcessTime class on the IDEA, and the program will start to listen to the 9999 port of the machine when it runs;
  3. Input aaa in the front console, then enter, wait for ten seconds, and IEDA's console outputs the following information, which can be seen from the results that meet the expectations:

  4. Continue to input aaa and enter again, two times in a row, with an interval of no more than 10 seconds. The result is shown in the figure below. It can be seen that each Tuple2 element has a timer, but before the second input of aaa, the latest appearance time of aaa is updated by the third input operation, so the comparison operation in the timer of the second input of aaa finds that it is the closest to aaa at this time The last occurrence (i.e. the third occurrence) has not reached 10 seconds, so the second element will not be emitted to the downstream operator:

  5. All timeout messages received by downstream operators will be printed out, as shown in the red box below. Only records equal to 1 and 3 will be printed. When the number is equal to 2, aaa will be input again within 10 seconds, so there is no timeout reception and will not be printed downstream:

    At this point, the learning of KeyedProcessFunction processing function is completed, its state reading and writing and timer operation are very practical capabilities, I hope this article can provide you with reference;

Welcome to my official account: programmer Xin Chen.

Tags: Apache github git Java

Posted on Sun, 24 May 2020 00:57:29 -0700 by cayman_d