欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Spark多文件输出(MultipleOutputFormat)

  在本博客的《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)》《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(二)》两篇文章中我介绍了如何在Hadoop中根据Key或者Value的不同将属于不同的类型记录写到不同的文件中。在里面用到了MultipleOutputFormat这个类。
  因为Spark内部写文件方式其实调用的都是Hadoop那一套东西,所以我们也可以通过Spark实现多文件输出。不过遗憾的是,Spark内部没有多文件输出的函数供大家直接调用,值得欣慰的是,我们自己实现这个功能也是很简单的。我们可以通过调用saveAsHadoopFile函数并自定义一个OutputFormat类即可,代码如下:

/**
 * User: 过往记忆
 * Date: 15-03-11
 * Time: 上午08:24
 * bolg: 
 * 本文地址:/archives/1281
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._


import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SplitTest")
    val sc = new SparkContext(conf)
    sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))
      .map(value => (value._1, value._2 + "Test"))
      .partitionBy(new HashPartitioner(3))
      .saveAsHadoopFile("/iteblog", classOf[String], classOf[String],
        classOf[RDDMultipleTextOutputFormat])
    sc.stop()
  }
}

  RDDMultipleTextOutputFormat类中的generateFileNameForKeyValue函数有三个参数,key和value就是我们RDD的Key和Value,而name参数是每个Reduce的编号。本例中没有使用该参数,而是直接将同一个Key的数据输出到同一个文件中。执行:

bin/spark-submit --master yarn-cluster 
--class Split ./iteblog-1.0-SNAPSHOT.jar

  然后我们可以看到在HDFS上输出的文件列表如下:

[iteblog@master ]$ bin/hadoop fs -ls /iteblog
Found 4 items
-rw-r--r--   3 iteblog hadoop2          0 2015-03-09 11:26 /iteblog/_SUCCESS
-rw-r--r--   3 iteblog hadoop2         11 2015-03-09 11:26 /iteblog/b
-rw-r--r--   3 iteblog hadoop2         10 2015-03-09 11:26 /iteblog/c
-rw-r--r--   3 iteblog hadoop2         19 2015-03-09 11:26 /iteblog/w

[iteblog@master ]$ bin/hadoop fs -cat /iteblog/w
w	btTest
w	wwwTest

  从上面的输出可以看出key为w的记录全部输出到文件名为w的文件中去了。

  不过社区已经有人建议开发出saveAsTextFileByKey函数来实现该功能(SPARK-3533,https://github.com/apache/spark/pull/4895),很有可能会在Spark 1.4.0版本添加。
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark多文件输出(MultipleOutputFormat)】(https://www.iteblog.com/archives/1281.html)
喜欢 (17)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(19)个小伙伴在吐槽
  1. 文件的数量好像不能太多,否则好像会占用很多内存,导致outofmemoryerror

    2016-11-12 15:14 回复
  2. 我采用了你的方法去保存数据,saveAsHadoopFile但是每次执行都会覆盖上一次的,我希望把同一天的数据追加到一个文件中怎么做呢?

    Mini_Night_Cat2016-08-29 09:21 回复
    • 内置的无法做到文件追加,需要你自己实现。可以做到同一天的数据放到同一个目录下,但是不是追加的形式。

      w3970907702016-08-29 09:26 回复
      • 访问日志需要按照/日期/访问IP.log形式汇总,提供给客户下载,不知道怎么办好了

        Mini_Night_Cat2016-08-29 09:48 回复
        • 这个也是可以实现的,但是需要你自己编写一个类似于HadoopWriter,然后按天切割。

          w3970907702016-08-29 12:53 回复
          • 需要怎么做呢?
            @Override
            protected RecordWriter<String, String> getBaseRecordWriter(FileSystem fs,
            JobConf job, String name, Progressable arg3) throws IOException {
            if (theTextOutputFormat == null) {
            theTextOutputFormat = new MyTextOutputFormat<String, String>();
            }
            return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
            }

            private class MyTextOutputFormat<K,V> extends TextOutputFormat<K,V>{

            @Override
            public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
            JobConf job, String name, Progressable progress)
            throws IOException {
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
            "\t");
            if (!isCompressed) {
            Path file = FileOutputFormat.getTaskOutputPath(job, name);
            FileSystem fs = file.getFileSystem(job);
            FSDataOutputStream fileOut = fs.create(file,false,fs.getConf().getInt("io.file.buffer.size", 4096),progress);
            return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
            } else {
            Class<? extends CompressionCodec> codecClass =
            getOutputCompressorClass(job, GzipCodec.class);
            // create the named codec
            CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
            // build the filename including the extension
            Path file =
            FileOutputFormat.getTaskOutputPath(job,
            name + codec.getDefaultExtension());
            FileSystem fs = file.getFileSystem(job);
            FSDataOutputStream fileOut = fs.create(file,false,fs.getConf().getInt("io.file.buffer.size", 4096),progress);
            return new LineRecordWriter<K, V>(new DataOutputStream
            (codec.createOutputStream(fileOut)),
            keyValueSeparator);
            }
            }
            }
            我覆盖了 MultipleTextOutputFormat的getBaseRecordWriter方法依然不行

            Mini_Night_Cat2016-08-29 16:56
  3. 😈 xxxasfasdfg 艾丝凡

    w3970907702016-06-01 13:58 回复
  4. 最后输出我希望是这样的
    [iteblog@master ]$ bin/hadoop fs -ls /iteblog
    Found 4 items
    -rw-r--r-- 3 iteblog hadoop2 0 2015-03-09 11:26 /iteblog/_SUCCESS
    -rw-r--r-- 3 iteblog hadoop2 11 2015-03-09 11:26 /iteblog/b
    -rw-r--r-- 3 iteblog hadoop2 10 2015-03-09 11:26 /iteblog/c
    -rw-r--r-- 3 iteblog hadoop2 19 2015-03-09 11:26 /iteblog/w

    [iteblog@master ]$ bin/hadoop fs -cat /iteblog/w
    btTest
    wwwTest

    南风不竞2016-05-31 10:51 回复
    • 很好办,把上面的RDDMultipleTextOutputFormat类定义成下面即可:

      class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
      override def generateActualKey(key: K, value: V): K =
      NullWritable.get().asInstanceOf[K]

      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
      key.asInstanceOf[String]
      }

      w3970907702016-05-31 11:47 回复
      • 问题解决了 十分感谢博主! :mrgreen: 请问如何学习spark和hadoop相关比较好 有什么可以推荐的书么?

        南风不竞2016-05-31 13:54 回复
      • 博主 按照这篇文章提供的方式处理一个100G的数据文件时,发现输出的多个文件总记录书远远小于原文件,我猜测可能是spark在分块(128M)处理时,默认不是追加写文件的模式,应该是覆盖的方式,请问这个能够在哪里修改?

        南风不竞2016-05-31 15:17 回复
        • 是因为漏掉了partitionBy这个步骤导致的,添加后没问题了。再次感谢博主:)

          南风不竞2016-05-31 16:05 回复
          • 为啥漏掉 partitionBy结果不对呢,麻烦解答下哈,谢谢

            等待是我的宿命2018-04-04 16:49
      • 为啥漏掉 partitionBy结果不对呢,麻烦解答下哈,谢谢

        等待是我的宿命2018-04-04 16:49 回复
  5. 您好! 如果我只想在多输出文件中输入value,不输出key,如何实现呢? 本人刚刚接触spark,请多指教。

    南风不竞2016-05-31 10:50 回复
  6. 这个例子里输出文件的分隔符是\t,我怎么样可以换位:。比如w btTest
    唤作w:btTest

    HFheck2015-03-16 22:22 回复
    • 如果你是Hadoop 1.x版本,可以设置mapred.textoutputformat.separator;如果你是Hadoop 2.x可以设置mapreduce.output.textoutputformat.separator。这个值的默认值就是制表符(\t)。

      w3970907702015-03-16 22:34 回复
      • conf.set("mapreduce.textoutputformat.separator", ";");在hadoop下可以实现。也可在hdfs的配置文件里配置,但是现在在scala写的spark代码里怎么写呢?我用了sparkconf去set,好像不管用,不知道怎么弄?

        HFheck2015-03-17 16:39 回复
        • 直接写到hdfs的配置文件里应该也可以实现吧?

          w3970907702015-03-17 19:23 回复