欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Kafka实战:七步将RDBMS中的数据实时传输到Hadoop

  对那些想快速把数据传输到其Hadoop集群的企业来说,Kafka是一个非常合适的选择。关于什么是Kafka我就不介绍了,大家可以参见我之前的博客:《Apache kafka入门篇:工作原理简介》

  本文是面向技术人员编写的。阅读本文你将了解到我是如何通过Kafka把关系数据库管理系统(RDBMS)中的数据实时写入到Hive中,这将使得实时分析的用列得以实现。作为参考,本文使用的组件版本分别是Hive 1.2.1、 Flume 1.6 以及 Kafka 0.9。

总体解决架构

下图展示RDBMS中的事务数据如何结合Kafka、Flume以及Hive的事务特性写入到Hive表的总体解决方案。


如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

7步把RDBMS的数据实时写入Hadoop

现在我们来深入这个解决方案的细节,我将展示你如何可以通过仅仅几步就把数据实时导入到Hadoop中。

1、从RDBMS中抽取数据

  所有的关系型数据库都有一个日志文件用于记录最新的事务信息。我们流解决方案的第一步就是获取这些事务数据,并使得Hadoop可以解析这些事务格式。(关于如何解析这些事务日志,原作者并没有介绍,可能涉及到商业信息。)

2、启动Kafka Producer

  将消息发送到Kafka主题的进程成为生产者。Topic将Kafka中同类消息写入到一起。RDBMS中的事务消息将被转换到Kafka的Topic中。对于我们的例子来说,我们有一个销售团队的数据库,其中的事务信息都会被发布到Kafka的Topic中,下面步骤是启动Kafka producer的必要步骤:

$ cd /usr/hdp/2.4.0.0-169/kafka
$ bin/kafka-topics.sh --create --zookeeper www.iteblog.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions
Created topic "SalesDBTransactions".
$ bin/kafka-topics.sh --list --zookeeper www.iteblog.com:2181
SalesDBTransactions

3、设置Hive

我们在Hive中创建一个表用于接收销售团队数据库的事务信息。这例子中我们将重建一个名为customers的表:

[iteblog@sandbox ~]$ beeline -u jdbc:hive2:// -n hive -p hive
0: jdbc:hive2://> use raj;
create table customers (id string, name string, email string, street_address string, company string)
partitioned by (time string)
clustered by (id) into 5 buckets stored as orc
location '/user/iteblog/salescust'
TBLPROPERTIES ('transactional'='true');

为了在Hive中启用事务,我们需要在Hive中进行如下的配置:

hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

4、启动会一个Flume Agent用于将Kafka的数据写入到Hive

下面我们将创建一个Flume Agent,其将会把Kafka主题中的数据发送到Hive相应的表中。按照下面步骤在启动Flume agent之前设置好相关的环境变量:

$ pwd
/home/iteblog/streamingdemo
$ mkdir flume/checkpoint
$ mkdir flume/data
$ chmod 777 -R flume
$ export HIVE_HOME=/usr/hdp/current/hive-server2
$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
 
$ pwd
/home/iteblog/streamingdemo/flume
$ mkdir logs

然后创建一个log4j properties文件:

[iteblog@sandbox conf]$ vi log4j.properties

flume.root.logger=INFO,LOGFILE
flume.log.dir=/home/iteblog/streamingdemo/flume/logs
flume.log.file=flume.log

最后我们的Flume Agent配置如下:

$ vi flumetohive.conf
flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink
# Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181
flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions
flumeagent1.sources.source_from_kafka.groupID = flume
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000
 
# Hive Sink
flumeagent1.sinks.hive_sink.type = hive
flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083
flumeagent1.sinks.hive_sink.hive.database = raj
flumeagent1.sinks.hive_sink.hive.table = customers
flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M
flumeagent1.sinks.hive_sink.batchSize = 10
flumeagent1.sinks.hive_sink.serializer = DELIMITED
flumeagent1.sinks.hive_sink.serializer.delimiter = ,
flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company
# Use a channel which buffers events in memory
flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100
# Bind the source and sink to the channel
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink.channel = mem_channel

5、启动Flume Agent

使用下面的命令启动Flume Agent:

$ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf

6、启动Kafka Stream

作为示例,下面是模拟交易的信息,在实际系统中这些信息将会被数据库产生:

$ cd /usr/hdp/2.4.0.0-169/kafka
$ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions
1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company"
2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated"
3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP"
4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc."
5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd"
6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC"
7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute"
8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation"
9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated"
10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates"
11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"

7、接收Hive数据

完成上面所有步骤之后,现在你往Kafka发送数据,你将在几秒内看到数据被发送到Hive中。

本文翻译自:https://dzone.com/articles/kafka-in-action-7-steps-to-real-time-streaming-fro

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Kafka实战:七步将RDBMS中的数据实时传输到Hadoop】(https://www.iteblog.com/archives/1771.html)
喜欢 (24)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(6)个小伙伴在吐槽
  1. 就是RDBMS已经传入的数据做了update
    这样能够进行解析处理吗

    舒克丶开飞机2017-05-11 16:29 回复
    • 不管是 Insert、Update还是 Delete,都是可以解析的。

      w3970907702017-05-12 13:35 回复
      • OK .都是通过解析WAL来进行的吧 准备去尝试一下

        舒克丶开飞机2017-05-12 15:57 回复
        • Binlog里面有这些信息的,你可以去看下。

          w3970907702017-05-12 16:10 回复
  2. 😮 如果数据有update这样应该如何处理

    舒克丶开飞机2017-05-11 16:28 回复
  3. 最关键的一步竟然不说了...binlog是怎么实时写到kafka的?

    IvanLeung2016-10-12 10:48 回复