mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-18 06:24:26 +08:00
150 lines
7.1 KiB
Markdown
150 lines
7.1 KiB
Markdown
|
|
我们有分析过[TopicCommand之创建Topic源码解析]();
|
|
因为篇幅太长所以 关于分区分配的问题单独开一篇文章写;
|
|
|
|
|
|
## 源码分析
|
|
**创建Topic的源码入口 `AdminManager.createTopics()`**
|
|
|
|
以下只列出了分区分配相关代码其他省略
|
|
```java
|
|
|
|
def createTopics(timeout: Int,
|
|
validateOnly: Boolean,
|
|
toCreate: Map[String, CreatableTopic],
|
|
includeConfigsAndMetatadata: Map[String, CreatableTopicResult],
|
|
responseCallback: Map[String, ApiError] => Unit): Unit = {
|
|
|
|
// 1. map over topics creating assignment and calling zookeeper
|
|
val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
|
|
|
|
val metadata = toCreate.values.map(topic =>
|
|
try {
|
|
val assignments = if (topic.assignments().isEmpty) {
|
|
AdminUtils.assignReplicasToBrokers(
|
|
brokers, resolvedNumPartitions, resolvedReplicationFactor)
|
|
} else {
|
|
val assignments = new mutable.HashMap[Int, Seq[Int]]
|
|
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
|
|
// this follows the existing logic in TopicCommand
|
|
topic.assignments.asScala.foreach {
|
|
case assignment => assignments(assignment.partitionIndex()) =
|
|
assignment.brokerIds().asScala.map(a => a: Int)
|
|
}
|
|
assignments
|
|
}
|
|
trace(s"Assignments for topic $topic are $assignments ")
|
|
|
|
}
|
|
|
|
```
|
|
1. 以上有两种方式,一种是我们没有指定分区分配的情况也就是没有使用参数`--replica-assignment`;一种是自己指定了分区分配
|
|
|
|
### 1. 自己指定了分区分配规则
|
|
从源码中得知, 会把我们指定的规则进行了包装,**注意它并没有去检查你指定的Broker是否存在;**
|
|
|
|
### 2. 自动分配 AdminUtils.assignReplicasToBrokers
|
|

|
|
1. 参数检查: 分区数>0; 副本数>0; 副本数<=Broker数 (如果自己未定义会直接使用Broker中个配置)
|
|
2. 根据是否有 机架信息来进行不同方式的分配;
|
|
3. 要么整个集群都有机架信息,要么整个集群都没有机架信息; 否则抛出异常
|
|
|
|
|
|
#### 无机架方式分配
|
|
`AdminUtils.assignReplicasToBrokersRackUnaware`
|
|
```scala
|
|
/**
|
|
* 副本分配时,有三个原则:
|
|
* 1. 将副本平均分布在所有的 Broker 上;
|
|
* 2. partition 的多个副本应该分配在不同的 Broker 上;
|
|
* 3. 如果所有的 Broker 有机架信息的话, partition 的副本应该分配到不同的机架上。
|
|
*
|
|
* 为实现上面的目标,在没有机架感知的情况下,应该按照下面两个原则分配 replica:
|
|
* 1. 从 broker.list 随机选择一个 Broker,使用 round-robin 算法分配每个 partition 的第一个副本;
|
|
* 2. 对于这个 partition 的其他副本,逐渐增加 Broker.id 来选择 replica 的分配。
|
|
*/
|
|
|
|
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
|
|
replicationFactor: Int,
|
|
brokerList: Seq[Int],
|
|
fixedStartIndex: Int,
|
|
startPartitionId: Int): Map[Int, Seq[Int]] = {
|
|
val ret = mutable.Map[Int, Seq[Int]]()
|
|
// 这里是上一层传递过了的所有 存活的Broker列表的ID
|
|
val brokerArray = brokerList.toArray
|
|
//默认随机选一个index开始
|
|
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
|
|
//默认从0这个分区号开始
|
|
var currentPartitionId = math.max(0, startPartitionId)
|
|
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
|
|
for (_ <- 0 until nPartitions) {
|
|
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
|
|
nextReplicaShift += 1
|
|
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
|
|
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
|
|
for (j <- 0 until replicationFactor - 1)
|
|
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
|
|
ret.put(currentPartitionId, replicaBuffer)
|
|
currentPartitionId += 1
|
|
}
|
|
ret
|
|
}
|
|
```
|
|
|
|
|
|
#### 有机架方式分配
|
|
|
|
```java
|
|
private def assignReplicasToBrokersRackAware(nPartitions: Int,
|
|
replicationFactor: Int,
|
|
brokerMetadatas: Seq[BrokerMetadata],
|
|
fixedStartIndex: Int,
|
|
startPartitionId: Int): Map[Int, Seq[Int]] = {
|
|
val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
|
|
id -> rack
|
|
}.toMap
|
|
val numRacks = brokerRackMap.values.toSet.size
|
|
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
|
|
val numBrokers = arrangedBrokerList.size
|
|
val ret = mutable.Map[Int, Seq[Int]]()
|
|
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
|
|
var currentPartitionId = math.max(0, startPartitionId)
|
|
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
|
|
for (_ <- 0 until nPartitions) {
|
|
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
|
|
nextReplicaShift += 1
|
|
val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
|
|
val leader = arrangedBrokerList(firstReplicaIndex)
|
|
val replicaBuffer = mutable.ArrayBuffer(leader)
|
|
val racksWithReplicas = mutable.Set(brokerRackMap(leader))
|
|
val brokersWithReplicas = mutable.Set(leader)
|
|
var k = 0
|
|
for (_ <- 0 until replicationFactor - 1) {
|
|
var done = false
|
|
while (!done) {
|
|
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
|
|
val rack = brokerRackMap(broker)
|
|
// Skip this broker if
|
|
// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
|
|
// that do not have any replica, or
|
|
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
|
|
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
|
|
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
|
|
replicaBuffer += broker
|
|
racksWithReplicas += rack
|
|
brokersWithReplicas += broker
|
|
done = true
|
|
}
|
|
k += 1
|
|
}
|
|
}
|
|
ret.put(currentPartitionId, replicaBuffer)
|
|
currentPartitionId += 1
|
|
}
|
|
ret
|
|
}
|
|
```
|
|
|
|
## 源码总结
|
|
|