Big data case - MapReduce traffic statistics case - partition

Code download address: https://github.com/tazhigang/big-data-github.git

I. demand: output statistical results to different files (zones) according to different provinces where mobile phones belong

II. Data preparation

  1. Data preparation: phoneData.txt in case 2
  2. According to the first three digits of the phone number

III. create maven project

  1. Project structure
  2. Since this case is based on case 2, it means to show the newly added FlowPartiton.java and the modified FlowDriveV2. If you want to view all the code structure of the modified case, please go to download.
  3. Code display
  • FlowPartition.java
package com.ittzg.hadoop.flowv2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @email: tazhigang095@163.com
 * @author: ittzg
 * @date: 2019/7/6 14:19
 */
public class FlowPartition extends Partitioner<Text,FlowBean> {
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        String phoneNum = text.toString().substring(0,3);
        int partitons = 3;
        if("135".equals(phoneNum)){
            partitons = 0;
        }else if("136".equals(phoneNum)){
            partitons = 1;
        }else if("137".equals(phoneNum)){
            partitons = 2;
        }
        return partitons;
    }
}
  • FlowDriveV2.java
package com.ittzg.hadoop.flowv2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @email: tazhigang095@163.com
 * @author: ittzg
 * @date: 2019/7/1 22:49
 * @describe:
 */
public class FlowDriverV2 {
    public static class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // Get a row of data
            String line = value.toString();
            // Interception field
            String[] split = line.split("\t");
            long upFlow = Long.parseLong(split[split.length-3]);
            long downFlow = Long.parseLong(split[split.length-2]);
            // Construct FlowBean
            FlowBean flowBean = new FlowBean(upFlow, downFlow);
            context.write(new Text(split[1]),flowBean);
        }
    }

    public static class FlowReduce extends Reducer<Text,FlowBean,Text,FlowBean>{
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            long upFlow = 0;
            long downFlow =  0;
            for (FlowBean flowBean : values) {
                upFlow += flowBean.getUpFlow();
                downFlow +=flowBean.getDownFlow();
            }

            FlowBean flowBean = new FlowBean(upFlow, downFlow);
            context.write(new Text(key),flowBean);
        }
    }

    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
        // Set I / O path
        String input = "hdfs://hadoop-ip-101:9000/user/hadoop/flow/input";
        String output = "hdfs://hadoop-ip-101:9000/user/hadoop/flow/v2output";
        Configuration conf = new Configuration();
        conf.set("mapreduce.app-submission.cross-platform","true");
        Job job = Job.getInstance(conf);
        // 
        job.setJar("/big-data-github/hadoop-parent/hadoop-flowcount-v2/target/hadoop-flowcount-v2-1.0-SNAPSHOT.jar");

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop-ip-101:9000"),conf,"hadoop");
        Path outPath = new Path(output);
        if(fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        // Set partition
        job.setPartitionerClass(FlowPartition.class);
        // Set the number of reduceTask to add
        job.setNumReduceTasks(4);

        FileInputFormat.addInputPath(job,new Path(input));
        FileOutputFormat.setOutputPath(job,outPath);

        boolean bool = job.waitForCompletion(true);
        System.exit(bool?0:1);
    }

}

IV. final results

  1. Web browsing
  2. Downloaded files
  3. Document content
  • part-r-0000 and part-r-0002
  • part-r-0001 and part-r-0003

Tags: Big Data Hadoop Apache Java github

Posted on Fri, 01 Nov 2019 16:50:43 -0700 by biz0r