Analyze American average temperature project and master MapReduce programming

Dataset import HDFS

Command line access to the dataset just uploaded to HDFS

[hadoop@master hadoop-2.6.0]$ bin/hdfs dfs -ls /weather/

  

MapReduce program compilation and operation:

Step 1: in the Map stage, extract the weather station and temperature data

public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
		/**
		 * @function Mapper Analysis of meteorological station data
		 * @input key=Offset value = weather station data
		 * @output key=weatherStationId value=temperature
		 */
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			
			String line = value.toString(); //Read each row of data
			int temperature = Integer.parseInt(line.substring(14, 19).trim());//air temperature

			if (temperature != -9999) { //Filter invalid data	
				FileSplit fileSplit = (FileSplit) context.getInputSplit();
				String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//Extract weather station id by file name
				context.write(new Text(weatherStationId), new IntWritable(temperature));
			}
		}
	}

Step 2: in the Reduce stage, count the average temperature of each weather station

/**
	 * 
	 * @function Reducer Count the average temperature of every state in the United States
	 * @input key=weatherStationId  value=temperature
	 * @output key=weatherStationId value=average(temperature)
	 */
	public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			int count = 0;
			for (IntWritable val : values) {
				sum += val.get();
				count++;
			}
			result.set(sum / count);
			context.write(key, result);
		}
	}

Step 3: unit test and debug the code.

Mapper unit test

Mapper's logic is to extract the temperature value from the read weather station data. For example, read a line of "1985 07 31 02 200 94 10137 220 26 1 0 9999" meteorological data, and extract the characters between the 14th and 19th bits as the temperature value 200.

/**
 * Mapper Unit test of end
 */
@SuppressWarnings("all")
public class TemperatureMapperTest {
	private Mapper mapper;//Define a Mapper object
	private MapDriver driver;//Define a MapDriver object

	@Before
	public void init() {
		mapper = new Temperature.TemperatureMapper();//Instantiate a TemperatureMapper object in Temperature
		driver = new MapDriver(mapper);//Instantiate MapDriver object
	}

	@Test
	public void test() throws IOException {
		//Enter a row of test data
		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
		driver.withInput(new LongWritable(), new Text(line))//Consistent with TemperatureMapper input type
				.withOutput(new Text("weatherStationId"), new IntWritable(200))//Consistent with TemperatureMapper output type
				.runTest();
	}
}

Reduce unit test

The logic of the Reduce function is to add the value values of the same key and take the average value. The Reducer unit test

/**
 * Reducer unit testing
 */
@SuppressWarnings("all")
public class TemperatureReduceTest {
	private Reducer reducer;//Define a Reducer object	
	private ReduceDriver driver;//Define a ReduceDriver object

	@Before
	public void init() {
		reducer = new Temperature.TemperatureReducer();//Instantiate a TemperatureReducer object in Temperature
		driver = new ReduceDriver(reducer);//Instantiate ReduceDriver object
	}

	@Test
	public void test() throws IOException {
		String key = "weatherStationId";//Declare a key value
		List values = new ArrayList();
		values.add(new IntWritable(200));//Add first value
		values.add(new IntWritable(100));//Add second value
		driver.withInput(new Text("weatherStationId"), values)//Consistent with TemperatureReducer input type
			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//Consistent with TemperatureReducer output type
			  .runTest();
	}
}

Mapper and Reducer integration test

/**
 * Mapper Integrate with Reducer to test
 */
@SuppressWarnings("all")
public class TemperatureTest {
	private Mapper mapper;//Define a Mapper object
	private Reducer reducer;//Define a Reducer object	
	private MapReduceDriver driver;//Define a mapreduceddriver object

	@Before
	public void init() {
		mapper = new Temperature.TemperatureMapper();//Instantiate a TemperatureMapper object in Temperature
		reducer = new Temperature.TemperatureReducer();//Instantiate a TemperatureReducer object in Temperature
		driver = new MapReduceDriver(mapper, reducer);//Instantiate MapReduceDriver object
	}

	@Test
	public void test() throws RuntimeException, IOException {
		//Enter two lines of test data
		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
		String line2 = "1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999";
		driver.withInput(new LongWritable(), new Text(line))//Consistent with TemperatureMapper input type
			  .withInput(new LongWritable(), new Text(line2))
			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//Consistent with TemperatureReducer output type
			  .runTest();
	}
}

Step 4: compile and package the project as temperture.jar, and use the client to upload temperture.jar to the / home/hadoop/Temp directory of hadoop.

Step 5: use cd /home/hadoop/Temp to switch to the current directory, and execute the task through hadoop jar Temperature.jar com.hadoop.base.Temperature /weather/ /weather/out / command line.

Step 6: output the final result of the task to HDFS, and use the hadoop fs -cat /weather/out/part-r-00000 command to view the result.

Tags: PHP Hadoop

Posted on Mon, 04 Nov 2019 07:53:28 -0800 by cyberdesi