我们在《Apache CarbonData快速入门编程指南》文章中介绍了如何快速使用Apache CarbonData,为了简单起见,我们展示了如何在单机模式下使用Apache CarbonData。但是生产环境下一般都是使用集群模式,本文主要介绍如何在集群模式下使用Apache CarbonData。
启动Spark shell
这里以Spark shell模式进行介绍,master为yarn-client,启动Spark shell如下:
[iteblog@www.iteblog.com ~]$ cd ${SPARK_HOME}
[iteblog@www.iteblog.com ~]$ carbondata_jar=./lib/$(ls -1 lib |grep "^carbondata_.*\.jar$")
[iteblog@www.iteblog.com ~]$ mysql_jar=./lib/$(ls -1 lib |grep "^mysql.*\.jar$")
[iteblog@www.iteblog.com ~]$ ./bin/spark-shell --master yarn-client \
--jars ${carbondata_jar},${mysql_jar} \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 5G \
--queue iteblog
上面命令将会以Client模式启动shell。
创建CarbonContext实例
启动完Spark Shell之后,接下来就是来初始化CarbonContext实例了,这个和《Apache CarbonData快速入门编程指南》里面类似:
/**
* User: 过往记忆
* Date: 2016年07月07日
* Time: 下午20:49
* bolg:
* 本文地址:/archives/1703
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
import org.apache.spark.sql.CarbonContext
import org.apache.hadoop.hive.conf.HiveConf
val storePath = "/user/iteblog/store/"
val cc = new CarbonContext(sc, storePath)
cc.setConf("carbon.kettle.home","./carbondata/carbonplugins")
cc.setConf("hive.metastore.warehouse.dir", "/user/iteblog/metadata/")
cc.setConf(HiveConf.ConfVars.HIVECHECKFILEFORMAT.varname, "false")
创建表
现在我们已经创建好CarbonContext实例了,可以使用它创建表:
cc.sql("create table if not exists iteblog (id string, hash string) STORED BY 'org.apache.carbondata.format'")
加载数据
/**
* User: 过往记忆
* Date: 2016年07月07日
* Time: 下午20:49
* bolg:
* 本文地址:/archives/1703
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
*/
cc.sql(s"load data inpath 'hdfs:///tmp/iteblog.csv' into table iteblog options('DELIMITER'='\t')")
我们发现数据加载出错了,可以到其中一个节点的stderr日志里面看到如下的异常信息:
16/07/07 20:38:18 ERROR graphgenerator.GraphGenerator: [Executor task launch worker-0][partitionID:default_iteblog_ace3f131-836f-4b27-b198-f636fbc4e53b] org.pentaho.di.core.exception.KettleException: Unable to read file './carbondata/carbonplugins/.kettle/kettle.properties' ./carbondata/carbonplugins/.kettle/kettle.properties (No such file or directory) at org.pentaho.di.core.util.EnvUtil.readProperties(EnvUtil.java:65) at org.pentaho.di.core.util.EnvUtil.environmentInit(EnvUtil.java:95) at org.carbondata.processing.graphgenerator.GraphGenerator. validateAndInitialiseKettelEngine(GraphGenerator.java:302) at org.carbondata.processing.graphgenerator.GraphGenerator.generateGraph(GraphGenerator.java:277) at org.carbondata.spark.load.CarbonLoaderUtil.generateGraph(CarbonLoaderUtil.java:130) at org.carbondata.spark.load.CarbonLoaderUtil.executeGraph(CarbonLoaderUtil.java:186) at org.carbondata.spark.rdd.CarbonDataLoadRDD$$anon$1.<init>(CarbonDataLoadRDD.scala:189) at org.carbondata.spark.rdd.CarbonDataLoadRDD.compute(CarbonDataLoadRDD.scala:148) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.io.FileNotFoundException: ./carbondata/carbonplugins/.kettle/kettle.properties (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:146) at java.io.FileInputStream.<init>(FileInputStream.java:101) at org.pentaho.di.core.util.EnvUtil.readProperties(EnvUtil.java:60) ... 15 more
很明显是没有找到./carbondata/carbonplugins/.kettle/kettle.properties文件,因为我们目前只在启动Spark Shell的那台机器上部署好了Carbondata,而Carbondata的计算依赖于kettle,所以我们需要把kettle相关的依赖加载到所有参与计算的节点。这里有以下两种方法可以解决这个问题。
1、我们很容易想到的就是将./carbondata/carbonplugins/文件里面的所有内容全部复制到Hadoop集群的各个节点的某一目录下(比如/user/iteblog/carbondata/carbonplugins),然后修改carbon.kettle.home如下:
cc.setConf("carbon.kettle.home","/user/iteblog/carbondata/carbonplugins")
其余代码不变,这个问题即可解决。
2、但是如果我们没有Hadoop集群各个节点的登陆权限,也就是说我们无法手动到各个节点部署好carbonplugins,这咋办呢?我们可以在启动Spark Shell的时候加载carbonplugins插件,如下:
[iteblog@www.iteblog.com ~]$ ./bin/spark-shell --master yarn-client \
--jars ${carbondata_jar},${mysql_jar},carbondata.tar.gz \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 5G \
--queue iteblog
carbondata.tar.gz里面已经打包好了所有的插件信息。然后我们上面的代码不需要改变,这个问题也可以解决。
查数
数据已经加载进iteblog表里面了,现在我们可以查询里面的数据了,如下:
scala> cc.sql("select * from iteblog").show
+----------+--------------------+
| id| hash|
+----------+--------------------+
|1761060630| 1507780651275746626|
|1777010203|-6420079594732250962|
|1777080884|-3720484624594970761|
|1777080885| 6953598621328551083|
|1794379845| 4443483533807209950|
|1794419628|-3898139641996026768|
|1794522657| 5721419051907524948|
|1796358316|-3848539843796297096|
|1796361951| 2673643446784761880|
|1796363022| 7081835511530066760|
|1797689090| 7687516489507000693|
|1798032763| 8729543868018607114|
|1798032765|-2073004072970288002|
|1798933651| 4359618602910211713|
|1799173523| 3862252443910602052|
|1799555536|-2751011281327328990|
|1799569121| 1024477364392595538|
|1799608637| 4403346642796222437|
|1799745227|-2719506638749624471|
|1799859723| 5552725300397463239|
+----------+--------------------+
only showing top 20 rows
scala> cc.sql("select count(*) from iteblog").show
+-------+
| _c0|
+-------+
|7230338|
+-------+
scala> cc.sql("select count(distinct id) from iteblog").show
+-------+
| _c0|
+-------+
|6031231|
+-------+
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache CarbonData集群模式使用指南】(https://www.iteblog.com/archives/1703.html)


fasfsdf sd十多个