欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,499,637
  3. 评论:3873
  4. 分类目录:103 个
  5. 注册用户数:5847
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Flink batch模式多路文件输出(MultipleTextOutputFormat)

  昨天我提到了如何在《Flink Streaming中实现多路文件输出(MultipleTextOutputFormat)》,里面我们实现了一个MultipleTextOutputFormatSinkFunction类,其中封装了mutable.Map[String, TextOutputFormat[String]],然后根据key的不一样选择不同的TextOutputFormat从而实现了文件的多路输出。本文将介绍如何在Flink batch模式下实现文件的多路输出,这种模式下比较简单,因为Flink内部提供了相应的API支持。

首先我们先实现一个自定义的IteblogMultipleTextOutputFormat类,具体实现如下:

class IteblogMultipleTextOutputFormat[K, V] 
    extends MultipleTextOutputFormat[K, V] {
  override def generateActualKey(key: K, value: V): K =
    NullWritable.get().asInstanceOf[K]

  override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
    key.asInstanceOf[String]
}

我们将Key作为文件的名称,然后我们可以将这个类对象封装到HadoopOutputFormat中,如下:

val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]()
val jc = new JobConf()
val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)

DataSet类中有个output的方法,它可以将数据写入到实现了org.apache.flink.api.common.io.OutputFormat接口的地方,而HadoopOutputFormat类就是实现了这个接口,所以我们可以将HadoopOutputFormat对象传进output方法中,如下:

batch.output(format)

最后我们别忘记指定存放到HDFS的什么路径:

FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))

完整代码:

package com.iteblog

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}

/**
  * Created by https://www.iteblog.com on 2016/5/11.
  */
object FlinkBatch {
  class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] {
    override def generateActualKey(key: K, value: V): K =
      NullWritable.get().asInstanceOf[K]

    override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
      key.asInstanceOf[String]
  }

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]()
    val jc = new JobConf()
    FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))
    val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)
    val batch = env.fromCollection(List(("A", "1"), ("A", "2"), ("A", "3"),
      ("B", "1"), ("B", "2"), ("C", "1"), ("D", "2")))
    batch.output(format)
    env.execute("MultipleTextOutputFormat")
  }
}

运行这个程序,我们可以在hdfs:///user/iteblog/路径下看到如下的输出结果:

[iteblog@www.iteblog.com ~]$ hadoop fs -ls /user/iteblog/
Found 5 items
-rw-r--r--   3 iteblog supergroup          6 2016-05-11 19:05 /user/iteblog/A
-rw-r--r--   3 iteblog supergroup          4 2016-05-11 19:05 /user/iteblog/B
-rw-r--r--   3 iteblog supergroup          2 2016-05-11 19:05 /user/iteblog/C
-rw-r--r--   3 iteblog supergroup          2 2016-05-11 19:05 /user/iteblog/D
-rw-r--r--   3 iteblog supergroup          0 2016-05-11 19:05 /user/iteblog/_SUCCESS

可以看出已经根据key的不一样将数据输入到相应的文件了。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Flink batch模式多路文件输出(MultipleTextOutputFormat)】(https://www.iteblog.com/archives/1667.html)
喜欢 (3)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!