Flink's Data Types for busy people

1, Data Type composition in Flink

  • Basic data type: basic data types in java 8 plus their respective packing types, plus void, string, date, BigDecimal, BigInteger

  • Data of basic data type and array of Object type

  • Composite type
    1.Flink Java Tuples
    2. scala case classes
    3. Row
    4. POJOs: if you want to be recognized by Flink and can also be referenced by name, you need to comply with certain rules (otherwise, it will be treated as generic)
    1) . this class is pulic and has no non static inner class.
    2) . you have to have a pulic constructor with no parameters
    3) . all non static non transient properties (including all parent classes) must be pulic or getter setter methods that conform to the java beans naming specification.

  • Auxiliary types (set class, Option, Either, etc.)

  • Generics: not serialized by Flink's own serializer, but by Kryo

2, How does Flink handle Data Type
First, Flink will serialize according to its own serializer. If not, it will default back to Kryo serializer for serialization.
The possible problems are as follows:

  • Registering subtypes
    If the signature of a method is a parent class and the return or use of a child class is a so-called covariant return type On covariant return types . Let Flink know that all subclasses can improve performance to a certain extent.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • Registering custom serializers
    Even if Flink can't serialize by itself, it will give Kryo, but Kryo can't handle all types well, so it's time to customize the serializer.
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
  • Adding Type Hints
    Flink may not be able to infer generic types, which are only necessary in Java Api.
DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(new TypeHint<SomeType.class});
  • Manually creating a TypeInformation
    When Flink may not be able to infer generic types
TypeInformation<String> info = TypeInformation.of(String.class);

TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});

3, Common use of returns

.returns(new TypeHint<Tuple2<String, String>>(){})
.returns(TypeInformation.of(new TypeHint<Tuple2<ConsumerRecord, String>>() {}))
109 original articles published, 51 praised, 140000 visitors+
Private letter follow

Tags: Java Scala

Posted on Mon, 16 Mar 2020 07:32:17 -0700 by Misticx