• -------------------------------------------------------------
  • ====================================

《kafka中文手册》- 构架设计(一)

kafka dewbay 6年前 (2019-04-12) 2397次浏览 已收录 0个评论 扫描二维码

4. DESIGN 设计

4.1 Motivation 目的

We designed Kafka to be able to act as a unified platform for handling all the real-time data feeds a large company might have. To do this we had to think through a fairly broad set of use cases.

kafka被设计为大公司的实时在线数据处理提供一个统一的平台, 为达到这样的目标, 我们对相当广泛的用例进行考虑和衡量.

It would have to have high-throughput to support high volume event streams such as real-time log aggregation.

能具备很高的吞吐量以便支持大容积的事件流, 例如实时日志汇总系统

It would need to deal gracefully with large data backlogs to be able to support periodic data loads from offline systems.

能非常谨慎处理大量日志数据备份, 以便能定时从离线系统加载数据

It also meant the system would have to handle low-latency delivery to handle more traditional messaging use-cases.

系统必须有比较低的延迟分发机制, 才能支持传统的消息系统的使用

We wanted to support partitioned, distributed, real-time processing of these feeds to create new, derived feeds. This motivated our partitioning and consumer model.

希望能够支持可分区的, 分布式的, 实时的数据反馈处理, 并创建和分发新的反馈.

Finally in cases where the stream is fed into other data systems for serving, we knew the system would have to be able to guarantee fault-tolerance in the presence of machine failures.

最后, 如果流是反馈给其他系统的, 系统需要能在机器宕机的时候提供容错保障.

Supporting these uses led us to a design with a number of unique elements, more akin to a database log than a traditional messaging system. We will outline some elements of the design in the following sections.

为了支持这些使用情景, 我们需要设计一个更类似于数据库日志系统, 而不是传统的消息系统那样, 具有更多独特特性的系统

4.2 Persistence 存储

Don’t fear the filesystem! 不要对文件系统感到恐惧

Kafka relies heavily on the filesystem for storing and caching messages. There is a general perception that “disks are slow” which makes people skeptical that a persistent structure can offer competitive performance. In fact disks are both much slower and much faster than people expect depending on how they are used; and a properly designed disk structure can often be as fast as the network.

kafka很依赖于底层的文件系统用于保存和缓存消息记录, 一种普遍的观念是磁盘很慢, 大家都会怀疑kafka的存储结构是否能提供有竞争力的存储性能呢. 但是, 实际上磁盘比人们现象中的还快, 这就看你怎么用了. 一个合理设计的磁盘存储结构, 往往可以和网络一样快

The key fact about disk performance is that the throughput of hard drives has been diverging from the latency of a disk seek for the last decade. As a result the performance of linear writes on a JBOD configuration with six 7200rpm SATA RAID-5 array is about 600MB/sec but the performance of random writes is only about 100k/sec—a difference of over 6000X. These linear reads and writes are the most predictable of all usage patterns, and are heavily optimized by the operating system. A modern operating system provides read-ahead and write-behind techniques that prefetch data in large block multiples and group smaller logical writes into large physical writes. A further discussion of this issue can be found in this ACM Queue article; they actually find that sequential disk access can in some cases be faster than random memory access!

关于磁盘性能的关键事实是,硬盘驱动器的吞吐量在过去十年时, 磁道的寻址延迟就已经达到了极限了。使用 jaod 方式配置 6 个 7200rpm SATA RAID-5 组的磁盘阵列大概是 600MB/sec, 但是随即写性能只有 100k/sec, 差距是 6000X 万倍, 线性读写在使用上是最容易预测的方式, 所以大部分操作系统都对这方面做了很多优化措施. 现在的操作系统, 都有提前读和缓存写的技术, 从大的数据块中批量读取数据, 并汇总小的逻辑写请求后, 使用一次大的物理写请求代替. 跟多关于这方面的套路可以查看 ACM Queue article 这里, 它指出这一的一个事实, 顺序写在某些情况下比随机的内存读取还要快

To compensate for this performance divergence, modern operating systems have become increasingly aggressive in their use of main memory for disk caching. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. This feature cannot easily be turned off without using direct I/O, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice.

