Hadoop example: calculating the total amount of stock transactions

The optional Cloud Computing and Big Data Overview final job requires a stock case study with the following specific requirements:
Case: Attachment Document TextData.txtThe file (shown in Fig. 1) shows the trading data of the daily stocks from 2011-1 to today, the trading data of the daily stocks, the trading data of the daily stocks, the trading data of the daily stocks, the trading data of the daily stocks, the trading data of the daily stocks, the trading data of the daily stocks, and the trading data of the daily stocks.Separators, which use spaces as separators between them, spaces as separators between them, spaces as separators between them, spaces as separators between them, spaces as separators between them, spaces as separators between them, spaces as separators between them, spaces as separators between them, and spaces as separators between them, in turn:
Stock code ticket code, transaction date, daily opening price day opening price, daily closing price day closing price, daily maximum price daily maximum price, daily minimum price daily, daily stock turnover (unit: Yuan)
For example, the first line represents a stock with the stock code 000001, which has an opening price of 15.99 yuan and a closing price of 15.93 yuan on the first day of 2011-1-5. The highest price is 16.13 yuan.The lowest price on the day is the lowest price on the day and the lowest price on the day is 15.91 yuan. The transaction amount on the day is, the transaction amount on the day is, and the transaction amount on the day is 379869070.98 yuan.

Related concepts:
Opening price: The initial price at the opening of the stock at 9:309:30am that day
Closing price: Closing price at 15:00 p.m. on the same day
Maximum price, lowest price: price, lowest: daily stock trading time is the day's stock trading time is the day's stock trading time is the day's stock trading time is the day's stock trading time is the day's stock trading time is the day's stock trading time is the day's stock trading time is 4 hours, during this period, during this period, during this period, the stock prices will be in this periodThe price of a high ticket will have a high price, the price of a high ticket will have a high price and a low price (as shown in Figure 2), which will form the highest price of the day, the lowest will form the highest price of the day, the lowest will form the highest price of the day, the lowest will form the highest price of the day, the lowest will form the highest price of the day, the lowest will form the highest price of the day, and the lowest will form the highest price of the day.
, low daily rise: (closing price of the day - opening price of the day) / closing price of the previous day 100% closing price 100%
For example: 2011-1-6 growth: (15.81-15.93)/15.93100%= 93)/15.93100%= 93)/15.93100%= 93)/15.93100%= 93)/15.93100%=93)/15.93*100%= -0.75% (0.75% (down 0.75% 0.75%)

Requirement:
(1) Please follow the above materials, describe the materials, use Java Api J method, and give the attached filesData.txt, which is written to the HDFS filesystem component system/user/input directory as rows, named stock. data, with ","(comma) as the delimiter between the data in each row.Separator.(15)
(2) For Question (1) (1)Stock.dataFile, using Java Api to write MapReduce program, calculates the number of days in which each stock rises by more than 5% a day in each month, and the total monthly transactions. The final output is formatted data as shown in Figure 3.(15)

The steps are as follows:
(Some operations may be slightly different since this experiment was performed on an experimental machine on the large Octopus data platform)
1. Start Hadoop
Cd/data/hadoop/sbin //Switch to this directory
. /start-all.sh//Start
2. Create/data/hadoop4 locally on linux
mkdir -p /data/hadoop4
3. Switch to the / data/hadoop4 directory, download the dependent packages with the wget command, and unzip to the current directory.
cd /data/hadoop4
wget http://192.168.1.100:60000/allfiles/hadoop4/hadoop2lib.tar.gz
tar.zxvf hadoop2lib.tar.gz
4. Create a new JAVA project named Hadoop4 and a new package under the hadooop4 project namedMy.hdfs

5. Under the Hadoop4 project, create a new directory named hadoop4lib to store the dependent packages needed by the project.


6. Copy the jar package from the / data/hadoop4/hadoop2lib directory to the hadoop4lib directory under the project, select all the Jar packages, and right-click BuildPath.


7. InMy.hdfsUnder the package, create a new MakeDir, which creates a directory named / user/input under the root directory of HDFS.The code is as follows:

package my.hdfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class MakeDir {
	public static void main(String[] args) throws IOException, URISyntaxException {
		Configuration conf = new Configuration();

		String hdfsPath = "hdfs://localhost:9000";
		FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf);

		String newDir = "/user/input";

		boolean result = hdfs.mkdirs(new Path(newDir));
		if (result) {
			System.out.println("Success!");
		}else {
			System.out.println("Failed!");
		}
	}
}


