Implementation, cluster submission and operation of wordcount in mapReduce

This example implements the statistics of the number of words in all files in a directory in hdfs.

Three java classes are used:

WordcountMapper is responsible for the reputation mapTask

WordcountReducer is responsible for the reputation ReduceTask

WordcountDriver is responsible for submitting tasks to yarn.

 

Related code

Mapper

package com.roadom;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * This class is used as a mapTask. The four generic meanings used in class names are as follows:
 * 
 * KEYIN: 	By default, it is the starting offset of a line of text read by the mr frame, Long,
 * 		But in hadoop, it has its own more compact serialization interface, so instead of using Long directly, it uses LongWritable
 * VALUEIN:	By default, it is the content of a line of Text read by mr frame, String, as above, with Text
 * KEYOUT: 	It is the key in the output data after the user-defined logic processing is completed. Here it is the word, String, the same as above, and Text
 * VALUEOUT: It is the value in the output data after the user-defined logic processing is completed. Here is the number of words, Integer, ditto, and IntWritable
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	/**
	 * map The business logic of the phase is written in the custom map() method. maptask will call our custom map() method once for each row of input data
	 */
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// Convert the text content passed to us by maptask to String first
		String line = value.toString();
		// Cut the line into words according to the space
		String[] words = line.split(" ");

		// Output word as < word, 1 >
		for (String word : words) {
			// Use the word as the key and the number of times 1 as the value for subsequent data distribution. You can distribute according to the word so that the same word can get the same reduce task
			context.write(new Text(word), new IntWritable(1));
		}
	}

}

 

Reducer

package com.roadom;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Like Mapper, inherited colleagues are known for four generics.
 * KEYIN, VALUEIN Corresponding to the keyout of the mapper output, the valueout type corresponds to
 * KEYOUT, VALUEOUT Is the output data type of the result of the custom reduce logic processing. Here, keyOut represents a single word, and valueOut corresponds to the total number of times
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	/**
	 * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
	 * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
	 * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
	 * Enter the key, which is a group of keys with the same word kv pair
	 */
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int count=0;
		for(IntWritable value : values){
			count += value.get();
		}
		
		context.write(key, new IntWritable(count));		//Output the number of occurrences of each word
	}
	/*
	 * It is inherited from the Reducer and executed after all the reducers are executed.
	 */
	@Override
	protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
        // NOTHING
    }
}

 

Driver. Responsible for submitting this mapReduce task to yarn.

package cncom.roadom;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * The client equivalent of a yarn cluster
 * We need to encapsulate the relevant running parameters of our mr program here, and specify the jar package
 * Final submission to yarn
 */
public class WordcountDriver {
	
	public static void main(String[] args) throws Exception {
		if (args == null || args.length == 0) {
			return;
		}
		
		//This object will read the hadoop configuration in the environment by default. Of course, it can also be reconfigured through set
		Configuration conf = new Configuration();
		
		//job is the abstraction of task in yarn.
		Job job = Job.getInstance(conf);
		
		/*job.setJar("/home/hadoop/wc.jar");*/
		//Specify the local path of the jar package of this program
		job.setJarByClass(WordcountDriver.class);
		
		//Specify the mapper/Reducer business class to be used by this business job
		job.setMapperClass(WordcountMapper.class);
		job.setReducerClass(WordcountReducer.class);
		
		//Specifies the kv type of Mapper output data. Need to be consistent with the generic type in Mapper
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//Specifies the kv type of the final output data. This is also the key and value type of Reduce.
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//Specify the directory of the original input file of the job
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		//Specify the directory where the output of the job is located
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//Submit the relevant parameters configured in the job and the jar package of the java class used in the job to yarn for running
		/*job.submit();*/
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
	}
}

 

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.6.5</version>
    </dependency>
</dependencies>

<build>
    <finalName>testWordcount</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <!-- Appoint mainclass,This configuration will eventually be reflected in jar package manifest In file -->
                        <mainClass>com.roadom.WordcountDriver</mainClass>
                    </manifest>
                </archive>
                <classesDirectory>
                </classesDirectory>
            </configuration>
        </plugin>
    </plugins>
</build>

 

Pack submit

Execute maven's package, and the jar package will be generated in the project target directory.

Upload it to any machine in the cluster.

 

Execution steps

1. Start hdfs, set up directory and upload files

