kafka管控推荐使用 滴滴开源 Kafka运维管控平台 更符合国人的操作习惯 ,

更强大的管控能力 ,更高效的问题定位能力更便捷的集群运维能力更专业的资源治理更友好的运维生态

以下配置基于 kafka2.5.X 编辑

以下配置部分是直接翻译,随着我的kafka持续连载更新,对应的配置会给出更详细的解析和对应配置生效的源码文章。

可以加 szzdzhp001 添加石臻臻为好友,进【高质量滴滴技术交流群】和最新的kafka运维大全。

Broker Configs

参数名称 默认值 重要程度 参数释义 相关链接 示例
broker.id -1 broker server的唯一标识,必须唯一, 如果不设置,或者broker.id<0 则会自动计算broker.id; 相关可以看broker.id.generation.enablereserved.broker.max.id 的配置。 0
reserved.broker.max.id 1000 中等 broker.id能够配置的最大值,同时 reserved.broker.max.id+1也是自动创建broker.id的最小值。 一图全解kafka在zookeeper中的数据结构
broker.id.generation.enable true 允许server自动创建broker.id。当broker.id< 0并且当前配置是true,则自动计算broker.id,计算逻辑是 {reserved.broker.max.id} +/brokers/seqid.dataVersion , 其中的/brokers/seqid.dataVersion保证了全局自增
log.dir /tmp/kafka-logs log数据存放的目录
log.dirs null Log数据存放的目录,比如/tmp/kafka-logs. 这个可以用逗号隔开设置多个目录,主要是用来挂载到多个磁盘上的
例如:/tmp/kafka0,/tmp/kafka1 。如果没有设置,则使用 log.dir的配置,
zookeeper.connect 必填 hostname:port形式指定 ZooKeeper 连接字符串
也可以用逗号形式指定多个Zookeeper; 例如:hostname1:port1,hostname2:port2,hostname3:port3
。如果你的kafka数据放在Zookeeper的某个路径下,也可以指定对应的路径,例如指定kafka数据存放到Zookeeper全局命名空间的 /szzkafka 下。那么你可以指定 hostname:port/szzkafka
advertised.listeners
listeners null
auto.create.topics.enable true 是否能够自动创建Topic。当生产者向一个未知Topic发送消息,或者消费者从未知Topic的时候会自动创建Topic。创建的时候是按照num.partitionsdefault.replication.factor 的配置进行。
auto.leader.rebalance.enable true 是否开始自动Leader再均衡的功能,具体看扩展讲解的Leader 均衡机制
leader.imbalance.check.interval.seconds 300 Controller触发Leader再均衡的频率,默认300S一次。具体看扩展讲解的Leader 均衡机制
leader.imbalance.per.broker.percentage 10 标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader;默认比例是10%; 具体看扩展讲解的Leader 均衡机制
background.threads 10 用于各种后台处理任务的线程数,例如过期消息文件的删除等
compression.type producer 消息的压缩类型, Kafka支持的类型有(’gzip’、’snappy’、’lz4’、’zstd’), 当然还可以设置uncompressed 表示不启用压缩, 这里 默认是 producer 表示继承使用 producer端的压缩类型。具体请看 压缩类型讲解
control.plane.listener.name
delete.topic.enable true 是否允许删除Topic。 Topic删除源码解析
log.flush.interval.messages Long.MaxValue 在消息刷新到磁盘之前在日志分区上累积的消息数
log.flush.scheduler.interval.ms Long.MaxValue 定时检查是否有任何日志需要刷新到磁盘的频率(毫秒单位),该值通常搭配log.flush.interval.ms
log.flush.interval.ms null 任何Topic中的消息在刷新到磁盘之前保留在内存中的最长时间(以毫秒为单位)。如果未设置,则使用 log.flush.scheduler.interval.ms 中的值, 但是需要注意的是,如果没有设置log.flush.scheduler.interval.ms 那么这个值也是不会生效的。因为都没有去定时检查。
log.flush.start.offset.checkpoint.interval.ms 60000 有个名称为”kafka-log-start-offset-checkpoint”的定时任务去更新日志起始偏移量日志文件,这个配置就是更新的频率,被更新的文件是每个log.dirs中目录下的log-start-offset-checkpoint
log.retention.bytes -1 日志文件的最大保留大小。分区级别的
log.retention.hours 168 日志保存的最大时间(单位小时),默认7天。 log.retention.ms > log.retention.minutes>log.retention.hours
log.retention.minutes null 日志保存的最大时间(单位分钟), log.retention.ms > log.retention.minutes>log.retention.hours
log.retention.ms null 日志保存的最大时间(单位毫秒),优先级 log.retention.ms > log.retention.minutes>log.retention.hours.
log.roll.hours 168 创建新日志段之前的最长时间(以小时为单位),默认7天, 意思是如果7天没有创建新的Segment,则强制创建新的Segment用于存储Log数据,优先级 次于 log.roll.ms 属性; 日志段的切分请看 日志段切分条件详解
log.roll.ms null log.roll.hours,只是单位是毫秒,优先级大于log.roll.hours ,详情请看下面日志段切分条件详解
log.segment.bytes 1073741824(1G) 单个日志段最大值,超过这个值就创建新的日志段,详情请看下面 日志段切分条件详解
log.segment.delete.delay.ms 60000 执行删除操作,Log文件夹会先被标记为-delete后缀,这个配置就是设置被标记为-delete之后延迟多长时间去真正的从磁盘中删除该文件,
message.max.bytes 1048588(约1M) Kafka 允许的最大记录批量大小(如果启用压缩,则在压缩之后),如果增加此值并且存在早于 0.10.2 的消费者,则消费者的获取大小也必须增加,以便他们可以获取如此大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是被分组。在以前的消息格式版本中,未压缩的记录不会分组到批处理中,并且此限制仅适用于这种情况下的单个记录。这可以使用主题级别max.message.bytes配置为每个主题设置。
min.insync.replicas 1 和生产者的配置acks=all配套使用, 表示最小有多少个副本数据同步成功,它的判断条件是 min.insync.replicas<= 当前副本ISR的数量 才可以,否则会抛出异常,更详细的请看下面 min.insync.replicas 详解
num.io.threads 8 服务器用于处理请求的线程数,可能包括磁盘 I/O
num.network.threads 3 服务器用于从网络接收请求并向网络发送响应的线程数
num.recovery.threads.per.data.dir 1 每个数据目录(数据目录即指的是上述log.dirs配置的目录路径)用于日志恢复启动和关闭时的线程数量,注意,这个参数指的是每个日志目录的线程数,比如如果该参数设置为8,而log.dirs设置为了三个路径(多个路径由”,”分隔),则总共会启动24个线程来处理
num.replica.alter.log.dirs.threads null 可以在日志目录之间移动(跨目录迁移)副本的线程数,其中可能包括磁盘 I/O
num.replica.fetchers 1 每个follower从leader拉取消息进行同步数据时候的拉取线程数,数字大,可以提供follower的I/O并发度,单位时间内leader持有更多的请求,同时leader的负载也会增大。
offset.metadata.max.bytes 4096(4kb) 与偏移提交相关联的元数据的最大数据量
offsets.commit.required.acks -1 必须>=-1 并且要 <=offsets.topic.replication.factor的配置
offsets.commit.timeout.ms 5000 偏移量提交将被延迟,直到偏移量主题的所有副本都收到提交或达到此超时。这类似于生产者请求超时。
offsets.load.buffer.size 5242880 将偏移量加载到缓存中时从偏移量段读取的批量大小(软限制,如果记录太大则覆盖)
offsets.retention.check.interval.ms 600000(10分钟) 检查陈旧偏移的频率
offsets.retention.minutes 10080(7天) 在消费者组失去其所有消费者(即变空)后,其偏移量将在此保留期内保留,然后再被丢弃。对于独立消费者(使用手动分配),偏移量将在上次提交时间加上此保留期后过期
offsets.topic.compression.codec 0 偏移Topic __consumer_offsets 的压缩编解码器 - 压缩可用于实现“原子”提交
offsets.topic.num.partitions 50 偏移Topic __consumer_offsets 的分区数(部署后不应更改)
offsets.topic.replication.factor 3 偏移Topic __consumer_offsets 的副本数量(设置得更高以确保可用性)。在集群大小满足此复制因子要求之前,内部主题创建将失败。详细请看下面 __consumer_offsets 详解
offsets.topic.segment.bytes 104857600(100M) __consumer_offsets主题每个Segment 的字节应该保持相对较小,以促进更快的日志压缩和缓存加载
queued.max.requests 500 在阻塞网络线程之前允许数据平面的排队请求数
replica.fetch.min.bytes 1 每个fetch请求响应所需的最小字节数。如果没有足够的字节,则等待replicaMaxWaitTimeMs
replica.fetch.wait.max.ms 500 Follower副本发出的每个 fetcher 请求的最大等待时间。该值应始终小于replica.lag.time.max.ms 以防止频繁收缩低吞吐量主题的ISR
replica.high.watermark.checkpoint.interval.ms 5000 每个replica将最高水位进行flush的时间间隔
replica.lag.time.max.ms 30000 如果一个follower在这个时间内没有发送fetch请求,leader将从ISR重移除这个followe
replica.socket.receive.buffer.bytes 65536 同步时向leader发送网络请求时的socket receive buffer
replica.socket.timeout.ms 30000 副本同步时候网络请求的超时时间,这个值要大于replica.fetch.wait.max.ms
request.timeout.ms 30000 这个配置控制着客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽,则请求失败
socket.receive.buffer.bytes 102400(100kb) 套接字服务器套接字的 SO_RCVBUF 缓冲区。如果值为 -1,则将使用操作系统默认值。
socket.request.max.bytes 104857600(100M) 套接字请求中的最大字节数
socket.send.buffer.bytes 102400 套接字服务器套接字的 SO_SNDBUF 缓冲区。如果值为 -1,则将使用操作系统默认值。
transaction.max.timeout.ms 900000 事务的最大允许超时时间。如果客户端请求的事务时间超过此时间,则代理将在 InitProducerIdRequest 中返回错误。这可以防止客户端超时过大,这可能会阻止消费者从事务中包含的主题中读取。
transaction.state.log.load.buffer.size 5242880 将生产者 ID 和事务加载到缓存中时从事务日志段读取的批量大小(软限制,如果记录太大则覆盖)。
transaction.state.log.min.isr 2 min.insync.replicas一样,但是事务主题 __transaction_state有自己的配置,他会覆盖min.insync.replicas的配置
transaction.state.log.num.partitions 50 事务主题 __transaction_state的分区数量配置(部署后不应更改)
transaction.state.log.replication.factor 3 事务主题的副本数量(设置得更高以确保可用性)。在集群大小满足此复制因子要求之前,内部主题创建将失败。
transaction.state.log.segment.bytes 104857600(100M) 事务主题 __transaction_state 的Segment段字节应该保持相对较小,以促进更快的日志压缩和缓存加载
transactional.id.expiration.ms 604800000(7天) 事务协调器在其事务 id 到期之前等待而不接收当前事务的任何事务状态更新的时间(以毫秒为单位)。此设置还会影响生产者 ID 到期 - 在使用给定生产者 ID 的最后一次写入之后,一旦过了此时间,生产者 ID 就会过期。请注意,如果由于主题的保留设置而删除了生产者 ID 的最后一次写入,生产者 ID 可能会更快过期。
unclean.leader.election.enable false 如果开启了这个,当ISR列表中没有数据的时候,那么将会从不在ISR的副本中选择为Leader,当然这样子可能会导致数据丢失。
zookeeper.connection.timeout.ms null 客户端等待与zookeeper建立连接的最长时间。如果未设置,则使用 zookeeper.session.timeout.ms 中的值
zookeeper.max.in.flight.requests 10 客户端在阻塞之前将发送给 Zookeeper 的最大未确认请求数。
zookeeper.session.timeout.ms 18000 Zookeeper 会话超时
zookeeper.set.acl false 将客户端设置为使用安全 ACL
broker.rack null broker的机架配置信息,在进行副本分配的时候会判断是否有机架信息,选择不同的分配策略,以实现容错。
connection.max.idle.ms 600000 空闲连接超时:服务器套接字处理器线程关闭空闲超过此时间的连接
connection.max.reauth.ms 0 当明确设置为正数(默认为 0,不是正数)时,不会超过配置值的会话生存期将在 v2.2.0 或更高版本的客户端进行身份验证时进行通信。代理将断开在会话生命周期内未重新验证的任何此类连接,然后将其用于除重新验证之外的任何目的。配置名称可以选择以小写的侦听器前缀和 SASL 机制名称作为前缀。例如,listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000
controlled.shutdown.enable true
controlled.shutdown.max.retries 3 Broker关闭的时候会通知Controller执行一个受控关闭的操作,这个操作可能由于多种原因而失败。这决定了发生此类故障时的重试次数
controlled.shutdown.retry.backoff.ms 5000 在每次重试之前,系统需要时间从导致上一次故障(控制器故障转移、副本滞后等)的状态中恢复。此配置确定重试前等待的时间。
controller.socket.timeout.ms 30000 控制器Controller到broker通道的套接字超时时间
default.replication.factor 1 创建主题的默认副本因子
delegation.token.expiry.time.ms 86400000 令牌需要更新之前的令牌有效期(以毫秒为单位)。默认值 1 天。
delegation.token.master.key null 生成和验证委托令牌的主/秘密密钥。必须在所有代理上配置相同的密钥。如果密钥未设置或设置为空字符串,broker将禁用委托令牌支持。
delegation.token.max.lifetime.ms 604800000 令牌有一个最长生命周期,超过这个生命周期就不能再更新了。默认值 7 天。
delete.records.purgatory.purge.interval.requests 1 删除记录请求炼狱的清除间隔(以请求数计)
fetch.max.bytes 57671680(55M) Fetch请求返回的最大字节数。必须至少为 1024。
fetch.purgatory.purge.interval.requests 1000 提取请求炼狱的清除间隔(以请求数计)
group.initial.rebalance.delay.ms 3000 在执行第一次重新平衡之前,组协调器将等待更多消费者加入新组的时间。更长的延迟意味着可能更少的重新平衡,但会增加处理开始之前的时间。
group.max.session.timeout.ms 1800000 注册消费者的最大允许会话超时。更长的超时时间让消费者有更多的时间在心跳之间处理消息,但代价是检测故障的时间更长。
group.max.size 2147483647 单个消费组可以容纳的最大消费者数。
group.min.session.timeout.ms 6000 注册上的消费者的最小允许会话超时。更短的超时导致更快的故障检测,代价是更频繁的消费者心跳,这可能会淹没代理资源。
inter.broker.listener.name null 用于代理之间通信的侦听器名称。如果未设置,则侦听器名称由security.inter.broker.protocol定义。同时设置 这个 和security.inter.broker.protocol属性是错误的。
inter.broker.protocol.version 2.5-IV0 指定将使用哪个版本的代理间协议。在所有代理升级到新版本后,这通常会受到影响。可选的所有版本: [0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2 -IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2 -IV0、2.2-IV1、2.3-IV0、2.3-IV1、2.4-IV0、2.4-IV1、2.5-IV0]
log.cleaner.backoff.ms 15000 没有要清理的日志时的睡眠时间
log.cleaner.dedupe.buffer.size 134217728(128M)
log.cleaner.delete.retention.ms 86400000(24小时)
log.cleaner.enable true 是否开启日志情侣进程, 特殊是如果使用带有 cleanup.policy=compact 的任何主题,包括内部偏移主题,则应启用。如果禁用,这些主题将不会被压缩并不断增长
log.cleaner.io.buffer.load.factor 0.9
log.cleaner.io.buffer.size 524288 所有清理线程中用于日志清理 I/O 缓冲区的总内存
log.cleaner.io.max.bytes.per.second 1.7976931348623157E308 日志清理器将受到限制,以便其读写 I/O 的总和平均小于此值
log.cleaner.max.compaction.lag.ms 9223372036854775807(Long.MaxValue) 消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。
log.cleaner.min.cleanable.ratio 0.5 对于符合清理条件的日志,脏日志与总日志的最小比率。如果还指定了 log.cleaner.max.compaction.lag.mslog.cleaner.min.compaction.lag.ms 配置,则日志压缩器会在以下任一情况下认为该日志符合压缩条件:(i)已达到脏比率阈值并且日志至少在 log.cleaner.min.compaction.lag.ms 持续时间内有脏(未压缩)记录,或者(ii)如果日志最多有脏(未压缩)记录log.cleaner.max.compaction.lag.ms 周期。
log.cleaner.min.compaction.lag.ms 0 消息在日志中保持未压缩的最短时间。仅适用于正在压缩的日志。
log.cleaner.threads 1 用于日志清理的后台线程数
log.cleanup.policy delete 日志清理策略, delete表示删除过期日志,还有一个compact选项表示压缩日志。
log.index.interval.bytes 4096 每隔多少个字节的消息量写入就条件一条索引。
log.index.size.max.bytes 10485760(10M) 索引文件最大值
log.message.format.version 2.5-IV0 消息格式的版本; [0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2 -IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2 -IV0、2.2-IV1、2.3-IV0、2.3-IV1、2.4-IV0、2.4-IV1、2.5-IV0]
log.message.timestamp.difference.max.ms 9223372036854775807(Logn.MaxValue) 代理接收消息时的时间戳与消息中指定的时间戳之间允许的最大差异。如果 log.message.timestamp.type=CreateTime,则如果时间戳差异超过此阈值,则消息将被拒绝。如果 log.message.timestamp.type=LogAppendTime 则忽略此配置。允许的最大时间戳差异不应大于log.retention.ms以避免不必要的频繁日志滚动。
log.message.timestamp.type CreateTime 可定义消息中的时间戳是消息创建时间还是日志附加时间。该值应该是 CreateTimeLogAppendTime
log.preallocate false 创建新段时应该预先分配文件吗?如果您在 Windows 上使用 Kafka,则可能需要将其设置为 true。
log.retention.check.interval.ms 300000(5分钟) 日志清理器检查任何日志是否符合删除条件的频率(以毫秒为单位)
max.connections 2147483647(Integer.MaxValue) 我们在任何时候允许的最大连接数。除了使用 max.connections.per.ip 配置的任何 per-ip 限制之外,还应用此限制。侦听器级别的限制也可以通过在配置名称前加上侦听器前缀来配置,例如,listener.name.internal.max.connections。应根据代理容量配置代理范围限制,而应根据应用程序要求配置侦听器限制。如果达到侦听器或代理限制,则新连接将被阻止。即使达到代理范围的限制,也允许代理间侦听器上的连接。在这种情况下,另一个侦听器上最近最少使用的连接将被关闭。
max.connections.per.ip 2147483647(Integer.MaxValue) 我们允许来自每个 IP 地址的最大连接数。如果使用 max.connections.per.ip.overrides 属性配置了覆盖,则可以将其设置为 0。如果达到限制,来自 IP 地址的新连接将被丢弃。
max.connections.per.ip.overrides ”“ 以逗号分隔的 per-ip 或主机名列表覆盖默认的最大连接数。示例值为“hostName:100,127.0.0.1:200”
max.incremental.fetch.session.cache.slots 1000 维护的最大增量fetch会话数。
num.partitions 1 创建topic时候的默认分区数
password.encoder.old.secret null 用于对动态配置的密码进行编码的旧秘密。只有在更新密码时才需要这样做。如果指定,则当代理启动时,所有动态编码的密码将使用此旧密码解码,并使用password.encoder.secret重新编码。(就是你要换密码总得先用老的密码解密再用新密码进行加密呢)
password.encoder.secret null 动态配置编码的秘钥
principal.builder.class null 实现 KafkaPrincipalBuilder 接口的类的完全限定名称,用于构建授权期间使用的 KafkaPrincipal 对象。此配置还支持已弃用的 PrincipalBuilder 接口,该接口以前用于通过 SSL 进行客户端身份验证。如果未定义主体构建器,则默认行为取决于使用的安全协议。对于 SSL 身份验证,ssl.principal.mapping.rules如果提供了客户端证书,将使用应用在客户端证书的专有名称上定义的规则派生主体;否则,如果不需要客户端身份验证,则主体名称将为 ANONYMOUS。对于 SASL 身份验证,将使用定义的规则派生主体sasl.kerberos.principal.to.local.rules如果正在使用 GSSAPI,以及其他机制的 SASL 身份验证 ID。对于 PLAINTEXT,委托人将是匿名的
producer.purgatory.purge.interval.requests 1000
queued.max.request.bytes -1 在不再读取请求之前允许的排队字节数
replica.fetch.backoff.ms 1000 当发生 fetch 分区的时候 休眠的时间长度
replica.fetch.max.bytes 1048576 尝试为每个分区获取的消息字节数。这不是绝对最大值,如果fetch的第一个非空分区中的第一个记录批次大于此值,则仍然会返回该记录批次以确保可以取得进展。代理接受的最大记录批量大小是通过message.max.bytes(broker config) 或max.message.bytes(topic config) 定义的。
replica.fetch.response.max.bytes 10485760 整个Fetch响应预期的最大字节数。记录是分批获取的,如果获取的第一个非空分区中的第一个记录批次大于该值,则仍然会返回该记录批次以确保可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批量大小是通过message.max.bytes(broker config) 或max.message.bytes(topic config) 定义的。
replica.selector.class null 实现 ReplicaSelector 的完全限定类名。broker 使用它来查找首选只读副本。默认情况下,我们使用返回leader的实现方式。
sasl.client.callback.handler.class null 实现 AuthenticateCallbackHandler 接口的 SASL 客户端回调处理程序类的完全限定名称
sasl.enabled.mechanisms GSSAPI Kafka 服务器中启用的 SASL 机制列表。该列表可能包含安全提供者可用的任何机制。默认情况下仅启用 GSSAPI。
sasl.jaas.config null SASL 连接的 JAAS 登录上下文参数,采用 JAAS 配置文件使用的格式。此处描述了JAAS 配置文件格式。该值的格式为:’ loginModuleClass controlFlag (optionName=optionValue)*;’。对于代理,配置必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule 需要;
sasl.kerberos.kinit.cmd /usr/bin/kinit Kerberos kinit 命令路径。
sasl.kerberos.min.time.before.relogin 60000 刷新尝试之间的登录线程休眠时间。
sasl.kerberos.principal.to.local.rules null 从主体名称到短名称(通常是操作系统用户名)的映射规则列表。规则按顺序评估,与主体名称匹配的第一个规则用于将其映射到短名称。列表中任何后面的规则都将被忽略。默认情况下,{username}/{hostname}@{REALM} 形式的主体名称映射到 {username}。有关格式的更多详细信息,请参阅安全授权和 acls。请注意,如果principal.builder.class配置提供了 KafkaPrincipalBuilder 的扩展,则此配置将被忽略。
sasl.kerberos.service.name null Kafka 运行时使用的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。
sasl.kerberos.ticket.renew.jitter 0.05 添加到更新时间的随机抖动百分比。
sasl.kerberos.ticket.renew.window.factor 0.8 登录线程将休眠,直到达到从上次刷新到票证到期的指定时间窗口因子,此时它将尝试更新票证。
sasl.login.callback.handler.class null 实现 AuthenticateCallbackHandler 接口的 SASL 登录回调处理程序类的完全限定名称。对于代理,登录回调处理程序配置必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler
sasl.login.class null 实现 Login 接口的类的完全限定名称。对于代理,登录配置必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin
sasl.login.refresh.buffer.seconds 300 刷新凭证时在凭证到期之前保持的缓冲时间,以秒为单位。如果刷新将在比缓冲秒数更接近到期时发生,则刷新将向上移动以保持尽可能多的缓冲时间。合法值介于 0 到 3600(1 小时)之间;如果未指定值,则使用默认值 300(5 分钟)。如果此值和 sasl.login.refresh.min.period.seconds 的总和超过凭证的剩余生命周期,则它们都将被忽略。目前仅适用于 OAUTHBEARER。
sasl.login.refresh.min.period.seconds 60 在刷新凭据之前登录刷新线程所需的最短等待时间,以秒为单位。合法值介于 0 到 900(15 分钟)之间;如果未指定值,则使用默认值 60(1 分钟)。如果此值和 sasl.login.refresh.buffer.seconds 的总和超过凭证的剩余生命周期,则它们都将被忽略。目前仅适用于 OAUTHBEARER。
sasl.login.refresh.window.factor 0.8 登录刷新线程将休眠,直到达到与凭证生命周期相关的指定窗口因子,此时它将尝试刷新凭证。合法值介于 0.5 (50%) 和 1.0 (100%) 之间;如果未指定值,则使用默认值 0.8 (80%)。目前仅适用于 OAUTHBEARER。
sasl.login.refresh.window.jitter 0.05 添加到登录刷新线程的睡眠时间的相对于凭证生命周期的最大随机抖动量。合法值介于 0 和 0.25 (25%) 之间;如果未指定值,则使用默认值 0.05 (5%)。目前仅适用于 OAUTHBEARER
sasl.mechanism.inter.broker.protocol GSSAPI SASL 机制用于代理间通信。默认为 GSSAPI。
sasl.server.callback.handler.class null 实现 AuthenticateCallbackHandler 接口的 SASL 服务器回调处理程序类的完全限定名称。服务器回调处理程序必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如,listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackH
security.inter.broker.protocol PLAINTEXT 用于代理之间通信的安全协议。有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。不能同时设置 这个和inter.broker.listener.name属性。
ssl.cipher.suites null 密码套件列表。这是身份验证、加密、MAC 和密钥交换算法的命名组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。默认情况下,支持所有可用的密码套件。
ssl.client.auth none 配置 kafka 代理以请求客户端身份验证。以下设置是常见的:
ssl.client.auth=required 如果设置为 required 则需要客户端身份验证。
ssl.client.auth=requested这意味着客户端身份验证是可选的。与请求不同,如果设置了此选项,客户端可以选择不提供有关自身的身份验证信息
ssl.client.auth=none 这意味着不需要客户端身份验证。
ssl.enabled.protocols TLSv1.2 为 SSL 连接启用的协议列表。
ssl.key.password null 密钥库文件中私钥的密码。这是客户端可选的。
ssl.keymanager.algorithm SunX509 密钥管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的密钥管理器工厂算法。
ssl.keystore.location null 密钥存储文件的位置。这对客户端是可选的,可用于客户端的双向身份验证。
ssl.keystore.password null 中等 密钥存储文件的存储密码。这对客户端是可选的,只有在配置了 ssl.keystore.location 时才需要。
ssl.keystore.type JKS 密钥库文件的文件格式。这是客户端可选的。
ssl.protocol TLSv1.2 用于生成 SSLContext 的 SSL 协议。默认设置为 TLSv1.2,适用于大多数情况。最近的 JVM 中允许的值为 TLSv1.2 和 TLSv1.3。旧的 JVM 可能支持 TLS、TLSv1.1、SSL、SSLv2 和 SSLv3,但由于已知的安全漏洞,不鼓励使用它们。
ssl.provider null 用于 SSL 连接的安全提供程序的名称。默认值是 JVM 的默认安全提供程序。
ssl.trustmanager.algorithm PKIX 信任管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的信任管理器工厂算法。
ssl.truststore.location null 信任存储文件的位置。
ssl.truststore.password null 信任存储文件的密码。如果未设置密码,则仍可访问信任库,但禁用完整性检查。
ssl.truststore.type JKS ssl.truststore.type
zookeeper.clientCnxnSocket 通常org.apache.zookeeper.ClientCnxnSocketNetty在使用 TLS 连接到 ZooKeeper 时设置为。覆盖通过同名zookeeper.clientCnxnSocket系统属性设置的任何显式值。
zookeeper.ssl.client.enable false 设置客户端在连接到 ZooKeeper 时使用 TLS。显式值会覆盖通过zookeeper.client.secure系统属性设置的任何值(注意不同的名称)。如果两者都没有设置,则默认为 false;当为真时,zookeeper.clientCnxnSocket必须设置(通常为org.apache.zookeeper.ClientCnxnSocketNetty);要设置的其他值可能包括zookeeper.ssl.cipher.suites、zookeeper.ssl.crl.enable、zookeeper.ssl.enabled.protocols、zookeeper.ssl.endpoint.identification.algorithm、zookeeper.ssl.keystore.location、zookeeper.ssl.keystore.password、zookeeper.ssl.keystore.type、zookeeper.ssl.ocsp.enable、zookeeper.ssl.protocol、zookeeper.ssl.truststore.location、zookeeper.ssl.truststore.password、zookeeper.ssl.truststore.type
zookeeper.ssl.keystore.location null 使用客户端证书与 ZooKeeper 的 TLS 连接时的密钥库位置。覆盖通过zookeeper.ssl.keyStore.location系统属性设置的任何显式值(注意驼峰式命名法)。
zookeeper.ssl.keystore.password null 使用客户端证书与 ZooKeeper 的 TLS 连接时的密钥库密码。覆盖通过zookeeper.ssl.keyStore.password系统属性设置的任何显式值(注意驼峰式命名法)。注意ZooKeeper不支持与keystore密码不同的key密码,所以一定要设置keystore中的key密码与keystore密码一致;否则与 Zookeeper 的连接尝试将失败。
zookeeper.ssl.keystore.type null 使用客户端证书与 ZooKeeper 的 TLS 连接时的密钥库类型。覆盖通过zookeeper.ssl.keyStore.type系统属性设置的任何显式值(注意驼峰式命名法)。默认值null表示将根据密钥库的文件扩展名自动检测类型。
zookeeper.ssl.truststore.location null 使用 TLS 连接到 ZooKeeper 时的信任库位置。覆盖通过zookeeper.ssl.trustStore.location系统属性设置的任何显式值(注意驼峰式命名法)。
zookeeper.ssl.truststore.password 使用 TLS 连接到 ZooKeeper 时的 Truststore 密码。覆盖通过zookeeper.ssl.trustStore.password系统属性设置的任何显式值(注意驼峰式命名法)。
zookeeper.ssl.truststore.type null 使用 TLS 连接到 ZooKeeper 时的 Truststore 类型。覆盖通过zookeeper.ssl.trustStore.type系统属性设置的任何显式值(注意驼峰式命名法)。默认值null表示将根据信任库的文件扩展名自动检测类型。
alter.config.policy.class.name null 应该用于验证的更改配置策略类。类应该实现org.apache.kafka.server.policy.AlterConfigPolicy接口。
alter.log.dirs.replication.quota.window.num 11 更改日志目录复制配额而在内存中保留的样本数 多图详解Kafka中的数据采集和统计机制
alter.log.dirs.replication.quota.window.size.seconds 1 更改日志目录复制配额的每个示例的时间跨度 多图详解Kafka中的数据采集和统计机制
listener.security.protocol.map PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL 侦听器名称和安全协议之间的映射。必须为同一安全协议定义这一点,才能在多个端口或 IP 中使用。例如,即使内部和外部流量都需要 SSL,也可以将两者分开。具体来说,用户可以定义名称为 INTERNAL 和 EXTERNAL 的侦听器,并且此属性为:INTERNAL:SSL,EXTERNAL:SSL。如图所示,键和值用冒号分隔,映射条目用逗号分隔。每个侦听器名称在地图中只应出现一次。通过向配置名称添加规范化前缀(侦听器名称小写),可以为每个侦听器配置不同的安全性(SSL 和 SASL)设置。例如,要为 INTERNAL 侦听器设置不同的密钥库,名称为listener.name.internal.ssl.keystore.location将被设置。如果未设置侦听器名称的配置,则配置将回退到通用配置(即ssl.keystore.location)。
log.message.downconversion.enable true 此配置控制是否启用消息格式的下转换以满足消费请求。当设置为 时false,broker 将不会为需要旧消息格式的消费者执行向下转换。代理响应UNSUPPORTED_VERSION来自此类较旧客户端的消费请求的错误。此配置不适用于复制到追随者可能需要的任何消息格式转换。
metric.reporters “” 用作指标报告器的类列表。实现该org.apache.kafka.common.metrics.MetricsReporter接口允许插入将在新指标创建时通知的类。JmxReporter 总是包含在注册 JMX 统计信息中。
metrics.num.samples 2 为计算指标而维护的样本数。
metrics.recording.level INFO 指标的最高记录级别。
metrics.sample.window.ms 30000 计算指标样本的时间窗口。
password.encoder.cipher.algorithm AES/CBC/PKCS5Padding 用于对动态配置的密码进行编码的密码算法。
password.encoder.iterations 4096 用于编码动态配置密码的迭代计数。
password.encoder.key.length 128 用于对动态配置的密码进行编码的密钥长度。
password.encoder.keyfactory.algorithm null 用于对动态配置的密码进行编码的 SecretKeyFactory 算法。如果可用,默认为 PBKDF2WithHmacSHA512,否则为 PBKDF2WithHmacSHA1。
quota.window.num 11 为客户端配额保留在内存中的样本数
quota.window.size.seconds 1 客户配额每个样本的时间跨度
replication.quota.window.num 11 为复制配额保留在内存中的样本数
replication.quota.window.size.seconds 1 每个样本的复制配额时间跨度
security.providers null 一个可配置的创建者类列表,每个类返回一个实现安全算法的提供者。这些类应该实现org.apache.kafka.common.security.auth.SecurityProviderCreator接口。
ssl.endpoint.identification.algorithm https 使用服务器证书验证服务器主机名的端点识别算法。
ssl.principal.mapping.rules DEFAULT 用于从客户端证书的专有名称映射到短名称的规则列表。规则按顺序评估,与主体名称匹配的第一个规则用于将其映射到短名称。列表中任何后面的规则都将被忽略。默认情况下,X.500 证书的专有名称将是主体。有关格式的更多详细信息,请参阅安全授权和 acls。请注意,如果principal.builder.class配置提供了 KafkaPrincipalBuilder 的扩展,则此配置将被忽略。
ssl.secure.random.implementation null 用于 SSL 加密操作的 SecureRandom PRNG 实现。
transaction.abort.timed.out.transaction.cleanup.interval.ms 10000 回滚超时事务的时间间隔
transaction.remove.expired.transaction.cleanup.interval.ms 3600000 删除由于transactional.id.expiration.ms传递而过期的事务的时间间隔
zookeeper.ssl.cipher.suites null 指定要在 ZooKeeper TLS 协商 (csv) 中使用的已启用密码套件。覆盖通过zookeeper.ssl.ciphersuites系统属性设置的任何显式值(注意单个词“密码套件”)。的默认值null意味着启用的密码套件列表由正在使用的 Java 运行时确定。
zookeeper.ssl.crl.enable false ZooKeeper TLS协议中是否开启证书吊销列表。覆盖通过zookeeper.ssl.crl系统属性设置的任何显式值(注意较短的名称)。
zookeeper.ssl.enabled.protocols null 在 ZooKeeper TLS 协商 (csv) 中指定启用的协议。覆盖通过zookeeper.ssl.enabledProtocols系统属性设置的任何显式值(注意驼峰式命名法)。的默认值null意味着启用的协议将是zookeeper.ssl.protocol配置属性的值。
zookeeper.ssl.endpoint.identification.algorithm HTTPS 指定是否在 ZooKeeper TLS 协商过程中启用主机名验证,其中(不区分大小写)“https”表示启用 ZooKeeper 主机名验证,显式空白值表示禁用(仅建议出于测试目的禁用)。显式值会覆盖通过zookeeper.ssl.hostnameVerification系统属性设置的任何“true”或“false”值(注意不同的名称和值;true 表示 https,false 表示空白)。
zookeeper.ssl.ocsp.enable false ZooKeeper TLS协议中是否开启在线证书状态协议。覆盖通过zookeeper.ssl.ocsp系统属性设置的任何显式值(注意较短的名称)。
zookeeper.ssl.protocol null 指定要在 ZooKeeper TLS 协商中使用的协议。显式值会覆盖通过同名zookeeper.ssl.protocol系统属性设置的任何值。
zookeeper.sync.time.ms 2000 ZK 追随者可以落后于 ZK 领导者多远