为了弥补这种性能上的差距, 现代操作系统, 更多使用内存来为磁盘做缓存. 现代的操作系统更乐意使用所有的空闲内存为磁盘做缓存, 在内存的回收上只需要花费极小的代价. 所有的磁盘读写都通过统一的缓存. 如果没有使用 direct I/O 这个开关, 这种特性不会很容易被屏蔽掉. 因此,即使一个进程内部独立维持一个数据缓存, 那么数据也有可能在系统页中再被缓存一次, 所有的数据都会被存储两次

Furthermore, we are building on top of the JVM, and anyone who has spent any time with Java memory usage knows two things:

此外, 我们基于 jvm 上面构建应用, 有花费时间在 java 内存上的人都知道两件事

  1. The memory overhead of objects is very high, often doubling the size of the data stored (or worse). 内存中存有大量的对象需要消耗很高, 经常是双倍于存储到磁盘时大小(可能更多)
  2. Java garbage collection becomes increasingly fiddly and slow as the in-heap data increases. java 的垃圾收集器在内存数据增加是变得很烦琐的, 很慢

As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure—we at least double the available cache by having automatic access to all free memory, and likely double again by storing a compact byte structure rather than individual objects. Doing so will result in a cache of up to 28-30GB on a 32GB machine without GC penalties. Furthermore, this cache will stay warm even if the service is restarted, whereas the in-process cache will need to be rebuilt in memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely cold cache (which likely means terrible initial performance). This also greatly simplifies the code as all logic for maintaining coherency between the cache and filesystem is now in the OS, which tends to do so more efficiently and more correctly than one-off in-process attempts. If your disk usage favors linear reads then read-ahead is effectively pre-populating this cache with useful data on each disk read.

考虑到这些因素, 使用文件系统并使用页缓存机制比自己去进行内存缓存或使用其他存储结构更为有效–我们访问内存的时候已经起码至少访问了两次缓存, 很有可能在写字节的时候也是两次存储而非单次. 这样做的话, 在一个缓存达到 32GB 的机器上, 可以减少 GC 的代价, 这样也可以减少代码在维护缓存和系统文件间的一致性, 比再尝试新的方法有更高的正确行. 如果你对磁盘的使用充分利用到线性读, 那么预取机制将会很有效的在每次磁盘读取时实现填充好缓存空间.

This suggests a design which is very simple: rather than maintain as much as possible in-memory and flush it all out to the filesystem in a panic when we run out of space, we invert that. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel’s pagecache.

这意味设计非常简单, 系统不是更多把数据保存到内存空间, 在内存空间耗尽时才赶紧写入到文件系统中, 相反的, 所有的数据都被马上写入到文件系统的日志文件中, 但没有必要马上进行 flush 磁盘操作. 只是把数据传输到系统内核的页面空间中去了.

This style of pagecache-centric design is described in an article on the design of Varnish here (along with a healthy dose of arrogance).

这种页面缓存风格设计可以参考这里 : article

Constant Time Suffices 常量耗时需求

The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structures to maintain metadata about messages. BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix very fast cached operations with very slow physical disk operations, the observed performance of tree structures is often superlinear as data increases with fixed cache–i.e. doubling your data makes things much worse than twice as slow.

在消息系统中, 大部分持久化的数据结构通常使用一个消费者队列一个 btree 结构, 或其他随机读取的数据结构用于维持消息的元数据信息. btree 结构是最通用的数据结构类型, 它在消息系统中, 能够支广泛的事物或非事物的语义. 虽然 btree 操作的代价是 O(log N), 但是实际使用时消耗的代价却很高. 通常 O(log N) 被认为是消耗常量时间, 但是这个对硬盘操作却不是这样, 硬盘寻址需要使用 10ms 的耗时, 每次请求只能做一次硬盘寻址, 不能并发执行. 所以即使少数的几次硬盘寻址也会有很高的负载, 因为存储系统混合和快速缓存操作和慢速的物理磁盘操作, btree 树的性能一般逼近与缓存到硬盘里面的数据大小, 当数据量加倍时, 效率可能下降一半, 或更慢.

Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. This structure has the advantage that all operations are O(1) and reads do not block writes or each other. This has obvious performance advantages since the performance is completely decoupled from the data size—one server can now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives. Though they have poor seek performance, these drives have acceptable performance for large reads and writes and come at 1/3 the price and 3x the capacity.

直觉来看, 一个持久化队列可以使用简单的读和追加数据到文件的日志方式进行实现, 这种结构有一个好处是, 所有操作都是 O(1)性能的, 而且读和写入数据不会相互阻塞, 这样性能和数据的大小完全无关, 一台服务器可以完全充分利用了廉价, 低速的 1+TB SATA 硬盘, 虽然它们的寻道性能不高, 但是他们以 3 分之一的价格和 3 倍的容量接受大量的读写请求

