之前的文章中,我们有讲解
Kafka中的数据采集和统计机制
分区副本限流机制三部曲(源码篇)
如果你都仔细研读过这两篇文章,那么会更容易理解本篇文章
想要把限流讲好, 我们分下面几个方面讲
如何配置副本限流配置
如何记录并统计Follower副本Fetch到的流量
如何判断Follower副本是否限流,并进行限流
如何记录并统计Leader副本Fetch到的流量
手动写入动态限流配置来进行限流
分区副本重分配的限流机制规则
如何配置副本限流配置
如果你对我之前写的关于 分区副本重分配源码解析 还有印象, 那么你肯定记得这么几个配置leader.replication.throttled.rate
控制leader副本端处理FETCH请求的速率follower.replication.throttled.rate
控制follower副本发送FETCH请求的速率replica.alter.log.dirs.io.max.bytes.per.second
broker内部目录之间迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上限leader.replication.throttled.replicas
: leader端的副本限流follower.replication.throttled.replicas
: follower端的副本限流
就算你不记得也没有关系,那我们今天来好好讲一讲这一块的内容
如何使某个副本需要被限流 首先我们看下判断Follower是否应该被限流的判断逻辑
1 2 3 4 5 6 7 8 private def shouldFollowerThrottle (quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition) : Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded }
被限流的条件如下
副本不在ISR列表中
该副本在限流配置列表中
超过限流阈值了
这里我们主要分析一下,如何才能在限流配置列表中
1 2 3 4 private def shouldFollowerThrottle (quota: ReplicaQuota , fetchState: PartitionFetchState , topicPartition: TopicPartition ): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded }
1 2 3 4 5 6 7 override def isThrottled (topicPartition: TopicPartition ): Boolean = { val partitions = throttledPartitions.get(topicPartition.topic) if (partitions != null ) (partitions eq AllReplicas ) || partitions.contains(topicPartition.partition) else false }
上面的ConcurrentHashMap throttledPartitions
就是所有需要被限流的副本列表,那么是在哪里被赋值的呢?
ConfigHandler#processConfigChanges
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def processConfigChanges (topic: String , topicConfig: Properties ): Unit = { val configNamesToExclude = excludedConfigs(topic, topicConfig) updateLogConfig(topic, topicConfig, configNamesToExclude) def updateThrottledList (prop: String , quotaManager: ReplicationQuotaManager ) = { if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length > 0 ) { val partitions = parseThrottledPartitions(topicConfig, kafkaConfig.brokerId, prop) println(prop+"本机:" +kafkaConfig.brokerId+" Topic" +topic+"; 需要限流的分区有:" +partitions.mkString(" " )) quotaManager.markThrottled(topic, partitions) debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for topic: $topic and partitions $partitions " ) } else { quotaManager.removeThrottle(topic) debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for topic $topic " ) } } updateThrottledList(LogConfig .LeaderReplicationThrottledReplicasProp , quotas.leader) updateThrottledList(LogConfig .FollowerReplicationThrottledReplicasProp , quotas.follower) if (Try (topicConfig.getProperty(KafkaConfig .UncleanLeaderElectionEnableProp ).toBoolean).getOrElse(false )) { kafkaController.enableTopicUncleanLeaderElection(topic) } }
这段代码是在修改了动态配置中的topic节点 的时候会被触发的,关于动态配置可以看我之前的文章 Kafka中的动态配置源码分析
这里的代码简单描述下
检查 动态配置 中是否存在配置 leader.replication.throttled.replicas
和 follower.replication.throttled.replicas
如果存在,则需要解析一下配置Value,解析得到的Value就是需要被限流的副本列表 ,将它写到内存中。
如果不存在,则需要把当前的限流副本列表 清空。
具体的解析逻辑如下
ConfigHandler#parseThrottledPartitions
1 2 3 4 5 6 7 8 9 10 11 12 13 def parseThrottledPartitions (topicConfig: Properties , brokerId: Int , prop: String ): Seq [Int ] = { val configValue = topicConfig.get(prop).toString.trim ThrottledReplicaListValidator .ensureValidString(prop, configValue) configValue match { case "" => Seq () case "*" => AllReplicas case _ => configValue.trim .split("," ) .map(_.split(":" )) .filter(_ (1 ).toInt == brokerId) .map(_ (0 ).toInt).toSeq } }
如果为空就没有需要限流的
如果是*
表示所有副本都需要限流
配置值的格式为:分区号:副本所在BrokerId,分区号:副本所在BrokerId
并过滤一下副本所在BrokerId
=自己的BrokerId
例如: 1:102,2:0,3:0
在当前 BrokerID=102 的集群上最终解析出来需要限流的副本为 1 , 只需要解析跟自己相关的副本就行了。
具体限流需要限制在多少
上面只是讲了怎么将分区副本设置为需要限流, 但是并没有设置限流多少鸭!就算上面你设置了,这里没有设置限流多少,那么默认的限流值就是 Long.MAXVALUE
约等于没有限流。
那么如何设置限流的流速呢? 请看下面代码, 这个是 在修改了Broker的动态配置之后就会调用的方法, 关于动态配置可以看我之前的文章 Kafka中的动态配置源码分析
BrokerConfigHandler#processConfigChanges
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class BrokerConfigHandler (private val brokerConfig: KafkaConfig , private val quotaManagers: QuotaManagers ) extends ConfigHandler with Logging { def processConfigChanges (brokerId: String , properties: Properties ): Unit = { def getOrDefault (prop: String ): Long = { if (properties.containsKey(prop)) properties.getProperty(prop).toLong else DefaultReplicationThrottledRate } if (brokerId == ConfigEntityName .Default ) brokerConfig.dynamicConfig.updateDefaultConfig(properties) else if (brokerConfig.brokerId == brokerId.trim.toInt) { brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties) quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp ))) quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp ))) quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp ))) } } }
判断是否对Follower进行限流
副本不在ISR中&&副本在限流副本列表中&&超出了限流阈值
1 2 3 4 5 private def shouldFollowerThrottle (quota: ReplicaQuota , fetchState: PartitionFetchState , topicPartition: TopicPartition ): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded }
判断的时机在 副本发起 Fetch请求的时候。
遍历所有分区, 构造Fetch请求。如果被限流了,则会被过滤。这一次不会请求对应分区副本的数据。
1 2 3 4 5 6 partitionMap.foreach { case (topicPartition, fetchState) => if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) { } }
判断是否对Leader进行限流
分区副本必须在ISR中&&分区副本在限流副本列表中 && 超过了限流阈值
1 2 3 4 def shouldLeaderThrottle (quota: ReplicaQuota , topicPartition: TopicPartition , replicaId: Int ): Boolean = { val isReplicaInSync = nonOfflinePartition(topicPartition).exists(_.inSyncReplicaIds.contains(replicaId)) !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded }
调用时机
在读取副本本地Log数据的时候, ReplicaManager#readFromLocalLog
, 读取完了然后判断这个分区副本是否已经被限流了,如果被限流了则返回的数据为Empty。
如何设置限流值 虽然上面设置了哪些分区需要被限流,但是没有设置具体限流多少,就算上面设置了,如果没有设置限流值那么默认的限流值就是 Long.MaxValue
, 也就是相当于不限流。
上面设置了3个跟限流相关的配置
leader.replication.throttled.rate
Leader副本的限流阈值follower.replication.throttled.rate
Follower副本的限流阈值replica.alter.log.dirs.io.max.bytes.per.second
同一个Broker多个目录直接的副本同步限流阈值
可以看到上面得到的阈值分别被保存在对象 Quota
中.Quota
: 限流对象, 他可以设置是 上限 还是 下限 , 并保存着流量阈值
特别需要注意的是: 只要跟当前这台BrokerId相关的才会被加载
1 2 3 4 5 private final boolean upper;private final double bound;
判断是否超过阈值
1 2 3 4 public boolean acceptable (double value) { return (upper && value <= bound) || (!upper && value >= bound); }
如何记录流量
想要限流,则需要计算准备的值 才能做好限流
如何计算准备的值,我在 Kafka中的数据采集和统计机制 里面已经说的非常清楚了, 所以这里我们看看 记录数据的地方在哪里
Follower记录流量
这里是所有需要被限流的副本的列表的所有流量数据, 统计在一起的。对应的配置 follower.replication.throttled.replicas
ReplicaFetcherThread#processPartitionData
具体的记录的地方是, ReplicalFetcherThread
线程在Fetch数据之后进行处理的时候,进行记录 当然这里会先判断是否需要限流, 如果不需要限流那么记录也没有意义.
Leader记录流量
这记录的也是 需要被限流的所有副本的所有流量。对应的配置是 leader.replication.throttled.replicas
KafkaApis#processResponseCallback
统计所有需要被限流的副本的总数据大小(将不需要限流的副本过滤掉了)
记录总数据大小
调用时机, 上面这个方法是一个 回调函数, 在读取本地副本数据完成的时候就会调用这个数据, 然后记录。
LogDir记录流量 ReplicaAlterLogDirsThread#processPartitionData
1 2 3 4 5 6 7 8 9 10 11 12 override def processPartitionData (topicPartition: TopicPartition , fetchOffset: Long , partitionData: PartitionData [Records ]): Option [LogAppendInfo ] = { quota.record(records.sizeInBytes) println(new SimpleDateFormat ("yyyy-MM-dd-HH:mm:ss" ).format(new Date ())+"LogDir 记录Follower Fetch 流量 size: " + records.sizeInBytes()) logAppendInfo }
限流测试用例 Leader 限流 Follower不限流 Topic1 单分区 2 副本 1 2 3 { "version" : 2 , "partitions" : { "0" : [ 0 , 1 ] } , "adding_replicas" : { } , "removing_replicas" : { } }
把Follower副本先停机 , Follower副本在 Broker-1 上, 我们停机避免一会他会同步数据
给Topic1 发送100M的数据
1 2 3 sh bin/kafka-producer-perf-test.sh --topic Topic1 --num-records 1024 --throughput 100000 --producer-props bootstrap.servers=xxxx:9090 --record-size 102400
设置Leader端的限流值为1M/s ; 0号分区 Leader 在Broker-0 上,则最终的配置应该是leader.replication.throttled.replicas:
0:0 leader.replication.throttled.rate:
524288 (512kb)
1 2 3 4 5 6 7 sh bin/kafka-configs.sh --bootstrap-server xxxx:9092 --alter --entity-type topics --entity-name Topic1 --add-config leader.replication.throttled.replicas=0:0 sh bin/kafka-configs.sh --bootstrap-server xxxx:9092 --alter --entity-type brokers --entity-name 2 --add-config leader.replication.throttled.rate=524288
启动Broker-1 , 让它自动拉取Leader的数据, 看看多久同步完成, 预计应该是200s左右
测试结果:
先说是否符合预期: 符合
我们先看看整个同步过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 本次最大Fetch数据大小:1048576 20211119162219 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false 本次最大Fetch数据大小:1048576 20211119162219 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true 20211119162219 这次Fetch获取的需要限流的数据大小为:18 本次最大Fetch数据大小:1048576 20211119162220 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true 20211119162220 这次Fetch获取的需要限流的数据大小为:18 本次最大Fetch数据大小:1048576 20211119162220 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true 20211119162220 这次Fetch获取的需要限流的数据大小为:18 本次最大Fetch数据大小:1048576 20211119162221 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false 20211119162221 这次Fetch获取的需要限流的数据大小为:1048648 ...... 省略..... 完成的时候 20211119162528 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false 20211119162528 这次Fetch获取的需要限流的数据大小为:409960 本次最大Fetch数据大小:1048576 20211119162528 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = true [2021-11-19 16:25:28,220][INFO][kafka-dp-request-handler-5]: [Partition Topic1-0 broker=0] Expanding ISR from 0 to 0,1 [2021-11-19 16:25:28,264][INFO][kafka-dp-request-handler-5]: [Partition Topic1-0 broker=0] ISR updated to [0,1] and zkVersion updated to [6] 本次最大Fetch数据大小:1048576 20211119162528 TP=Topic1-0 isReplicaInSync=true isThrottled=true isQuotaExceeded = true 20211119162528 这次Fetch获取的需要限流的数据大小为:18
上面是同步过程我加的一些日志, 我们来分析一下
启动Broker-1的时候开始第一次同步, 这个时候 Leader去读取本地Log日志文件,第一次读取肯定不会被限流,但是一次读取多少数据是根据 Fetch请求还有一个fetch.max.bytes
配置来确定,比如我们这里一次最大Fetch的数据是1048576
b (1M), 我们限流是阈值是 512kb
, 完事之后就会被采集到限流的SampleStat
中
第二次Fetch请求过来的时候,去获取数据才发现超出限流阈值了, 那么这个时候就不读取数据了,但是可以看到还是有18b
的数据,平时没有数据同步的时候这个18b
都是固定的。是一些元数据的大小。可忽略
经过时间的流逝,流量计算出来就变低了(可以看我之前写的数据采集和统计 ),那么又可以开始处理下一个Fetch请求了, 当然还是尽量一次读到最多的数据返回。
等到最后同步成功,赶上了ISR了, 就不会再限流了。当然你也可以看到最后一次获取的数据大小1048576
,之所以不是最大的1048648
, 因为Log已经没有那么多数据了。1 2 3 本次最大Fetch数据大小:1048576 20211119162219 TP=Topic1-0 isReplicaInSync=false isThrottled=true isQuotaExceeded = false 20211119162219 这次Fetch获取的需要限流的数据大小为:1048648
从开始到结束 :2021-11-19-16:25:28 - 2021-11-19-16:22:19 = 3分钟左右 约等于 200s
Topic1 单分区 3 副本
为了测试 多个副本同时同步会不会造成更久的同步时间, 我们在上面的基础上,再加一个副本, 也就是同时有两个Follower副本 去跟Leader副本同步数据 那么 是同步完成时间跟上面差不多呢? 还是会用double的时间呢?
按照上面的测试方法同步再测试一次发现 最终的结果是
2021-11-19-17:23:09 - 2021-11-19-17:16:46 = 6分钟左右 基本上就是 上面的一倍左右。
结论: 如果有多个副本进行同步的话,会同步占用Leader的限流阈值。
Leader 限流 Follower不限流 结论
Leader端的限流只会计算需要被限流的分区流量值。
如果多个副本向Leader端Fetch数据,那么都会被算进限流阈值, 基本上多一个副本就多一倍的时间。
如下图所示
如果有多个Leader分区都限流呢?
按照最终有多少个副本在Fetch数据.
经过理论上的值和 测试结果 2021-11-19-19:01:49 - 2021-11-19 18: 55: 27 = 6分钟左右 相符
Leader不限流 Follower限流
对应的配置有 follower.replication.throttled.replicas
:Follower分区副本的限流配置follower.replication.throttled.rate
Follower分区副本限流阈值 b/s
Topic1 单分区 2副本
测试 一个Follower Fetch Leader的数据的限流情况 限流阈值设置为 524288(512kb/s)
当前Topic1 的分配情况如下
1 { "version" : 2 , "partitions" : { "0" : [ 1 , 0 ] } , "adding_replicas" : { } , "removing_replicas" : { } }
设置Follower 限流的话,因为 Follower 在Broker-0 上。则配置如下,
1 2 3 4 5 6 sh bin/kafka-configs.sh --bootstrap-server xxx:9090 --alter --entity-type topics --entity-name Topic1 --add-config follower.replication.throttled.replicas=0:0 sh bin/kafka-configs.sh --bootstrap-server xxxx:9090 --alter --entity-type brokers --entity-name 0 --add-config follower.replication.throttled.rate=524288
当然测试方法还是跟之前一样, 先把Follower副本下线, 再给Topic1生产100M的数据, 代码里面加点日志观察Fetch情况 启动Follower副本开始进行同步, 部分Log日志如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 [2021-11-22 12:25:59,246][INFO][kafka-dp-request-handler-7]: [ReplicaFetcherManager on broker 0] Added fetcher to broker 1 for partitions Map(Topic1-0 -> (offset=5120, leaderEpoch=9)) 2021-11-22-12:25:59 TP:Topic1-0 是否可Fetch=false isReadyForFetch=false Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false [2021-11-22 12:26:00,274][INFO][ReplicaFetcherThread-0-1]: [Log partition=Topic1-0, dir=/Users/shirenchuang/work/IdeaPj/didi_source/kafka/kafka-logs-0] Truncating to 5120 has no effect as the largest offset in the log is 5119 2021-11-22-12:26:00 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false 2021-11-22-12:26:00记录Follower Fetch 流量 size: 1048576 2021-11-22-12:26:00 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:26:00 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:26:01 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false 2021-11-22-12:26:01记录Follower Fetch 流量 size: 1048576 2021-11-22-12:26:01 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true ---- 省略部分------- 2021-11-22-12:26:51 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false 2021-11-22-12:26:51记录Follower Fetch 流量 size: 1048576 2021-11-22-12:26:51 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:26:51 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:26:52 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:26:53 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false 2021-11-22-12:26:53记录Follower Fetch 流量 size: 1048576 2021-11-22-12:29:13 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:29:13 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:29:14 TP:Topic1-0 是否可Fetch=false isReadyForFetch=true Follower是否限流=true !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=true --- 最后一次Fetch就 同步完成了 2021-11-22-12:29:15 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=true quota.isThrottled=true quota.isQuotaExceeded=false 2021-11-22-12:29:15记录Follower Fetch 流量 size: 409888 2021-11-22-12:29:15 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=false quota.isThrottled=true quota.isQuotaExceeded=true 2021-11-22-12:29:16 TP:Topic1-0 是否可Fetch=true isReadyForFetch=true Follower是否限流=false !isReplicaInSync=false quota.isThrottled=true quota.isQuotaExceeded=true
可以看到日志, 开始同步之后,第一次Fetch同步了1048576(1MB)
这个数值是可以配置的,单次fetch的最大值。 Fetch之后超过了 限流阈值,则后面就开始限流了, 等过了几秒,低于阈值了又开始Fetch。最后一次Fetch成功后, Follower就跟上了ISR了,就不会再进行限流了。
最终耗时 :2021-11-22-12:29:15 - 2021-11-22 12:25:59 = 196 S 约等于我们的预期 200S (100MB/0.5M/s = 200s)。
Topic1 2分区 2 副本
2个分区 对应的副本都在同一个Broker上 进行测试。
最终结果 是在一台Broker上, 有多少个Follower的副本被限流, 那么这些副本所Fetch的数据流量都会被遗弃算入到限流中。
Topic1 多分区 多 副本
多个分区 多个副本 在不同的Broker上, 不同的Broker的流量只会算在当台Broker。
上图中的2个Leader 都是100M。
分区副本重分配的限流策略 如果你之前看过我写的 分区副本重分配源码原理分析(附配套教学视频) 的,你肯定有印象, 在执行副本重分配的时候我们会设置一个限流值 --throttle xxxx
你是否会疑问
这个限流,到底设置的是什么限流呢?
应该怎么合理的设置限流呢?
想了解更详细的内容可以 看看我那篇文章 , 这里我写2个例子来说明一下:
无副本新增 例如我有个 Topic2 单分区 3副本 分配情况如下
1 2 3 { "version" : 2 , "partitions" : { "0" : [ 0 , 1 , 2 ] } , "adding_replicas" : { } , "removing_replicas" : { } }
我对其做一个分区副本重分配的操作
最终想让它的分配方式为如下
1 2 3 { "version" : 2 , "partitions" : { "0" : [ 2 , 0 , 1 ] } , "adding_replicas" : { } , "removing_replicas" : { } }
迁移过程让它的限流值为 1 b/s
我们看下限流配置写入了什么
三个Broker配置 都新增了
1 2 3 4 5 6 7 8 { "version" : 1 , "config" : { "leader.replication.throttled.rate" : "1" , "follower.replication.throttled.rate" : "1" } }
还有Topic配置 如下
1 2 3 4 5 6 7 8 { "version" : 1 , "config" : { "leader.replication.throttled.replicas" : "" , "follower.replication.throttled.replicas" : "" } }
可以看到 这个Topic配置好像并没有设置要给哪个分区进行限流。 那是因为这个迁移 并没有新增
PS: 假如原本就有这个配置, 那么经过这样一次操作之后会被重写为空
有副本新增 Topic3 单分区 单副本 扩副本
1 { "version" : 1 , "partitions" : [ { "topic" : "Topic3" , "partition" : 0 , "replicas" : [ 0 ] } ] }
重分配至 (新增了2个副本)
1 { "version" : 1 , "partitions" : [ { "topic" : "Topic3" , "partition" : 0 , "replicas" : [ 0 , 1 , 2 ] } ] }
Broker [0,1,2] 都新增配置
1 2 3 4 5 6 7 { "version" : 1 , "config" : { "leader.replication.throttled.rate" : "1" , "follower.replication.throttled.rate" : "1" } }
Topic3配置 新增
1 2 3 4 5 6 7 { "version" : 1 , "config" : { "leader.replication.throttled.replicas" : "0:0" , "follower.replication.throttled.replicas" : "0:1,0:2" } }
可以看到 Topic配置, Leader 和 Follower 都有需要限流的
解释一下配置含义:
"leader.replication.throttled.replicas" : "0:0"
: 该Topic下的 0号分区 Leader副本 在Broker-0 上需要 进行Leader副本限流
"follower.replication.throttled.replicas" : "0:1,0:2"
该Topic下的0号分区Follower副本在 Broker-0、Broker-1 上需要进行Follower副本限流。
限流值都是 1b/s (上面设置的)
扩副本之前就有多个副本
1 { "version" : 1 , "partitions" : [ { "topic" : "Topic4" , "partition" : 0 , "replicas" : [ 2 , 0 ] } ] }
重分配成如下
1 { "version" : 1 , "partitions" : [ { "topic" : "Topic4" , "partition" : 0 , "replicas" : [ 0 , 1 , 2 ] } ] }
最终可以看到的Topic配置如下:
1 2 3 4 5 6 7 8 9 { "version" : 1 , "config" : { "leader.replication.throttled.replicas" : "0:2,0:0" , "follower.replication.throttled.replicas" : "0:1" } }
这个配置的计算方式如下:
Topic 重分配前的所有 副本 均设置成Leader副本限流 (之所以这样是因为原先的Leader有可能会切换到分配前的其他副本,为了避免这种情况,所以需要 都设置一下。)
一句话:重分配后的新增的副本 均设置成 Follower副本限流, 重分配前的所有副本 均设置成Leader限流
那么也就是说 我们用命令--throttle
限流的值
最终决定完成重分配任务的关键点是什么?
那就是 Leader端的限流 和 Follower端限流 谁先达到阈值
Leader端先达到阈值
Follower 先达到阈值
总结
1 . 重分配后的新增的副本 均设置成 Follower副本限流, 重分配前的所有副本 均设置成Leader限流
2 . 分区副本重分配 --throttle 命令会重新刷新 限流相关的配置。假如重分配没有新增的副本, 那么执行之后相关配置会变成空字符串。
如何设置限流的阈值?
请看下一篇文章 [Kafka分区副本同步限流机制三部曲( 实战篇 1 )]