欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:961
  2. 浏览总数:11,484,721
  3. 评论:3873
  4. 分类目录:103 个
  5. 注册用户数:5843
  6. 最后更新:2018年10月17日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

通过Flink将数据以压缩的格式写入HDFS

Flink中我们可以很容易的使用内置的API来读取HDFS上的压缩文件,内置支持的压缩格式包括.deflate,.gz, .gzip,.bz2以及.xz等。


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

但是如果我们想使用Flink内置sink API将数据以压缩的格式写入到HDFS上,好像并没有找到有API直接支持(如果不是这样的,欢迎留言纠正)。本文将介绍如何将数据以gz压缩格式将处理后的数据写入到HDFS上。主要实现代码如下:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat}
imoprt org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat

val benv = ExecutionEnvironment.getExecutionEnvironment
val input = benv.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)

val text = input map { _._2.toString }
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .groupBy(0)
  .sum(1)

val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }

val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
  new TextOutputFormat[Text, LongWritable], new JobConf)
val c = classOf[org.apache.hadoop.io.compress.GzipCodec]
hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
hadoopOutputFormat.getJobConf.setCompressMapOutput(true)
hadoopOutputFormat.getJobConf.set("mapred.output.compress", "true")
hadoopOutputFormat.getJobConf.setMapOutputCompressorClass(c)
hadoopOutputFormat.getJobConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopOutputFormat.getJobConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)

FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new Path("/tmp/iteblog/"))

words.output(hadoopOutputFormat)
benv.execute("Hadoop Compat WordCount")

关键就是上面高亮的几行代码,然后运行上面的程序,即可以gz压缩方式将最后处理完的数据写入到HDFS上。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【通过Flink将数据以压缩的格式写入HDFS】(https://www.iteblog.com/archives/1875.html)
喜欢 (3)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!