After writing, right-click run as ->run on hadoop, and after running, you can enter a command at the terminal command line: Hadoop fs-ls-R /
See

8. New Handle.class Class, process data, separate data by commas as as required by the title.

package my.hdfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class MakeDir {
	public static void main(String[] args) throws IOException, URISyntaxException {
		Configuration conf = new Configuration();

		String hdfsPath = "hdfs://localhost:9000";
		FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf);

		String newDir = "/user/input";

		boolean result = hdfs.mkdirs(new Path(newDir));
		if (result) {
			System.out.println("Success!");
		}else {
			System.out.println("Failed!");
		}
	}
}

9. Create a new CopyLocalFile class that will store local filesData.txtUpload to the / user/input directory of the hdfs file system.

package my.hdfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyLocalFile {
	public static void main(String[] args) throws IOException, URISyntaxException {
		Configuration conf = new Configuration();
		String hdfsPath = "hdfs://localhost:9000";
		FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf);
		//hdfs.rename(new Path("/user/input/stock.txt"),new Path("/user/input/stock.data"));
		String from_Linux = "src/data/stock.txt";
		String to_HDFS = "/user/input";
		hdfs.copyFromLocalFile(new Path(from_Linux), new Path(to_HDFS));
		System.out.println("Finish!");
	}
}


10. As required by the title, theData.txtRename toStock.data.

package my.hdfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyLocalFile {
	public static void main(String[] args) throws IOException, URISyntaxException {
		Configuration conf = new Configuration();
		String hdfsPath = "hdfs://localhost:9000";
		FileSystem hdfs = FileSystem.get(new URI(hdfsPath), conf);
		hdfs.rename(new Path("/user/input/stock.txt"),new Path("/user/input/stock.data"));
		//String from_Linux = "src/data/stock.txt";
		//String to_HDFS = "/user/input";
		//hdfs.copyFromLocalFile(new Path(from_Linux), new Path(to_HDFS));
		//System.out.println("Finish!");
	}
}


(This step code is similar to CopyLocalFile except that the upload statement is commented out and the rename method is added, which can be done in a class by first doing step 9, then modifying a few lines of code directly.Note the difference between steps 9 and 10 and it should be well understood.)
11. Create a new stock class definition interface.

package my.hdfs;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Stock  implements WritableComparable<Stock>{
	private int id;
	private int year;
	private int month;
	private int day;
	private float open;
	private float close;
	private float highest;
	private float lowest;
	private double total;
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		id=in.readInt();
		year=in.readInt();
		month=in.readInt();
		day=in.readInt();
		open=in.readInt();
		close=in.readInt();
		highest=in.readInt();
		lowest=in.readInt();
		total=in.readDouble();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeInt(id);
		out.writeInt(year);
		out.writeInt(month);
		out.writeInt(day);
		out.writeFloat(open);
		out.writeFloat(close);
		out.writeFloat(highest);
		out.writeFloat(lowest);
		out.writeDouble(total);
	}

	@Override
	public int compareTo(Stock o) {
		// TODO Auto-generated method stub
		//IntPair s=(IntPari) o;
		int res3=Integer.compare(id, o.getId());
		if(res3==0){
			int res1=Integer.compare(year, o.getYear());
			if(res1==0){
				int res2=Integer.compare(month, o.getMonth());
				if(res2==0){
					int res4=Integer.compare(day, o.getDay());
					return res4;
				}
				else{return res2;}
			}
			return res1;
		}
		else
		{
			return res3;
		}
		
	}

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public int getYear() {
		return year;
	}

	public void setYear(int year) {
		this.year = year;
	}

	public int getMonth() {
		return month;
	}

	public void setMonth(int month) {
		this.month = month;
	}

	public int getDay() {
		return day;
	}

	public void setDay(int day) {
		this.day = day;
	}

	public float getOpen() {
		return open;
	}

	public void setOpen(float open) {
		this.open = open;
	}

	public float getClose() {
		return close;
	}

	public void setClose(float close) {
		this.close = close;
	}

	public float getHighest() {
		return highest;
	}

	public void setHighest(float highest) {
		this.highest = highest;
	}

	public float getLowest() {
		return lowest;
	}

	public void setLowest(float lowest) {
		this.lowest = lowest;
	}

	public double getTotal() {
		return total;
	}

	public void setTotal(double total) {
		this.total = total;
	}

}



