欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,509,444
  3. 评论:3873
  4. 分类目录:103 个
  5. 注册用户数:5853
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Spark多文件输出(MultipleOutputFormat)

  在本博客的《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)》《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(二)》两篇文章中我介绍了如何在Hadoop中根据Key或者Value的不同将属于不同的类型记录写到不同的文件中。在里面用到了MultipleOutputFormat这个类。
  因为Spark内部写文件方式其实调用的都是Hadoop那一套东西,所以我们也可以通过Spark实现多文件输出。不过遗憾的是,Spark内部没有多文件输出的函数供大家直接调用,值得欣慰的是,我们自己实现这个功能也是很简单的。我们可以通过调用saveAsHadoopFile函数并自定义一个OutputFormat类即可,代码如下:

/**
 * User: 过往记忆
 * Date: 15-03-11
 * Time: 上午08:24
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1281
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._


import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SplitTest")
    val sc = new SparkContext(conf)
    sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))
      .map(value => (value._1, value._2 + "Test"))
      .partitionBy(new HashPartitioner(3))
      .saveAsHadoopFile("/iteblog", classOf[String], classOf[String],
        classOf[RDDMultipleTextOutputFormat])
    sc.stop()
  }
}

  RDDMultipleTextOutputFormat类中的generateFileNameForKeyValue函数有三个参数,key和value就是我们RDD的Key和Value,而name参数是每个Reduce的编号。本例中没有使用该参数,而是直接将同一个Key的数据输出到同一个文件中。执行:

bin/spark-submit --master yarn-cluster 
--class Split ./iteblog-1.0-SNAPSHOT.jar

  然后我们可以看到在HDFS上输出的文件列表如下:

[iteblog@master ]$ bin/hadoop fs -ls /iteblog
Found 4 items
-rw-r--r--   3 iteblog hadoop2          0 2015-03-09 11:26 /iteblog/_SUCCESS
-rw-r--r--   3 iteblog hadoop2         11 2015-03-09 11:26 /iteblog/b
-rw-r--r--   3 iteblog hadoop2         10 2015-03-09 11:26 /iteblog/c
-rw-r--r--   3 iteblog hadoop2         19 2015-03-09 11:26 /iteblog/w

[iteblog@master ]$ bin/hadoop fs -cat /iteblog/w
w	btTest
w	wwwTest

  从上面的输出可以看出key为w的记录全部输出到文件名为w的文件中去了。

  不过社区已经有人建议开发出saveAsTextFileByKey函数来实现该功能(SPARK-3533,https://github.com/apache/spark/pull/4895),很有可能会在Spark 1.4.0版本添加。
本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark多文件输出(MultipleOutputFormat)】(https://www.iteblog.com/archives/1281.html)
喜欢 (15)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(19)个小伙伴在吐槽
  1. 文件的数量好像不能太多,否则好像会占用很多内存,导致outofmemoryerror
    2016-11-12 15:14 回复
  2. 我采用了你的方法去保存数据,saveAsHadoopFile但是每次执行都会覆盖上一次的,我希望把同一天的数据追加到一个文件中怎么做呢?
    Mini_Night_Cat2016-08-29 09:21 回复
    • 内置的无法做到文件追加,需要你自己实现。可以做到同一天的数据放到同一个目录下,但是不是追加的形式。
      w3970907702016-08-29 09:26 回复
      • 访问日志需要按照/日期/访问IP.log形式汇总,提供给客户下载,不知道怎么办好了
        Mini_Night_Cat2016-08-29 09:48 回复
        • 这个也是可以实现的,但是需要你自己编写一个类似于HadoopWriter,然后按天切割。
          w3970907702016-08-29 12:53 回复
          • 需要怎么做呢? @Override protected RecordWriter<String, String> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { if (theTextOutputFormat == null) { theTextOutputFormat = new MyTextOutputFormat<String, String>(); } return theTextOutputFormat.getRecordWriter(fs, job, name, arg3); } private class MyTextOutputFormat<K,V> extends TextOutputFormat<K,V>{ @Override public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file,false,fs.getConf().getInt("io.file.buffer.size", 4096),progress); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file,false,fs.getConf().getInt("io.file.buffer.size", 4096),progress); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } } }我覆盖了 MultipleTextOutputFormat的getBaseRecordWriter方法依然不行
            Mini_Night_Cat2016-08-29 16:56
  3. 😈 xxxasfasdfg 艾丝凡
    w3970907702016-06-01 13:58 回复
  4. 最后输出我希望是这样的[iteblog@master ]$ bin/hadoop fs -ls /iteblogFound 4 items-rw-r--r-- 3 iteblog hadoop2 0 2015-03-09 11:26 /iteblog/_SUCCESS-rw-r--r-- 3 iteblog hadoop2 11 2015-03-09 11:26 /iteblog/b-rw-r--r-- 3 iteblog hadoop2 10 2015-03-09 11:26 /iteblog/c-rw-r--r-- 3 iteblog hadoop2 19 2015-03-09 11:26 /iteblog/w[iteblog@master ]$ bin/hadoop fs -cat /iteblog/wbtTestwwwTest
    南风不竞2016-05-31 10:51 回复
    • 很好办,把上面的RDDMultipleTextOutputFormat类定义成下面即可:class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: K, value: V): K = NullWritable.get().asInstanceOf[K] override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]}
      w3970907702016-05-31 11:47 回复
      • 问题解决了 十分感谢博主! :mrgreen: 请问如何学习spark和hadoop相关比较好 有什么可以推荐的书么?
        南风不竞2016-05-31 13:54 回复
      • 博主 按照这篇文章提供的方式处理一个100G的数据文件时,发现输出的多个文件总记录书远远小于原文件,我猜测可能是spark在分块(128M)处理时,默认不是追加写文件的模式,应该是覆盖的方式,请问这个能够在哪里修改?
        南风不竞2016-05-31 15:17 回复
        • 是因为漏掉了partitionBy这个步骤导致的,添加后没问题了。再次感谢博主:)
          南风不竞2016-05-31 16:05 回复
          • 为啥漏掉 partitionBy结果不对呢,麻烦解答下哈,谢谢
            等待是我的宿命2018-04-04 16:49
      • 为啥漏掉 partitionBy结果不对呢,麻烦解答下哈,谢谢
        等待是我的宿命2018-04-04 16:49 回复
  5. 您好! 如果我只想在多输出文件中输入value,不输出key,如何实现呢? 本人刚刚接触spark,请多指教。
    南风不竞2016-05-31 10:50 回复
  6. 这个例子里输出文件的分隔符是\t,我怎么样可以换位:。比如w btTest唤作w:btTest
    HFheck2015-03-16 22:22 回复
    • 如果你是Hadoop 1.x版本,可以设置mapred.textoutputformat.separator;如果你是Hadoop 2.x可以设置mapreduce.output.textoutputformat.separator。这个值的默认值就是制表符(\t)。
      w3970907702015-03-16 22:34 回复
      • conf.set("mapreduce.textoutputformat.separator", ";");在hadoop下可以实现。也可在hdfs的配置文件里配置,但是现在在scala写的spark代码里怎么写呢?我用了sparkconf去set,好像不管用,不知道怎么弄?
        HFheck2015-03-17 16:39 回复
        • 直接写到hdfs的配置文件里应该也可以实现吧?
          w3970907702015-03-17 19:23 回复