Having access to virtually unlimited disk space without any performance penalty means that we can provide some features not usually found in a messaging system. For example, in Kafka, instead of attempting to delete messages as soon as they are consumed, we can retain messages for a relatively long period (say a week). This leads to a great deal of flexibility for consumers, as we will describe.

能够以微小地 性能代价存取数据到无限的硬盘中, 这意味着我们可以提供一些其他消息系统没有的特性. 例如, 在kafka中, 不需要在消费者消费了数据后马上把消息从队列中删除掉, 相反的我们可以保留一段很长的时间, 例如一个礼拜. 这对消费者来说提供了很大的灵活性, 下面我们就会讲到

4.3 Efficiency 效率

We have put significant effort into efficiency. One of our primary use cases is handling web activity data, which is very high volume: each page view may generate dozens of writes. Furthermore, we assume each message published is read by at least one consumer (often many), hence we strive to make consumption as cheap as possible.

我们在效率上投入了众多的努力, 一个我们的主要用例是具有大吞吐量的 web 活动日志, 每页面的每次访问都会产生好几十次的写, 进一步, 我们假定每次消息发布, 至少会被一个消费者读取(经常情况下是多个消费者), 因此, 我们努力使消费消息的代价尽可能小.

We have also found, from experience building and running a number of similar systems, that efficiency is a key to effective multi-tenant operations. If the downstream infrastructure service can easily become a bottleneck due to a small bump in usage by the application, such small changes will often create problems. By being very fast we help ensure that the application will tip-over under load before the infrastructure. This is particularly important when trying to run a centralized service that supports dozens or hundreds of applications on a centralized cluster as changes in usage patterns are a near-daily occurrence.

从构建一些相识的系统的经验中, 我们也发现, 有效的多租户操作是提升性能的关键. 下游的基础服务很容易由于程序的很小的使用错误成为瓶颈, 例如, 一些小的变化很常导致一些新的问题, 我们可以非常快速在程序发布到基础平台前, 进行迭代测试, 这对需要在集中式的集群里跑几十个, 几千个应用时, 程序每天都在变动时非常有用.

We discussed disk efficiency in the previous section. Once poor disk access patterns have been eliminated, there are two common causes of inefficiency in this type of system: too many small I/O operations, and excessive byte copying.

前面一个章节我们讨论了磁盘的性能, 没有效率的磁盘访问模式就忽略不说了, 这里在系统上还有两个可能会导致效率低下的地方: 很多小的 I/O 操作和过多的字节拷贝

The small I/O problem happens both between the client and the server and in the server’s own persistent operations.

小 I/O 问题在客户端和服务器端都会发生, 在服务器端有它自己的存储操作

To avoid this, our protocol is built around a “message set” abstraction that naturally groups messages together. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.

为了避免这个问题, 我们的通讯协议这是基于消息集合这个概念构建的, 很容易把多个消息组合起来. 这样允许网络组合消息后进行发送, 而不是每次发送一条信息, 减少网络的来回开销. 服务器也是每次写入一堆数据到日志中, 消费者也是每次线性读取一堆数据

This simple optimization produces orders of magnitude speed up. Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers.

这种简单的优化可以提升大量的性能, 批量处理导致大的网络数据包, 大的磁盘顺序读写, 连续的内存块等等, 所有的这些能把kafka的间接性的随机消息写改成线性写入后, 发送给消费者

The other inefficiency is in byte copying. At low message rates this is not an issue, but under load the impact is significant. To avoid this we employ a standardized binary message format that is shared by the producer, the broker, and the consumer (so data chunks can be transferred without modification between them).

另外一个低效率的地方是字节拷贝。在消息吞吐量不多的时候这不是一个问题,但在高负载下的影响是非常显著。为了避免这种情况,我们在生产者、服务器和消费者间使用一个标准化的二进制消息格式(这样数据块可以在它们之间直接进行传输而不需要再做修改)。

The message log maintained by the broker is itself just a directory of files, each populated by a sequence of message sets that have been written to disk in the same format used by the producer and consumer. Maintaining this common format allows optimization of the most important operation: network transfer of persistent log chunks. Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.

