Files
KnowStreaming/study-kafka/3-config.md
2023-02-22 14:38:17 +08:00

464 lines
229 KiB
Markdown
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: 3.Kafka参数配置
order: 3
toc: menu
---
## 3.1 Broker 配置
基本配置如下:
| 参数 | 描述 | 默认 |
| ----------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------- |
| broker.id | broker server 的唯一标识,必须唯一, 如果不设置,或者`broker.id`<0 则会自动计算`broker.id`; 相关可以看`broker.id.generation.enable` `reserved.broker.max.id` 的配置 | -1 |
| reserved.broker.max.id | `broker.id`能够配置的最大值,同时 `reserved.broker.max.id+1`也是自动创建`broker.id`的最小值 | 1000 |
| broker.id.generation.enable | 允许 server 自动创建`broker.id``broker.id`< 0 并且当前配置是 true,则自动计算`broker.id`计算逻辑是` {reserved.broker.max.id} +/brokers/seqid.dataVersion` , 其中的`/brokers/seqid.dataVersion`保证了全局自增 | true |
| log.dir | log 数据存放的目录 | /tmp/kafka-logs |
| log.dirs | Log 数据存放的目录,比如`/tmp/kafka-logs`. 这个可以用逗号隔开设置多个目录,主要是用来挂载到多个磁盘上的<br>例如:`/tmp/kafka0,/tmp/kafka1` 。如果没有设置,则使用 `log.dir`的配置 | null |
| zookeeper.connect | 以`hostname:port`形式指定 ZooKeeper 连接字符串 <br> 也可以用逗号形式指定多个 Zookeeper; 例如:`hostname1:port1,hostname2:port2,hostname3:port3 ` <br> 。如果你的 kafka 数据放在 Zookeeper 的某个路径下,也可以指定对应的路径,例如指定 kafka 数据存放到 Zookeeper 全局命名空间的 /szzkafka 下。那么你可以指定 ` hostname:port/szzkafka` | |
| advertised.listeners | 对外发布的监听器列表,这个配置会存放在 Zookeeper 中,其他 Broker/客户端可以通过元数据拿到这个监听器列表,从而可以对该 Broker 进行访问。如果这个为空,则默认用 listeners 的配置数据. 简单理解:你可能有多个监听器列表,但是你只想告诉其他客户端你只有某一个/多个监听器列表,其他客户端只能通过其发布的监听器列表该 Broker 进行访问 | null |
| listeners | 监听器列表,这个配置的监听器将被用于监听网络请求。简单理解就是你建立监听一个通道,别人能够通过这个通道跟你沟通。 | null |
| auto.create.topics.enable | 是否能够自动创建 Topic。当生产者向一个未知 Topic 发送消息,或者消费者从未知 Topic 的时候会自动创建 Topic。创建的时候是按照`num.partitions``default.replication.factor` 的配置进行 | true |
| auto.leader.rebalance.enable | 是否开始自动 Leader 再均衡的功能,具体看扩展讲解的**Leader 均衡机制** | true |
| leader.imbalance.check.interval.seconds | Controller 触发 Leader 再均衡的频率,默认 300S 一次。具体看扩展讲解的**Leader 均衡机制** | 300 |
| leader.imbalance.per.broker.percentage | 标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader默认比例是 10%; 具体看扩展讲解的**Leader 均衡机制** | 10 |
| background.threads | 用于各种后台处理任务的线程数,例如过期消息文件的删除等 | 10 |
| compression.type | 消息的压缩类型, Kafka 支持的类型有('gzip'、'snappy'、'lz4'、'zstd', 当然还可以设置`uncompressed` 表示不启用压缩, 这里 默认是 `producer` 表示继承使用 `producer`端的压缩类型。具体请看 **压缩类型讲解** | producer |
| control.plane.listener.name | Controller 与 Broker 之间的通信监听器名称, 默认为空,则 Controller 的请求与 Broker 请求都是使用同一个监听器,但是一般 Controller 的请求需要优先级更高一点,则需要单独给 Controller 配置网络请求模块,那么 Controller 的请求就不会被其他请求给影响 | null |
| delete.topic.enable | 是否允许删除 Topic。 | true |
| log.flush.interval.messages | 在消息刷新到磁盘之前在日志分区上累积的消息数 | Long.MaxValue |
| log.flush.scheduler.interval.ms | 定时检查是否有任何日志需要刷新到磁盘的频率(毫秒单位),该值通常搭配`log.flush.interval.ms` | Long.MaxValue |
| log.flush.interval.ms | 任何 Topic 中的消息在刷新到磁盘之前保留在内存中的最长时间(以毫秒为单位)。如果未设置,则使用 `log.flush.scheduler.interval.ms` 中的值, 但是需要注意的是,如果没有设置`log.flush.scheduler.interval.ms` 那么这个值也是不会生效的。因为都没有去定时检查。 | null |
| log.flush.start.offset.checkpoint.interval.ms | 有个名称为"kafka-log-start-offset-checkpoint"的定时任务去更新日志起始偏移量日志文件,这个配置就是更新的频率,被更新的文件是每个`log.dirs`中目录下的`log-start-offset-checkpoint` | 60000 |
| log.retention.bytes | 日志文件的最大保留大小。分区级别的 | -1 |
| log.retention.hours | 日志保存的最大时间(单位小时),默认 7 天。优先级 `log.retention.ms` > `log.retention.minutes`>`log.retention.hours` | 168 |
| log.retention.minutes | 日志保存的最大时间(单位分钟), 优先级 `log.retention.ms` > `log.retention.minutes`>`log.retention.hours` | null |
| log.retention.ms | 日志保存的最大时间(单位毫秒),优先级 `log.retention.ms` > `log.retention.minutes`>`log.retention.hours`. | null |
| log.roll.hours | 创建新日志段之前的最长时间(以小时为单位),默认 7 天, 意思是如果 7 天没有创建新的 Segment,则强制创建新的 Segment 用于存储 Log 数据,优先级 次于 log.roll.ms 属性; 日志段的切分请看 **日志段切分条件详解** | 168 |
| log.roll.ms | 同`log.roll.hours`,只是单位是毫秒,优先级大于`log.roll.hours` ,详情请看下面**日志段切分条件详解** | null |
| log.segment.bytes | 单个日志段最大值,超过这个值就创建新的日志段,详情请看下面 **日志段切分条件详解** | 1073741824(1G) |
| log.segment.delete.delay.ms | 执行删除操作,Log 文件夹会先被标记为`-delete`后缀,这个配置就是设置被标记为`-delete`之后延迟多长时间去真正的从磁盘中删除该文件 | 60000 |
| message.max.bytes | Kafka 允许的最大记录批量大小(如果启用压缩,则在压缩之后),如果增加此值并且存在早于 0.10.2 的消费者,则消费者的获取大小也必须增加,以便他们可以获取如此大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是被分组。在以前的消息格式版本中,未压缩的记录不会分组到批处理中,并且此限制仅适用于这种情况下的单个记录。这可以使用主题级别`max.message.bytes`配置为每个主题设置。 | 1048588(约 1M) |
| min.insync.replicas | 和生产者的配置`acks=all`配套使用, 表示最小有多少个副本数据同步成功,它的判断条件是 `min.insync.replicas<= 当前副本ISR的数量` 才可以,否则会抛出异常,更详细的请看下面 **min.insync.replicas 详解** | 1 |
| num.io.threads | 服务器用于处理请求的线程数,可能包括磁盘 I/O | 8 |
| num.network.threads | 服务器用于从网络接收请求并向网络发送响应的线程数 | 3 |
| num.recovery.threads.per.data.dir | 每个数据目录(数据目录即指的是上述 log.dirs 配置的目录路径)用于日志恢复启动和关闭时的线程数量,注意,这个参数指的是每个日志目录的线程数,比如如果该参数设置为 8而 log.dirs 设置为了三个路径(多个路径由","分隔),则总共会启动 24 个线程来处理 | 1 |
| num.replica.alter.log.dirs.threads | 可以在日志目录之间移动(跨目录迁移)副本的线程数,其中可能包括磁盘 I/O | null |
| num.replica.fetchers | 每个 follower 从 leader 拉取消息进行同步数据时候的拉取线程数,数字大,可以提提高 follower 的 I/O 并发度,单位时间内 leader 持有更多的请求,同时 leader 的负载也会增大 | 1 |
| offset.metadata.max.bytes | 与偏移提交相关联的元数据的最大数据量 | 4096(4kb) |
| offsets.commit.required.acks | 偏移量相关数据的 ack 设置,0、1、-1 为可选项,默认为-1不建议你去修改此值 | -1 |
| offsets.commit.timeout.ms | 偏移量提交将被延迟,直到偏移量主题的所有副本都收到提交或达到此超时。这类似于生产者请求超时 | 5000 |
| offsets.load.buffer.size | 将偏移量加载到缓存中时从偏移量段读取的批量大小(软限制,如果记录太大则覆盖) | 5242880 |
| offsets.retention.check.interval.ms | 检查陈旧偏移的频率 | 600000(10 分钟) |
| offsets.retention.minutes | 在消费者组失去其所有消费者(即变空)后,其偏移量将在此保留期内保留,然后再被丢弃。对于独立消费者(使用手动分配),偏移量将在上次提交时间加上此保留期后过期 | 100807 天) |
| offsets.topic.compression.codec | 偏移 Topic `__consumer_offsets` 的压缩编解码器 - 压缩可用于实现“原子”提交,默认是 0; 对应的压缩 k-v 映射关系为{ 0:NONE、1:GZIP、2:SNAPPY、3:LZ4、4: ZSTD} | 0 |
| offsets.topic.num.partitions | 偏移 Topic `__consumer_offsets` 的分区数(部署后不应更改) | 50 |
| offsets.topic.replication.factor | 偏移 Topic `__consumer_offsets` 的副本数量(设置得更高以确保可用性)。在集群大小满足此复制因子要求之前,内部主题创建将失败。详细请看下面 **`__consumer_offsets` 详解** | 3 |
| offsets.topic.segment.bytes | `__consumer_offsets`主题每个 Segment 的字节应该保持相对较小,以促进更快的日志压缩和缓存加载 | 104857600100M |
| queued.max.requests | 在阻塞网络线程之前允许数据平面的排队请求数 | 500 |
| replica.fetch.min.bytes | 每个 fetch 请求响应所需的最小字节数。如果没有足够的字节,则等待 replicaMaxWaitTimeMs | 1 |
| replica.fetch.wait.max.ms | Follower 副本发出的每个 fetcher 请求的最大等待时间。该值应始终小于`replica.lag.time.max.ms `以防止频繁收缩低吞吐量主题的 ISR | 500 |
| replica.high.watermark.checkpoint.interval.ms | 每个 replica 将最高水位进行 flush 的时间间隔 | 5000 |
| replica.lag.time.max.ms | 如果一个 follower 在这个时间内没有发送 fetch 请求leader 将从 ISR 重移除这个 follow | 30000 |
| replica.socket.receive.buffer.bytes | 同步时向 leader 发送网络请求时的 socket receive buffer | 65536 |
| replica.socket.timeout.ms | 副本同步时候网络请求的超时时间,这个值要大于`replica.fetch.wait.max.ms` | 30000 |
| request.timeout.ms | 这个配置控制着客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽,则请求失败 | 30000 |
| socket.receive.buffer.bytes | 套接字服务器套接字的 SO_RCVBUF 缓冲区。如果值为 -1则将使用操作系统默认值。 | 102400100kb |
| socket.request.max.bytes | 套接字请求中的最大字节数 | 104857600(100M) |
| socket.send.buffer.bytes | 套接字服务器套接字的 SO_SNDBUF 缓冲区。如果值为 -1则将使用操作系统默认值 | 102400 |
| transaction.max.timeout.ms | 事务的最大允许超时时间。如果客户端请求的事务时间超过此时间,则代理将在 InitProducerIdRequest 中返回错误。这可以防止客户端超时过大,这可能会阻止消费者从事务中包含的主题中读取。 | 900000 |
| transaction.state.log.load.buffer.size | 将生产者 ID 和事务加载到缓存中时从事务日志段读取的批量大小(软限制,如果记录太大则覆盖) | 5242880 |
| transaction.state.log.min.isr | 跟`min.insync.replicas`一样,但是事务主题` __transaction_state`有自己的配置,他会覆盖`min.insync.replicas`的配置 | 2 |
| transaction.state.log.num.partitions | 事务主题` __transaction_state`的分区数量配置(部署后不应更改) | 50 |
| transaction.state.log.replication.factor | 事务主题的副本数量(设置得更高以确保可用性)。在集群大小满足此复制因子要求之前,内部主题创建将失败。 | 3 |
| transaction.state.log.segment.bytes | 事务主题` __transaction_state` 的 Segment 段字节应该保持相对较小,以促进更快的日志压缩和缓存加载 | 104857600100M |
| transactional.id.expiration.ms | 事务协调器在其事务 id 到期之前等待而不接收当前事务的任何事务状态更新的时间(以毫秒为单位)。此设置还会影响生产者 ID 到期 - 在使用给定生产者 ID 的最后一次写入之后,一旦过了此时间,生产者 ID 就会过期。请注意,如果由于主题的保留设置而删除了生产者 ID 的最后一次写入,生产者 ID 可能会更快过期 | 6048000007 天) |
| unclean.leader.election.enable | 如果开启了这个,当 ISR 列表中没有数据的时候,那么将会从不在 ISR 的副本中选择为 Leader,当然这样子可能会导致数据丢失 | false |
| zookeeper.connection.timeout.ms | 客户端等待与 zookeeper 建立连接的最长时间。如果未设置,则使用` zookeeper.session.timeout.ms` 中的值 | null |
| zookeeper.max.in.flight.requests | 客户端在阻塞之前将发送给 Zookeeper 的最大未确认请求数。 | 10 |
| zookeeper.session.timeout.ms | Zookeeper 会话超时 | 18000 |
| zookeeper.set.acl | 将客户端设置为使用安全 ACL | false |
| broker.rack | broker 的机架配置信息,在进行副本分配的时候会判断是否有机架信息,选择不同的分配策略,以实现容错。 | null |
| connection.max.idle.ms | 空闲连接超时:服务器套接字处理器线程关闭空闲超过此时间的连接 | 600000 |
| connection.max.reauth.ms | 当明确设置为正数(默认为 0不是正数不会超过配置值的会话生存期将在 v2.2.0 或更高版本的客户端进行身份验证时进行通信。代理将断开在会话生命周期内未重新验证的任何此类连接,然后将其用于除重新验证之外的任何目的。配置名称可以选择以小写的侦听器前缀和 SASL 机制名称作为前缀。例如listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000 | 0 |
| controlled.shutdown.enable | 当该值为 true, 然后当 Broker 关闭的时候, 会向 Controller 发一起一个`ControlledShutdownRequest`请求, Controller 收到这个请求会针对性的做一些善后事件。比如说 **执行 Leader 重选举** 等等之类的 。如果改值为 false则不会有善后请求事件 | true |
| controlled.shutdown.max.retries | Broker 关闭的时候会通知 Controller 执行一个受控关闭的操作,这个操作可能由于多种原因而失败。这决定了发生此类故障时的重试次数 | 3 |
| controlled.shutdown.retry.backoff.ms | 在每次重试之前,系统需要时间从导致上一次故障(控制器故障转移、副本滞后等)的状态中恢复。此配置确定重试前等待的时间。 | 5000 |
| controller.socket.timeout.ms | 控制器 Controller 到 broker 通道的套接字超时时间 | 30000 |
| default.replication.factor | 创建主题的默认副本因子 | 1 |
| delegation.token.expiry.time.ms | 令牌需要更新之前的令牌有效期(以毫秒为单位)。默认值 1 天。 | 86400000 |
| delegation.token.master.key | 生成和验证委托令牌的主/秘密密钥。必须在所有代理上配置相同的密钥。如果密钥未设置或设置为空字符串broker 将禁用委托令牌支持。 | null |
| delegation.token.max.lifetime.ms | 令牌有一个最长生命周期,超过这个生命周期就不能再更新了。默认值 7 天。 | 604800000 |
| delete.records.purgatory.purge.interval.requests | 删除记录请求炼狱的清除间隔(以请求数计) | 1 |
| fetch.max.bytes | Fetch 请求返回的最大字节数。必须至少为 1024。 | 57671680(55M) |
| fetch.purgatory.purge.interval.requests | 提取请求炼狱的清除间隔(以请求数计) | 1000 |
| group.initial.rebalance.delay.ms | 在执行第一次重新平衡之前,组协调器将等待更多消费者加入新组的时间。更长的延迟意味着可能更少的重新平衡,但会增加处理开始之前的时间。 | 3000 |
| group.max.session.timeout.ms | 注册消费者的最大允许会话超时。更长的超时时间让消费者有更多的时间在心跳之间处理消息,但代价是检测故障的时间更长。 | 1800000 |
| group.max.size | 单个消费组可以容纳的最大消费者数。 | 2147483647 |
| group.min.session.timeout.ms | 注册上的消费者的最小允许会话超时。更短的超时导致更快的故障检测,代价是更频繁的消费者心跳,这可能会淹没代理资源。 | 6000 |
| inter.broker.listener.name | 用于代理之间通信的侦听器名称。如果未设置,则侦听器名称由 security.inter.broker.protocol 定义。同时设置 这个 和 security.inter.broker.protocol 属性是错误的。 | null |
| inter.broker.protocol.version | 指定将使用哪个版本的代理间协议。在所有代理升级到新版本后,这通常会受到影响。可选的所有版本: [0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2 -IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2 -IV0、2.2-IV1、2.3-IV0、2.3-IV1、2.4-IV0、2.4-IV1、2.5-IV0] | 根据 kafka 版本会有不同的默认值,例如 2.5-IV0 |
| log.cleaner.backoff.ms | 没有要清理的日志时的睡眠时间 | 15000 |
| log.cleaner.dedupe.buffer.size | | 134217728(128M) |
| log.cleaner.delete.retention.ms | | 8640000024 小时) |
| log.cleaner.enable | 是否开启日志情侣进程, 特殊是如果使用带有 cleanup.policy=compact 的任何主题,包括内部偏移主题,则应启用。如果禁用,这些主题将不会被压缩并不断增长 | true |
| log.cleaner.io.buffer.load.factor | | 0.9 |
| log.cleaner.io.buffer.size | 所有清理线程中用于日志清理 I/O 缓冲区的总内存 | 524288 |
| log.cleaner.io.max.bytes.per.second | 日志清理器将受到限制,以便其读写 I/O 的总和平均小于此值 | 1.7976931348623157E308 |
| log.cleaner.max.compaction.lag.ms | 消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。 | 9223372036854775807Long.MaxValue |
| log.cleaner.min.cleanable.ratio | 对于符合清理条件的日志,脏日志与总日志的最小比率。如果还指定了 `log.cleaner.max.compaction.lag.ms``log.cleaner.min.compaction.lag.ms `配置则日志压缩器会在以下任一情况下认为该日志符合压缩条件i已达到脏比率阈值并且日志至少在 `log.cleaner.min.compaction.lag.ms` 持续时间内有脏未压缩记录或者ii如果日志最多有脏未压缩记录`log.cleaner.max.compaction.lag.ms `周期 | 0.5 |
| log.cleaner.min.compaction.lag.ms | 消息在日志中保持未压缩的最短时间。仅适用于正在压缩的日志。 | 0 |
| log.cleaner.threads | 用于日志清理的后台线程数 | 1 |
| log.cleanup.policy | 日志清理策略, delete 表示删除过期日志,还有一个 compact 选项表示压缩日志。 | delete |
| log.index.interval.bytes | 每隔多少个字节的消息量写入就条件一条索引。 | 4096 |
| log.index.size.max.bytes | 索引文件最大值 | 1048576010M |
| log.message.format.version | 消息格式的版本; [0.8.0, 0.8.1, 0.8.2, 0.9.0, 0.10.0-IV0, 0.10.0-IV1, 0.10.1-IV0, 0.10.1-IV1, 0.10.1-IV2, 0.10.2 -IV0, 0.11.0-IV0, 0.11.0-IV1, 0.11.0-IV2, 1.0-IV0, 1.1-IV0, 2.0-IV0, 2.0-IV1, 2.1-IV0, 2.1-IV1, 2.1-IV2, 2.2 -IV0、2.2-IV1、2.3-IV0、2.3-IV1、2.4-IV0、2.4-IV1、2.5-IV0] | |
| log.message.timestamp.difference.max.ms | 代理接收消息时的时间戳与消息中指定的时间戳之间允许的最大差异。如果` log.message.timestamp.type=CreateTime`,则如果时间戳差异超过此阈值,则消息将被拒绝。如果 `log.message.timestamp.type=LogAppendTime `则忽略此配置。允许的最大时间戳差异不应大于`log.retention.ms`以避免不必要的频繁日志滚动 | 9223372036854775807Logn.MaxValue |
| log.message.timestamp.type | 可定义消息中的时间戳是消息创建时间还是日志附加时间。该值应该是 CreateTime 或 LogAppendTime | CreateTime |
| log.preallocate | 创建新段时应该预先分配文件吗?如果您在 Windows 上使用 Kafka则可能需要将其设置为 true。 | false |
| log.retention.check.interval.ms | 日志清理器检查任何日志是否符合删除条件的频率(以毫秒为单位) | 3000005 分钟) |
| max.connections | 我们在任何时候允许的最大连接数。除了使用 max.connections.per.ip 配置的任何 per-ip 限制之外还应用此限制。侦听器级别的限制也可以通过在配置名称前加上侦听器前缀来配置例如listener.name.internal.max.connections。应根据代理容量配置代理范围限制而应根据应用程序要求配置侦听器限制。如果达到侦听器或代理限制则新连接将被阻止。即使达到代理范围的限制也允许代理间侦听器上的连接。在这种情况下另一个侦听器上最近最少使用的连接将被关闭。 | 2147483647(Integer.MaxValue) |
| max.connections.per.ip | 我们允许来自每个 IP 地址的最大连接数。如果使用 max.connections.per.ip.overrides 属性配置了覆盖,则可以将其设置为 0。如果达到限制来自 IP 地址的新连接将被丢弃。 | 2147483647(Integer.MaxValue) |
| max.connections.per.ip.overrides | 以逗号分隔的 per-ip 或主机名列表覆盖默认的最大连接数。示例值为“hostName:100,127.0.0.1:200” | “” |
| max.incremental.fetch.session.cache.slots | 维护的最大增量 fetch 会话数。 | 1000 |
| num.partitions | 创建 topic 时候的默认分区数 | 1 |
| password.encoder.old.secret | 用于对动态配置的密码进行编码的旧秘密。只有在更新密码时才需要这样做。如果指定,则当代理启动时,所有动态编码的密码将使用此旧密码解码,并使用 password.encoder.secret 重新编码。(就是你要换密码总得先用老的密码解密再用新密码进行加密呢) | null |
| password.encoder.secret | 动态配置编码的秘钥 | null |
| principal.builder.class | 实现 KafkaPrincipalBuilder 接口的类的完全限定名称,用于构建授权期间使用的 KafkaPrincipal 对象。此配置还支持已弃用的 PrincipalBuilder 接口,该接口以前用于通过 SSL 进行客户端身份验证。如果未定义主体构建器,则默认行为取决于使用的安全协议。对于 SSL 身份验证ssl.principal.mapping.rules 如果提供了客户端证书,将使用应用在客户端证书的专有名称上定义的规则派生主体;否则,如果不需要客户端身份验证,则主体名称将为 ANONYMOUS。对于 SASL 身份验证,将使用定义的规则派生主体 sasl.kerberos.principal.to.local.rules 如果正在使用 GSSAPI以及其他机制的 SASL 身份验证 ID。对于 PLAINTEXT委托人将是匿名的 | null |
| producer.purgatory.purge.interval.requests | | 1000 |
| queued.max.request.bytes | 在不再读取请求之前允许的排队字节数 | -1 |
| replica.fetch.backoff.ms | 当发生 fetch 分区的时候 休眠的时间长度 | 1000 |
| replica.fetch.max.bytes | 尝试为每个分区获取的消息字节数。这不是绝对最大值,如果 fetch 的第一个非空分区中的第一个记录批次大于此值,则仍然会返回该记录批次以确保可以取得进展。代理接受的最大记录批量大小是通过 message.max.bytes(broker config) 或 max.message.bytes(topic config) 定义的。 | 1048576 |
| replica.fetch.response.max.bytes | 整个 Fetch 响应预期的最大字节数。记录是分批获取的,如果获取的第一个非空分区中的第一个记录批次大于该值,则仍然会返回该记录批次以确保可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批量大小是通过 message.max.bytes(broker config) 或 max.message.bytes(topic config) 定义的。 | 10485760 |
| replica.selector.class | 实现 ReplicaSelector 的完全限定类名。broker 使用它来查找首选只读副本。默认情况下,我们使用返回 leader 的实现方式。 | null |
| sasl.client.callback.handler.class | | 实现 AuthenticateCallbackHandler 接口的 SASL 客户端回调处理程序类的完全限定名称 |
| sasl.enabled.mechanisms | Kafka 服务器中启用的 SASL 机制列表。该列表可能包含安全提供者可用的任何机制。默认情况下仅启用 GSSAPI。 | GSSAPI |
| sasl.jaas.config | SASL 连接的 JAAS 登录上下文参数,采用 JAAS 配置文件使用的格式。此处描述了 JAAS 配置文件格式。该值的格式为:’ loginModuleClass controlFlag (optionName=optionValue)\*;'。对于代理,配置必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule 需要; | null |
| sasl.kerberos.kinit.cmd | Kerberos kinit 命令路径。 | /usr/bin/kinit |
| sasl.kerberos.min.time.before.relogin | 刷新尝试之间的登录线程休眠时间。 | 60000 |
| sasl.kerberos.principal.to.local.rules | 从主体名称到短名称(通常是操作系统用户名)的映射规则列表。规则按顺序评估,与主体名称匹配的第一个规则用于将其映射到短名称。列表中任何后面的规则都将被忽略。默认情况下,{username}/{hostname}@{REALM} 形式的主体名称映射到 {username}。有关格式的更多详细信息,请参阅安全授权和 acls。请注意如果 principal.builder.class 配置提供了 KafkaPrincipalBuilder 的扩展,则此配置将被忽略。 | null |
| sasl.kerberos.service.name | Kafka 运行时使用的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。 | null |
| sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | 0.05 |
| sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到达到从上次刷新到票证到期的指定时间窗口因子,此时它将尝试更新票证。 | 0.8 |
| sasl.login.callback.handler.class | 实现 AuthenticateCallbackHandler 接口的 SASL 登录回调处理程序类的完全限定名称。对于代理,登录回调处理程序配置必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler | null |
| sasl.login.class | 实现 Login 接口的类的完全限定名称。对于代理,登录配置必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin | null |
| sasl.login.refresh.buffer.seconds | 刷新凭证时在凭证到期之前保持的缓冲时间,以秒为单位。如果刷新将在比缓冲秒数更接近到期时发生,则刷新将向上移动以保持尽可能多的缓冲时间。合法值介于 0 到 36001 小时)之间;如果未指定值,则使用默认值 3005 分钟)。如果此值和 sasl.login.refresh.min.period.seconds 的总和超过凭证的剩余生命周期,则它们都将被忽略。目前仅适用于 OAUTHBEARER。 | 300 |
| sasl.login.refresh.min.period.seconds | 在刷新凭据之前登录刷新线程所需的最短等待时间,以秒为单位。合法值介于 0 到 90015 分钟)之间;如果未指定值,则使用默认值 601 分钟)。如果此值和 sasl.login.refresh.buffer.seconds 的总和超过凭证的剩余生命周期,则它们都将被忽略。目前仅适用于 OAUTHBEARER。 | 60 |
| sasl.login.refresh.window.factor | 登录刷新线程将休眠,直到达到与凭证生命周期相关的指定窗口因子,此时它将尝试刷新凭证。合法值介于 0.5 (50%) 和 1.0 (100%) 之间;如果未指定值,则使用默认值 0.8 (80%)。目前仅适用于 OAUTHBEARER。 | 0.8 |
| sasl.login.refresh.window.jitter | 添加到登录刷新线程的睡眠时间的相对于凭证生命周期的最大随机抖动量。合法值介于 0 和 0.25 (25%) 之间;如果未指定值,则使用默认值 0.05 (5%)。目前仅适用于 OAUTHBEARER | 0.05 |
| sasl.mechanism.inter.broker.protocol | SASL 机制用于代理间通信。默认为 GSSAPI。 | GSSAPI |
| sasl.server.callback.handler.class | 实现 AuthenticateCallbackHandler 接口的 SASL 服务器回调处理程序类的完全限定名称。服务器回调处理程序必须以侦听器前缀和小写的 SASL 机制名称作为前缀。例如listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackH | null |
| security.inter.broker.protocol | 用于代理之间通信的安全协议。有效值为PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。不能同时设置 这个和 inter.broker.listener.name 属性。 | PLAINTEXT |
| ssl.cipher.suites | 密码套件列表。这是身份验证、加密、MAC 和密钥交换算法的命名组合,用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置。默认情况下,支持所有可用的密码套件。 | |
| ssl.client.auth | 配置 kafka 代理以请求客户端身份验证。以下设置是常见的ssl.client.auth=required 如果设置为 required 则需要客户端身份验证。ssl.client.auth=requested 这意味着客户端身份验证是可选的。与请求不同,如果设置了此选项,客户端可以选择不提供有关自身的身份验证信息 ssl.client.auth=none 这意味着不需要客户端身份验证。 | none |
| ssl.enabled.protocols | 为 SSL 连接启用的协议列表。 | TLSv1.2 |
| ssl.key.password | 密钥库文件中私钥的密码。这是客户端可选的。 | |
| ssl.keymanager.algorithm | 密钥管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的密钥管理器工厂算法。 | SunX509 |
| ssl.keystore.location | 密钥存储文件的位置。这对客户端是可选的,可用于客户端的双向身份验证。 | null |
| ssl.keystore.password | 密钥存储文件的存储密码。这对客户端是可选的,只有在配置了 ssl.keystore.location 时才需要。 | null |
| ssl.keystore.type | 密钥库文件的文件格式。这是客户端可选的。 | JKS |
| ssl.protocol | 用于生成 SSLContext 的 SSL 协议。默认设置为 TLSv1.2,适用于大多数情况。最近的 JVM 中允许的值为 TLSv1.2 和 TLSv1.3。旧的 JVM 可能支持 TLS、TLSv1.1、SSL、SSLv2 和 SSLv3但由于已知的安全漏洞不鼓励使用它们。 | TLSv1.2 |
| ssl.provider | 用于 SSL 连接的安全提供程序的名称。默认值是 JVM 的默认安全提供程序。 | null |
| ssl.trustmanager.algorithm | 信任管理器工厂用于 SSL 连接的算法。默认值是为 Java 虚拟机配置的信任管理器工厂算法。 | PKIX |
| ssl.truststore.location | 信任存储文件的位置。 | null |
| ssl.truststore.password | 信任存储文件的密码。如果未设置密码,则仍可访问信任库,但禁用完整性检查。 | null |
| ssl.truststore.type | ssl.truststore.type | JKS |
| zookeeper.clientCnxnSocket | 通常 org.apache.zookeeper.ClientCnxnSocketNetty 在使用 TLS 连接到 ZooKeeper 时设置为。覆盖通过同名 zookeeper.clientCnxnSocket 系统属性设置的任何显式值。 | |
| zookeeper.ssl.client.enable | 设置客户端在连接到 ZooKeeper 时使用 TLS。显式值会覆盖通过 zookeeper.client.secure 系统属性设置的任何值(注意不同的名称)。如果两者都没有设置,则默认为 false当为真时zookeeper.clientCnxnSocket 必须设置(通常为 org.apache.zookeeper.ClientCnxnSocketNetty要设置的其他值可能包括 zookeeper.ssl.cipher.suites、zookeeper.ssl.crl.enable、zookeeper.ssl.enabled.protocols、zookeeper.ssl.endpoint.identification.algorithm、zookeeper.ssl.keystore.location、zookeeper.ssl.keystore.password、zookeeper.ssl.keystore.type、zookeeper.ssl.ocsp.enable、zookeeper.ssl.protocol、zookeeper.ssl.truststore.location、zookeeper.ssl.truststore.password、zookeeper.ssl.truststore.typell | false |
| zookeeper.ssl.keystore.location | 使用客户端证书与 ZooKeeper 的 TLS 连接时的密钥库位置。覆盖通过 zookeeper.ssl.keyStore.location 系统属性设置的任何显式值(注意驼峰式命名法)。 | null |
| zookeeper.ssl.keystore.password | 使用客户端证书与 ZooKeeper 的 TLS 连接时的密钥库密码。覆盖通过 zookeeper.ssl.keyStore.password 系统属性设置的任何显式值(注意驼峰式命名法)。注意 ZooKeeper 不支持与 keystore 密码不同的 key 密码,所以一定要设置 keystore 中的 key 密码与 keystore 密码一致;否则与 Zookeeper 的连接尝试将失败。 | null |
| zookeeper.ssl.keystore.type | 使用客户端证书与 ZooKeeper 的 TLS 连接时的密钥库类型。覆盖通过 zookeeper.ssl.keyStore.type 系统属性设置的任何显式值(注意驼峰式命名法)。默认值 null 表示将根据密钥库的文件扩展名自动检测类型。 | |
| zookeeper.ssl.truststore.location | 使用 TLS 连接到 ZooKeeper 时的信任库位置。覆盖通过 zookeeper.ssl.trustStore.location 系统属性设置的任何显式值(注意驼峰式命名法)。 | null |
| zookeeper.ssl.truststore.password | 使用 TLS 连接到 ZooKeeper 时的 Truststore 密码。覆盖通过 zookeeper.ssl.trustStore.password 系统属性设置的任何显式值(注意驼峰式命名法)。 | null |
| zookeeper.ssl.truststore.type | 使用 TLS 连接到 ZooKeeper 时的 Truststore 类型。覆盖通过 zookeeper.ssl.trustStore.type 系统属性设置的任何显式值(注意驼峰式命名法)。默认值 null 表示将根据信任库的文件扩展名自动检测类型。 | null |
| alter.config.policy.class.name | 应该用于验证的更改配置策略类。类应该实现 org.apache.kafka.server.policy.AlterConfigPolicy 接口。 | null |
| alter.log.dirs.replication.quota.window.num | 更改日志目录复制配额而在内存中保留的样本数 | 11 |
| alter.log.dirs.replication.quota.window.size.seconds | 更改日志目录复制配额的每个示例的时间跨度 | 1 |
| listener.security.protocol.map | 侦听器名称和安全协议之间的映射。必须为同一安全协议定义这一点,才能在多个端口或 IP 中使用。例如,即使内部和外部流量都需要 SSL也可以将两者分开。具体来说用户可以定义名称为 INTERNAL 和 EXTERNAL 的侦听器并且此属性为INTERNAL:SSL,EXTERNAL:SSL。如图所示键和值用冒号分隔映射条目用逗号分隔。每个侦听器名称在地图中只应出现一次。通过向配置名称添加规范化前缀侦听器名称小写可以为每个侦听器配置不同的安全性SSL 和 SASL设置。例如要为 INTERNAL 侦听器设置不同的密钥库,名称为 listener.name.internal.ssl.keystore.location 将被设置。如果未设置侦听器名称的配置,则配置将回退到通用配置(即 ssl.keystore.location | PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL |
| log.message.downconversion.enable | 此配置控制是否启用消息格式的下转换以满足消费请求。当设置为 时 falsebroker 将不会为需要旧消息格式的消费者执行向下转换。代理响应 UNSUPPORTED_VERSION 来自此类较旧客户端的消费请求的错误。此配置不适用于复制到追随者可能需要的任何消息格式转换。 | true |
| password.encoder.cipher.algorithm | 用于对动态配置的密码进行编码的密码算法。 | AES/CBC/PKCS5Padding |
| password.encoder.iterations | 用于编码动态配置密码的迭代计数。 | 4096 |
| password.encoder.key.length | 用于对动态配置的密码进行编码的密钥长度。 | 128 |
| password.encoder.keyfactory.algorithm | 用于对动态配置的密码进行编码的 SecretKeyFactory 算法。如果可用,默认为 PBKDF2WithHmacSHA512否则为 PBKDF2WithHmacSHA1 | null |
| security.providers | 一个可配置的创建者类列表,每个类返回一个实现安全算法的提供者。这些类应该实现 org.apache.kafka.common.security.auth.SecurityProviderCreator 接口。 | null |
| ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | https |
| ssl.principal.mapping.rules | 用于从客户端证书的专有名称映射到短名称的规则列表。规则按顺序评估与主体名称匹配的第一个规则用于将其映射到短名称。列表中任何后面的规则都将被忽略。默认情况下X.500 证书的专有名称将是主体。有关格式的更多详细信息,请参阅安全授权和 acls。请注意如果 principal.builder.class 配置提供了 KafkaPrincipalBuilder 的扩展,则此配置将被忽略。 | DEFAULT |
| ssl.secure.random.implementation | 用于 SSL 加密操作的 SecureRandom PRNG 实现。 | null |
| transaction.abort.timed.out.transaction.cleanup.interval.ms | 回滚超时事务的时间间隔 | 10000 |
| transaction.remove.expired.transaction.cleanup.interval.ms | 删除由于 transactional.id.expiration.ms 传递而过期的事务的时间间隔 | 3600000 |
| zookeeper.ssl.cipher.suites | 指定要在 ZooKeeper TLS 协商 (csv) 中使用的已启用密码套件。覆盖通过 zookeeper.ssl.ciphersuites 系统属性设置的任何显式值(注意单个词“密码套件”)。的默认值 null 意味着启用的密码套件列表由正在使用的 Java 运行时确定 | null |
| zookeeper.ssl.crl.enable | ZooKeeper TLS 协议中是否开启证书吊销列表。覆盖通过 zookeeper.ssl.crl 系统属性设置的任何显式值(注意较短的名称) | false |
| zookeeper.ssl.enabled.protocols | 在 ZooKeeper TLS 协商 (csv) 中指定启用的协议。覆盖通过 zookeeper.ssl.enabledProtocols 系统属性设置的任何显式值(注意驼峰式命名法)。的默认值 null 意味着启用的协议将是 zookeeper.ssl.protocol 配置属性的值 | null |
| zookeeper.ssl.endpoint.identification.algorithm | 指定是否在 ZooKeeper TLS 协商过程中启用主机名验证其中不区分大小写“https”表示启用 ZooKeeper 主机名验证,显式空白值表示禁用(仅建议出于测试目的禁用)。显式值会覆盖通过 zookeeper.ssl.hostnameVerification 系统属性设置的任何“true”或“false”值注意不同的名称和值true 表示 httpsfalse 表示空白) | HTTPS |
| zookeeper.ssl.ocsp.enable | ZooKeeper TLS 协议中是否开启在线证书状态协议。覆盖通过 zookeeper.ssl.ocsp 系统属性设置的任何显式值(注意较短的名称) | false |
| zookeeper.ssl.protocol | 指定要在 ZooKeeper TLS 协商中使用的协议。显式值会覆盖通过同名 zookeeper.ssl.protocol 系统属性设置的任何值 | null |
| zookeeper.sync.time.ms | ZK 追随者可以落后于 ZK 领导者多远 | 2000 |
| ZK 追随者可以落后于 ZK 领导者多远 | 2000 |
可以在 scala 类中找到有关代理配置的更多详细信息`kafka.server.KafkaConfig`
#### 扩展讲解
**Leader 均衡机制auto.leader.rebalance.enable=true**
> 当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。这意味着在默认情况下,当这个 broker 重新启动之后,它的所有分区都将仅作为 follower不再用于客户端的读写操作。
>
> 为了避免这种不平衡Kafka 有一个首选副本的概念。如果一个分区的副本列表是 159节点 1 将优先作为其他两个副本 5 和 9 的 leader因为它较早存在于副本中。你可以通过运行以下命令让 Kafka 集群尝试恢复已恢复正常的副本的 leader 地位:。不会导致负载不均衡和资源浪费,**这就是 leader 的均衡机制**
>
> ```
> # kafka版本 <= 2.4
> > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
> # kafka新版本
> > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
> ```
>
> [kafka 平衡 leader](https://www.orchome.com/33)
>
> 手动运行很不优雅,可以通过配置`auto.leader.rebalance.enable = true` 来自动均衡
在配置文件 conf/ server.properties 中配置开启(默认就是开启)`auto.leader.rebalance.enable = true`
与其相关的配置还有
`leader.imbalance.check.interval.seconds` partition 检查重新 rebalance 的周期时间 ; 默认 300 秒;
`leader.imbalance.per.broker.percentage` 标识每个 Broker 失去平衡的比率,如果超过该比率,则执行重新选举 Broker 的 leader默认值是 10(百分之 10 的意思);
这个比率的算法是 broker 不平衡率=本应该是 Leader 但是却不是 Leader 的分区总数 /应该是 Leader 的分区总数
例如,下面表格是集群的分配情况, 问 **Broker-0** 上的不平衡率是多少?
| TopicPartition | AR | Leader |
| -------------- | ----- | ------ |
| Topic1-0 | 0,1,2 | 0 |
| Topic1-1 | 2,1,0 | 2 |
| Topic1-2 | 1,0,2 | 1 |
| Topic2-0 | 0,1,2 | 1 |
| Topic2-1 | 0,2,1 | 0 |
| Topic2-2 | 0,2,1 | 0 |
**应该是 Leader 的分区数** = 4 (所有首个副本为这个 Broker 数量)
**本应该是 Leader 但是却不是的分区数** = 1 (Topic2-0)
**不平衡率** = 1/4 = 25%
上面几个配置都是 && 的关系; 同时满足才能触发再平衡;
<font color=red> 调优建议:考虑到 leader 重选举的代价比较大,可能会带来性能影响,也可能会引发客户端的阻塞,生产环境建议设置为 false。或者周期设置长一点,比如一天一次;</font>
那么如果我们关闭了 均衡机制 , 或者周期时间比较长, 也就有可能造成上面说的问题, 那么[LogiKM](https://blog.csdn.net/u010634066/article/details/115631758) 就提供了一个手动再平衡的操作;
**compression.type 压缩类型讲解**
开启压缩发送传输可以提高 kafka 的消息吞吐量
具体压缩类型: `(gzip, snappy, lz4, zstd)`
**producer** 端设置类型
```java
// 开启 lz4 压缩
props.put("compression.type", "lz4");
```
**broker 端** 继承使用 producer 的类型
```java
compression.type=producer(默认值)
```
Kafak 的压缩方式是将多条消息一起压缩,这样可以保证比较好的压缩效果。一般情况下,生产者发送的压缩数据在 Broker 中也是保持要锁状态进行存储的,消费者从服务端获取的也是压缩的数据,消费组在处理消息之前才会解压消息,这样保持了端到端的压缩。
使用默认值`producer`, Broker 会原封不动的保存这个数据, 如果设置了跟`producer`不一样的压缩方式,那么 Broker 还有先解压之后再用新的压缩方式来压缩,这样会导致 cpu 使用率升高, 所以使用默认值就好。 **producer** 端建议使用`lz4`的压缩方式,压缩效果比较好。
**日志段切分条件详解**
> 1. 当前日志分段文件大小超过 broker 端的参数`log.segment.bytes`,默认是**1073741824** (1G)
> 2. 当前日志分段中消息的最大时间戳与当前系统的时间戳差值大于`log.roll.hours`或`log.roll.ms`的值,则强制创建新的日志段 Segment,`log.roll.ms` > `log.roll.hours` . `log.roll.ms` 默认为 168 ,即 7 天。
**min.insync.replicas 最小同步副本数**
> 这个参数表示 ISR 集合中的最少副本数,默认值是 1并只有在生产者将`acks=all`或-1 时才有效。acks 与 min.insync.replicas 搭配使用,才能为消息提供最高的持久性保证。引入`min.insync.replicas`的目的就是为了保证下限:不能只满足于 ISR 全部写入,还要保证 ISR 中的写入个数不少于`min.insync.replicas` 。它的判断条件是 `min.insync.replicas<= 当前副本ISR的数量`,否则会抛出异常。常见的场景是创建一个三副本(即 replication.factor=3的 topic最少同步副本数设为 2即 min.insync.replicas=2acks 设为 all那么必须至少有 2 个副本数据同步了(包括 Leader 副本)才可以,当然我们也可以设置为`min.insync.replicas=3`,表示所有的副本必须同步才可以,否则会抛出异常NotEnoughReplicas 或 NotEnoughReplicasAfterAppend
**`__consumer_offsets` 详解**
> 专门用来保存消费组消费的  偏移量的数据, 它也是一个 Topic是一个内部 Topic在第一次消费的时候会自动创建这个 Topic,
> 分区大小是`offsets.topic.num.partitions`控制,默认是 50,部署后不建议更改,后续可以执行扩容操作进行扩容
> 副本数量由`offsets.topic.replication.factor`控制,默认是 3, 在集群大小满足此复制因子要求之前,内部主题`__consumer_offsets` 创建将失败。会抛出异常
>
> ```java
> Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.
> ```
</article>
</div>
**受控关机操作**
> 当 Broker 关闭的时候, 会向 Controller 发一起一个`ControlledShutdownRequest`请求, Controller 收到这个请求会针对性的做一些善后事件。比如说 **执行 Leader 重选举** 等等之类的...
>
> 源码位置:**KafkaServer#controlledShutdown**
**controlled.shutdown.enable 是否启用受控关闭操作**
> 如果关闭了,则不会发起上面说的请求了
**controlled.shutdown.max.retries 受控关机操作 重试的次数**
> `ControlledShutdownRequest`这个请求可能有多种原因执行失败, 这个配置就是重试的次数,当然,超过了这个次数仍然失败的话,那么 Broker 还是会强制关机的。
**controlled.shutdown.retry.backoff.ms 失败后等等多久再次重试**
> 当`ControlledShutdownRequest`请求失败后,会等等多少秒后再次重试 ,Thread.sleep(config.controlledShutdownRetryBackoffMs)
## 3.2 Topic 配置
与主题相关的配置既有服务器默认值,也有可选的每个主题覆盖。
如果没有给出每个主题的配置,则使用服务器默认值。可以在创建主题时通过提供一个或多个`--config`选项来设置覆盖。此示例创建一个名为 my-topic 的主题,具有自定义的最大消息大小和刷新率:
```
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
```
也可以稍后使用 alter configs 命令更改或设置覆盖。此示例更新 my-topic 的最大消息大小:
```
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
```
要检查在主题上设置的覆盖,您可以执行
```
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe
```
要删除覆盖,您可以执行
```
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
--alter --delete-config max.message.bytes
```
这本质上是什么呢?这是 Kafka 的动态配置,针对 Topic 级别来控制的都是在动态配置来设置的
关于 Kafka 的动态配置可以看 [Kafka 中的动态配置源码分析]()
Topic 相关可选配置
| key | 服务器默认属性(server.properties) | value | 默认 |
| --------------------------------------- | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------- |
| cleanup.policy | log.cleanup.policy | 此配置指定要在旧日志段使用的保留策略, 可选项[compact, delete],默认是 delete,表示直接删除旧数据, compact 表示压缩,一个 topic 的一个分区中,只保留最近的某个 key 对应的 value | delete |
| compression.type | compression.type | 指定给定 Topic 的最终压缩类型,它不仅可以配置('gzip'、'snappy'、'lz4'、'zstd'),还可以配置'uncompressed' 不压缩,和'producer' 保留生产者设置的原始压缩格式 | producer |
| delete.retention.ms | log.cleaner.delete.retention.ms | cleanup.policy 在 compact 模式下 , 墓碑消息的保存时间,墓碑消息是 value 为空的消息,当紧缩线程发现了这个消息,那么紧缩线程会把该 Key 之前的消息都紧缩掉只留下了墓碑消息,这条墓碑消息会被保存 delete.retention.ms 的时间然后再被删除 | 864000001 天) |
| file.delete.delay.ms | log.segment.delete.delay.ms | topic 删除被标记为--delete 文件之后延迟多长时间删除该 Log 文件 | 600001 分钟) |
| flush.messages | log.flush.interval.messages | 调用强制写入数据方法(fsync)的间隔。例如,如果将其设置为 1我们将在每条消息后进行 fsync如果是 5我们将在每 5 条消息后进行 fsync。一般来说我们建议您不要设置此选项并使用复制来提高持久性并允许操作系统的后台刷新功能因为它更有效 | 9223372036854775807 |
| flush.ms | log.flush.interval.ms | 调用强制写入数据方法(fsync)的时间间隔。例如, 如果将其设置为 1000我们将在 1000 毫秒后进行 fsync。一般来说我们建议您不要设置此选项并使用复制来提高持久性并允许操作系统的后台刷新功能因为它更有效 | 9223372036854775807 |
| follower.replication.throttled.replicas | follower.replication.throttled.replicas | flowwer 副本限流 格式:分区号:副本 follower 号,分区号:副本 follower 号,例如 0:1,1:1 ,或者可以使用通配符“\*”来限制该主题的所有副本,更详细内容请看 [分区副本同步限流机制]() | "" |
| index.interval.bytes | log.index.interval.bytes | 此设置控制 Kafka 将索引消息添加到其偏移索引的频率。默认设置确保我们大约每 4096 个字节索引一条消息。更多的索引允许读取更接近日志中的确切位置,但会使索引更大。您可能不需要更改此设置。 | 4096 |
| leader.replication.throttled.replicas | leader.replication.throttled.replicas | leader 副本限流 格式:分区号:副本 Leader 号,或者可以使用通配符“\*”来限制该主题的所有副本 | “” |
| max.compaction.lag.ms | log.cleaner.max.compaction.lag.ms | 当 cleanup.policy 在 compact 模式下,一条消息从生产后到被紧缩的时间间隔最大值 | 9223372036854775807 |
| min.compaction.lag.ms | log.cleaner.min.compaction.lag.ms | 当 cleanup.policy 在 compact 模式下,一条消息从生产后到被紧缩的时间间隔最小值 | 0 |
| min.cleanable.dirty.ratio | log.cleaner.min.cleanable.ratio | 当 cleanup.policy 在 compact 模式下,此配置控制日志紧缩器尝试清理日志的评论 | 0.5 |
| max.message.bytes | message.max.bytes | 最大的 batch 的 message 大小 | 1048588 |
| message.downconversion.enable | log.message.downconversion.enable | 此配置控制是否启用消息格式的下转换以满足消费请求。当设置为 时 falseBroker 不会为期望旧消息格式的消费者执行下转换。Broker 对 UNSUPPORTED_VERSION 来自这些旧客户端的消费请求做出错误响应。此配置不适用于复制到追随者可能需要的任何消息格式转换。 | true |
| min.insync.replicas | min.insync.replicas | 当生产者的 ack 设置为 all 或者-1 的时候,该值表示的是 ISR 的最少数量。如果 ISR 里面的数量少于此值则消息的写入会抛异常NotEnoughReplicas 或 NotEnoughReplicasAfterAppendack=all 保证了所有 ISR 都成功写入, 但是如果 ISR 只有一个的话就退变成了 ack=1 的情况了, 如果要保证高可用, 还需要配置此值>1 | 1 |
| preallocate | log.preallocate | 创建新的日志段 Segment 是否需要预先分配文件,如果您再 windows 上使用 kafka您可能需要设置为 true | false |
| retention.bytes | log.retention.bytes | 如果我们使用的是`cleanup.policy=delete`策略,此配置控制分区在丢弃旧的日志段 Segment 之前可以增长到的最大大小,默认情况下,没有大小限制,只有时间限制。注意这个配置是控制分区的大小的。不是整个 Topic 所有分区加起来的大小 | -1 |
| retention.ms | log.retention.ms | 如果我们使用的是`cleanup.policy=delete`策略,日志段 Segment 能够保留的最长时间。只要日志段最近一条消息写入的时间距离当前时间超过了这个值,则该日志段将会被删除,如果设置为-1则不应用时间限制 | 6048000007 天) |
| segment.bytes | log.segment.bytes | 每个日志段 Segment 的最大大小,该值越大,那么 Segment 数量就越小。 | 1073741824(1GB) |
| segment.index.bytes | log.index.size.max.bytes | 索引文件的大小,我们会预先分配这个索引文件并且仅仅在日志滚动后收缩它,一般您不需要更改此值 | 10485760(10MB) |
| segment.jitter.ms | log.roll.jitter.ms | 从计划的段滚动时间中减去的最大随机抖动,以避免雷鸣般的段滚动群 | 0 |
| segment.ms | log.roll.ms | 此配置控制 Kafka 将强制日志滚动的时间段,即使段文件未满,以确保保留可以删除或压缩旧数据。 | 6048000007 天) |
| unclean.leader.election.enable | unclean.leader.election.enable | 指示是否启用不在 ISR 集中的副本作为最后的选择作为领导者,即使这样做可能会导致数据丢失。 | false |
#### 扩展讲解
**墓碑消息**
当我们的清理策略`cleanup.policy``compact`的时候:
针对有 key 的 topic对于每个 key只存储最近一次的 value既满足应用的需求又加快了日志清理的速度。没有指定 key 的 topic 无法进行紧缩。
然后当你想把该 key 所有的消息都要清理掉的时候 , 你只需要发送一个墓碑消息, key 为你指定的 key但是 value 为空。
那么紧缩线程就知道你想把该 Key 的所有消息都清理掉, 但是这个目标消息还是为你保留一段时间,以防你可能有一些业务上的处理。 这个保留的时间为 `delete.retention.ms`
## 3.3 Producer 配置
| 属性 | 描述 | 默认值 |
| -------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| key.serializer | 实现 org.apache.kafka.common.serialization.Serializer 接口的键的序列化程序类。 | |
| value.serializer | org.apache.kafka.common.serialization.Serializer 实现接口的值的序列化程序类。 | |
| acks | 生产者要求 Leader 在决定是否完成请求之前收到的确认数量. 这控制了发送的记录的持久性 可配置的参数如下:<br>1. `acks=0` 如果为 0, 生产者不会等待服务器的任何确认, 会被立即视为已发送,这种情况下不能保证服务器是否真的已经收到了消息。这个时候`retries`配置不会生效(客户端都不管服务端的返回了,所以客户端一般是不知道有故障的) <br> 2. `acks=1` Leader 会将消息写入到它的本地日志中,但是不会等待所有的 Follower 完全确认就会返回发送成功状态。 这种情况下, 当 Follower 成功同步数据之前 Leader 挂掉了会造成数据丢失。<br> 3.`acks=all` Leader 将等待所有的 ISR 中的副本完成同步之后返回成功状态, 这样子数据就不会丢失,是最高级别的保证。 | 1 |
| bootstrap.servers | 用于连接 kafka 集群 初始连接 host/port 列表,此列表仅用于发现集群, 并不强制在这里配置集群所有服务器列表 | |
| buffer.memory | 生产者的消息缓冲区内存大小, 关于缓冲区请看[图解 Kafka Producer 消息缓存模型]() | 33554432(32M) |
| compression.type | 生产者生成的所有数据的压缩类型。默认值为 none即不压缩。有效值为 none、gzip、snappy、lz4 或 zstd。 | none |
| retries | 生产者重试次数,当`max.in.flight.requests.per.connection>1`的情况发生重试可能会导致顺序问题. | 2147483647 |
| batch.size | 每当多个记录被发送到同一个分区的时候,生产者会尝试将消息放到一个 Batch 里面处理减少请求数。这个配置就是控制一个 Batch 的内存大小。具体请看[多图详解 kafka 生产者消息发送过程](https://blog.csdn.net/u010634066/article/details/120887135) | 16384 |
| client.dns.lookup | 控制客户端如何使用 DNS 查找。如果设置为 use_all_dns_ips则依次连接到每个返回的 IP 地址,直到建立成功的连接。断开连接后,使用下一个 IP。一旦所有 IP 都被使用过一次,客户端会再次从主机名解析 IP但是JVM 和操作系统都会缓存 DNS 名称查找)。如果设置为 resolve_canonical_bootstrap_servers_only则将每个引导地址解析为规范名称列表。在引导阶段之后它的行为与 use_all_dns_ips. 如果设置为 default不推荐使用则尝试连接到查找返回的第一个 IP 地址,即使查找返回多个 IP 地址也是如此。 | use_all_dns_ips |
| client.id | 生产者客户端 ID | |
| connections.max.idle.ms | 在此配置指定的毫秒数后关闭空闲连接 | 540000 (9 minutes) |
| delivery.timeout.ms | 最大交付时间, 调用 send()方法后不管是成功还是失败的时间上限。例如重试太多次之后达到次配置时间的时候也会停止重试了。此配置值应该大于等于`request.timeout.ms ``linger.ms`总和 | 120000 (2 minutes). 如果这个值你没有主动设置并且`request.timeout.ms `+`linger.ms` > 120000(默认值) ,那么它最终的值是`request.timeout.ms `+`linger.ms` |
| linger.ms | kafka 生产者会在 Batch 填满或 linger.ms 达到上限时把批次发送出去 | 0 |
| max.block.ms | 生产者发送消息过程中,获取元信息的最大超时时间 | 60000 (1 minute) |
| max.request.size | 请求的最大大小(以字节为单位)。此设置将限制生产者在单个请求中发送的记录批次的总数据量,以避免发送大量请求。这实际上也是最大未压缩记录批量大小的上限。请注意,服务器对记录批量大小有自己的上限(如果启用压缩,则在压缩之后),这可能与此不同。 | 1048576 |
| retries | 生产者重试次数,当`max.in.flight.requests.per.connection>1`的情况发生重试可能会导致顺序问题. | 1048576 |
| partitioner.class | 生产者分区分配策略, 详情请看:[Kafka 中生产消息时的三种分区分配策略]() | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
| receive.buffer.bytes | 读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。如果值为 -1将使用操作系统默认值。 | 32768 (32 kb) |
| request.timeout.ms | 控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败 | 30000 (30 seconds) |
| security.protocol | | PLAINTEXT |
| send.buffer.bytes | 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。如果值为 -1将使用操作系统默认值。 | 131072 (128 kb) |
| socket.connection.setup.timeout.max.ms | 客户端等待建立套接字连接的最长时间。对于每个连续的连接失败,连接设置超时将成倍增加,直至达到此最大值。为避免连接风暴,将对超时应用 0.2 的随机化因子,从而产生低于计算值 20% 到高于 20% 的随机范围。 | 127000 (127 seconds) |
| socket.connection.setup.timeout.ms | 客户端等待套接字连接建立的时间。如果在超时之前没有建立连接,客户端将关闭套接字通道。 | 10000 (10 seconds) |
| enable.idempotence | 是否启动幂等。当设置为 true 时候, 生产者将确保每条消息被最多写入一个副本,如果未 false,生产者由于 Broker 失败等原因重试,可能会写入到多个副本中。注意:启动幂等性的要求`max.in.flight.requests.per.connection<=5` `retries>0`并且 `acks=all` .如果设置了不兼容的值则会抛出异常 | false |
| interceptor.classes | 生产者拦截器配置,填写全路径类名,可用逗号隔开配置多个,执行顺序就是配置的顺序。 | |
| max.in.flight.requests.per.connection | 客户端能够允许的最大未完成请求(在请求中)的请求数量, 如果该值大于 1, 并且请求发送失败可可能导致消息重排序的风险(如果重试启用的话) | 5 |
| metadata.max.age.ms | 即使我们没有看到任何分区领导层更改以主动发现任何新代理或分区,我们也强制刷新元数据的时间段(以毫秒为单位) | 300000 (5 minutes) |
| metadata.max.idle.ms | Topic 的最大空闲时间. 如果一个主题在这么多毫秒内没有被访问过,它就会从缓存中删除。并且下一次对其的访问将强制执行元数据获取请求。 | 300000 (5 minutes) |
| reconnect.backoff.max.ms | 重新连接到反复连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,每台主机的退避将在每次连续连接失败时呈指数增长,直至达到此最大值。在计算回退增加后,添加 20% 的随机抖动以避免连接风暴。 | 1000 (1 second) |
| reconnect.backoff.ms | 在尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧密循环中重复连接到主机。此退避适用于客户端到代理的所有连接尝试 | 50 |
| retry.backoff.ms | 在尝试重试对给定主题分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧密循环中重复发送请求。 | 100 |
## 3.4 Consumer 配置
| 属性 | 描述 | 默认 |
| ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------- |
| key.deserializer | 实现 org.apache.kafka.common.serialization.Deserializer 接口的键的反序列化器类 | |
| value.deserializer | 实现 org.apache.kafka.common.serialization.Deserializer 接口的值的反序列化器类。 | |
| bootstrap.servers | 用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器,无论此处指定哪些服务器进行引导——此列表仅影响用于发现完整服务器集的初始主机 | |
| fetch.min.bytes | 服务器应为获取请求返回的最小数据量。如果可用数据不足则请求将在响应请求之前等待累积那么多数据。1 字节的默认设置意味着只要有一个字节的数据可用或获取请求超时等待数据到达,就会响应获取请求。将此设置为大于 1 的值将导致服务器等待大量数据累积,这可以以一些额外的延迟为代价提高服务器吞吐量。 | 1 |
| group.id | 标识此消费者所属的消费者组的唯一字符串。 | |
| heartbeat.interval.ms | 使用 Kafka 的 Group 管理设施时,与消费者协调器之间的心跳间隔时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 1`session.timeout.ms`,但通常应设置为不高于该值的 1/3。它可以调整得更低以控制正常重新平衡的预期时间。 | 30003 秒) |
| max.partition.fetch.bytes | 服务器将返回的每个分区的最大数据量。记录由消费者分批获取。如果 fetch 的第一个非空分区中的第一个记录批大于此限制,则该批仍将返回以确保消费者可以进行。代理接受的最大记录批量大小是通过 message.max.bytes代理配置或 max.message.bytes主题配置定义的。有关限制消费者请求大小的信息请参阅 fetch.max.bytes | 10485761 MB |
| session.timeout.ms | 使用 Kafka 的组管理工具时用于检测客户端故障的超时。客户端定期发送心跳以向代理指示其活跃性。如果在此会话超时到期之前代理未收到任何心跳,则代理将从组中删除此客户端并启动重新平衡。请注意,该值必须在代理配置中由 group.min.session.timeout.ms 和配置的允许范围内 group.max.session.timeout.ms。 | 1000010 秒) |
| allow.auto.create.topics | 订阅或分配主题时,允许在代理上自动创建主题。仅当代理允许使用 `auto.create.topics.enable` 代理配置时,才会自动创建订阅的主题。使用早于 0.11.0 的代理时,此配置必须设置为 `false` | true |
| auto.offset.reset | 如果 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量(例如,因为该数据已被删除),可选项:**earliest**:自动将偏移量重置为最早的偏移量 **latest**:自动将偏移量重置为最新的偏移量 **none**:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常 其他任何事情:向消费者抛出异常。 | latest |
| client.dns.lookup | 控制客户端如何使用 DNS 查找。如果设置为 use_all_dns_ips则依次连接到每个返回的 IP 地址,直到建立成功的连接。断开连接后,使用下一个 IP。一旦所有 IP 都被使用过一次,客户端会再次从主机名解析 IP但是JVM 和操作系统都会缓存 DNS 名称查找)。如果设置为 resolve_canonical_bootstrap_servers_only则将每个引导地址解析为规范名称列表。在引导阶段之后它的行为与 use_all_dns_ips. 如果设置为 default不推荐使用则尝试连接到查找返回的第一个 IP 地址,即使查找返回多个 IP 地址也是如此。 | use_all_dns_ips |
| connections.max.idle.ms | 在此配置指定的毫秒数后关闭空闲连接。 | 540000 (9 minutes) |
| default.api.timeout.ms | 指定客户端 API 的超时时间(以毫秒为单位)。此配置用作所有未指定 timeout 参数的客户端操作的默认超时。 | 60000 (1 minute) |
| enable.auto.commit | 如果为 true消费者的偏移量将在后台定期提交。 | true |
| exclude.internal.topics | 是否应从订阅中排除与订阅模式匹配的内部主题。始终可以显式订阅内部主题 | true |
| fetch.max.bytes | 服务器应为获取请求返回的最大数据量。记录由消费者分批获取,如果获取的第一个非空分区的第一个记录批大于该值,仍然会返回记录批,以确保消费者可以进行。因此,这不是绝对最大值。代理接受的最大记录批量大小是通过 message.max.bytes代理配置或 max.message.bytes主题配置定义的。请注意消费者并行执行多个提取。 | 52428800(50MB) |
| group.instance.id | 最终用户提供的消费者实例的唯一标识符。只允许非空字符串。如果设置,消费者将被视为静态成员,这意味着在任何时候都只允许具有此 ID 的一个实例在消费者组中。这可以与更大的会话超时结合使用,以避免由于暂时不可用(例如进程重新启动)导致的组重新平衡。如果不设置,消费者将作为动态成员加入群组,这是传统的行为 | |
| isolation.level | 控制如何读取以事务方式编写的消息。如果设置为 read_committedconsumer.poll() 将只返回已提交的事务性消息。如果设置为 read_uncommitted默认值consumer.poll() 将返回所有消息,甚至是已中止的事务消息。在任一模式下都将无条件返回非事务性消息。消息将始终按偏移顺序返回。因此,在 read_committedmode 下consumer.poll() 将只返回直到最后一个稳定偏移量 (LSO) 的消息该偏移量小于第一个打开事务的偏移量。特别是在属于正在进行的交易的消息之后出现的任何消息都将被保留直到相关交易完成。结果read_committed 当存在飞行交易时,消费者将无法读取到高水位线。此外,当在 read_committedseekToEnd 方法中将返回 LSO | read_uncommitted |
| max.poll.interval.ms | 使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。对于使用达到此超时的非 null 的消费者 group.instance.id不会立即重新分配分区。相反消费者将停止发送心跳并且分区将在 session.timeout.ms. 这反映了已关闭的静态消费者的行为。 | 3000005 分钟) |
| max.poll.records | 在一次 poll() 调用中返回的最大记录数。 | 500 |
| partition.assignment.strategy | 支持的分区分配策略的类名称或类类型列表,当使用组管理时,客户端将使用这些策略在消费者实例之间分配分区所有权。除了下面指定的默认类之外,您还可以使用 org.apache.kafka.clients.consumer.RoundRobinAssignor 该类将分区循环分配给消费者。实现该 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 接口允许您插入自定义分配策略。 | org.apache.kafka.clients.consumer.RangeAssignor |
| receive.buffer.bytes | 读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。如果值为 -1将使用操作系统默认值。 | 6553664 千字节) |
| request.timeout.ms | 配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败。 | 3000030 秒) |
| security.protocol | 用于与 Broker 通信的协议。有效值为PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。 | PLAINTEXT |
| auto.commit.interval.ms | enable.auto.commit 如果设置为 true ,则该值为消费者偏移量自动提交到 Kafka 的频率(以毫秒为单位) | 5000 (5 seconds) |
| check.crcs | 自动检查消费记录的 CRC32。这可确保消息不会发生在线或磁盘损坏。此检查会增加一些开销因此在寻求极端性能的情况下可能会被禁用。 | true |
| client.id | 发出请求时传递给服务器的 id 字符串。这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志中来跟踪请求的来源,而不仅仅是 ip/port。 | |
| client.rack | 此客户端的机架标识符。这可以是任何字符串值指示此客户端的物理位置。它对应于代理配置“broker.rack” | |
| fetch.max.wait.ms | 如果没有足够的数据立即满足 fetch.min.bytes 给出的要求,服务器将在响应 fetch 请求之前阻塞的最长时间。 | 500 |
| interceptor.classes | 用作拦截器的类列表。实现该 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口允许您拦截(并可能改变)消费者收到的记录。默认情况下,没有拦截器。 | |
| metadata.max.age.ms | 即使我们没有看到任何分区领导层更改以主动发现任何新代理或分区,我们也强制刷新元数据的时间段(以毫秒为单位)。 | 3000005 分钟) |
| retry.backoff.ms | 如果上次更新失败,发起重试的间隔时间 | 100 |