12. Create a new KeySort class, MyGroup class, MyPartitioner class, uprate class to prepare the mapreduce process for sorting, and so on.
KeySort class:

package my.hdfs;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class KeySort  extends WritableComparator{
	public KeySort(){
		super(Stock.class,true);
	}
	public int compare(WritableComparable a,WritableComparable b){
		//System.out.println("Sort....");
		Stock S1=(Stock)a;
		Stock S2=(Stock)b;
		
		return S1.compareTo(S2);
	}
}

MyGroup class:

package my.hdfs;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator{
	public MyGroup(){
		super(Stock.class,true);
	}
	public int compare(WritableComparable a,WritableComparable b){
		//System.out.println("group....");
		Stock S1=(Stock)a;
		Stock S2=(Stock)b;
		int res1=Integer.compare(S1.getYear(), S2.getYear());
		if(res1==0){
			int res2=Integer.compare(S1.getMonth(), S2.getMonth());
			return res2;
		}
		return res1;
	}
}

MyPartitioner class:

package my.hdfs;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Stock,DoubleWritable>{
	public int getPartition(Stock key,DoubleWritable value,int numReduceTasks)
	{
		//Split in...
		System.out.print("");
		return(key.getYear()-2011%numReduceTasks);
	}		 
 }

uprate class:

package my.hdfs;
import java.text.NumberFormat;

import stock.Stock;
public class uprate {
public float uprate(Stock s1,Stock s2){
	float result=0;
	NumberFormat nt=NumberFormat.getInstance();
	if (s2.compareTo(s1)==0){
		result=(s2.getClose()-s2.getOpen())/s1.getClose()*100;
		
	}
	return result;
}
}

13. Create a new test class and complete the Mapreduce process.

package stock;

