本文共 6366 字,大约阅读时间需要 21 分钟。
[toc]
有下面的数据:
cookieId time url2 12:12:34 2_hao1233 09:10:34 3_baidu1 15:02:41 1_google3 22:11:34 3_sougou1 19:10:34 1_baidu2 15:02:41 2_google1 12:12:34 1_hao1233 23:10:34 3_soso2 05:02:41 2_google
假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志,排序后的结果如下:
---------------------------------1 12:12:34 1_hao1231 15:02:41 1_google1 19:10:34 1_baidu---------------------------------2 05:02:41 2_google2 12:12:34 2_hao1232 15:02:41 2_google---------------------------------3 09:10:34 3_baidu3 22:11:34 3_sougou3 23:10:34 3_soso
要求使用MapReduce程序实现。
Map函数:/** * Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据 * AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求 * 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的 */ Reduce函数:/** * 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可 */
所以为了进行多个数据的比较,我们需要自定义key来作为Map输出的key。
关于如何进行数据的排序,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序。
package com.uplooking.bigdata.mr.secondsort;import com.uplooking.bigdata.common.utils.MapReduceJobUtil;import com.uplooking.bigdata.mr.sort.SortJob;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;/** * MapReduce排序之二次排序 */public class SecondSortJob { /** * 驱动程序,使用工具类使用Job * @param args */ public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { System.err.println("Parameter Errors! Usages:"); System.exit(-1); } Job job = MapReduceJobUtil.buildJob(new Configuration(), SecondSortJob.class, args[0], TextInputFormat.class, SecondSortMapper.class, AccessLogWritable.class, NullWritable.class, new Path(args[1]), TextOutputFormat.class, SecondSortReducer.class, AccessLogWritable.class, NullWritable.class); // ReduceTask必须设置为1 job.setNumReduceTasks(1); job.waitForCompletion(true); } /** * Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据 * AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求 * 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的 */ public static class SecondSortMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 解析每一行 String[] fields = value.toString().split("\t"); if(fields == null || fields.length < 3) { return; } String cookieId = fields[0]; String time = fields[1]; String url = fields[2]; // 构建AccessLogWritable对象 AccessLogWritable logLine = new AccessLogWritable(cookieId, time, url); // 写出到context context.write(logLine, NullWritable.get()); } } /** * 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可 */ public static class SecondSortReducer extends Reducer { @Override protected void reduce(AccessLogWritable key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }}
package com.uplooking.bigdata.mr.secondsort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * 自定义Hadoop数据类型,作为key,需要实现WritableComparable接口 * map中排序需要比较的对象为AccessLogWritable,所以泛型填写为AccessLogWritable */public class AccessLogWritable implements WritableComparable{ private String cookieId; private String time; private String url; /** * 空参构造方法,必须要有,否则会有下面的异常: Caused by: java.lang.NoSuchMethodException: com.uplooking.bigdata.mr.secondsort.AccessLogWritable. () at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125) ... 16 more */ public AccessLogWritable() { } public AccessLogWritable(String cookieId, String time, String url) { this.cookieId = cookieId; this.time = time; this.url = url; } /** * 比较的方法,定义的规则为: * 先按 cookieId 排序,然后按 time 排序 * @param o * @return */ public int compareTo(AccessLogWritable o) { int ret = this.cookieId.compareTo(o.cookieId); // 如果cookieId比较结果相同,再比较time if(ret == 0) { ret = this.time.compareTo(o.time); } return ret; } public void write(DataOutput out) throws IOException { out.writeUTF(cookieId); out.writeUTF(time); out.writeUTF(url); } public void readFields(DataInput in) throws IOException { this.cookieId = in.readUTF(); this.time = in.readUTF(); this.url = in.readUTF(); } @Override public String toString() { return cookieId + "\t" + time + "\t" + url; }}
这里使用本地环境来运行MapReduce程序,输入的参数如下:
/Users/yeyonghao/data/input/secondsort /Users/yeyonghao/data/output/mr/secondsort
也可以将其打包成jar包,然后上传到Hadoop环境中运行。
运行程序后,查看输出结果如下:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/secondsort$ cat part-r-000001 12:12:34 1_hao1231 15:02:41 1_google1 19:10:34 1_baidu2 05:02:41 2_google2 12:12:34 2_hao1232 15:02:41 2_google3 09:10:34 3_baidu3 22:11:34 3_sougou3 23:10:34 3_soso
可以看到,通过使用自定义的key,我们的MapReduce程序已经完成了二次排序的功能。
其实如果上面的程序能够理解清楚的话,多次排序的思路应该也是很自然就可以想到的,因为比较的规则其实是在key中定义的,而对于Map来说,是依据key来进行排序的,所以如果需要进行多次排序,我们就可以在自定义的key的compareTo方法中来实现多次排序的规则,有兴趣的朋友可以自行写出这样的程序,这里就不再说明。
转载地址:http://nyutl.baihongyu.com/