可以在 scala 类中找到有关代理配置的更多详细信息kafka.server.KafkaConfig

扩展讲解

Leader 均衡机制(auto.leader.rebalance.enable=true)

当一个broker停止或崩溃时,这个broker中所有分区的leader将转移给其他副本。这意味着在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。

为了避免这种不平衡,Kafka有一个首选副本的概念。如果一个分区的副本列表是1,5,9,节点1将优先作为其他两个副本5和9的leader,因为它较早存在于副本中。你可以通过运行以下命令让Kafka集群尝试恢复已恢复正常的副本的leader地位:。不会导致负载不均衡和资源浪费,这就是leader的均衡机制

1
2
3
4
># kafka版本 <= 2.4
>> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
># kafka新版本
>> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

kafka平衡leader

手动运行很不优雅,可以通过配置auto.leader.rebalance.enable = true 来自动均衡

在配置文件conf/ server.properties中配置开启(默认就是开启)auto.leader.rebalance.enable = true
与其相关的配置还有

leader.imbalance.check.interval.seconds partition 检查重新 rebalance 的周期时间 ; 默认300秒;
leader.imbalance.per.broker.percentage 标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader;默认比例是10%;

这个比率的算法是 :broker不平衡率=非优先副本的leader个数/总分区数,
假如一个topic有3个分区[0,1,2],并且3个副本 ,正常情况下,[0,1,2]分别都为一个leader副本; 这个时候 0/3=0%;

