问题描述
二次排序就是key之间有序,而且每个Key对应的value也是有序的;也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):
[root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0
我们期望的输出结果是
2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5
但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:
2014-2 64 2015-1 21,4,3,24 2015-2 0,35,-43 2015-3 46,56 2015-4 5
解决方案
针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key;
2、我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v)
3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
4、自定义分组函数,将k相同的键值对当作一个分组。
文字比较枯燥,我们来看看下面实例:
1、原始数据是
[root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0
我们将年、月组成key(natural key),总数作为value,结果变成:
(2015-1,24) (2015-3,56) (2015-1,3) (2015-2,-43) (2015-4,5) (2015-3,46) (2014-2,64) (2015-1,4) (2015-1,21) (2015-2,35) (2015-2,0)
2、将value和key(natural key)组成新的key(composite key),如下:
((2015-1,24),24) ((2015-3,56),56) ((2015-1,3),3) ((2015-2,-43),-43) ((2015-4,5),5) ((2015-3,46),46) ((2014-2,64),64) ((2015-1,4),4) ((2015-1,21),21) ((2015-2,35),35) ((2015-2,0),0)
3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:
[((2014-2,64),64)] [((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,56),56),((2015-3,46),46)] [((2015-4,5),5)]
4、自定义组排序函数,结果如下:
[((2014-2,64),64)] [((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,46),46),((2015-3,56),56)] [((2015-4,5),5)]
5、自定义分组函数,结果如下:
((2014-2,64),(64)) ((2015-1,24),(3,4,21,24)) ((2015-2,35),(-43,0,35)) ((2015-3,56),(46,56)) ((2015-4,5),(5))
6、最后输出的结果就是我们要的:
2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5
代码实例
下面将贴出使用MapReduce解决这个问题的代码:
package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* User: 过往记忆
* Date: 2015-08-05
* Time: 下午23:49
* bolg:
* 本文地址:/archives/1415.html
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
public class Entry implements WritableComparable<Entry> {
private String yearMonth;
private int count;
public Entry() {
}
@Override
public int compareTo(Entry entry) {
int result = this.yearMonth.compareTo(entry.getYearMonth());
if (result == 0) {
result = compare(count, entry.getCount());
}
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(yearMonth);
dataOutput.writeInt(count);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.yearMonth = dataInput.readUTF();
this.count = dataInput.readInt();
}
public String getYearMonth() {
return yearMonth;
}
public void setYearMonth(String yearMonth) {
this.yearMonth = yearMonth;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public static int compare(int a, int b) {
return a < b ? -1 : (a > b ? 1 : 0);
}
@Override
public String toString() {
return yearMonth;
}
}
上面就是将旧的Key(natural key)和Value组合成新的Key(composite key)的代码,接下来看下自定义的分区类:
package com.iteblog;
import org.apache.hadoop.mapreduce.Partitioner;
public class EntryPartitioner extends Partitioner<Entry, Integer> {
@Override
public int getPartition(Entry entry, Integer integer, int numberPartitions) {
return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
}
}
这个类使得natural key相同的数据分派到同一个Reduce中。然后看下自定义分组类:
package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* User: 过往记忆
* Date: 2015-08-05
* Time: 下午23:49
* bolg:
* 本文地址:/archives/1415.html
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
public class EntryGroupingComparator extends WritableComparator {
public EntryGroupingComparator() {
super(Entry.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Entry a1 = (Entry) a;
Entry b1 = (Entry) b;
return a1.getYearMonth().compareTo(b1.getYearMonth());
}
}
只要是natural key相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类
public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> {
private Entry entry = new Entry();
private Text value = new Text();
@Override
protected void map(LongWritable key, Text lines, Context context)
throws IOException, InterruptedException {
String line = lines.toString();
String[] tokens = line.split(",");
// YYYY = tokens[0]
// MM = tokens[1]
// count = tokens[2]
String yearMonth = tokens[0] + "-" + tokens[1];
int count = Integer.parseInt(tokens[2]);
entry.setYearMonth(yearMonth);
entry.setCount(count);
value.set(tokens[2]);
context.write(entry, value);
}
}
其实就是解析每一行的数据,然后将旧的Key(natural key)和Value组合成新的Key(composite key)。接下来看下Reduce类实现
public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> {
@Override
protected void reduce(Entry key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuilder builder = new StringBuilder();
for (Text value : values) {
builder.append(value.toString());
builder.append(",");
}
context.write(key, new Text(builder.toString()));
}
}
builder存储的就是排序好的Value序列,最后来看看启动程序的使用:
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Iteblog.class);
job.setJobName("SecondarySort");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Entry.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setPartitionerClass(EntryPartitioner.class);
job.setGroupingComparatorClass(EntryGroupingComparator.class);
关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:
[root@iteblog.com /hadoop]# bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar
com.iteblog.Main /iteblog/data.txt /iteblog/output
[root@iteblog.com /hadoop]# bin/hadoop fs -cat /iteblog/output/pa*
2014-2 64,
2015-1 3,4,21,24,
2015-2 -43,0,35,
2015-3 46,56,
2015-4 5,
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Hadoop&Spark解决二次排序问题(Hadoop篇)】(https://www.iteblog.com/archives/1415.html)


如果有多个reducer的,如何保证不同分区间是有序的,有可能出现key为6经hash后散列到分区1,而key为5经hash后散列到分区2,是不是有这种情况?
二次排序就是key之间有序,而且每个Key对应的value也是有序的
还是不太懂,我的理解是将原来的key-value作为新的key再自定义比较规则可以实现二次排序,这样在同一reduce分区中是好理解的,但是如果保证经分区函数分到不同分区中的key是相对有序的。。
你可以将上面的程序测试一下呢
如果有多个reduce同时计算像这样的二次排序请问有没有好的方案
上文其实在多个Reduce的情况下也是有效的。当然,这个不是唯一的方案。