欢迎关注Hadoop、Spark、Flink、Hive、Hbase、Flume等大数据资料分享微信公共账号:iteblog_hadoop
  1. 文章总数:978
  2. 浏览总数:11,953,008
  3. 评论:3936
  4. 分类目录:106 个
  5. 注册用户数:6116
  6. 最后更新:2018年12月15日
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
iteblog_hadoop
大数据技术博客公众号bigdata_ai
大数据猿:
bigdata_ai

Kafka新建的分区会在哪个目录下创建

我们在《Kafka创建Topic时如何将分区放置到不同的Broker中》文章中已经学习到创建 Topic 的时候分区是如何分配到各个 Broker 中的。今天我们来介绍分区分配到 Broker 中之后,会再哪个目录下创建文件夹。

我们知道,在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。

如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。

但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。

代码实现逻辑如下;

private val logs = new Pool[TopicAndPartition, Log]()

/**
 * Create a log for the given topic and the given partition
 * If the log already exists, just return a copy of the existing log
 */
def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
  logCreationOrDeletionLock synchronized {
    var log = logs.get(topicAndPartition)
    
    // check if the log has already been created in another thread
    if(log != null)
      return log
    
    // if not, create it
    val dataDir = nextLogDir()
    val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
    dir.mkdirs()
    log = new Log(dir, 
                  config,
                  recoveryPoint = 0L,
                  scheduler,
                  time)
    logs.put(topicAndPartition, log)
    info("Created log for partition [%s,%d] in %s with properties {%s}."
         .format(topicAndPartition.topic, 
                 topicAndPartition.partition, 
                 dataDir.getAbsolutePath,
                 {import JavaConversions._; config.toProps.mkString(", ")}))
    log
  }
}

/**
 * Choose the next directory in which to create a log. Currently this is done
 * by calculating the number of partitions in each directory and then choosing the
 * data directory with the fewest partitions.
 */
private def nextLogDir(): File = {
  if(logDirs.size == 1) {
    logDirs(0)
  } else {
    // count the number of logs in each parent directory (including 0 for empty directories
    val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
    val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
    var dirCounts = (zeros ++ logCounts).toBuffer
  
    // choose the directory with the least logs in it
    val leastLoaded = dirCounts.sortBy(_._2).head
    new File(leastLoaded._1)
  }
}

从上面代码可以清楚看出,需要创建新的分区时,Kafka先从 logs 存储池中获取当前分区对应的 Log 对象。如果获取到了,说明不是新的分区,这时候直接返回 Log 实例;如果这个分区是新建的,肯定是获取不到,这时候需要调用 nextLogDir 函数获取再哪个目录上创建分区目录。其核心思想就是找到分区数最少的目录来创建新的分区。

当然,这种实现上会有几个问题:

  • 分区数最少的目录未必是数据量最少的目录,如果分区数最少的目录恰恰是数据量最多的目录这样会导致磁盘使用不均衡;
  • 这种实现也没有考虑到磁盘的读写负载。
    本博客文章除特别声明,全部都是原创!
    转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
    本文链接: 【Kafka新建的分区会在哪个目录下创建】(https://www.iteblog.com/archives/2231.html)
    喜欢 (13)
    分享 (0)
    发表我的评论
    取消评论

    表情
    本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!