欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

使用Spark读取HBase中的数据

  在《Spark读取Hbase中的数据》文章中我介绍了如何在Spark中读取Hbase中的数据,并提供了Java和Scala两个版本的实现,本文将接着上文介绍如何通过Spark将计算好的数据存储到Hbase中。

  Spark中内置提供了两个方法可以将数据写入到Hbase:(1)、saveAsHadoopDataset;(2)、saveAsNewAPIHadoopDataset,它们的官方介绍分别如下:
  saveAsHadoopDataset: Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system. The JobConf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.
  saveAsNewAPIHadoopDataset: Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop Configuration object for that storage system. The Conf should set an OutputFormat and any output paths required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop MapReduce job.

可以看出这两个API分别是针对mapredmapreduce实现的,本文将提供这两个版本的实现实例代码。在编写代码之前我们先在pom.xml文件中引入一下的依赖:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>0.9.1</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

saveAsHadoopDataset

package com.iteblog.bigdata.hbase

import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 2016-11-29
 Time: 22:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1892
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

object SparkToHBase {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkToHBase <input file>")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("SparkToHBase")
    val sc = new SparkContext(conf)

    val input = sc.textFile(args(0))

    //创建HBase配置
    val hConf = HBaseConfiguration.create()
    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")

    //创建JobConf,设置输出格式和表名
    val jobConf = new JobConf(hConf, this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")

    val data = input.map { item =>
      val Array(key, value) = item.split("\t")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, put)
    }
    //保存到HBase表
    data.saveAsHadoopDataset(jobConf)
    sc.stop()
  }
}

我们输入的数据格式是:

0015A49A8F2A60DACEE0160545C58F94    1234
0152C9666B5F3426DDDB5FB74BDBCE4F    4366
0160D90AC268AEB595208E8448C7F8B8    6577
0225A39EB29BDB582CA58BE86791ACBC    1234
02462ACDF7232C49890B07D63B50C5E1    4366
030730EBE05740C992840525E35BC8AD    7577
038A459BC05F3B655F5655C810E76352    7577
0417D3FD71458C4BAD1E5AFDE7259930    7577
042CD42B657C46D0D4E5CC69AFDD7E54    7577
051069378849ACF97BFAD09D3A9C7702    7577
05E833C9C763A98323E0328DA0A31039    7577
060E206514A24D944305D370F615F8E9    7577
087E8488796C29E1C8239565666CE2D7    7577
09A425F1DD240A7150ECEFAA0BFF25FA    7577
0B27E3CB5F3F32EB3715DB8E2D333BED    7577
0B27E82A4CEE73BBB98438DFB0DB2FFE    7577
0BAEEB7A12DCEF20EE26D7A030164DFF    7577
0C5BFC45F64907A61ECB1C892F98525C    7577
0C74F2FFD1BB3598BC8DB10C37DBA6B4    7577
0C9CEE40DDD961C7D2BBE0491FDF92A8    7577
0CC578371622F932287EB81065F81F5F    7577
0D6B03EFDAE7165A0F7CC79EABEAC0D3    7577
0DF7B014187A9AB2F1049781592CC053    7577
0E67D8ABDB3749D58207A7B45FEA7F12    7577
0E866677E79A7843E0EDCF2BE0141911    7577
0EAF4A69BA3BF05E8EA75CC1287304A3    7577
0EE2969AE674DF5F8944B5EA2E97DBEC    7577
0FAA253D53BC6D831CF6E742147C3BED    7577
0FB92AC3DE664BFF40D334DA8EE97B85    7577

第一列将作为HBase的Rowkey存储,第二列就是info的值。

saveAsNewAPIHadoopDataset

package com.iteblog.bigdata.hbase

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkContext, SparkConf}

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 2016-11-29
 Time: 22:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1892
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
object SparkToHBaseNew {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkToHBaseNew <input file>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("SparkToHBaseNew")
    val sc = new SparkContext(conf)
    val input = sc.textFile(args(0))
    val hConf = HBaseConfiguration.create()
    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")
    val jobConf = new JobConf(hConf, this.getClass)
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")
    //设置job的输出格式
    val job = Job.getInstance(jobConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    val data = input.map { item =>
      val Array(key, value) = item.split("\t")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, put)
    }
    //保存到HBase表
    data.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()
  }
}

这个方法和第一种几乎一样,大家可以根据自己的情况选择使用其中一个。不过上面将Spark中的数据写入到Hbase还是有点啰嗦,后面我将单独再介绍如何将RDD中的数据直接写入到hbase中,类似于saveToHbase,欢迎大家关注。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【使用Spark读取HBase中的数据】(https://www.iteblog.com/archives/1892.html)
喜欢 (29)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(1)个小伙伴在吐槽
  1. https://github.com/TopSpoofer/hbrdd 这个封装了

    spoofer2016-12-06 09:05 回复