Hadoop Series - Distributed Computing Framework MapReduce

1. Overview of MapReduce

Hadoop MapReduce is a distributed computing framework for writing batch applications.Written programs can be submitted to the Hadoop cluster for parallel processing of large datasets.

The MapReduce job splits the input dataset into separate blocks, which are processed by the map in parallel, and the framework sorts the output of the map, which is then input into the reduce.The MapReduce framework is designed for < key, value> key-value pairing, which treats the input to a job as a set of < key, value> pairs and generates a set of < key, value> pairs as output.Both key and value for output and output must be implemented Writable Interface.

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

2. Brief description of MapReduce programming model

As an example of word frequency statistics, MapReduce processes the following:

  1. input: Read the text file;

  2. Spitting: splitting a file into rows, resulting in the number of K1 lines, where V1 represents the text content of the corresponding line;

  3. Map: A List(K2,V2) in which K2 represents each word and V2 has a value of 1 because of frequency statistics.
  4. Shuffling: Since Mapping operations may be processed in parallel on different machines, data with the same key value needs to be distributed to the same node through shuffling to merge so that the final result can be counted. K2 is each word, List(V2) is an iterative set, and V2 is MappingV2 in;
  5. Reducing: The case here is to count the total number of occurrences of a word, so Reducing sums List(V2) and outputs it.

The splitting and shuffing operations in the MapReduce programming model are implemented by the framework, and only mapping and reduction are needed for our own programming, which is the source of the term MapReduce.

3. Combiner & partitioner

3.1 InputFormat & RecordReaders

InputFormat splits the output file into multiple InputSplit, and RecordReaders converts the InputSplit to a standard <key, value>key-value pair for map output.The significance of this step is that only after logical splitting and converting to a standard key-value pair format can multiple maps be input for parallel processing.

3.2 Combiner

combiner is an optional operation after map operation. It is actually a localized reduce operation, which does a simple operation to merge duplicate key values after map calculates the intermediate file.Here is an example of word frequency statistics:

Map records 1 when it encounters a hadoop word, but in this article hadoop may occur n times, so map output files are redundant, so do a merge of the same key s before reduce calculation, and the amount of data to be transferred will be reduced, and the transfer efficiency will be achieved.Promote.

However, combiner is not suitable for all scenarios, and its principle is that the output of combiner will not affect the final input of reduce calculation, for example, combiner can be used for total, maximum and minimum values, but combiner cannot be used for average calculation.

Without combiner:

<div align="center"> <img width="600px" src="https://raw.githubusercontent.com/heibaiying/BigData-Notes/master/pictures/mapreduce-without-combiners.png"/&gt; </div>
When using combiner:

<div align="center"> <img width="600px" src="https://raw.githubusercontent.com/heibaiying/BigData-Notes/master/pictures/mapreduce-with-combiners.png"/&gt; </div>

You can see that when using combiner, the data that needs to be transferred to reducer is reduced from 12 keys to 10 keys.The magnitude of the decrease depends on the repetition rate of your keys. The following case of word frequency statistics demonstrates that combiner can reduce transmission hundreds of times.

3.3 Partitioner

partitioner understands component classifiers and assigns the output of a map to the corresponding reducer according to the key value, supporting custom implementations, as demonstrated in the following example.

4. MapReduce Word Frequency Statistics Cases

4.1 Project Introduction

Here's a classic case of word frequency statistics: counting the number of occurrences of each word in the following sample data.

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive

To facilitate your development, I placed a tool class WordCountDataUtils in the source code of the project to simulate samples that generate word frequency statistics. The resulting files can be output to local or written directly to HDFS.

Project code download address: hadoop-word-count

4.2 Project Dependency

To program MapReduce, you need to import a hadoop-client dependency:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>

4.3 WordCountMapper

Split each row of data according to the specified delimiter.It is important to note that Hadoop-defined types must be used in MapReduce, because Hadoop-predefined types are serializable, comparable, and all implement the WritableComparable interface.

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, 
                                                                      InterruptedException {
        String[] words = value.toString().split("\t");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }

}

WordCountMapper corresponds to the Mapping operation of the following image:

WordCountMapper inherits from the Mappe class, which is generic and is defined as follows:

WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
   ......
}
  • KEYIN: Map the type of input key, which is the offset of each line (the position of the first character in the whole text), Long type, which corresponds to LongWritable type in Hadoop;
  • VALUEIN: Map the type of input value, that is, each row of data; String type, corresponding to Text type in Hadoop;
  • KEYOUT: The type of key that is output by mapping, that is, each word; String type, which corresponds to Text type in Hadoop;
  • VALUEOUT:mapping outputs the type of value, the number of occurrences of each word; here, the int type is used, corresponding to the IntWritable type.

