Spark from zero to Spark API In Java8

                          Spark API In  Java8

1. map, flatMap

map is easy to understand. It passes an element of the source JavaRDD into the call method and returns one by one after the algorithm to generate a new JavaRDD.


  • Sample Code
List<Integer> list = Arrays.asList(1, 2, 3);
JavaRDD<Integer> listRDD = sc.parallelize(list);

JavaRDD<Integer> nameRDD = -> {
	return n * n;
nameRDD.foreach(f -> {
	System.out.println("n Square=" + f);
  • Run Results
Square of n = 1
 Square of n = 4
 Square of n = 9

It can be seen that for the map operator, every element of the source JavaRDD is calculated, and since it is passed in turn, it is ordered, and the element order of the new RDD is the same as that of the source RDD.The next flatMap is derived from ordering.



flatMap, like map, is the calling method that successively passes elements in RDD. It has more functions than map to add any multielement after any element that passes in the calling method, which is achieved precisely because its parameters are passed in sequence.

  • Sample Code
   List<String> list = Arrays.asList("Zhang Wuji Zhao Min","Song Qing Shu Zhou Zhiruo");
	        JavaRDD<String> listRDD = sc.parallelize(list);

	        JavaRDD<String> nameRDD = listRDD
	                .flatMap(new FlatMapFunction<String, String>() {
	            public Iterator<String> call(String line) throws Exception {
	                return Arrays.asList(line.split(" ")).iterator();
	        nameRDD.foreach(s-> {
	                System.out.println("Hello "+s);
  • Run Results
Hello Zhang Wuji
 Hello Zhao Min
 Hello Song Qingshu
 Hello Zhou Zhiruo


2. reduce, reduce ByKey


reduce actually means that all elements in an RDD are merged. When the call method is run, two parameters are passed in, and the two parameters are merged in the call method and returned. This return value rounds back to the element in a new RDD and is passed into the call method again, and the merge continues until only one element remains.

  • Sample Code
 List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
        JavaRDD<Integer> listRDD = sc.parallelize(list);

        Integer result = listRDD.reduce((x, y) -> x + y);
  • Run Results


ReducByKey merges only all K's in RDD and V's with the same median K value

  • Sample Code
	 List<Tuple2<String, Integer>> list = Arrays.asList(
	                new Tuple2<String, Integer>("Wudang", 99),
	                new Tuple2<String, Integer>("Shaolin Temple", 97),
	                new Tuple2<String, Integer>("Wudang", 89),
	                new Tuple2<String, Integer>("Shaolin Temple", 77)
	        JavaPairRDD<String, Integer> listRDD = sc.parallelizePairs(list);
	        //When reducing ByKey is run, the same key values are grouped together to perform operations in the call method
	        JavaPairRDD<String, Integer> result = listRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
	            public Integer call(Integer i1, Integer i2) throws Exception {
	                return i1 + i2;
	                System.out.println("Genre: " + tuple._1 + "->" + tuple._2);
  • Run Results
Genre: Shaolin - >174
 School: Wudang->188


3. union, join and groupByKey


When you want to merge two RDDs, you use union and join, where Union simply adds up the two RDDs and you can see the addAll method of List.As in Lists, when using Union and join, you must ensure that the generics of both RDDs are consistent.

  • Sample Code
 final List<Integer> list1 = Arrays.asList(1, 2, 3, 4);
        final List<Integer> list2 = Arrays.asList(3, 4, 5, 6);
        final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        final JavaRDD<Integer> rdd2 = sc.parallelize(list2);

        rdd1.union(rdd2).foreach(num -> System.out.println(num));
  • Run Results
10:08:48.974 [Executor task launch worker for task 0] INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
10:08:48.977 [dispatcher-event-loop-2] INFO  org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4976 bytes)
10:08:48.977 [Executor task launch worker for task 1] INFO  org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)


Union simply adds up the two RDDs, but join is different. Join is similar to the combin operation in hadoop, except that there is less sorting. Before joining we talked about groupByKey, because join can be interpreted as union's combination with groupByKey: groupBy groups elements in RDD and group name is in the call methodThe return value of PairRDD, groupByKey as its name implies, is a grouping of elements that have the same key value in PairRDD.That is:

  • Sample Code
 List<Tuple2<String,String>> list = Arrays.asList(
                new Tuple2("Wudang", "Zhang Sanfen"),
                new Tuple2("Mount Emei", "Extinction division too"),
                new Tuple2("Wudang", "Song Qingshu"),
                new Tuple2("Mount Emei", "Zhou Zhiruo")
        JavaPairRDD<String, String> listRDD = sc.parallelizePairs(list);

        JavaPairRDD<String, Iterable<String>> groupByKeyRDD = listRDD.groupByKey();
        groupByKeyRDD.foreach(tuple -> {
            String menpai = tuple._1;
            Iterator<String> iterator = tuple._2.iterator();
            String people = "";
            while (iterator.hasNext()){
                people = people +" ";
            System.out.println("Genre:"+menpai + "personnel:"+people);
  • Run Results
Genre: Emei personnel: exterminator Tai Zhou Zhiruo 
Genre: Wudang personnel: Zhang Sanfeng Song Qingshu



join is a combination of two PairRDD s and a group of elements with the same key, which can be interpreted as a combination of groupByKey and Union

  • Sample Code
 final List<Tuple2<Integer, String>> names = Arrays.asList(
                new Tuple2<Integer, String>(1, "Invincible Eastern"),
                new Tuple2<Integer, String>(2, "Linghu Chong"),
                new Tuple2<Integer, String>(3, "Linpingzhi")
        final List<Tuple2<Integer, Integer>> scores = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 99),
                new Tuple2<Integer, Integer>(2, 98),
                new Tuple2<Integer, Integer>(3, 97)

        final JavaPairRDD<Integer, String> nemesrdd = sc.parallelizePairs(names);
        final JavaPairRDD<Integer, Integer> scoresrdd = sc.parallelizePairs(scores);

        final JavaPairRDD<Integer, Tuple2<String, Integer>> joinRDD = nemesrdd.join(scoresrdd);
        joinRDD.foreach(tuple -> System.out.println("School Number:"+tuple._1+" Full name:"+tuple._2._1+" achievement:"+tuple._2._2));
  • Run Results
School Number: 1 Name: Eastern unbeaten result: 99
 School Number: 3 Name: Lin Ping's Achievement: 97
 School Number: 2 Name: Linhuchong Score: 98

5. filter, distinct


  • Sample Code
 List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
	        JavaRDD<Integer> listRDD = sc.parallelize(list);
	        JavaRDD<Integer> filterRDD = listRDD.filter(num -> num % 2 ==0);
	        filterRDD.foreach(num -> System.out.println("num=="+num ));
  • Run Results



  • Sample Code
 List<Integer> list = Arrays.asList(1, 1, 2, 2, 3, 3, 4, 5);
        JavaRDD<Integer> listRDD  = (JavaRDD<Integer>) sc.parallelize(list);
        listRDD.distinct().foreach(num -> System.out.println(num));
  • Run Results

Tags: Spark Apache Hadoop less

Posted on Mon, 02 Sep 2019 20:01:48 -0700 by dubhcat