博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)
阅读量:6073 次
发布时间:2019-06-20

本文共 7912 字,大约阅读时间需要 26 分钟。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

package zhouls.bigdata.myMapReduce.weather;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class MyKey implements WritableComparable
{ //WritableComparable,实现这个方法,要多很多 //readFields是读入,write是写出 private int year; private int month; private double hot; public int getYear() { return year;} public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; }//这一大段的get和set,可以右键,source,产生get和set,自动生成。 public void readFields(DataInput arg0) throws IOException { //反序列化 this.year=arg0.readInt(); this.month=arg0.readInt(); this.hot=arg0.readDouble(); } public void write(DataOutput arg0) throws IOException { //序列化 arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } //判断对象是否是同一个对象,当该对象作为输出的key public int compareTo(MyKey o) { int r1 =Integer.compare(this.year, o.getYear());//比较当前的年和你传过来的年 if(r1==0){ int r2 =Integer.compare(this.month, o.getMonth()); if(r2==0){ return Double.compare(this.hot, o.getHot()); }else{ return r2; } }else{ return r1; } }}

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class MyPartitioner extends HashPartitioner
{
//这里就是洗牌 //执行时间越短越好 public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) { return (key.getYear()-1949)%numReduceTasks;//对于一个数据集,找到最小,1949 }}//1949-10-01 14:21:02 34c//1949-10-02 14:01:02 36c//1950-01-01 11:21:02 32c//1950-10-01 12:21:02 37c//1951-12-01 12:21:02 23c//1950-10-02 12:21:02 41c//1950-10-03 12:21:02 27c//1951-07-01 12:21:02 45c//1951-07-02 12:21:02 46c//1951-07-03 12:21:03 47c

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class MySort extends WritableComparator{    public MySort(){        super(MyKey.class,true);//把MyKey传进了    }    public int compare(WritableComparable a, WritableComparable b) {
//这是排序的精髓 MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){
//年相同 int r2 =Integer.compare(k1.getMonth(), k2.getMonth()); if(r2==0){
//月相同 return -Double.compare(k1.getHot(), k2.getHot());//比较气温 }else{ return r2; } }else{ return r1; } }}

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class MyGroup extends WritableComparator{    public MyGroup(){        super(MyKey.class,true);//把MyKey传进了}    public int compare(WritableComparable a, WritableComparable b) {
//这是分组的精髓 MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ return Integer.compare(k1.getMonth(), k2.getMonth()); }else{ return r1; } }}

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import java.io.IOException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class RunJob {//    1949-10-01 14:21:02    34c WeatherMapper//    1949-10-02 14:01:02    36c//    1950-01-01 11:21:02    32c    分区在MyPartitioner.java //    1950-10-01 12:21:02    37c//    1951-12-01 12:21:02    23c    排序在MySort.java//    1950-10-02 12:21:02    41c//    1950-10-03 12:21:02    27c    分组在MyGroup.java//    1951-07-01 12:21:02    45c//    1951-07-02 12:21:02    46c    再,WeatherReducer//    1951-07-03 12:21:03    47c//key:每行第一个隔开符(制表符)左边为key,右边为value    自定义类型MyKey,洗牌,        static class WeatherMapper extends Mapper
{ SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); NullWritable v =NullWritable.get();// 1949-10-01 14:21:02是自定义类型MyKey,即key// 34c是DoubleWritable,即value protected void map(Text key, Text value,Context context) throws IOException, InterruptedException { try { Date date =sdf.parse(key.toString()); Calendar c =Calendar.getInstance(); //Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象, //此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance(); c.setTime(date); int year =c.get(Calendar.YEAR); int month =c.get(Calendar.MONTH); double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c"))); MyKey k =new MyKey(); k.setYear(year); k.setMonth(month); k.setHot(hot); context.write(k, new DoubleWritable(hot)); } catch (Exception e) { e.printStackTrace(); } }} static class WeatherReducer extends Reducer
{ protected void reduce(MyKey arg0, Iterable
arg1,Context arg2)throws IOException, InterruptedException { int i=0; for(DoubleWritable v :arg1){ i++; String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();//"\t"是制表符 arg2.write(new Text(msg), NullWritable.get()); if(i==3){ break; } } }}public static void main(String[] args) { Configuration config =new Configuration();// config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");// config.set("yarn.resourcemanager.hostname", "HadoopMaster");// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如"," try { FileSystem fs =FileSystem.get(config); Job job =Job.getInstance(config); job.setJarByClass(RunJob.class); job.setJobName("weather"); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(DoubleWritable.class); job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MySort.class); job.setGroupingComparatorClass(MyGroup.class); job.setNumReduceTasks(3); job.setInputFormatClass(KeyValueTextInputFormat.class);// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt// // Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather"); FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt Path outpath =new Path("./out/weather"); if(fs.exists(outpath)){ fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); boolean f= job.waitForCompletion(true); if(f){ } } catch (Exception e) { e.printStackTrace(); } }}

 

 

本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6164729.html,如需转载请自行联系原作者

你可能感兴趣的文章
理解B树索引
查看>>
vi编辑器的命令集合
查看>>
Mysql利用binlog恢复数据
查看>>
解决 Windows启动时要求验证
查看>>
我的友情链接
查看>>
用yum安装mariadb
查看>>
一点IT"边缘化"的人的思考
查看>>
Gallery循环滑动
查看>>
Sql与C#中日期格式转换总结
查看>>
iOS开发流程总结
查看>>
hadoop datanode 启动出错
查看>>
js颜色拾取器
查看>>
IDEA使用(1)intellIJ idea 配置 svn
查看>>
Thread Safety in Java(java中的线程安全)
查看>>
WPF 降低.net framework到4.0
查看>>
数据管理DMS 全量SQL诊断:你的SQL是健康的蓝色,还是危险的红色?
查看>>
搭建一个通用的脚手架
查看>>
开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
查看>>
开源磁盘加密软件VeraCrypt教程
查看>>
本地vs云:大数据厮杀的最终幸存者会是谁?
查看>>