上面几个配置都是 && 的关系; 同时满足才能触发再平衡;

调优建议:考虑到leader重选举的代价比较大,可能会带来性能影响,也可能会引发客户端的阻塞,生产环境建议设置为false。或者周期设置长一点,比如一天一次;

那么如果我们关闭了 均衡机制 , 或者周期时间比较长, 也就有可能造成上面说的问题, 那么LogiKM 就提供了一个手动再平衡的操作;

compression.type 压缩类型讲解

开启压缩发送传输可以提高kafka的消息吞吐量
具体压缩类型: (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’)

producer 端设置类型

1
2
// 开启 lz4 压缩
props.put("compression.type", "lz4");

broker 端 继承使用 producer的类型

1
2
3

compression.type=producer(默认值)

Kafak的压缩方式是将多条消息一起压缩,这样可以保证比较好的压缩效果。一般情况下,生产者发送的压缩数据在Broker中也是保持要锁状态进行存储的,消费者从服务端获取的也是压缩的数据,消费组在处理消息之前才会解压消息,这样保持了端到端的压缩。

使用默认值producer, Broker会原封不动的保存这个数据, 如果设置了跟producer不一样的压缩方式,那么Broker还有先解压之后再用新的压缩方式来压缩,这样会导致cpu使用率升高, 所以使用默认值就好。 producer 端建议使用lz4的压缩方式,压缩效果比较好。

