有时候我们需要根据记录的类别分别写到不同的文件中去,正如本博客的 《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)》《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(二)》以及《Spark多文件输出(MultipleOutputFormat)》等文章提到的类似。那么如何在Flink Streaming实现类似于《Spark多文件输出(MultipleOutputFormat)》文章中提到的功能呢?很遗憾,Flink内置并不提供相应的API接口来实现这种功能,我们需要自己实现多路文件输出。
在Flink Streaming的DataStream类中可以发现,DataStream的print函数、printToErr函数以及writeAsText函数都是封装了一个称为Sink的对象;而这些Sink都是实现了org.apache.flink.streaming.api.functions.sink.RichSinkFunction或者org.apache.flink.streaming.api.functions.sink.SinkFunction接口的,所以我们也可以自己实现上述两个接口从而达到文件的多路输出功能。
我们还发现,Flink中有一个org.apache.flink.api.java.io.TextOutputFormat类,此类通过调用FSDataOutputStream对象将记录写入到HDFS(当然也可以是其他Hadoop支持的文件系统),所以我们可以封装TextOutputFormat,然后根据record类别的不一样创建不一样的TextOutputFormat对象,从而实现文件的多路输出,根据上面的思路我实现了一个名为MultipleTextOutputFormatSinkFunction类,具体实现如下:
package com.iteblog
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful
import org.apache.flink.api.java.io.TextOutputFormat
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.slf4j.LoggerFactory
import scala.collection.mutable
/**
* Created by on 2016/5/9.
*/
class MultipleTextOutputFormatSinkFunction[IN](descPath: String) extends RichSinkFunction[IN] {
val map = mutable.Map[String, TextOutputFormat[String]]()
var cleanupCalled = false
val LOG = LoggerFactory.getLogger(classOf[MultipleTextOutputFormatSinkFunction[_]])
var parameters: Configuration = null;
override def open(parameters: Configuration) {
this.parameters = parameters
}
override def invoke(item: IN): Unit = {
val tuple = item.asInstanceOf[(String, String)]
val key = tuple._1
val value = tuple._2
val result = map.get(key)
val format = if (result.isDefined) {
result.get
} else {
val textOutputFormat = new TextOutputFormat[String](new Path(descPath, key))
textOutputFormat.configure(parameters)
val context: RuntimeContext = getRuntimeContext
val indexInSubtaskGroup: Int = context.getIndexOfThisSubtask
val currentNumberOfSubtasks: Int = context.getNumberOfParallelSubtasks
textOutputFormat.open(indexInSubtaskGroup, currentNumberOfSubtasks)
map.put(key, textOutputFormat)
textOutputFormat
}
try {
format.writeRecord(value)
}
catch {
case ex: Exception => {
cleanup()
throw ex
}
}
}
override def close() {
try {
map.foreach(_._2.close())
} catch {
case ex: Exception => {
cleanup()
throw ex
}
} finally {
map.clear()
}
}
private def cleanup() {
try {
if (!cleanupCalled) {
cleanupCalled = true
map.foreach(item => item._2.asInstanceOf[CleanupWhenUnsuccessful].tryCleanupOnError())
}
}
catch {
case t: Throwable => {
LOG.error("Cleanup on error failed.", t)
}
}
}
}
MultipleTextOutputFormatSinkFunction类实现了org.apache.flink.streaming.api.functions.sink.RichSinkFunction接口,并实现了def invoke(item: IN): Unit方法;在里面我们根据记录的key值创建不同的TextOutputFormat,然后缓存到mutable.Map[String, TextOutputFormat[String]]中,以便下次可以直接根据key值获取。那么如何在Flink Streaming中使用呢?如下操作:
val stream = env.addSource(new FlinkKafkaConsumer08[String]("iteblog", new SimpleStringSchema(), properties))
stream.map{
//做一些业务逻辑操作
}.addSink(new MultipleTextOutputFormatSinkFunction[(String, String)]("hdfs:///user/iteblog/outputs/"))
env.execute("FlinkKafkaStreaming")
运行这个Streaming程序,我们可以在hdfs:///user/iteblog/outputs/路径下看到产生了很多文件,如下:
[iteblog@www.iteblog.com ~]$ hadoop fs -ls -h /user/iteblog/outputs/ -rw-r--r-- 3 iteblog supergroup 1.7 M 2016-05-10 14:57 /user/iteblog/outputs/A -rw-r--r-- 3 iteblog supergroup 2.5 M 2016-05-10 14:57 /user/iteblog/outputs/B -rw-r--r-- 3 iteblog supergroup 1.9 M 2016-05-10 14:57 /user/iteblog/outputs/C -rw-r--r-- 3 iteblog supergroup 3.1 M 2016-05-10 14:57 /user/iteblog/outputs/D
可以看到,已经根据记录的类型写入到不同的文件中了。
但是有几点需要注意:
1、这个MultipleTextOutputFormatSinkFunction只有当文件的大小达到了HDFS的块大小才能看到文件的大小,否则你看到的文件大小会一直为0,这是因为TextOutputFormat类就是这么实现的(我们应该可以对其进行扩展,当写到一定batch数量时,对文件进行刷新);有人可能会问,为什么你上面的文件不是块大小就显示了?那是因为我已经关掉了这个Flink Streaming程序,所以记录都写入到各个文件中了;
2、更好的做法应该是扩展org.apache.flink.api.common.io.FileOutputFormat类(TextOutputFormat类就是扩展这个类的),然后可以这么使用:
val mtof = new MultipleTextOutputFormat[(String,String)](new Path(bashPath)) stream.writeUsingOutputFormat(mtof)
上面的做法很类似于Hadoop中的MultipleTextOutputFormat,由于时间和精力有限,所以就不介绍如何在Flink中实现MultipleTextOutputFormat了;
3、如果你使用Flink Batch模式,实现一个MultipleTextOutputFormat应该很容易,因为我们可以直接使用Hadoop中的MultipleTextOutputFormat,具体如何使用我将在后面的文章中进行介绍;
4、如果你有更好的想法,欢迎分享。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flink Streaming中实现多路文件输出(MultipleTextOutputFormat)】(https://www.iteblog.com/archives/1662.html)


我使用stream.addSink(new MultipleTextOutputFormatSinkFunction[(String, String)]("/user/lian"))时提示类型不匹配是什么情况?大神
你的stream是啥类型的?一定要是存储(String, String)类型的
已解决,谢谢!我也是今天早上发现的这个问题。 🙂
还有一个问题就是在写入的时候报错 路径已存在:
textOutputFormat.open(indexInSubtaskGroup, currentNumberOfSubtasks)