服务器端使用文件的形式维护消息日志, 所有的消息都按提供者和消费者使用的格式顺序写入到磁盘中, 维护这样的格式需要优化最常用的一些操作: 对持久日志块的网络传输. 现在的 unix 操作系列通常都有提供高效的优化代码直接把数据从缓存页发送到 socket, 在 linux 下使用 sendfile 的系统调用

To understand the impact of sendfile, it is important to understand the common data path for transfer of data from file to socket: 如果要理解一下 sendfile 调用的功效, 需要了解下正常情况下数据从文件发送到 socket 的过程

  1. The operating system reads data from the disk into pagecache in kernel space  操作系统从磁盘读取数据到系统内核空间的缓存页中
  2. The application reads the data from kernel space into a user-space buffer 应用从内核空间读取数据到用户空间缓冲区中
  3. The application writes the data back into kernel space into a socket buffer 应用从把数据写回到内核空间的 socket 缓冲区中
  4. The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network 系统拷贝 socket 缓冲区的数据到网卡缓冲区, 然后由网卡发送数据到网络中

This is clearly inefficient, there are four copies and two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed.

这很明显很没效率, 有 4 次拷贝还有 2 次系统调用, 如果使用 sendfile 命令, 重新拷贝运行系统直接把数据从缓存页拷贝到网络, 优化后, 最终只需要一次从缓存页到网卡缓冲区拷贝

We expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection.

我们预期一个常见消费方式是使用多个消费者同时消费一个主题, 使用上面提到的 zero-copy 的优化方式, 数据只被拷贝到页缓存一次, 并被多次消费, 而不是缓存到(用户空间的)内存中, 然后在每次消费时拷贝到系统内核空间中. 这可以使消费者消费消息的速度达到网络连接的速度.

This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache.

组合页缓存和 sendfile 机制后, kafka集群在跟上消费者消费的同时, 让你觉得好像没有多少的磁盘读活动, 因为大部分的数据响应需求都是从缓存获取的.

For more background on the sendfile and zero-copy support in Java, see this article.

如果想要知道更多关于 java 对 sendfile 和 zero-copy 的支持, 可以阅读这篇文章 article.

End-to-end Batch Compression 端到端的数据压缩

In some cases the bottleneck is actually not CPU or disk but network bandwidth. This is particularly true for a data pipeline that needs to send messages between data centers over a wide-area network. Of course, the user can always compress its messages one at a time without any support needed from Kafka, but this can lead to very poor compression ratios as much of the redundancy is due to repetition between messages of the same type (e.g. field names in JSON or user agents in web logs or common string values). Efficient compression requires compressing multiple messages together rather than compressing each message individually.

在大部分情况下, 瓶颈不会是 cpu 或磁盘, 而是网络带宽. 这个在数据中心之间建立需要跨越广域网发送消息的数据管道时更为明显, 当然用户可以独立于 kafka 自己做消息压缩, 但是这有可能由于消息类型冗余, 导致压缩比例很低(例如, json 的字段名, 或 web 中的用户代理日志, 或常用的字符串值), 有效的压缩方式应该是允许压缩重复的消息, 而不是分别压缩单个消息

Kafka supports this by allowing recursive message sets. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.

kafka 通过递归的消息集合支持这样的操作. 一批的消息可以被收集在一起后压缩, 并发送到服务器端. 这样被压缩的一批数据, 在日志也是使用压缩的格式, 只有在消费者消费的时候才会被解压

Kafka supports GZIP, Snappy and LZ4 compression protocols. More details on compression can be found here.

kafka 支持 GZIP, Snappy and LZ4 压缩协议, 更多关于压缩的细节可以查看这里 here.

4.4 The Producer 发布者

Load balancing 负载均衡

The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. To help the producer do this all Kafka nodes can answer a request for metadata about which servers are alive and where the leaders for the partitions of a topic are at any given time to allow the producer to appropriately direct its requests. 为了让生产者实现这个功能, 所有的 kafka 服务器节点都能响应这样的元数据请求: 哪些服务器是活着的, 主题的哪些分区是主分区, 分配在哪个服务器上, 这样提供者就能适当地直接发送它的请求到服务器上.

生产者之间发送数据到主分区的服务器上, 不需要经过任何中间路由.

