欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Apache CarbonData快速入门编程指南

  CarbonData是由华为开发、开源并支持Apache Hadoop的列式存储文件格式,支持索引、压缩以及解编码等,其目的是为了实现同一份数据达到多种需求,而且能够实现更快的交互查询。目前该项目正处于Apache孵化过程中。详情参见《CarbonData:华为开发并支持Hadoop的列式文件格式》,本文是单机模式下使用CarbonData的,如果你需要集群模式下使用请参见《Apache CarbonData集群模式使用指南》

编译CarbonData

  编译CarbonData之前,我们必须确保好实现安装好了thrift(如何安装看这里:CentOS上编译安装Apache Thrift),否则会出现以下的异常:

[ERROR] thrift failed output: 
[ERROR] thrift failed error: /bin/sh: thrift: command not found

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] carbondata ......................................... SUCCESS [01:59 min]
[INFO] carbon-common ...................................... SUCCESS [ 41.227 s]
[INFO] carbon-format ...................................... FAILURE [  8.875 s]
[INFO] carbon-core ........................................ SKIPPED
[INFO] carbon-processing .................................. SKIPPED
[INFO] carbon-hadoop ...................................... SKIPPED
[INFO] carbon-spark ....................................... SKIPPED
[INFO] carbon-assembly .................................... SKIPPED
[INFO] carbon-examples .................................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:50 min
[INFO] Finished at: 2016-06-30T17:30:43+08:00
[INFO] Final Memory: 25M/224M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.thrift.tools:maven-thrift-plugin:0.1.11:
compile (generate-thrift-java) on project carbon-format: thrift did not exit cleanly. 
Review output for more information. -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.

默认情况下CarbonData是使用Spark 1.5.2,Hadoop 2.2.0的,最新版的CarbonData支持Spark 1.6.1,这里以Spark 1.6.1版本为例进行介绍。我们依次进行如下操作:

$ git clone https://github.com/apache/incubator-carbondata.git carbondata
$ cd carbondata
$ mvn -Pspark-1.6.1 clean install -DskipTests
$ cp assembly/target/scala-2.10/carbondata_*.jar ${SPARK_HOME}/lib
$ mkdir ${SPARK_HOME}/carbondata
$ cp -r processing/carbonplugins ${SPARK_HOME}/carbondata

到这里我们已经编译好CarbonData了,而且把编译好的相关lib包添加到${SPARK_HOME}/lib中。然后我们就可以在Spark中(不可以直接在Hive中使用吗?)使用CarbonData了。

启动Spark shell

$ cd ${SPARK_HOME}
$ carbondata_jar=./lib/$(ls -1 lib |grep "^carbondata_.*\.jar$")
$ mysql_jar=./lib/$(ls -1 lib |grep "^mysql.*\.jar$")
$ ./bin/spark-shell --master local --jars ${carbondata_jar},${mysql_jar}

创建CarbonContext实例

