欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:978
  2. 浏览总数:11,958,271
  3. 评论:3937
  4. 分类目录:106 个
  5. 注册用户数:6120
  6. 最后更新:2018年12月15日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Spark读取Hbase中的数据


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

  大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:

JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));

Scala版本如下:

val myRDD= sc.parallelize(List(1,2,3))

  这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.addFile("wyp.data");
JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));

Scala版本如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
sc.addFile("spam.data")
val inFile = sc.textFile(SparkFiles.get("spam.data"))

  在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的核心的Java版本代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest",
                System.getenv("SPARK_HOME"), System.getenv("JARS"));

Configuration conf = HBaseConfiguration.create();
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf"));
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName"));

try {
        String tableName = "flight_wap_order_log";
        conf.set(TableInputFormat.INPUT_TABLE, tableName);
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        conf.set(TableInputFormat.SCAN, ScanToString);

        JavaPairRDD<ImmutableBytesWritable, Result> myRDD = 
                sc.newAPIHadoopRDD(conf,  TableInputFormat.class, 
                ImmutableBytesWritable.class, Result.class);

catch (Exception e) {
            e.printStackTrace();
}

这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:

System.out.println(myRDD.count());

本段代码需要在pom.xml文件加入以下依赖:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>0.9.1</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

Scala版如下:

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

object HBaseTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0), "HBaseTest",
      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))

    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, args(1))

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }
}

我们需要在加入如下依赖:

libraryDependencies ++= Seq(
        "org.apache.spark" % "spark-core_2.10" % "0.9.1",
        "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"
)

  在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件。
