本文共 7912 字,大约阅读时间需要 26 分钟。
代码
package zhouls.bigdata.myMapReduce.weather;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class MyKey implements WritableComparable{ //WritableComparable,实现这个方法,要多很多 //readFields是读入,write是写出 private int year; private int month; private double hot; public int getYear() { return year;} public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; }//这一大段的get和set,可以右键,source,产生get和set,自动生成。 public void readFields(DataInput arg0) throws IOException { //反序列化 this.year=arg0.readInt(); this.month=arg0.readInt(); this.hot=arg0.readDouble(); } public void write(DataOutput arg0) throws IOException { //序列化 arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } //判断对象是否是同一个对象,当该对象作为输出的key public int compareTo(MyKey o) { int r1 =Integer.compare(this.year, o.getYear());//比较当前的年和你传过来的年 if(r1==0){ int r2 =Integer.compare(this.month, o.getMonth()); if(r2==0){ return Double.compare(this.hot, o.getHot()); }else{ return r2; } }else{ return r1; } }}
package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class MyPartitioner extends HashPartitioner{ //这里就是洗牌 //执行时间越短越好 public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) { return (key.getYear()-1949)%numReduceTasks;//对于一个数据集,找到最小,1949 }}//1949-10-01 14:21:02 34c//1949-10-02 14:01:02 36c//1950-01-01 11:21:02 32c//1950-10-01 12:21:02 37c//1951-12-01 12:21:02 23c//1950-10-02 12:21:02 41c//1950-10-03 12:21:02 27c//1951-07-01 12:21:02 45c//1951-07-02 12:21:02 46c//1951-07-03 12:21:03 47c
package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class MySort extends WritableComparator{ public MySort(){ super(MyKey.class,true);//把MyKey传进了 } public int compare(WritableComparable a, WritableComparable b) { //这是排序的精髓 MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ //年相同 int r2 =Integer.compare(k1.getMonth(), k2.getMonth()); if(r2==0){ //月相同 return -Double.compare(k1.getHot(), k2.getHot());//比较气温 }else{ return r2; } }else{ return r1; } }}
package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class MyGroup extends WritableComparator{ public MyGroup(){ super(MyKey.class,true);//把MyKey传进了} public int compare(WritableComparable a, WritableComparable b) { //这是分组的精髓 MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ return Integer.compare(k1.getMonth(), k2.getMonth()); }else{ return r1; } }}
package zhouls.bigdata.myMapReduce.weather;import java.io.IOException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class RunJob {// 1949-10-01 14:21:02 34c WeatherMapper// 1949-10-02 14:01:02 36c// 1950-01-01 11:21:02 32c 分区在MyPartitioner.java // 1950-10-01 12:21:02 37c// 1951-12-01 12:21:02 23c 排序在MySort.java// 1950-10-02 12:21:02 41c// 1950-10-03 12:21:02 27c 分组在MyGroup.java// 1951-07-01 12:21:02 45c// 1951-07-02 12:21:02 46c 再,WeatherReducer// 1951-07-03 12:21:03 47c//key:每行第一个隔开符(制表符)左边为key,右边为value 自定义类型MyKey,洗牌, static class WeatherMapper extends Mapper{ SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); NullWritable v =NullWritable.get();// 1949-10-01 14:21:02是自定义类型MyKey,即key// 34c是DoubleWritable,即value protected void map(Text key, Text value,Context context) throws IOException, InterruptedException { try { Date date =sdf.parse(key.toString()); Calendar c =Calendar.getInstance(); //Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象, //此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance(); c.setTime(date); int year =c.get(Calendar.YEAR); int month =c.get(Calendar.MONTH); double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c"))); MyKey k =new MyKey(); k.setYear(year); k.setMonth(month); k.setHot(hot); context.write(k, new DoubleWritable(hot)); } catch (Exception e) { e.printStackTrace(); } }} static class WeatherReducer extends Reducer { protected void reduce(MyKey arg0, Iterable arg1,Context arg2)throws IOException, InterruptedException { int i=0; for(DoubleWritable v :arg1){ i++; String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();//"\t"是制表符 arg2.write(new Text(msg), NullWritable.get()); if(i==3){ break; } } }}public static void main(String[] args) { Configuration config =new Configuration();// config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");// config.set("yarn.resourcemanager.hostname", "HadoopMaster");// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如"," try { FileSystem fs =FileSystem.get(config); Job job =Job.getInstance(config); job.setJarByClass(RunJob.class); job.setJobName("weather"); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(DoubleWritable.class); job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MySort.class); job.setGroupingComparatorClass(MyGroup.class); job.setNumReduceTasks(3); job.setInputFormatClass(KeyValueTextInputFormat.class);// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt// // Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather"); FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt Path outpath =new Path("./out/weather"); if(fs.exists(outpath)){ fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); boolean f= job.waitForCompletion(true); if(f){ } } catch (Exception e) { e.printStackTrace(); } }}
本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6164729.html,如需转载请自行联系原作者