/**
 * User: 过往记忆
 * Date: 2016年7月01日
 * Time: 下午21:16
 * bolg: 
 * 本文地址:/archives/1698
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

import org.apache.spark.sql.CarbonContext
import java.io.File
import org.apache.hadoop.hive.conf.HiveConf
val storePath = "/user/iteblog/store/"
val cc = new CarbonContext(sc, storePath)
cc.setConf("carbon.kettle.home","./carbondata/carbonplugins")
cc.setConf("hive.metastore.warehouse.dir", "/user/iteblog/metadata/")
cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")

CarbonContext的创建接收两个参数,SparkContext以及storePathstorePath参数用于指定创建好的表数据存放的目录,此目录可以是本地或者HDFS上的目录。


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

创建表

现在我们已经创建好CarbonContext实例了,可以使用它创建表:

cc.sql("create table if not exists iteblog (id string, hash string) STORED BY 'org.apache.carbondata.format'")

运行完上面语句之后,我们可以看到在Hive的default库里面有了一个表,我们来看看他的建表语句:

hive> show create table iteblog;
OK
CREATE EXTERNAL TABLE `iteblog`(
  `col` array<string> COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe' 
WITH SERDEPROPERTIES ( 
  'tableName'='default.iteblog', 
  'tablePath'='hdfs:///user/iteblog/store/default/iteblog') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.SequenceFileInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
LOCATION
  'hdfs://itebloghadoop/user/iteblog/hive/warehouse/iteblog'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='false', 
  'numFiles'='0', 
  'numRows'='-1', 
  'rawDataSize'='-1', 
  'spark.sql.sources.provider'='org.apache.spark.sql.CarbonSource', 
  'totalSize'='0', 
  'transient_lastDdlTime'='1467361064')
Time taken: 0.05 seconds, Fetched: 21 row(s)

不科学啊,怎么只有col的字段呢?我们明明是创建了两个字段id和hash,这两个字段哪里去了?别急,上面不是说了创建CarbonContext实例的时候需要指定storePath参数吗?这里面就存放了刚刚创建好表的元数据信息:

drwxr-xr-x   - iteblog iteblog   0 2016-07-01 21:00 /user/iteblog/store/default
drwxr-xr-x   - iteblog iteblog   0 2016-07-01 21:00 /user/iteblog/store/default/iteblog
drwxr-xr-x   - iteblog iteblog   0 2016-07-01 21:00 /user/iteblog/store/default/iteblog/Metadata
-rw-r--r--   3 iteblog iteblog 597 2016-07-01 21:00 /user/iteblog/store/default/iteblog/Metadata/schema

表的元信息是存放在/user/iteblog/store/default/iteblog/Metadata/schema文件里面的。

加载数据

好了,表创建好了让我们来load点数据进去吧,我准备好了类似于以下的数据(名称为iteblog):

1802202095      -9223347229018688133
1805433788      -9223224306642795473
1807808238      -9223191974382569971
1803505412      -9222950928798855459
1803603535      -9222783416682807621
1808506900      -9222758602401798041
1805531330      -9222636742915245241
1807853373      -9222324670859328253

第一列对应id字段;第二列对应hash字段;他们之间是使用tab分割的。来把数据load到iteblog里面去:

cc.sql(s"load data  inpath '/tmp/iteblog' into table iteblog")
org.carbondata.processing.etl.DataLoadingException: please check your input 
path and make sure that files end with '.csv' and content is not empty.

遗憾的是,居然说文件名不是.csv后缀的。所以CarbonData需要你加载进去的文件名后缀为.csv(这个为什么?)。我们把文件名修改成iteblog.csv,然后再load:

cc.sql(s"load data  inpath '/tmp/iteblog.csv' into table iteblog")
org.carbondata.processing.etl.DataLoadingException: CSV File provided is not proper. 
Column names in schema and csv header are not same. CSVFile Name : iteblog.csv
  at org.carbondata.processing.csvload.DataGraphExecuter.validateCSV(DataGraphExecuter.java:147)
  at org.carbondata.processing.csvload.DataGraphExecuter.validateCSVFiles(DataGraphExecuter.java:552)
  at org.carbondata.processing.csvload.DataGraphExecuter.executeGraph(DataGraphExecuter.java:166)
  at org.carbondata.spark.load.CarbonLoaderUtil.executeGraph(CarbonLoaderUtil.java:189)
  at org.carbondata.spark.rdd.CarbonDataLoadRDD$$anon$1.<init>(CarbonDataLoadRDD.scala:189)
  at org.carbondata.spark.rdd.CarbonDataLoadRDD.compute(CarbonDataLoadRDD.scala:148)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:744)

居然又出错了!!!意思大概就是你文件里面没有指定csv头信息。这里有两种方法来解决这个问题。

1、直接在数据文件里面加上

id      hash
1802202095      -9223347229018688133
1805433788      -9223224306642795473
1807808238      -9223191974382569971
1803505412      -9222950928798855459
1803603535      -9222783416682807621
1808506900      -9222758602401798041
1805531330      -9222636742915245241
1807853373      -9222324670859328253

2、我们在load语句里面加上header信息

/**
 * User: 过往记忆
 * Date: 2016年7月01日
 * Time: 下午21:16
 * bolg: 
 * 本文地址:/archives/1698
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
 
cc.sql(s"load data local inpath '/tmp/iteblog.csv' into table iteblog  options('FILEHEADER'='id,hash')")

我们再运行,发现还是出错!这是因为CarbonData加载文件的时候字段之间的分隔符默认是英文逗号(,),而我们这里字段字段之间是制表符分割的,当然不行了,我们可以在load的时候加上DELIMITER属性,如下:

cc.sql(s"load data local inpath '/tmp/iteblog.csv' into table iteblog  
options('DELIMITER'='\t', 'FILEHEADER'='id,hash')")

这次终于没问题了!运行完上面的加载命令之后,我们可以看到/user/iteblog/store目录下产生了如下的数据文件:

drwxr-xr-x  /user/iteblog/store/default/iteblog
drwxr-xr-x  /user/iteblog/store/default/iteblog/Fact
drwxr-xr-x  /user/iteblog/store/default/iteblog/Fact/Part0
drwxr-xr-x  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/0-1467362068000.carbonindex
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/1-1467362068000.carbonindex
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/2-1467362068000.carbonindex
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/3-1467362068000.carbonindex
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/4-1467362068000.carbonindex
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-0-1467362068000.carbondata
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-1-1467362068000.carbondata
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-2-1467362068000.carbondata
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-3-1467362068000.carbondata
-rw-r--r--  /user/iteblog/store/default/iteblog/Fact/Part0/Segment_0/part-0-4-1467362068000.carbondata

其中*.carbonindex文件是CarbonData文件的索引;而*.carbondata才是真正的数据,当然里面的数据肯定已经编码好了。

  需要注意的是,目前CarbonData虽然也支持load data local inpath 'xxx' into table;语法,但是其含义和load data inpath 'xxx' into table;一致,其目的是为了和Hive语法相兼容。

查数

数据已经加载进iteblog表里面了,现在我们可以查询里面的数据了,如下:

scala> cc.sql("select * from iteblog").show
+----------+--------------------+
|        id|                hash|
+----------+--------------------+
|1761060630| 1507780651275746626|
|1777010203|-6420079594732250962|
|1777080884|-3720484624594970761|
|1777080885| 6953598621328551083|
|1794379845| 4443483533807209950|
|1794419628|-3898139641996026768|
|1794522657| 5721419051907524948|
|1796358316|-3848539843796297096|
|1796361951| 2673643446784761880|
|1796363022| 7081835511530066760|
|1797689090| 7687516489507000693|
|1798032763| 8729543868018607114|
|1798032765|-2073004072970288002|
|1798933651| 4359618602910211713|
|1799173523| 3862252443910602052|
|1799555536|-2751011281327328990|
|1799569121| 1024477364392595538|
|1799608637| 4403346642796222437|
|1799745227|-2719506638749624471|
|1799859723| 5552725300397463239|
+----------+--------------------+
only showing top 20 rows

scala> cc.sql("select count(*) from iteblog").show
+-------+
|    _c0|
+-------+
|7230338|
+-------+

scala> cc.sql("select count(distinct id) from iteblog").show
+-------+
|    _c0|
+-------+
|6031231|
+-------+

更多高级的功能请参见Carbondata官方文档:https://github.com/apache/incubator-carbondata/blob/master/docs/Quick-Start.md

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache CarbonData快速入门编程指南】(https://www.iteblog.com/archives/1698.html)
喜欢 (6)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(3)个小伙伴在吐槽
  1. 博主 我按照你上面的步骤操作最后的dataFrame里是空的 也不报错 本地路径下也有文件 里面也有内容 请问是什么原因?
    以下是我操作的代码
    val path="/home/wg/store"
    val cc = new CarbonContext(sc,path)
    cc.setConf("carbon.kettle.home","./carbondata/carbonplugins")
    cc.setConf("hive.metastore.warehouse.dir", "/home/hive/metadata/")
    cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")
    cc.sql("create table if not exists iteblog1 (id string, hash string) STORED BY 'org.apache.carbondata.format'")
    val df=cc.sql(s"load data inpath '/home/wg/data.csv' into table iteblog1 options('DELIMITER'=',', 'FILEHEADER'='id,hash')")
    hive下的表路径
    'tableName'='default.iteblog1',
    'tablePath'='/home/wg/store/default/iteblog1

    清云2016-09-29 09:32 回复
    • 你执行load data inpath这句的时候在/home/wg/store/defalut/iteblog1目录下看到有数据吗?

      w3970907702016-09-29 09:43 回复
      • 有的 生成了一个 0-1475113051000.carbonindex和一个part-0-0-1475113051000.carbondata文件 大小几KB

        清云2016-09-29 10:07 回复