欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,499,573
  3. 评论:3873
  4. 分类目录:103 个
  5. 注册用户数:5847
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Hadoop&Spark解决二次排序问题(Hadoop篇)

问题描述

  二次排序就是key之间有序,而且每个Key对应的value也是有序的;也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):

[root@iteblog.com /tmp]# vim data.txt 
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们期望的输出结果是

2014-2	64
2015-1	3,4,21,24
2015-2	-43,0,35
2015-3	46,56
2015-4	5

  但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:

2014-2	64
2015-1	21,4,3,24
2015-2	0,35,-43
2015-3	46,56
2015-4	5

解决方案

  针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
  1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key;
  2、我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v)
  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
  4、自定义分组函数,将k相同的键值对当作一个分组。
  文字比较枯燥,我们来看看下面实例:
  1、原始数据是

[root@iteblog.com /tmp]# vim data.txt 
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们将年、月组成key(natural key),总数作为value,结果变成:

(2015-1,24)
(2015-3,56)
(2015-1,3)
(2015-2,-43)
(2015-4,5)
(2015-3,46)
(2014-2,64)
(2015-1,4)
(2015-1,21)
(2015-2,35)
(2015-2,0)

  2、将value和key(natural key)组成新的key(composite key),如下:

((2015-1,24),24)
((2015-3,56),56)
((2015-1,3),3)
((2015-2,-43),-43)
((2015-4,5),5)
((2015-3,46),46)
((2014-2,64),64)
((2015-1,4),4)
((2015-1,21),21)
((2015-2,35),35)
((2015-2,0),0)

  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:

[((2014-2,64),64)]
[((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,56),56),((2015-3,46),46)]
[((2015-4,5),5)]

  4、自定义组排序函数,结果如下:

[((2014-2,64),64)]
[((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,46),46),((2015-3,56),56)]
[((2015-4,5),5)]

  5、自定义分组函数,结果如下:

((2014-2,64),(64))
((2015-1,24),(3,4,21,24))
((2015-2,35),(-43,0,35))
((2015-3,56),(46,56))
((2015-4,5),(5))

  6、最后输出的结果就是我们要的:

2014-2	64
2015-1	3,4,21,24
2015-2	-43,0,35
2015-3	46,56
2015-4	5

代码实例

  下面将贴出使用MapReduce解决这个问题的代码:

package com.iteblog;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * User: 过往记忆
 * Date: 2015-08-05
 * Time: 下午23:49
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1415
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
public class Entry implements WritableComparable<Entry> {
    private String yearMonth;
    private int count;

    public Entry() {
    }

    @Override
    public int compareTo(Entry entry) {
        int result = this.yearMonth.compareTo(entry.getYearMonth());
        if (result == 0) {
            result = compare(count, entry.getCount());
        }
        return result;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(yearMonth);
        dataOutput.writeInt(count);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.yearMonth = dataInput.readUTF();
        this.count = dataInput.readInt();
    }

    public String getYearMonth() {
        return yearMonth;
    }

    public void setYearMonth(String yearMonth) {
        this.yearMonth = yearMonth;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public static int compare(int a, int b) {
        return a < b ? -1 : (a > b ? 1 : 0);
    }

    @Override
    public String toString() {
        return yearMonth;
    }
}

  上面就是将旧的Key(natural key)和Value组合成新的Key(composite key)的代码,接下来看下自定义的分区类:

package com.iteblog;

import org.apache.hadoop.mapreduce.Partitioner;

public class EntryPartitioner extends Partitioner<Entry, Integer> {

    @Override
    public int getPartition(Entry entry, Integer integer, int numberPartitions) {
        return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
    }
}

  这个类使得natural key相同的数据分派到同一个Reduce中。然后看下自定义分组类:

package com.iteblog;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * User: 过往记忆
 * Date: 2015-08-05
 * Time: 下午23:49
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1415
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
public class EntryGroupingComparator extends WritableComparator {
    public EntryGroupingComparator() {
        super(Entry.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Entry a1 = (Entry) a;
        Entry b1 = (Entry) b;
        return a1.getYearMonth().compareTo(b1.getYearMonth());
    }
}

  只要是natural key相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类

public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> {

    private Entry entry = new Entry();
    private Text value = new Text();

    @Override
    protected void map(LongWritable key, Text lines, Context context)
            throws IOException, InterruptedException {
        String line = lines.toString();
        String[] tokens = line.split(",");
        // YYYY = tokens[0]
        // MM = tokens[1]
        // count = tokens[2]
        String yearMonth = tokens[0] + "-" + tokens[1];
        int count = Integer.parseInt(tokens[2]);

        entry.setYearMonth(yearMonth);
        entry.setCount(count);
        value.set(tokens[2]);

        context.write(entry, value);
    }
}

  其实就是解析每一行的数据,然后将旧的Key(natural key)和Value组合成新的Key(composite key)。接下来看下Reduce类实现

public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> {
    @Override
    protected void reduce(Entry key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder builder = new StringBuilder();
        for (Text value : values) {
            builder.append(value.toString());
            builder.append(",");
        }
        context.write(key, new Text(builder.toString()));
    }
}

builder存储的就是排序好的Value序列,最后来看看启动程序的使用:

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Iteblog.class);
job.setJobName("SecondarySort");

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setOutputKeyClass(Entry.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setPartitionerClass(EntryPartitioner.class);
job.setGroupingComparatorClass(EntryGroupingComparator.class);

关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:

[root@iteblog.com /hadoop]# bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar  
    com.iteblog.Main  /iteblog/data.txt /iteblog/output

[root@iteblog.com /hadoop]# bin/hadoop fs -cat /iteblog/output/pa*
2014-2	64,
2015-1	3,4,21,24,
2015-2	-43,0,35,
2015-3	46,56,
2015-4	5,
本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Hadoop&Spark解决二次排序问题(Hadoop篇)】(https://www.iteblog.com/archives/1415.html)
喜欢 (26)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(2)个小伙伴在吐槽
  1. 如果有多个reduce同时计算像这样的二次排序请问有没有好的方案
    天仙宝宝2017-05-06 11:31 回复
    • 上文其实在多个Reduce的情况下也是有效的。当然,这个不是唯一的方案。
      w3970907702017-05-09 22:47 回复