欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:978
  2. 浏览总数:11,981,412
  3. 评论:3939
  4. 分类目录:106 个
  5. 注册用户数:6130
  6. 最后更新:2018年12月15日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Apache Kafka编程入门指南:设置分区数和复制因子

  在前面的例子(《Apache Kafka编程入门指南:Producer篇》)中,我们学习了如何编写简单的Kafka Producer程序。在那个例子中,在如果需要发送的topic不存在,Producer将会创建它。我们都知道(假设你知道),每个topic都是有分区数和复制因子的,但是我们无法通过Producer相关的API设定分区数和复制因子的,因为Producer相关API创建topic的是通过读取server.properties文件中的num.partitionsdefault.replication.factor的。那么是否就意味着咱们无法在程序里面定义topic的分区数和复制因子呢?答案是否,否则我也没必要写这篇文章了!

我们可以通过Kafka提供的AdminUtils.createTopic函数来创建topic,它的函数原型如下:

def createTopic(zkClient: ZkClient, 
      topic: String,
      partitions: Int,   
      replicationFactor: Int,  
      topicConfig: Properties = new Properties)

  这个函数是没有返回值的。从上面的参数列表我们可以看出,partitions和replicationFactor参数就是上面说到的分区数和复制因子,所以我们可以通过这个参数来创建topic。在使用createTopic函数之前,我们需要创建zkClient对象,它里面封装了操作Zookeeper的相关API。而这个API不是Kafka内置的,所以我们需要先引入这个依赖:

<dependency>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
      <version>0.3</version>
</dependency>

然后我们就可以创建ZkClient对象了:

val zk = "www.iteblog.com:2181"
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000

val zkClient = new ZkClient(zk, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)

需要特别主要的是,我们必须指定ZKStringSerializer对象,否则运行完代码之后,你可以看到zookeeper里面已经创建了相关topic,而且你list的时候也可以看到你创建的topic,但是当你往这个topic里面发送消息的是,你会得到以下的异常:

[2016-02-05 16:45:52,335] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: iteblog (kafka.producer.async.DefaultEventHandler)
[2016-02-05 16:45:52,441] WARN Error while fetching metadata [{TopicMetadata for topic iteblog -> 
No partition metadata for topic flight due to kafka.common.LeaderNotAvailableException}] for topic [flight]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-02-05 16:45:52,441] ERROR Failed to send requests for topics flight with correlation ids in [41,48] (kafka.producer.async.DefaultEventHandler)
[2016-02-05 16:45:52,441] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
	at scala.collection.immutable.Stream.foreach(Stream.scala:547)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

找到topic的Leader,所以无法发送消息。这是因为如果你不使用ZKStringSerializer对象,那么只会在Zookeeper里面创建topic的相关信息,但是kafka并没有创建这个主题!现在我们就可以使用AdminUtils.createTopic参见topic了:

val topic = "iteblog"
val replicationFactor = 1
val numPartitions = 2

AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)

如果topic存在,那么程序将会报错:

Exception in thread "main" kafka.common.TopicExistsException: Topic "iteblog" already exists.
	at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
	at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
	at com.iteblog.kafka.IteblogProducerV3$.main(IteblogProducerV3.scala:46)
	at com.iteblog.kafka.IteblogProducerV3.main(IteblogProducerV3.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

否则什么都没输出则代表你的topic创建成功了!完整代码片段如下:

package com.iteblog.kafka

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

/**
 * User: 过往记忆
 * Date: 2016-02-06
 * Time: 下午04:21
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1581
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

object CreateTopic {
  def main(args: Array[String]) {
    val zk = "www.iteblog.com:2181"
    val sessionTimeoutMs = 10000
    val connectionTimeoutMs = 10000

    val zkClient = new ZkClient(zk, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)

    val topic = "iteblog"
    val replicationFactor = 1
    val numPartitions = 2

    AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)
  }
}
  除了使用上面AdminUtils.createTopic在创建主题的时候设置复制因子和分区数,我们还可以使用kafka.admin.TopicCommand来实现同样的功能,如下:

val arguments = Array("--create", "--zookeeper", zk, "--replication-factor", "2", "--partition", "2", "--topic", "iteblog")
TopicCommand.main(arguments)
本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Apache Kafka编程入门指南:设置分区数和复制因子】(https://www.iteblog.com/archives/1581.html)
喜欢 (6)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!