源码先锋

源码先锋

Kafka为什么这么快?

admin 86 108

题上的“快”过于口语化,准确来讲是“高吞吐量”,也就是为什么Kafka每秒钟可处理百万级别的消息。

面试官通常希望,候选人能够逻辑清晰地从Kafka底层原理的角度,盘点出若干个高吞吐量的原因。

在本文中,我们就来针对这个问题梳理一下。

批处理机制

在Kafka的内部实现中,无论是生产者发送消息给Broker,还是Broker将消息落盘持久化,以及消费者从Broker上拉取消息,都是以批处理的方式进行的。

这是Kafka实现高吞吐量的核心设计之一。

1、生产者端

Kafka生产者端有一个非常重要的参数,,默认值为16384字节,16KB。

该参数为消息发送的批次大小,在追求高吞吐量的情况下,生产者并不是一条条发送消息给同一个分区的,而是在内存缓冲区中攒成一个批次再进行发送。

如果我们把该参数值设置得大些,可以攒一个大的batch后再发送,这样吞吐量就可以进一步提升。

不过,这个参数需要另一个参数进行配合,两者相辅相成完成生产者发消息的控制,那就是,默认值为0。

该参数会跟配合使用,表示等待生产者消息攒成批次的时间。

生产者会在消息攒成大小或达到时间的情况下,将消息发送出去。

如果将的值设置为0的话,这就意味着生产者没有给消息留攒成批次的时间,还是按照一条条发送的。

如果我们想要优化生产者的吞吐量,这个值一定不能设置为默认值。

2、Broker端

Kafka的Broker端接收到生产者端发送过来的消息,会以批次追加写入(App)的方式将其保存到分区的日志分段中,这样可以减少磁盘IO次数,提升写入的吞吐量。

3、消费者端

我们来看一个消费者端的代码片段:

while(true){ConsumerRecordsString,Stringrecords=((100));for(ConsumerRecordString,Stringrecord:records){//处理消息("Offset=%d,Key=%s,Value=%s%n",(),(),());}}

从代码的处理逻辑中可以看到,消费者端一样是以批次为单位进行消息拉取并处理的。

消费者端的参数,用于控制单次调用poll()方法能够返回批次的消息数量,默认是500。

分区机制

Kafka的分区机制是实现高吞吐量的另一个核心设计,通过将一个主题的消息分散到多个Broker的分区上,以此实现消息的并行发送、接收、保存和处理。

如果Kafka消息的键值为null,并且使用了默认的分区器,分区器会使用轮询(RoundRobin)算法将消息分配到不同Broker的分区上。

反之,如果Kafka消息的键值不为null,并且使用了默认的分区器,分区器会对键进行散列,然后根据散列值将消息分配到不同Broker的分区上。

这样一来,就可以实现Kafka分区机制的负载均衡性。

零拷贝机制

Kafka的零拷贝机制,是通过减少消息数据在内核态和用户态之间的拷贝次数,来达到提升数据传输效率的。

1、传统拷贝机制

在该机制中,需经历4次数据拷贝和4次上下文切换,才能完成当消费者从Broker拉取消息时,Broker从磁盘中读取消息数据并发送到网卡缓冲区上。

如下图所示:

(1)KafkaBroker磁盘—ReadBuffer(一次DMACopy,用户态—内核态)

(2)ReadBuffer—APPBuffer(一次CPUCopy,内核态—用户态)

(3)APPBuffer—SocketBuffer(一次CPUCopy,用户态—内核态)

(4)SocketBuffer—NICBuffer(一次DMACopy,内核态—用户态)

术语解释:

DMACopy:DirectMemoryAccess,数据直接在内存磁盘、网卡之间,或内存不同区域之间传输,无需CPU参与介入,与之相对应的是CPUCopy。

ReadBuffer:操作系统的PageCache。

SocketBuffer:操作系统用来管理数据包的缓冲区。

NICBuffer:网卡缓冲区。

2、零拷贝机制

如上图所示,通过零拷贝机制,KafkaBroker磁盘上的数据读取到ReadBuffer后,不再需要拷贝到APPBuffer中,而是直接拷贝到NICBuffer中。

图中的步骤二只是通过DMA的scatter/gather操作,将ReadBuffer数据指针存储在SocketBuffer中,并让DMA直接从内存中进行地址读取。

通过零拷贝机制优化后,4次上下文切换变成了2次,4次数据拷贝只剩下2次DMA数据拷贝+一次CPU指针拷贝,而两次最消耗CPU资源的CPU数据拷贝操作则不再需要了。

双线程机制

生产者端发送消息的代码如下:

ProducerRecordString,Stringrecord=newProducerRecord("Topic1","12345","order_event");(record);

代码实现非常简单,但其底层的处理机制则复杂很多,核心是通过双线程(主线程、Ser线程)并行机制各自处理不同逻辑,并提升整体生产者吞吐量的。

主线程负责消息创建,然后会依次经过拦截器、序列化器和分区器,并将消息缓存在消息累加器中。

随后,Ser线程再从消息累加器中获取批次消息,并完成后续消息发送逻辑。

压缩机制

一般来讲,绝大多数的业务系统都不属于CPU密集型,CPU占用率不会太高,此时我们可以对消息进行压缩,以达到减少数据量,提升吞吐量的目的。

生产者端的参数用于压缩设定,其默认值为none,不进行压缩,我们可以选择gzip、snappy、lz4、zstd等压缩算法。

其中,zstd的压缩率最高,适用于磁盘存储和网络传输占用少的场景,lz4的压缩、解压速度最快,且CPU占用率低,适用于高并发的业务场景。

Kafka的压缩机制在生产者端执行,生产者将多条消息合并到一个批次中,并选择合适的压缩算法对整个批次进行压缩,再发送到Broker上。