点击进入下载

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Spark读取Hbase中的数据】(https://www.iteblog.com/archives/1051.html)
喜欢 (53)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(47)个小伙伴在吐槽
  1. 忘记说了,我的邮箱是xuezhekim@gmail.com
    。。2016-06-02 11:03 回复
  2. 麻烦lz 发下代码,学习学习;本人学生,学得一头雾水
    。。2016-06-02 11:02 回复
  3. 我做过实时的关系型大数据架构和优化,正在研究用spark操作hbase的代码,敬请楼主发一份,多谢,jiaxinlin@live.com
    大数据系统应用2016-03-24 00:01 回复
    • 我这里也在做一个spark批量导入hbase、读hbase的lib, 项目地址在这里: https://github.com/TopSpoofer/hbrdd现在没有安装文档,使用文档,等项目差不多了,就会开始编写了!希望能帮到您。
      spoofer2016-03-29 22:26 回复
  4. 正在研究用spark操作hbase的代码,麻烦楼主发一份,多谢,592327707@qq.com
    lixiang4022015-03-20 08:03 回复
    • 已经发到github里面去了,到这里下载http://t.cn/RA7ZZLx
      w3970907702015-03-20 17:31 回复
  5. 楼主好,最近在看spark 读写hbase,看了你得文章深受启发,麻烦楼主发一份完整的java 和 scala 版本的代码,邮箱:xiaogang0903@gmail.com ,谢谢
    小石头2015-03-12 19:12 回复
    • 已经发到github里面去了,到这里下载http://t.cn/RA7ZZLx
      w3970907702015-03-20 17:32 回复
  6. 正在研究Spark读写HBase,希望可以学习下你的代码,麻烦楼主发送到邮箱:elite808@163.com,谢谢~
    yoyo2015-03-09 18:01 回复
    • 已经发到github里面去了,到这里下载http://t.cn/RA7ZZLx
      w3970907702015-03-20 17:32 回复
  7. 邮箱:529564995@qq.com 麻烦楼主发一份spark 操作hbase java 和scala 两个版本的代码,不胜感激~
    jsonchen2015-02-28 13:37 回复
  8. 邮箱:529564995@qq.com 麻烦楼主发一份spark 操作hbase的完整代码,真心感谢~
    jsonchen2015-02-28 13:33 回复
  9. 514608402@qq.com 麻烦楼主发一份!
    demon2015-02-03 22:53 回复
  10. 正在研究Spark HBase,希望可以学习下你的代码,谢谢
    aaronhadoop2014-12-11 22:53 回复
  11. 请问博主 那个jars 指的是哪些jar System.getenv("JARS"));需要在哪里进行配置? 谢谢ps:能把代码共享到云端 这样更加方便 。。再次感谢
    thinkml2014-12-04 13:21 回复
  12. 751238116@qq.com, 发我一份 谢谢
    zz2014-11-25 10:53 回复
  13. 发一份我邮箱;497657574@qq.com
    夏季的风2014-11-08 22:07 回复
  14. 还有一个是groubykey 或者 combineBykey ,reducebykey这几个操作的时候 一直卡在shuffle运行好久,以前用的是mapreduce来实现没问题,同样的yarn模式运行,我的key大概是几千万,这样的操作如何优化。。。
    源远流长2014-11-07 10:39 回复
  15. 另外,楼主,本人菜鸟一枚,烦请问下如何让程序找到Hbase-site.xml配置文件,谢谢。87017679@qq.com 楼主求代码
    nickccpy2014-11-06 14:11 回复
  16. 317631346@qq.com 楼主求代码 谢谢
    What2014-11-05 17:52 回复
  17. 读取hbase的时候,发现guava依赖版本不一致,有冲突,楼主有无遇到过
    源远流长2014-10-27 18:11 回复
    • 统一guava版本就行了啊
      w3970907702014-10-27 19:33 回复
      • 我是将hadoop classpath ,hbase classpath加进来。。那就会将hbase的12版本搞进来
        源远流长2014-10-29 11:03 回复
        • 没关系啊,你直接把hadoop,hbase中的Guava版本统一不就可以?
          w3970907702014-10-29 13:00 回复
          • 哈哈哈
            w3970907702014-10-29 13:08
  18. kwlcx@sina.com楼主务必发一个给我, :mrgreen:
    tclcx1112014-09-26 10:47 回复
  19. oni2014-09-25 16:06 回复
  20. 谢谢分享!麻烦传一份代码,邮箱 feitian295030@126.com
    飞天2014-09-24 15:33 回复
  21. 楼主可以发一份java代码吗?zglin1988@163.com
    zhanggl2014-09-18 23:22 回复
  22. 674175742@qq.com LZ发份java的代码,谢谢啦
    oni2014-09-16 14:01 回复
  23. 345885864@qq.com LZ发份java的代码,谢谢啦
    xxx2014-09-15 17:24 回复
  24. 345885864@qq.com谢谢求代码
    xxx2014-09-15 17:08 回复
  25. 725755364@qq.com 谢谢了
    oneayfly2014-09-12 10:16 回复
  26. 邮箱: bluelf77@163.com
    bluelf772014-09-09 02:40 回复
    • 已经发送到你邮箱,请查收。
      w3970907702014-09-09 09:47 回复
  27. 也发一份spark查询Hbase的代码参考给我,多谢了
    bluelf772014-09-09 02:39 回复
  28. 想测试一下spark查询Hbase的速度有多快,能否给份代码。环境配置好了。谢谢!xiangyuwen@139.com
    xyw2014-09-01 15:42 回复
    • 已经发送到你邮箱,请查收。
      w3970907702014-09-09 09:47 回复
  29. 最近在研究spark 读写 hbase ,能给份scala代码吗?
    zhumf2014-08-29 10:15 回复
  30. 博主你好,最近在研究Spark对Hbase的操作,在Spark向Hbase中存数据时发生无法找到HMaster的异常,能给份代码参考下吗,多谢!邮箱:kylewu@163.com
    Kyle可一2014-08-13 22:43 回复
    • 已经发送到你邮箱,请查收。
      w3970907702014-09-09 09:47 回复
  31. 正在调研spark对habse的操作,能给份scala代码吗
    zhang2014-07-16 14:37 回复
    • 留下联系方式吧,否则我发不了。
      w3970907702014-08-29 12:56 回复
      • 大侠 请发 Java 和 Scala 的 代码 给我 学习学习 leeworld88@yahoo.com
        Mike2014-10-18 12:56 回复
    • http://blog.csdn.net/yuanbingze/article/details/51891222 这篇文章里边有代码,可以看看
      seagle2016-07-15 16:16 回复