一、Kafka 简介
本文综合了我之前写的kafka相关文章,可作为一个全面了解学习kafka的培训学习资料。
1
转载请注明出处 : 本文链接
1.1 背景历史
当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战:
如何收集这些巨大的信息
如何分析它
如何及时做到如上两点
以上几个挑战形成了一个业务需求模型,即生产者生产(produce)各种信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,需要一个沟通两者的桥梁-消息系统。从一个微观层面来说,这种需求也可理解为不同的系统之间如何传递消息。
1.2 Kafka 诞生
Kafka 由 linked-in 开源
kafka-即是解决上述这类问题的一个框架,它实现了生产者和消费者之间的无缝连接。
kafka-高产出的分布式消息系统(A high-throughput distributed messaging system)
1.3 Kafka 现在
Apache kafka 是一个分布式的基于 push-subscribe 的消息系统,它具备快速、可扩展、可持久化的特点。它现在是 Apache 旗下的一个开源系统,作为 hadoop 生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 hadoop 的批处理系统、低延迟的实时系统、storm/spark 流式处理引擎。
二、Kafka 技术概览
2.1 Kafka 的特性
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
可扩展性:kafka 集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为 n,则允许 n-1 个节点失败)
高并发:支持数千个客户端同时读写
2.2 Kafka 一些重要设计思想
下面介绍先大体介绍一下 Kafka 的主要设计思想,可以让相关人员在短时间内了解到 kafka 相关特性,如果想深入研究,后面会对其中每一个特性都做详细介绍。
Consumergroup:各个 consumer 可以组成一个组,每个消息只能被组中的一个 consumer 消费,如果一个消息可以被多个 consumer 消费的话,那么这些 consumer 必须在不同的组。
消息状态:在 Kafka 中,消息的状态被保存在 consumer 中,broker 不会关心哪个消息被消费了被谁消费了,只记录一个 offset 值(指向 partition 中下一个要被消费的消息位置),这就意味着如果 consumer 处理不好的话,broker 上的一个消息可能会被消费多次。
消息持久化:Kafka 中会把消息持久化到本地文件系统中,并且保持极高的效率。
消息有效期:Kafka 会长久保留其中的消息,以便 consumer 可以多次消费,当然其中很多细节是可配置的。
批量发送:Kafka 支持以消息集合为单位进行批量发送,以提高 push 效率。
push-and-pull : Kafka 中的 Producer 和 consumer 采用的是 push-and-pull 模式,即 Producer 只管向 broker push 消息,consumer 只管从 broker pull 消息,两者对消息的生产和消费是异步的。
Kafka 集群中 broker 之间的关系:不是主从关系,各个 broker 在集群中地位一样,我们可以随意的增加或删除任何一个 broker 节点。
负载均衡方面: Kafka 提供了一个 metadata API 来管理 broker 之间的负载(对 Kafka0.8.x 而言,对于 0.7.x 主要靠 zookeeper 来实现负载均衡)。
同步异步:Producer 采用异步 push 方式,极大提高 Kafka 系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
分区机制 partition:Kafka 的 broker 端支持消息分区,Producer 可以决定把消息发到哪个分区,在一个分区中消息的顺序就是 Producer 发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。
离线数据装载:Kafka 由于对可拓展的数据持久化的支持,它也非常适合向 Hadoop 或者数据仓库中进行数据装载。
插件支持:现在不少活跃的社区已经开发出不少插件来拓展 Kafka 的功能,如用来配合 Storm、Hadoop、flume 相关的插件。
2.3 kafka 应用场景
日志收集:一个公司可以用 Kafka 可以收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理:比如 spark streaming 和 storm
事件源
2.4 Kafka 架构组件
Kafka 中发布订阅的对象是 topic。我们可以为每类数据创建一个 topic,把向 topic 发布消息的客户端称作 producer,从 topic 订阅消息的客户端称作 consumer。Producers 和 consumers 可以同时从多个 topic 读写数据。一个 kafka 集群由一个或多个 broker 服务器组成,它负责持久化和备份具体的 kafka 消息。
topic:消息存放的目录即主题
Producer:生产消息到 topic 的一方
Consumer:订阅 topic 消费消息的一方
Broker:Kafka 的服务实例就是一个 broker
2.5 Kafka Topic&Partition
消息发送时都被发送到一个 topic,其本质就是一个目录,而 topic 由是由一些 Partition Logs(分区日志)组成,其组织结构如下图所示:
我们可以看到,每个 Partition 中的消息都是有序的,生产的消息被不断追加到 Partition log 上,其中的每一个消息都被赋予了一个唯一的 offset 值。
Kafka 集群会保存所有的消息,不管消息有没有被消费;我们可以设定消息的过期时间,只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为 2 天,那么这 2 天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。
Kafka 需要维持的元数据只有一个–消费消息在 Partition 中的 offset 值,Consumer 每消费一个消息,offset 就会加 1。其实消息的状态完全是由 Consumer 控制的,Consumer 可以跟踪和重设这个 offset 值,这样的话 Consumer 就可以读取任意位置的消息。
把消息日志以 Partition 的形式存放有多重考虑,第一,方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;第二就是可以提高并发,因为可以以 Partition 为单位读写了。
三、Kafka 核心组件
3.1 Replications、Partitions 和 Leaders
通过上面介绍的我们可以知道,kafka 中的数据是持久化的并且能够容错的。Kafka 允许用户为每个 topic 设置副本数量,副本数量决定了有几个 broker 来存放写入的数据。如果你的副本数量设置为 3,那么一份数据就会被存放在 3 台不同的机器上,那么就允许有 2 个机器失败。一般推荐副本数量至少为 2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为 3 或者更多。
Kafka 中的 topic 是以 partition 的形式存放的,每一个 topic 都可以设置它的 partition 数量,Partition 的数量决定了组成 topic 的 log 的数量。Producer 在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到 topic 的各个 partition 中。上面将的副本都是以 partition 为单位的,不过只有一个 partition 的副本会被选举成 leader 作为读写用。
关于如何设置 partition 值需要考虑的因素。一个 partition 只能被一个消费者消费(一个消费者可以同时消费多个 partition),因此,如果设置的 partition 的数量小于 consumer 的数量,就会有消费者消费不到数据。所以,推荐 partition 的数量一定要大于同时运行的 consumer 的数量。另外一方面,建议 partition 的数量大于集群 broker 的数量,这样 leader partition 就可以均匀的分布在各个 broker 中,最终使得集群负载均衡。在 Cloudera,每个 topic 都有上百个 partition。需要注意的是,kafka 需要为每个 partition 分配一些内存来缓存消息数据,如果 partition 数量越大,就要为 kafka 分配更大的 heap space。
3.2 Producers
Producers 直接发送消息到 broker 上的 leader partition,不需要经过任何中介一系列的路由转发。为了实现这个特性,kafka 集群中的每个 broker 都可以响应 producer 的请求,并返回 topic 的一些元信息,这些元信息包括哪些机器是存活的,topic 的 leader partition 都在哪,现阶段哪些 leader partition 是可以直接被访问的。
Producer 客户端自己控制着消息被推送到哪些 partition。实现的方式可以是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka 提供了接口供用户实现自定义的分区,用户可以为每个消息指定一个 partitionKey,通过这个 key 来实现一些 hash 分区算法。比如,把 userid 作为 partitionkey 的话,相同 userid 的消息将会被推送到同一个分区。
以 Batch 的方式推送数据可以极大的提高处理效率,kafka Producer 可以将消息在内存中累计到一定数量后作为一个 batch 发送请求。Batch 的数量大小可以通过 Producer 的参数控制,参数值可以设置为累计的消息的数量(如 500 条)、累计的时间间隔(如 100ms)或者累计的数据大小(64KB)。通过增加 batch 的大小,可以减少网络请求和磁盘 IO 的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。
Producers 可以异步的并行的向 kafka 发送消息,但是通常 producer 在发送完消息之后会得到一个 future 响应,返回的是 offset 值或者发送过程中遇到的错误。这其中有个非常重要的参数“acks”,这个参数决定了 producer 要求 leader partition 收到确认的副本个数,如果 acks 设置数量为 0,表示 producer 不会等待 broker 的响应,所以,producer 无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks 值为 0 会得到最大的系统吞吐量。
若 acks 设置为 1,表示 producer 会在 leader partition 收到消息时得到 broker 的一个确认,这样会有更好的可靠性,因为客户端会等待直到 broker 确认收到消息。若设置为-1,producer 会在所有备份的 partition 收到消息时得到 broker 的确认,这个设置可以得到最高的可靠性保证。
Kafka 消息有一个定长的 header 和变长的字节数组组成。因为 kafka 消息支持字节数组,也就使得 kafka 可以支持任何用户自定义的序列号格式或者其它已有的格式如 Apache Avro、protobuf 等。Kafka 没有限定单个消息的大小,但我们推荐消息大小不要超过 1MB,通常一般消息大小都在 1~10kB 之前。
3.3 Consumers
Kafka 提供了两套 consumer api,分为 high-level api 和 sample-api。Sample-api 是一个底层的 API,它维持了一个和单一 broker 的连接,并且这个 API 是完全无状态的,每次请求都需要指定 offset 值,因此,这套 API 也是最灵活的。
在 kafka 中,当前读到消息的 offset 值是由 consumer 来维护的,因此,consumer 可以自己决定如何读取 kafka 中的数据。比如,consumer 可以通过重设 offset 值来重新消费已消费过的数据。不管有没有被消费,kafka 会保存数据一段时间,这个时间周期是可配置的,只有到了过期时间,kafka 才会删除这些数据。
High-level API 封装了对集群中一系列 broker 的访问,可以透明的消费一个 topic。它自己维持了已消费消息的状态,即每次消费的都是下一个消息。
High-level API 还支持以组的形式消费 topic,如果 consumers 有同一个组名,那么 kafka 就相当于一个队列消息服务,而各个 consumer 均衡的消费相应 partition 中的数据。若 consumers 有不同的组名,那么此时 kafka 就相当与一个广播服务,会把 topic 中的所有消息广播到每个 consumer。
四、Kafka 核心特性
4.1 压缩
我们上面已经知道了 Kafka 支持以集合(batch)为单位发送消息,在此基础上,Kafka 还支持对消息集合进行压缩,Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。Producer 端进行压缩之后,在 Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是 CPU(压缩和解压会耗掉部分 CPU 资源)。
那么如何区分消息是压缩的还是未压缩的呢,Kafka 在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,如果后两位为 0,则表示消息未被压缩。
4.2 消息可靠性
在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下三中情况:
一个消息发送失败
一个消息被发送多次
最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次
有许多系统声称它们实现了 exactly-once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个 Producer 成功发送一个消息,但是消息在发送途中丢失,或者成功发送到 broker,也被 consumer 成功取走,但是这个 consumer 在处理取过来的消息时失败了。
从 Producer 端看:Kafka 是这么处理的,当一个消息被发送后,Producer 会等待 broker 成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个 broker 挂掉,Producer 会重新发送(我们知道 Kafka 有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。
从 Consumer 端看:前面讲到过 partition,broker 端记录了 partition 中的一个 offset 值,这个值指向 Consumer 下一个即将消费 message。当 Consumer 收到了消息,但却在处理过程中挂掉,此时 Consumer 可以通过这个 offset 值重新找到上一个消息再进行处理。Consumer 还有权限控制这个 offset 值,对持久化到 broker 端的消息做任意处理。
4.3 备份机制
备份机制是 Kafka0.8 版本的新特性,备份机制的出现大大提高了 Kafka 集群的可靠性、稳定性。有了备份机制后,Kafka 允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为 n 的集群允许 n-1 个节点失败。在所有备份节点中,有一个节点作为 lead 节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。下面这幅图解释了 Kafka 的备份机制:
4.4 Kafka 高效性相关设计
4.4.1 消息的持久化
Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为磁盘是缓慢的,这导致人们对持久化结构具有竞争性持怀疑态度。其实,磁盘远比你想象的要快或者慢,这决定于我们如何使用磁盘。
一个和磁盘性能有关的关键事实是:磁盘驱动器的吞吐量跟寻到延迟是相背离的,也就是所,线性写的速度远远大于随机写。比如:在一个 6 7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是 600M/秒,但是随机写的速度只有 100K/秒,两者相差将近 6000 倍。线性读写在大多数应用场景下是可以预测的,因此,操作系统利用 read-ahead 和 write-behind 技术来从大的数据块中预取数据,或者将多个逻辑上的写操作组合成一个大写物理写操作中。更多的讨论可以在 ACMQueueArtical 中找到,他们发现,对磁盘的线性读在有些情况下可以比内存的随机访问要快一些。
为了补偿这个性能上的分歧,现代操作系统都会把空闲的内存用作磁盘缓存,尽管在内存回收的时候会有一点性能上的代价。所有的磁盘读写操作会在这个统一的缓存上进行。
此外,如果我们是在 JVM 的基础上构建的,熟悉 java 内存应用管理的人应该清楚以下两件事情:
一个对象的内存消耗是非常高的,经常是所存数据的两倍或者更多。
随着堆内数据的增多,Java 的垃圾回收会变得非常昂贵。
基于这些事实,利用文件系统并且依靠页缓存比维护一个内存缓存或者其他结构要好——我们至少要使得可用的缓存加倍,通过自动访问可用内存,并且通过存储更紧凑的字节结构而不是一个对象,这将有可能再次加倍。这么做的结果就是在一台 32GB 的机器上,如果不考虑 GC 惩罚,将最多有 28-30GB 的缓存。此外,这些缓存将会一直存在即使服务重启,然而进程内缓存需要在内存中重构(10GB 缓存需要花费 10 分钟)或者它需要一个完全冷缓存启动(非常差的初始化性能)。它同时也简化了代码,因为现在所有的维护缓存和文件系统之间内聚的逻辑都在操作系统内部了,这使得这样做比 one-off in-process attempts 更加高效与准确。如果你的磁盘应用更加倾向于顺序读取,那么 read-ahead 在每次磁盘读取中实际上获取到这人缓存中的有用数据。
以上这些建议了一个简单的设计:不同于维护尽可能多的内存缓存并且在需要的时候刷新到文件系统中,我们换一种思路。所有的数据不需要调用刷新程序,而是立刻将它写到一个持久化的日志中。事实上,这仅仅意味着,数据将被传输到内核页缓存中并稍后被刷新。我们可以增加一个配置项以让系统的用户来控制数据在什么时候被刷新到物理硬盘上。
4.4.2 常数时间性能保证
消息系统中持久化数据结构的设计通常是维护者一个和消费队列有关的 B 树或者其它能够随机存取结构的元数据信息。B 树是一个很好的结构,可以用在事务型与非事务型的语义中。但是它需要一个很高的花费,尽管 B 树的操作需要 O(logN)。通常情况下,这被认为与常数时间等价,但这对磁盘操作来说是不对的。磁盘寻道一次需要 10ms,并且一次只能寻一个,因此并行化是受限的。
直觉上来讲,一个持久化的队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。尽管和 B 树相比,这种结构不能支持丰富的语义,但是它有一个优点,所有的操作都是常数时间,并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完全无关,服务器可以充分利用廉价的硬盘来提供高效的消息服务。
事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着我们可以提供一般消息系统无法提供的特性。比如说,消息被消费后不是立马被删除,我们可以将这些消息保留一段相对比较长的时间(比如一个星期)。
4.4.3 进一步提高效率
我们已经为效率做了非常多的努力。但是有一种非常主要的应用场景是:处理 Web 活动数据,它的特点是数据量非常大,每一次的网页浏览都会产生大量的写操作。更进一步,我们假设每一个被发布的消息都会被至少一个 consumer 消费,因此我们更要怒路让消费变得更廉价。
通过上面的介绍,我们已经解决了磁盘方面的效率问题,除此之外,在此类系统中还有两类比较低效的场景:
太多小的 I/O 操作
过多的字节拷贝
为了减少大量小 I/O 操作的问题,kafka 的协议是围绕消息集合构建的。Producer 一次网络请求可以发送一个消息集合,而不是每一次只发一条消息。在 server 端是以消息块的形式追加消息到 log 中的,consumer 在查询的时候也是一次查询大量的线性数据块。消息集合即 MessageSet,实现本身是一个非常简单的 API,它将一个字节数组或者文件进行打包。所以对消息的处理,这里没有分开的序列化和反序列化的上步骤,消息的字段可以按需反序列化(如果没有需要,可以不用反序列化)。
另一个影响效率的问题就是字节拷贝。为了解决字节拷贝的问题,kafka 设计了一种“标准字节消息”,Producer、Broker、Consumer 共享这一种消息格式。Kakfa 的 message log 在 broker 端就是一些目录文件,这些日志文件都是 MessageSet 按照这种“标准字节消息”格式写入到磁盘的。
维持这种通用的格式对这些操作的优化尤为重要:持久化 log 块的网络传输。流行的 unix 操作系统提供了一种非常高效的途径来实现页面缓存和 socket 之间的数据传递。在 Linux 操作系统中,这种方式被称作:sendfile system call(Java 提供了访问这个系统调用的方法:FileChannel.transferTo api)。
为了理解 sendfile 的影响,需要理解一般的将数据从文件传到 socket 的路径:
操作系统将数据从磁盘读到内核空间的页缓存中
应用将数据从内核空间读到用户空间的缓存中
应用将数据写回内核空间的 socket 缓存中
操作系统将数据从 socket 缓存写到网卡缓存中,以便将数据经网络发出
这种操作方式明显是非常低效的,这里有四次拷贝,两次系统调用。如果使用 sendfile,就可以避免两次拷贝:操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
我们期望一个主题上有多个消费者是一种常见的应用场景。利用上述的 zero-copy,数据只被拷贝到页缓存一次,然后就可以在每次消费时被重得利用,而不需要将数据存在内存中,然后在每次读的时候拷贝到内核空间中。这使得消息消费速度可以达到网络连接的速度。这样以来,通过页面缓存和 sendfile 的结合使用,整个 kafka 集群几乎都已以缓存的方式提供服务,而且即使下游的 consumer 很多,也不会对整个集群服务造成压力。
关于 sendfile 和 zero-copy,请参考:zero-copy
五、Kafka 集群部署
5.1 集群部署
为了提高性能,推荐采用专用的服务器来部署 kafka 集群,尽量与 hadoop 集群分开,因为 kafka 依赖磁盘读写和大的页面缓存,如果和 hadoop 共享节点的话会影响其使用页面缓存的性能。
Kafka 集群的大小需要根据硬件的配置、生产者消费者的并发数量、数据的副本个数、数据的保存时长综合确定。
磁盘的吞吐量尤为重要,因为通常 kafka 的瓶颈就在磁盘上。
Kafka 依赖于 zookeeper,建议采用专用服务器来部署 zookeeper 集群,zookeeper 集群的节点采用偶数个,一般建议用 3、5、7 个。注意 zookeeper 集群越大其读写性能越慢,因为 zookeeper 需要在节点之间同步数据。一个 3 节点的 zookeeper 集群允许一个节点失败,一个 5 节点集群允许 2 个几点失败。
5.2 集群大小
有很多因素决定着 kafka 集群需要具备存储能力的大小,最准确的衡量办法就是模拟负载来测算一下,Kafka 本身也提供了负载测试的工具。
如果不想通过模拟实验来评估集群大小,最好的办法就是根据硬盘的空间需求来推算。下面我就根据网络和磁盘吞吐量需求来做一下估算。
我们做如下假设:
W:每秒写多少 MB
R :副本数
C :Consumer 的数量
一般的来说,kafka 集群瓶颈在于网络和磁盘吞吐量,所以我们先评估一下集群的网络和磁盘需求。
对于每条消息,每个副本都要写一遍,所以整体写的速度是 WR。读数据的部分主要是集群内部各个副本从 leader 同步消息读和集群外部的 consumer 读,所以集群内部读的速率是(R-1)W,同时,外部 consumer 读的速度是 C*W,因此:
Write:WRRead:(R-1)W+CW需要注意的是,我们可以在读的时候缓存部分数据来减少 IO 操作,如果一个集群有 M MB 内存,写的速度是 W MB/sec,则允许 M/(WR) 秒的写可以被缓存。如果集群有 32GB 内存,写的速度是 50MB/s 的话,则可以至少缓存 10 分钟的数据。
5.3 Kafka 性能测试
Performance testing
5.4 Kafka 在 zookeeper 中的数据结构
Kafka data structures in Zookeeper
六、Kafka 主要配置
6.1 Broker Config
属性 默认值 描述
broker.id 必填参数,broker 的唯一标识
log.dirs /tmp/kafka-logs Kafka 数据存放的目录。可以指定多个目录,中间用逗号分隔,当新 partition 被创建的时会被存放到当前存放 partition 最少的目录。
port 9092 BrokerServer 接受客户端连接的端口号
zookeeper.connect null Zookeeper 的连接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3。可以填一个或多个,为了提高可靠性,建议都填上。注意,此配置允许我们指定一个 zookeeper 路径来存放此 kafka 集群的所有数据,为了与其他应用集群区分开,建议在此配置中指定本集群存放目录,格式为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消费者的参数要和此参数一致。
message.max.bytes 1000000 服务器可以接收到的最大的消息大小。注意此参数要和 consumer 的 maximum.message.size 大小一致,否则会因为生产者生产的消息太大导致消费者无法消费。
num.io.threads 8 服务器用来执行读写请求的 IO 线程数,此参数的数量至少要等于服务器上磁盘的数量。
queued.max.requests 500 I/O 线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。
socket.send.buffer.bytes 100 * 1024 The SO_SNDBUFF buffer the server prefers for socket connections.
socket.receive.buffer.bytes 100 * 1024 The SO_RCVBUFF buffer the server prefers for socket connections.
socket.request.max.bytes 100 * 1024 * 1024 服务器允许请求的最大值, 用来防止内存溢出,其值应该小于 Java heap size.
num.partitions 1 默认 partition 数量,如果 topic 在创建时没有指定 partition 数量,默认使用此值,建议改为 5
log.segment.bytes 1024 * 1024 * 1024 Segment 文件的大小,超过此值将会自动新建一个 segment,此值可以被 topic 级别的参数覆盖。
log.roll.{ms,hours} 24 * 7 hours 新建 segment 文件的时间,此值可以被 topic 级别的参数覆盖。
log.retention.{ms,minutes,hours} 7 days Kafka segment log 的保存周期,保存周期超过此时间日志就会被删除。此参数可以被 topic 级别参数覆盖。数据量大时,建议减小此值。
log.retention.bytes -1 每个 partition 的最大容量,若数据量超过此值,partition 数据将会被删除。注意这个参数控制的是每个 partition 而不是 topic。此参数可以被 log 级别参数覆盖。
log.retention.check.interval.ms 5 minutes 删除策略的检查周期
auto.create.topics.enable true 自动创建 topic 参数,建议此值设置为 false,严格控制 topic 管理,防止生产者错写 topic。
default.replication.factor 1 默认副本数量,建议改为 2。
replica.lag.time.max.ms 10000 在此窗口时间内没有收到 follower 的 fetch 请求,leader 会将其从 ISR(in-sync replicas)中移除。
replica.lag.max.messages 4000 如果 replica 节点落后 leader 节点此值大小的消息数量,leader 节点就会将其从 ISR 中移除。
replica.socket.timeout.ms 30 * 1000 replica 向 leader 发送请求的超时时间。
replica.socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests to the leader for replicating data.
replica.fetch.max.bytes 1024 * 1024 The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.
replica.fetch.wait.max.ms 500 The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.
num.replica.fetchers 1 Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.
fetch.purgatory.purge.interval.requests 1000 The purge interval (in number of requests) of the fetch request purgatory.
zookeeper.session.timeout.ms 6000 ZooKeeper session 超时时间。如果在此时间内 server 没有向 zookeeper 发送心跳,zookeeper 就会认为此节点已挂掉。 此值太低导致节点容易被标记死亡;若太高,.会导致太迟发现节点死亡。
zookeeper.connection.timeout.ms 6000 客户端连接 zookeeper 的超时时间。
zookeeper.sync.time.ms 2000 H ZK follower 落后 ZK leader 的时间。
controlled.shutdown.enable true 允许 broker shutdown。如果启用,broker 在关闭自己之前会把它上面的所有 leaders 转移到其它 brokers 上,建议启用,增加集群稳定性。
auto.leader.rebalance.enable true If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the “preferred” replica for each partition if it is available.
leader.imbalance.per.broker.percentage 10 The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above the configured value per broker.
leader.imbalance.check.interval.seconds 300 The frequency with which to check for leader imbalance.
offset.metadata.max.bytes 4096 The maximum amount of metadata to allow clients to save with their offsets.
connections.max.idle.ms 600000 Idle connections timeout: the server socket processor threads close the connections that idle more than this.
num.recovery.threads.per.data.dir 1 The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
unclean.leader.election.enable true Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
delete.topic.enable false 启用 deletetopic 参数,建议设置为 true。
offsets.topic.num.partitions 50 The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).
offsets.topic.retention.minutes 1440 Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.
offsets.retention.check.interval.ms 600000 The frequency at which the offset manager checks for stale offsets.
offsets.topic.replication.factor 3 The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.
offsets.topic.segment.bytes 104857600 Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.
offsets.load.buffer.size 5242880 An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager’s cache.
offsets.commit.required.acks -1 The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer’s acknowledgement setting. In general, the default should not be overridden.
offsets.commit.timeout.ms 5000 The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.
6.2 Producer Config
属性 默认值 描述
metadata.broker.list 启动时 producer 查询 brokers 的列表,可以是集群中所有 brokers 的一个子集。注意,这个参数只是用来获取 topic 的元信息用,producer 会从元信息中挑选合适的 broker 并与之建立 socket 连接。格式是:host1:port1,host2:port2。
request.required.acks 0 参见 3.2 节介绍
request.timeout.ms 10000 Broker 等待 ack 的超时时间,若等待时间超过此值,会返回客户端错误信息。
producer.type sync 同步异步模式。async 表示异步,sync 表示同步。如果设置成异步模式,可以允许生产者以 batch 的形式 push 数据,这样会极大的提高 broker 性能,推荐设置为异步。
serializer.class kafka.serializer.DefaultEncoder 序列号类,.默认序列化成 byte[] 。
key.serializer.class Key 的序列化类,默认同上。
partitioner.class kafka.producer.DefaultPartitioner Partition 类,默认对 key 进行 hash。
compression.codec none 指定 producer 消息的压缩格式,可选参数为: “none”, “gzip” and “snappy”。关于压缩参见 4.1 节
compressed.topics null 启用压缩的 topic 名称。若上面参数选择了一个压缩格式,那么压缩仅对本参数指定的 topic 有效,若本参数为空,则对所有 topic 有效。
message.send.max.retries 3 Producer 发送失败时重试次数。若网络出现问题,可能会导致不断重试。
retry.backoff.ms 100 Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
topic.metadata.refresh.interval.ms 600 * 1000 The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available…). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed
queue.buffering.max.ms 5000 启用异步模式时,producer 缓存消息的时间。比如我们设置成 1000 时,它会缓存 1 秒的数据再一次发送出去,这样可以极大的增加 broker 吞吐量,但也会造成时效性的降低。
queue.buffering.max.messages 10000 采用异步模式时 producer buffer 队列里最大缓存的消息数量,如果超过这个数值,producer 就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms -1 当达到上面参数值时 producer 阻塞等待的时间。如果值设置为 0,buffer 队列满时 producer 不会阻塞,消息直接被丢掉。若值设置为-1,producer 会被阻塞,不会丢消息。
batch.num.messages 200 采用异步模式时,一个 batch 缓存的消息数量。达到这个数量值时 producer 才会发送消息。
send.buffer.bytes 100 * 1024 Socket write buffer size
client.id “” The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
6.3 Consumer Config
属性 默认值 描述
group.id Consumer 的组 ID,相同 goup.id 的 consumer 属于同一个组。
zookeeper.connect Consumer 的 zookeeper 连接串,要和 broker 的配置一致。
consumer.id null 如果不设置会自动生成。
socket.timeout.ms 30 * 1000 网络请求的 socket 超时时间。实际超时时间由 max.fetch.wait + socket.timeout.ms 确定。
socket.receive.buffer.bytes 64 * 1024 The socket receive buffer for network requests.
fetch.message.max.bytes 1024 * 1024 查询 topic-partition 时允许的最大消息大小。consumer 会为每个 partition 缓存此大小的消息到内存,因此,这个参数可以控制 consumer 的内存使用量。这个值应该至少比 server 允许的最大消息大小大,以免 producer 发送的消息大于 consumer 允许的消息。
num.consumer.fetchers 1 The number fetcher threads used to fetch data.
auto.commit.enable true 如果此值设置为 true,consumer 会周期性的把当前消费的 offset 值保存到 zookeeper。当 consumer 失败重启之后将会使用此值作为新开始消费的值。
auto.commit.interval.ms 60 * 1000 Consumer 提交 offset 值到 zookeeper 的周期。
queued.max.message.chunks 2 用来被 consumer 消费的 message chunks 数量, 每个 chunk 可以缓存 fetch.message.max.bytes 大小的数据量。
rebalance.max.retries 4 When a new consumer joins a consumer group the set of consumers attempt to “rebalance” the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.
fetch.min.bytes 1 The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request.
fetch.wait.max.ms 100 The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy fetch.min.bytes.
rebalance.backoff.ms 2000 Backoff time between retries during rebalance.
refresh.leader.backoff.ms 200 Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
auto.offset.reset largest What to do when there is no initial offset in ZooKeeper or if an offset is out of range ;smallest : automatically reset the offset to the smallest offset; largest : automatically reset the offset to the largest offset;anything else: throw exception to the consumer
consumer.timeout.ms -1 若在指定时间内没有消息消费,consumer 将会抛出异常。
exclude.internal.topics true Whether messages from internal topics (such as offsets) should be exposed to the consumer.
zookeeper.session.timeout.ms 6000 ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
zookeeper.connection.timeout.ms 6000 The max time that the client waits while establishing a connection to zookeeper.
zookeeper.sync.time.ms 2000 How far a ZK follower can be behind a ZK leader
6.4 Topic 级别的配置
topic-config
作者:Heaven-Wang
来源:CSDN
原文:https://blog.csdn.net/suifeng3051/article/details/48053965
版权声明:本文为博主原创文章,转载请附上博文链接!