#Start hadoop
start-dfs.sh
#...
#Create a multi-level directory structure
hadoop fs -mkdir -p /wordcount/input
#Upload the local file a.txt to the directory. You can also upload multiple files
hadoop fs -put a.txt /wordcount/input

 

2. Start yarn

start-yarn.sh
#...

 

3. Run jar package

#Hadoop jar package full path full name of main class [list of parameters required for main class]
#Because there is a main class in the jar package, you do not need to add the main class name after the jar package parameter
hadoop jar testWordcount.jar /wordcount/input /wordcount/output

There is no essential difference between this operation mode and the traditional Java cp form. It is not necessary to specify all the dependent jar s of hadoop after - cp during the running process.

However, if there are other third-party jar packages besides hadoop in the program, they need to be listed after - cp.

The full path of the jar package where the Java CP main class is located and the full path of all dependent jar packages [list of parameters required by the main class] 

Or, choose to pack all the dependent third-party jar packages into the main jar package. Then run runnable jar directly using java jar

 java -jar wc.jar /wordCount/input /wordCount/output 

 

1. You need to copy the hadoop custom configuration file of the cluster to the jar package

2. You need to specify the location where wc.jar will exist in the main method.

//job.setJarByClass(WordcountDriver.class); / / this method will no longer find classes.
job.setJar("/home/hadoop/wc.jar");       

 

Operation log

18/04/24 01:24:00 INFO client.RMProxy: Connecting to ResourceManager at centos00/192.168.2.100:8032
18/04/24 01:24:01 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/24 01:24:01 INFO input.FileInputFormat: Total input paths to process : 3
18/04/24 01:24:02 INFO mapreduce.JobSubmitter: number of splits:3
18/04/24 01:24:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1524496863055_0002
18/04/24 01:24:02 INFO impl.YarnClientImpl: Submitted application application_1524496863055_0002
18/04/24 01:24:02 INFO mapreduce.Job: The url to track the job: http://centos00:8088/proxy/application_1524496863055_0002/
18/04/24 01:24:02 INFO mapreduce.Job: Running job: job_1524496863055_0002
18/04/24 01:24:15 INFO mapreduce.Job: Job job_1524496863055_0002 running in uber mode : false
18/04/24 01:24:15 INFO mapreduce.Job:  map 0% reduce 0%
18/04/24 01:24:41 INFO mapreduce.Job:  map 100% reduce 0%
18/04/24 01:24:52 INFO mapreduce.Job:  map 100% reduce 100%
18/04/24 01:24:53 INFO mapreduce.Job: Job job_1524496863055_0002 completed successfully
18/04/24 01:24:53 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=201730
		FILE: Number of bytes written=832609
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=101534
		HDFS: Number of bytes written=30057
		HDFS: Number of read operations=12
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters
		Launched map tasks=3
		Launched reduce tasks=1
		Data-local map tasks=3
		Total time spent by all maps in occupied slots (ms)=68494
		Total time spent by all reduces in occupied slots (ms)=8225
		Total time spent by all map tasks (ms)=68494
		Total time spent by all reduce tasks (ms)=8225
		Total vcore-milliseconds taken by all map tasks=68494
		Total vcore-milliseconds taken by all reduce tasks=8225
		Total megabyte-milliseconds taken by all map tasks=70137856
		Total megabyte-milliseconds taken by all reduce tasks=8422400
	Map-Reduce Framework
		Map input records=2030
		Map output records=16756
		Map output bytes=168212
		Map output materialized bytes=201742
		Input split bytes=337
		Combine input records=0
		Combine output records=0
		Reduce input groups=2401
		Reduce shuffle bytes=201742
		Reduce input records=16756
		Reduce output records=2401
		Spilled Records=33512
		Shuffled Maps =3
		Failed Shuffles=0
		Merged Map outputs=3
		GC time elapsed (ms)=1219
		CPU time spent (ms)=3760
		Physical memory (bytes) snapshot=498151424
		Virtual memory (bytes) snapshot=8303079424
		Total committed heap usage (bytes)=375455744
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=101197
	File Output Format Counters
		Bytes Written=30057

 

4. View run results

hadoopfs-cat/wordcount/output/part-r-00000

Tags: Hadoop Apache Java Maven

Posted on Sun, 29 Mar 2020 09:02:12 -0700 by tech603