import stock.MyPartitioner;
import stock.uprate;
import stock.TextArrayWritable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class test {

	
	public static void main(String[] args)throws URISyntaxException ,IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
		
		String hdfsPath="hdfs://localhost:9000";
		FileSystem hdfs=FileSystem.get(new URI(hdfsPath),conf);
		
		
		        Job job = Job.getInstance();
		        job.setJobName("StockCount");
		        job.setJarByClass(StockCount.class);
		        job.setMapperClass(doMapper.class);
		        job.setPartitionerClass(MyPartitioner.class);
		        job.setSortComparatorClass(KeySort.class);
		        job.setGroupingComparatorClass(MyGroup.class);
		        job.setMapOutputKeyClass(Stock.class);
		        job.setMapOutputValueClass(Text.class);
		        
		        job.setReducerClass(doReducer.class);
		        job.setOutputKeyClass(Text.class);
			     job.setOutputValueClass(Text.class); 
			     
		        
		        Path in = new Path("hdfs://localhost:9000/user/input/stock.data");
		        if(hdfs.exists(new Path("hdfs://localhost:9000/user/input/out"))){
		        	hdfs.delete(new Path("hdfs://localhost:9000/user/input/out"),true);
		       
		        }
			     Path out = new Path("hdfs://localhost:9000/user/input/out");
		        FileInputFormat.addInputPath(job, in);
		        FileOutputFormat.setOutputPath(job, out);
		        System.exit(job.waitForCompletion(true) ? 0 : 1);
		    }

	public static class doMapper extends
			Mapper<LongWritable, Text, Stock, Text> {
		// public static final IntWritable one = new IntWritable(1);
		// public static Text word = new Text();
		public SimpleDateFormat simpleDate = new SimpleDateFormat("yyyy/MM/dd");
		public Calendar c = Calendar.getInstance();
		public String s=new String();
		// public static TextArrayWritable Tarray=new TextArrayWritable() ;
		
		

		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			// Split a row of data into commas
			String[] arr = line.split(",");
			if (arr.length == 7) {
				String id = arr[0];
				String riqi = arr[1];
				String open = arr[2];
				String close = arr[3];
				String highest = arr[4];
				String lowest = arr[5];
				String total = arr[6];
				
				Stock stock = new Stock();
				//MyKey mykey = new MyKey();
				//MyValue myvalue=new MyValue();
				try {
						
					stock.setId(Integer.parseInt(id));
					Date date = simpleDate.parse(riqi);
					c.setTime(date);
					int year=c.get(Calendar.YEAR);
					int month=c.get(Calendar.MONTH) + 1;
					int day=c.get(Calendar.DATE);
					stock.setYear(c.get(Calendar.YEAR));
					stock.setMonth(c.get(Calendar.MONTH) + 1);
					stock.setDay(c.get(Calendar.DATE));
					stock.setOpen(Float.parseFloat(open));
					stock.setClose(Float.parseFloat(close));
					stock.setHighest(Float.parseFloat(highest));
					stock.setLowest(Float.parseFloat(lowest));
					stock.setTotal(Double.parseDouble(total));
					//String sid=stock.getId()+" "+stock.getYear()+"-"+stock.getMonth()+"-"+stock.getDay();
					s=id+","+year+","+month+","+day+","+open+","+close+","+highest+","+lowest+","+total;
				    context.write(stock, new Text(s));

				
				} catch (ParseException e) {
					e.printStackTrace();
				}
			}
			

		}
	}
	 
	public static class doReducer extends
			Reducer<Stock, Text, Text, Text> {
		// private IntWritable result = new IntWritable();
		public Stock stock=new Stock();
		public Stock prestock=new Stock();
		public List<Stock> list=new ArrayList<Stock>();
		public uprate up = new uprate(); 
		public int count =0;
		public double all=0;
		public void reduce(Stock key, Iterable<Text> values,
				Context context) throws IOException, InterruptedException {
			String val = key.getId() + " " + Integer.toString(key.getYear())
					+ "-" + Integer.toString(key.getMonth());
			//System.out.println(".......");
			
			for(Text t:values){
				String[] arr=t.toString().split(",");
				stock.setId(Integer.parseInt(arr[0]));
				stock.setYear(Integer.parseInt(arr[1]));
				stock.setMonth(Integer.parseInt(arr[2]));
				stock.setDay(Integer.parseInt(arr[3]));
				stock.setOpen(Float.parseFloat(arr[4]));
				stock.setClose(Float.parseFloat(arr[5]));
				stock.setHighest(Float.parseFloat(arr[6]));
				stock.setLowest(Float.parseFloat(arr[7]));
				stock.setTotal(Double.parseDouble(arr[8]));
				//list.add(stock);
				float result=up.uprate(prestock, stock);
				all=all+stock.getTotal();
				if(result>5){
					count++;
				}
				//System.out.println("d");
				
				prestock=stock;
			}
			context.write(new Text(val), new Text(count+" "+all));
			count=0;
			all=0;
			
		}
	}
}

14. Enter commands to view output results
hadoop fs -cat /user/input/out/part-r-00000

15. Enter a command to import the results into the linux local directory/data/hadoop4
hadoo fs -get /user/input/out/part-r-00000 /data/hadoop4/
The results are now successfully exported and can be viewed in the / data/hadoop directory.

A simple example for your reference.It is recommended that students with insufficient computer performance want to learn about Hadoop. They can use Octopus Big Data Platform, which has detailed instructions on practical operations and is suitable for getting started with Big Data.

Tags: Hadoop Apache Java Big Data

Posted on Thu, 04 Jun 2020 11:54:49 -0700 by poppy