The client controls which partition it publishes messages to. This can be done at random, implementing a kind of random load balancing, or it can be done by some semantic partitioning function. We expose the interface for semantic partitioning by allowing the user to specify a key to partition by and using this to hash to a partition (there is also an option to override the partition function if need be). For example if the key chosen was a user id then all data for a given user would be sent to the same partition. This in turn will allow consumers to make locality assumptions about their consumption. This style of partitioning is explicitly designed to allow locality-sensitive processing in consumers.

客户端控制消息发送数据到哪个分区,  这个可以实现随机的负载均衡方式. 或者使用一些特定语义的分区函数, 我们有提供特定分区的接口让用于根据指定的键值进行 hash 分区(当然也有选项可以重写分区函数), 例如, 如果键值使用用户 ID, 则用户相关的所有数据都会被分发到同一个分区上. 这允许消费者, 在消费数据时做一些特定的本地化处理. 这样的分区风格经常被设计用于一些本地处理比较敏感的消费者

Asynchronous send 异步发送

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.

批处理是提升性能的一个主要驱动, 为了允许批量处理, kafka 提供者会尝试在内存中汇总数据, 并用一次请求批次提交信息. 批处理, 不仅仅可以配置指定的消息数量, 也可以指定等待特定的延迟时间(如 64k 或 10ms), 这允许汇总更多的数据后再发送, 在服务器端也会减少更多的 IO 操作. 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量.

Details on configuration and the api for the producer can be found elsewhere in the documentation.

更多的细节信息可以在提供者的 configuration 和 api 这里找到.

4.5 The Consumer 订阅者

The Kafka consumer works by issuing “fetch” requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.消费者通过从主分区的服务器获取数据进行消费. 消费者指定每次请求时日志的偏移量, 然后从这个位置开启批量获取数据. 消费者对位移量有绝对的控制权, 这样消费者可以重新设置位移位置, 并在有需要的时重新消费.

Push vs. pull 推送 vs 拉取

An initial question we considered is whether consumers should pull data from brokers or brokers should push data to the consumer. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some logging-centric systems, such as Scribe and Apache Flume, follow a very different push-based path where data is pushed downstream. There are pros and cons to both approaches. However, a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal is generally for the consumer to be able to consume at the maximum possible rate; unfortunately, in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model.

一个基本的问题是, 我们在考虑, 消费者是否主动从服务器那里拉去数据, 还是服务器应该主动推送数据到消费者端. 在这方面, kafka 和传统的消息吸引设计一样, 生产者推送消息到服务器, 消费者从服务器拉去消息. 在一些日志中心系统, 像 Scribe and Apache Flume, 使用一种特殊的推送流数据推送机制, 这些方式都有利有弊, 但是, 在一个基于推送方式消息系统, 很难处理大量的消费者, 因为服务器需要控制数据的传输速率. 目标是为了让消费者尽可能多消费数据;不幸的是,在一个推送系统,这意味着消费者往往被消息淹没,如果消费率低于生产速度(例如密集的服务攻击). 基于拉去的系统往往比较优雅些, 消息处理只是落后, 消费者在后面尽可能赶上.

Another advantage of a pull-based system is that it lends itself to aggressive batching of data sent to the consumer. A push-based system must choose to either send a request immediately or accumulate more data and then send it later without knowledge of whether the downstream consumer will be able to immediately process it. If tuned for low latency, this will result in sending a single message at a time only for the transfer to end up being buffered anyway, which is wasteful. A pull-based design fixes this as the consumer always pulls all available messages after its current position in the log (or up to some configurable max size). So one gets optimal batching without introducing unnecessary latency.

使用基于拉取方式的系统还有一个好处就是容易汇集批量数据后发给消费者. 基于推送的系统, 要么马上发送请求, 要么汇总数据后再发送, 而不光下游的消费者是否能够处理得上. 如果为了进一步降低延迟, 这会导致缓存还没有结束时就传输单条数据过去, 这样很浪费. 基于拉的方式可以从当前日志位置拉去可用的消息(或者根据配置的大小). 这样能在没有引入不必要的延迟的情况下, 获取到比较好的批处理性能.

The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a “long poll” waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).

基于拉取方式的系统不足的地方是如果没有任何数据, 消费者就要循环检测, 使用空轮询的繁忙检测方式等候数据到来.为了避免这一点,我们可以设置拉请求的参数,允许消费者请求在“长轮询”时阻塞,直到数据到达.