日志段切分条件详解

  1. 当前日志分段文件大小超过broker端的参数log.segment.bytes,默认是1073741824 (1G)
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳差值大于log.roll.hourslog.roll.ms的值,则强制创建新的日志段Segment,log.roll.ms > log.roll.hours . log.roll.ms 默认为168 ,即7天。

min.insync.replicas 最小同步副本数

这个参数表示ISR集合中的最少副本数,默认值是1,并只有在生产者将acks=all或-1时才有效。acks与min.insync.replicas搭配使用,才能为消息提供最高的持久性保证。引入min.insync.replicas的目的就是为了保证下限:不能只满足于ISR全部写入,还要保证ISR中的写入个数不少于min.insync.replicas 。它的判断条件是 min.insync.replicas<= 当前副本ISR的数量,否则会抛出异常。常见的场景是创建一个三副本(即replication.factor=3)的topic,最少同步副本数设为2(即min.insync.replicas=2),acks设为all,那么必须至少有2个副本数据同步了(包括Leader副本)才可以,当然我们也可以设置为min.insync.replicas=3,表示所有的副本必须同步才可以,否则会抛出异常(NotEnoughReplicas 或 NotEnoughReplicasAfterAppend

__consumer_offsets 详解

专门用来保存消费组消费的偏移量的数据, 它也是一个Topic,是一个内部Topic,在第一次消费的时候会自动创建这个Topic,
分区大小是offsets.topic.num.partitions控制,默认是50,部署后不建议更改,后续可以执行扩容操作进行扩容
副本数量由offsets.topic.replication.factor控制,默认是3, 在集群大小满足此复制因子要求之前,内部主题__consumer_offsets 创建将失败。会抛出异常

1
Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.

受控关机操作

当Broker关闭的时候, 会向Controller发一起一个ControlledShutdownRequest请求, Controller收到这个请求会针对性的做一些善后事件。比如说 执行Leader重选举 等等之类的…

源码位置:KafkaServer#controlledShutdown

controlled.shutdown.enable 是否启用受控关闭操作

如果关闭了,则不会发起上面说的请求了

controlled.shutdown.max.retries 受控关机操作 重试的次数

ControlledShutdownRequest这个请求可能有多种原因执行失败, 这个配置就是重试的次数,当然,超过了这个次数仍然失败的话,那么 Broker还是会强制关机的。

controlled.shutdown.retry.backoff.ms 失败后等等多久再次重试

1
>当`ControlledShutdownRequest`请求失败后,会等等多少秒后再次重试  ,Thread.sleep(config.controlledShutdownRetryBackoffMs)

作者石臻臻,工作8年的互联网老兵,丰富的开发和管理经验,全网「 粉丝数4万 」,

先后从事 「 电商 」「 中间件 」「 大数据」 等工作

现在任职于「 滴滴技术专家 」岗位,从事开源建设工作

目前在维护 个人公众号「  石臻臻的杂货铺 」 ; 关注公众号会有「 日常送书活动 」;

欢迎进「 高质量 」 「 滴滴开源技术答疑群 」 , 群内每周技术专家轮流值班答疑

可帮忙「 内推 」一二线大厂