昨天我提到了如何在《Flink Streaming中实现多路文件输出(MultipleTextOutputFormat)》,里面我们实现了一个MultipleTextOutputFormatSinkFunction类,其中封装了mutable.Map[String, TextOutputFormat[String]],然后根据key的不一样选择不同的TextOutputFormat从而实现了文件的多路输出。本文将介绍如何在Flink batch模式下实现文件的多路输出,这种模式下比较简单,因为Flink内部提供了相应的API支持。
首先我们先实现一个自定义的IteblogMultipleTextOutputFormat类,具体实现如下:
class IteblogMultipleTextOutputFormat[K, V]
extends MultipleTextOutputFormat[K, V] {
override def generateActualKey(key: K, value: V): K =
NullWritable.get().asInstanceOf[K]
override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
key.asInstanceOf[String]
}
我们将Key作为文件的名称,然后我们可以将这个类对象封装到HadoopOutputFormat中,如下:
val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]() val jc = new JobConf() val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)
在DataSet类中有个output的方法,它可以将数据写入到实现了org.apache.flink.api.common.io.OutputFormat接口的地方,而HadoopOutputFormat类就是实现了这个接口,所以我们可以将HadoopOutputFormat对象传进output方法中,如下:
batch.output(format)
最后我们别忘记指定存放到HDFS的什么路径:
FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))
完整代码:
package com.iteblog
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
/**
* Created by on 2016/5/11.
*/
object FlinkBatch {
class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] {
override def generateActualKey(key: K, value: V): K =
NullWritable.get().asInstanceOf[K]
override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
key.asInstanceOf[String]
}
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]()
val jc = new JobConf()
FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))
val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)
val batch = env.fromCollection(List(("A", "1"), ("A", "2"), ("A", "3"),
("B", "1"), ("B", "2"), ("C", "1"), ("D", "2")))
batch.output(format)
env.execute("MultipleTextOutputFormat")
}
}
运行这个程序,我们可以在hdfs:///user/iteblog/路径下看到如下的输出结果:
[iteblog@www.iteblog.com ~]$ hadoop fs -ls /user/iteblog/ Found 5 items -rw-r--r-- 3 iteblog supergroup 6 2016-05-11 19:05 /user/iteblog/A -rw-r--r-- 3 iteblog supergroup 4 2016-05-11 19:05 /user/iteblog/B -rw-r--r-- 3 iteblog supergroup 2 2016-05-11 19:05 /user/iteblog/C -rw-r--r-- 3 iteblog supergroup 2 2016-05-11 19:05 /user/iteblog/D -rw-r--r-- 3 iteblog supergroup 0 2016-05-11 19:05 /user/iteblog/_SUCCESS
可以看出已经根据key的不一样将数据输入到相应的文件了。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flink batch模式多路文件输出(MultipleTextOutputFormat)】(https://www.iteblog.com/archives/1667.html)


楼主问下,MultipleTextOutputFormat可以用新版本中的MultipleOutputs去实现吗?能的话怎么做呢?求解谢谢
这个不是用新版的 MultipleOutputs 实现的,你可以试试,应该也是可以实现的。
不过社区好像已经有人要做这个了 :https://issues.apache.org/jira/browse/FLINK-11737
嗷嗷 好的吧~ 感谢大佬