4.4 WordCountReducer

Statistics of word occurrences in Reduce:

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 
                                                                                  InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

As shown below, the output of shuffling is the input to reduce.Here key is each word, and values is an iterative data type, similar to (1,1,1,...).

4.4 WordCountApp

Assemble the MapReduce job and submit it to the server to run with the following code:

/**
 * Assemble jobs and submit them to the cluster to run
 */
public class WordCountApp {

    // Hard-coded parameters are used here to visualize parameters, which can be transferred externally during actual development
    private static final String HDFS_URL = "hdfs://192.168.0.107:8020";
    private static final String HADOOP_USER_NAME = "root";

    public static void main(String[] args) throws Exception {

        //  File input and output paths are specified by external parameters
        if (args.length < 2) {
            System.out.println("Input and output paths are necessary!");
            return;
        }

        // You need to specify the hadoop user name, otherwise you may throw an exception with insufficient privileges when creating directories on HDFS
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

        Configuration configuration = new Configuration();
        // Indicate the address of the HDFS
        configuration.set("fs.defaultFS", HDFS_URL);

        // Create a Job
        Job job = Job.getInstance(configuration);

        // Set Running Main Class
        job.setJarByClass(WordCountApp.class);

        // Setting up Mapper and Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // Set the type of Mapper output key and value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // Set the type of Reducer output key and value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // If the output directory already exists, it must be deleted or an exception will be thrown when the program is run repeatedly
        FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
        Path outputPath = new Path(args[1]);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // Set the path of job input and output files
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);

        // Submit the job to the cluster and wait for it to complete, and set the parameter true to show the corresponding progress for printing
        boolean result = job.waitForCompletion(true);

        // Close fileSystem Created Before
        fileSystem.close();

        // Terminate the currently running Java virtual machine and exit the program based on the job results
        System.exit(result ? 0 : -1);

    }
}

Note that if the output type of the Mapper operation is not set, the program defaults to the same type as the output of the Reducer operation.

4.5 Submit to Server for Run

In practical development, you can configure the Hadoop development environment locally and start testing directly in the IDE.This is mainly about packaging submissions to the server.Since this project does not use third-party dependencies other than Hadoop, it can be packaged directly:

# mvn clean package

Submit the job using the following command:

hadoop jar /usr/appjar/hadoop-word-count-1.0.jar \
com.heibaiying.WordCountApp \
/wordcount/input.txt /wordcount/output/WordCountApp

View the generated directory on HDFS when the job is finished:

# View Directory
hadoop fs -ls /wordcount/output/WordCountApp

# View statistics
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000

5. Combiner of Advanced Cases in Word Frequency Statistics

5.1 Code implementation

To use the combiner function, simply add the following line of code when assembling the job:

// Set Combiner
job.setCombinerClass(WordCountReducer.class);

5.2 Execution Results

The statistical results will not change when the combiner is added, but the effect of the combiner can be seen from the printed log:

No print logs added to combiner:

The print log after joining combiner is as follows:

Here we have only one input file and it is less than 128M, so there is only one Map to process.You can see that after the combiner, records are reduced from 3519 to 6 (there are only six word types in the sample), in which case the combiner can dramatically reduce the amount of data that needs to be transferred.

6. Partitioner of Advanced Cases in Word Frequency Statistics

6.1 Default Artitioner

This assumes the need to output statistics for different words to different files.This requirement is actually quite common, for example, when you are counting sales of a product, you need to break up the results by product type.To do this, you need to use a custom Partitioner.

Here's the MapReduce default classification rule: When building a job, if you don't specify it, the default is HashPartitioner: hash key values and balance numReduceTasks.Its implementation is as follows:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

6.2 Custom Partitioner

Here we inherit the Partitioner custom classification rules, which are categorized by word:

public class CustomPartitioner extends Partitioner<Text, IntWritable> {

    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
    }
}

When building a job, specify that we use our own classification rules and set the number of reduce s:

// Set up custom partition rules
job.setPartitionerClass(CustomPartitioner.class);
// Set reduce number
job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size());

6.3 Execution Results

The results are as follows, generating six files, each with statistical results for the corresponding words:

Reference material

  1. Distributed Computing Framework MapReduce
  2. Apache Hadoop 2.9.2 > MapReduce Tutorial
  3. MapReduce - Combiners

For more large data series articles, see the GitHub Open Source Project: Starter's Guide to Big Data

Tags: Big Data Hadoop HBase hive Programming

Posted on Fri, 13 Sep 2019 09:21:27 -0700 by tharagleb