You could imagine other possible designs which would be only pull, end-to-end. The producer would locally write to a local log, and brokers would pull from that with consumers pulling from them. A similar type of “store-and-forward” producer is often proposed. This is intriguing but we felt not very suitable for our target use cases which have thousands of producers. Our experience running persistent data systems at scale led us to feel that involving thousands of disks in the system across many applications would not actually make things more reliable and would be a nightmare to operate. And in practice we have found that we can run a pipeline with strong SLAs at large scale without a need for producer persistence.

你可以想象一些其他从端到端的一些可能性设计. 生产者把记录写入到本地日志中, 服务器将从消费者拉取的数据中拉取. 一种类似的储存和转发的生产者模型经常被提议. 这虽然挺有趣的, 但不适合有成千上万生产者的情况. 在我们大规模运行数据储存系统的经验来看, 成千上万的磁盘跨越多个应用并不让系统更为可靠, 操作起来将会是一个噩梦. 在实践中, 我们发现可以创建具有很强壮的 SLAs 保障的, 大规模的管道, 并且不需要提供者有持久化能力.Keeping track of what has been consumed is, surprisingly, one of the key performance points of a messaging system.令人惊讶的是,跟踪已消耗的内容是消息传递系统的关键性能点之一.。

Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structures used for storage in many messaging systems scale poorly, this is also a pragmatic choice–since the broker knows what is consumed it can immediately delete it, keeping the data size small.

大部分的消息系统在服务器端记录哪些消息被消费的元数据信息.  那就是, 消息被发送给消费者时, 服务器要么在本地马上记录日志, 要么等待消费者反馈后记录. 这样的话相当不直观, 事实上,对于一台服务器, 很难理清楚这个状态到底去哪里了. 因为在大部分的消息储存系统中, 数据结构很难被扩展, 这也依赖于编程的语义, 如果服务器知道消息被消费后可以马上删除, 那么就可以维持比较小的数据集.

What is perhaps not obvious is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

碰巧不太明显的是, 让服务器和消费者对已经消费的数据达成一致并不是一件简单的事情. 如果服务器在每次数据分发出去后, 马上标记消息已经被消费了, 如果消费者处理消息失败了(例如宕机了), 那么消息可能会丢失. 为了解决这个问题, 很多消息系统添加了反馈机制, 用于标记消息已经被发送, 而不是被消费, 服务器等待消费者发送一个反馈来确认消息已经 被消费. 这个策略解决消息丢失的问题, 但是同时也引发新的问题. 首先, 如果消费者已经消费了记录, 但是在反馈时失败, 则有可能重复消费两次. 其次, 是多一个来回的性能损耗, 现在服务器就要为每个消息保存不同的状态(先锁定, 这样不会发送第二次, 然后标记为永久消费后, 才能把它删除). 还有些麻烦的问题需要处理, 比如消息被发送 了, 但是从来没有接受到反馈.

Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by exactly one consumer within each subscribing consumer group at any given time. This means that the position of a consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

kafka 使用不一样的处理方式, 主题被划分成一系列有序的分区集合, 每个分区在一个时刻仅被订阅分组中的一个消费者消费. 这意味这每个消费者在一个分区位置就只是一个数值, 用于记录下一次消息要被消费的位置. 这意味着记录消费者状态的代价非常小, 只是每个分区一个数值. 这个状态可以定期做检查点, 这使等价的消息反馈代价非常小.

There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

这个方案还有另外的好处, 消费者可以优雅地重新指定一个旧的位移位置, 并重新消费数据. 这个和通常的队列观念有点相悖, 但是对很多消费者来说是一个很重要的特性. 例如,如果消费代码有 bug,并且在一些消息被消费后发现,一旦 bug 被修复,消费者可以重新使用这些消息.。

Offline Data Load 离线数据加载

Scalable persistence allows for the possibility of consumers that only periodically consume such as batch data loads that periodically bulk-load data into an offline system such as Hadoop or a relational data warehouse.可扩展的持久性储存能力, 使得消费者能定期批量把数据导入到离线系统中, 如:Hadoop 或关系型数据仓库.

In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data—they simply restart from their original position.

在 hadoop 的例子中, 我们通过把数据分发到独立的任务集中进行并行处理, 每个的单位是按服务器/主题/分区, 这样可以允许很好的并发数据加载处理. Hadoop 提供任务管理, 任务可以在失败是重新启动, 而不用担心会重复处理数据–只需要简单从他们原来处理的位置重新开始.


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:《kafka中文手册》- 构架设计(一)
喜欢 (0)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址