Apache Flink Zero Foundation Introduction Flink Table API&SQL

What is Flink relational API?

Although Flink already supports DataSet and DataStream API s, is there a better way to program without caring about specific API implementations? There is no need to understand the specific implementations of Java and Scala.

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.

Flink provides three layers of API s, each of which provides a trade-off between simplicity and expressiveness.

The lowest level is a stateful event-driven. It is very troublesome to develop in this layer.

Although many functions can be accomplished based on DataSet and DataStream APIs, it is difficult to familiarize yourself with both APIs and Java and Scala. If a framework can't be processed with SQL in the process of using, then the framework has great limitations. It doesn't matter to developers, but it doesn't show to users. So SQL is a very popular language.

Like MapReduce using Hive SQL, Spark using Spark SQL, Flink using Flink SQL.

Although Flink supports batch/stream processing, how can we achieve API-level unification?

So Table and SQL came into being.

This is actually a relational API, which is as easy to operate as Mysql.

Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. 

Apache Flink uses Table API and SQL to unify batch and stream processing. The Table API is a query API that integrates Scala and Java languages and allows operations such as select filter join.

Using the Table SQL API requires additional dependencies





Programming with Table SQL API

First import the dependencies above, and then read the sales.csv file, which reads as follows:



object TableSQLAPI {

  def main(args: Array[String]): Unit = {
    val bEnv = ExecutionEnvironment.getExecutionEnvironment
    val bTableEnv = BatchTableEnvironment.create(bEnv)
    val filePath="E:/test/sales.csv"
    // You've got DataSet
    val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)
    // DataSet => Table

  case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double

First you get the DataSet, then you turn the DataSet into Table, and then you can execute SQL.

    // DataSet => Table
    val salesTable = bTableEnv.fromDataSet(csv)
    // Register as Table Table => table
    bTableEnv.registerTable("sales", salesTable)
    // sql
    val resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")

The output results are as follows:


In this way, you only need to use SQL to realize the function of writing mapreduce before. It greatly facilitates the development process.


package com.vincent.course06;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class JavaTableSQLAPI {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
        DataSource<Sales> salesDataSource = bEnv.readCsvFile("E:/test/sales.csv").ignoreFirstLine().
                pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid");
        Table sales = bTableEnv.fromDataSet(salesDataSource);
        bTableEnv.registerTable("sales", sales);
        Table resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");
        DataSet<Row> rowDataSet = bTableEnv.toDataSet(resultTable, Row.class);

    public static class Sales {
        public String transactionId;
        public String customerId;
        public String itemId;
        public Double amountPaid;

        public String toString() {
            return "Sales{" +
                    "transactionId='" + transactionId + '\'' +
                    ", customerId='" + customerId + '\'' +
                    ", itemId='" + itemId + '\'' +
                    ", amountPaid=" + amountPaid +

Tags: Programming Apache SQL Java Scala

Posted on Wed, 11 Sep 2019 02:54:21 -0700 by navtheace