你好,我是陈阳,今天我和你分享的主题是“Kafka是如何封装NIO的?(上篇)”。
从这一讲开始,我们就来开启Kafka客户端生产者代码的学习之旅。
对于生产者这个模块,我会采用自底向上的讲解方式:先从底层通信开始,然后讲解底层数据的整合,再讲上层业务流程,最后把整体架构都串起来。一开始可能会比较基础,甚至你会感觉跟实际工作关系不大,但正是由于这些底层的实现才真正决定了Kafka的一些特性,掌握了这些底层的机制,你在实际工作中才能更好地去驾驭、运维和优化Kafka。
Kafka是一个分布式系统,生产者和服务端在不同的机器上,这就会涉及网络通信,Java系统的网络通信一般会用Java多路复用的实现库NIO,同时Kafka内部对JavaNIO封装了统一的框架,实现了多路复用的网络IO操作。之所以Kafka要自己实现JavaNIO的封装,而不采用类似Netty那样NIO封装好的类库,就是因为自己实现的更加适合Kafka的一些特性。
为便于你更好地理解,这里我用一个思维导图大体描述下这一讲我们要讲的知识点:
JavaNIO简单介绍这里我们介绍几个JavaNIO的组件。
Buffer:缓冲区。这是一个接口,Kafka用它的ByteBuffer实现类,配合SocketChannel实现读写操作。读的时候,调用(buffer)把SocketChannel的数据读到ByteBuffer内;写的时候,调用(buffer)把Buffer中的数据写到SocketChannel内。
SocketChannel:网络连接通道。字节数据的读写都发生在这个通道上,包括从通道中读出数据、将数据写入通道。
SelectionKey:选择键。每个Channel向Selector注册标识时,都将会创建一个SelectionKey。SelectionKey里可以定义Selector监听SocketChannel的事件,包括连接、读、写事件。
Selector:选择器。SelectionKey先把SocketChannel注册到Selector上,然后就能监听网络连接、读、写事件。
Kafka对JavaNIO的封装下面我列出了Kafka对JavaNIO封装后的组件,以及和NIO组件的对应关系。
Kafka自己的Selector类:对NIO中Selector的封装。
TransportLayer:对NIO中SocketChannel和SelectionKey的封装。TransportLayer是一个接口,实现类有PlaintextTransportLayer和SslTransportLayer,其中,PlaintextTransportLayer是明文网络传输,SslTransportLayer是SSL加密网络传输,这一讲我们只涉及明文网络传输的讲解。
NetworkReceive:对NIO中读Buffer的封装。
NetworkS:对NIO中写Buffer的封装。
KafkaChannel:把TransportLayer、NetworkReceive和NetworkS又做了一次封装,这样用起来比较方便,就不用关心底层的组件了。
Kafka对NIO中的SelectionKey并没有封装,而是直接使用。
它们之间的关系如下图所示:
KafkaChannel上的读写操作图
我简单给你介绍一下Kafka封装JavaNIO后处理读写的流程:Selector监听到客户端的读写事件后,会获取绑定在选择键上的KafkaChannel;KafkaChannel会把读写操作交给TransportLayer,TransportLayer进一步会把读写操作交给SocketChannel完成数据的发送。数据的接收步骤也是类似的,你可以结合操作图对比了解一下。
接下来我主要为你介绍TransportLayer、NetworkReceive、NetworkS、KafkaChannel这几个组件,另外一个Selector的封装会在下一讲重点讲解。
TransportLayer
TransportLayer是对NIO中SocketChannel的封装。它的实现类有2个:
PlaintextTransportLayer类,对接口TransportLayer的明文传输的实现;
SslTransportLayer类,对接口TransportLayer的SSL加密传输的实现。
其中PlaintextTransportLayer是比较有代表性的,我们主要学习这个类。如果你对另一个实现感兴趣,可以自己去源码中找到SslTransportLayer类去研究相关代码。
PlaintextTransportLayer这个类的字段和定义比较简单,代码如下:
publicclassPlaintextTransportLayerimplementsTransportLayer{privatefinalSelectionKeykey;//javanio中的事件//javanio中的SocketChannelprivatefinalSocketChannelsocketChannel;//Kafka的安全相关字段privatefinalPrincipalprincipal=;publicPlaintextTransportLayer(SelectionKeykey)throwsIOException{=key;=(SocketChannel)();}我们先了解下其中的字段。
key:NIO中SelectionKey类的对象引用。
socketChannel:NIO中SocketChannel类的对象引用。
由此可以看出,PlaintextTransportLayer就是对NIO中SelectionKey和SocketChannel的封装。
在类的定义中,我们可以构造方法参数SelectionKey的类对象,构造方法会把SelectionKey的类对象赋给key,然后从key中取出对应的SocketChannel给socketChannel,这样就完成了初始化。
类初始化完成以后,下面我们看看相关的重要方法是怎么使用这两个NIO组件的。这里我将讲解finishConnect()、read()和write()这三种方法。
finishConnect()方法用于完成网络连接,代码如下所示:
这个方法首先调用NIO组件socketChannel的finishConnect()方法看是否连接成功,如果连接成功就取消对连接事件的监听,同时增加对读事件的监听(因为连接好以后就有可能接收到数据了),最后方法返回网络连接是否成功。
read()方法是把socketChannel里的数据读到缓冲区ByteBuffer中,具体是调用NIO的socketChannel的read方法,代码如下所示:
/***从channel中读一个byte序列到给定的ByteBuffer中*/@Overridepublicintread(ByteBufferdst)throwsIOException{//调用nio的通道实现数据的读取(dst);}write()方法是把缓冲区ByteBuffer的数据写到SocketChannel里,具体是调用NIO的socketChannel的Write方法,代码如下所示:
/**把ByteBuffer中Byte序列写到socketChannel中*/@Overridepublicintwrite(ByteBuffersrc)throwsIOException{(src);}对于JavaNIO来说,一次读写不一定能把数据读写完,这样就需要判断读写是否完成,没有读写完的数据还需要继续执行读写操作,这样的操作涉及的步骤过于烦琐,显然对上层逻辑不是很友好。
于是Kafka内部把ByteBuffer进行了封装,并按读和写封装成NetworkReceive和NetworkS,上层调用方不用关心读写是否完成,NetworkReceive和NetworkS自己会做判断和处理。
NetworkReceiveNIO中网络数据的读取要通过Buffer来实现,NetworkReceive这个类就是对读取时的Buffer的封装。
其中,NetworkReceive类的字段如下所示:
publicclassNetworkReceiveimplementsReceive{privatefinalStringsource;//channelidprivatefinalByteBuffersize;//存储数据长度的ByteBufferprivatefinalintmaxSize;//数据的最大长度privatefinalMemoryPoolmemoryPool;//ByteBuffer池privateByteBufferbuffer;//存储数据体的ByteBuffersource:channnelid,用来确定这个NetworkReceive是和哪个channel配套使用的。
size:存储数据长度的ByteBuffer。
maxSize:数据的最大长度,这里的数据长度是指接收数据的最大字节数。
memoryPool:用来分配和管理数据体ByteBuffer的组件。
buffer:存储数据体的ByteBuffer。
NetworkReceive类的定义如下代码所示:
publicNetworkReceive(intmaxSize,//能接收的最大消息Stringsource,//channelidMemoryPoolmemoryPool//内存池){=source;=(4);//分配4个字节大小的数据长度=null;=maxSize;=memoryPool;}这里我重点说一下size字段的初始化,其中存储数据长度的ByteBuffer是由4个字节的ByteBuffer定义的,也就是32位,和Javaint类型占用的字节相同,取值最大约等于21G,足以满足表示消息长度的数值。
说完这个类的字段和定义,下面我们再来分析下其包含的readFrom()方法。
readFrom()方法表示把channel中的数据读到NetworkReceive中的字段,包括表示消息长度的字段size和表示消息体本身的字段buffer,代码如下所示:
//把channel里的数据读到ByteBuffer中publiclongreadFrom(ScatteringByteChannelchannel)throwsIOException{//总读取数据大小intread=0;//1.判断数据长度的缓存是否读完,没有读完接着读if(()){//2.读取数据的长度intbytesRead=(size);if(bytesRead0)thrownewEOFException();//每次读取后,读取长度加到总读取长度里read+=bytesRead;//3.如果数据长度的缓存读完了if(!()){();//4.读取数据长度intreceiveSize=();//5.如果有异常就抛出if(receiveSize0)thrownewInvalidReceiveException("Invalidreceive(size="+receiveSize+")");if(maxSize!=UNLIMITEDreceiveSizemaxSize)thrownewInvalidReceiveException("Invalidreceive(size="+receiveSize+"largerthan"+maxSize+")");requestedBufferSize=receiveSize;if(receiveSize==0){buffer=EMPTY_BUFFER;}}}//6.如果数据体ByteBuffer还没有分配,且requestedBufferSize没有赋值,就分配requestedBufferSize字节大小的内存空间if(buffer==nullrequestedBufferSize!=-1){//weknowthesizewewantbuthaventbeenabletoallocateityetbuffer=(requestedBufferSize);if(buffer==null)("Brokerlowonmemory-couldnotallocatebufferofsize{}forsource{}",requestedBufferSize,source);}//7.如果ByteBuffer分配成功就把channel里的数据读到buffer中if(buffer!=null){intbytesRead=(buffer);if(bytesRead0)thrownewEOFException();read+=bytesRead;}returnread;可以看到,readFrom()方法主要是把对应channel中的数据读到ByteBuffer中,具体的步骤如下。
判断size是否读完了,如果没读完就接着读。因为接收数据的前4个字节表示响应头,而size长度也是4个字节,所以正好读完响应头,响应头表示的是数据体的长度。
通过调用JavaNIO底层的方法(size),把数据体的大小读到size中。
把读取的长度累加到总长度中。
如果表示size的数据读完了,就把size的int数值赋给receiveSize,receiveSize表示响应体的长度。
如果有异常就抛出,包括数据体的长度大于最大长度、数据体的长度无效等。
如果数据体ByteBuffer还没有分配,且requestedBufferSize没有赋值,就分配requestedBufferSize字节大小的内存空间。
如果ByteBuffer分配成功,就把channel里的数据读到表示响应体的buffer中。
NetworkS读Buffer的封装讲完了,我们接着讲讲用来写的Buffer。下图是写Buffer的相关接口和类的关系图:
写Buffer的相关接口和类的关系图
我们先看一下接口S都定义了哪些方法。
接口S定义了发送数据buffer的方法,如下所示:
publicinterfaceS{/***channelid*/Stringdestination();/***数据是否发送完成*/booleancompleted();/***把数据写到channel中*/longwriteTo(GatheringByteChannelchannel)throwsIOException;/***发送数据的大小*/longsize();这里我简单介绍下代码中所包含方法的含义。
destination():要把数据写入channelid。
completed():要发送的数据是否发送完了。
writeTo():把数据往指定的channel里写。
size():发送数据的大小。
ByteBufferS这个抽象类实现了上面的接口S,也实现了数据从ByteBuffer数组发送到channel,对应字段如下所示:
privatefinalStringdestination;//channelidprivatefinalintsize;//一共要写多少字节protectedfinalByteBuffer[]buffers;//用于写入channel里的ByteBuffer数组privateintremaining;//一共还剩多少字节没有写完privatebooleanping=false;publicByteBufferS(Stringdestination,ByteBufferbuffers){=destination;=buffers;for(ByteBufferbuffer:buffers)remaining+=();=remaining;//计算需要写入字节的总和}其中,重要的字段有以下几个。
buffers:ByteBuffer数组,承载了要写进channel的数据。
remaining:表示ByteBuffer数组内所有ByteBuffer还剩多少字节没有写。
size:需要往channel中写入多少字节。
destination:这里指channelid,就是数据写到哪里。
下面我重点介绍下writeTo()方法,这是负责真正写数据的方法,与readFrom()读数据是对应的。其他方法比较简单,你可以自行学习。
writeTo()方法是把buffer数组写入SocketChannel中,代码如下所示:
//把buffer数组写入传输层中@OverridepubliclongwriteTo(GatheringByteChannelchannel)throwsIOException{//1.调用javanio底层方法把buffers写入传输层,并返回写入的字节数longwritten=(buffers);if(written0)thrownewEOFException("'thappen.");//2.修改还剩多少字节没有写进传输层remaining-=written;ping=(channel);returnwritten;}可以看到,writeTo()方法首先把buffers写入channel中,我前面说过,写一次不一定能把数据全都写成功,底层()会返回一个“写成功了多少字节”的返回值,我们利用这个返回值就能知道调用一次写操作究竟写入了多少字节。
NetworkS这个类继承了ByteBufferS,是我们真正用于写Buffer的类,字段如下所示:
publicclassNetworkSextsByteBufferS{//实例化publicNetworkS(Stringdestination,ByteBufferbuffer){super(destination,sizeBuffer(()),buffer);}//构造4个字节的sizeBufferprivatestaticByteBuffersizeBuffer(intsize){//声明4个字节的ByteBufferByteBuffersizeBuffer=(4);(size);//写结束,更新postion的位置();returnsizeBuffer;}}NetworkS类实例化的过程是:先分配长度为4个字节的ByteBuffer的变量sizeBuffer,再把要发送的数据长度赋值给sizeBuffer。这样sizeBuffer的字节数和sizeBuffer的数据就都有了,正好对应了前面NetworkReceive类对ByteBuffer的处理。
KafkaChannel到这里,读写缓存区的内容我们就讲完了,接下来我再给你分析一下KafkaChannel,这个类封装了我们上面讲的TransportLayer、NetworkReceive、NetworkS的使用,代码如下所示:
publicclassKafkaChannelimplementsAutoCloseable{忽略privatefinalStringid;//channelidprivatefinalTransportLayertransportLayer;//传输层对象privatefinalSupplierAuthenticatorauthenticatorCreator;privatefinalintmaxReceiveSize;//能收到请求的最大字节数privatefinalMemoryPoolmemoryPool;//负责分配指定大小的ByteBuffer//读时的缓存privateNetworkReceivereceive;//写时的缓存privateSs;privatebooleandisconnected;//是否连接关闭privateChannelStatestate;//连接状态privateSocketAddressremoteAddress;//要连接的远端地址publicKafkaChannel(Stringid,TransportLayertransportLayer,SupplierAuthenticatorauthenticatorCreator,intmaxReceiveSize,MemoryPoolmemoryPool,ChannelMetadataRegistrymetadataRegistry){=id;=transportLayer;=authenticatorCreator;=();=0L;=maxReceiveSize;//可接收的最大字节数=memoryPool;=metadataRegistry;=false;=_MUTED;=_CONNECTED;其中包含的字段如下。
id:channelid。
transportLayer:传输层对象,用于调用SocketChannel的方法。
maxReceiveSize:能收到请求的最大字节数。
memoryPool:负责分配指定大小的ByteBuffer,对ByteBuffer进行管理。
receive:NetworkReceive类的实例。读时的缓存,上面介绍过,就不重复说明了。
s:NetworkS类的实例。写时的缓存,上面介绍过,就不重复说明了。
disconnected:channel连接是否关闭。
state:ChannelState类的实例,表示KafkaChannel的状态。
remoteAddress:要连接的远端地址。
KafkaChannel的状态有以下6种。
NOT_CONNECTED:表示远端服务器不可用。
AUTHENTICATE:处于SSL验证状态。这是SSL等加密连接时的状态,用于SSL握手时的状态描述,明文连接不会有这个状态。
READY:表示连接成功。
EXPIRED:表示连接超时而关闭。
FAILED_SEND:表示连接因为发送失败而关闭。
LOCAL_CLOSE:表示主动把连接关闭。
接下来我们继续讲解KafkaChannel这个类包含的方法,主要有四种:setS()、write()、read()和maybeCompleteReceive()。
我们先来看一下setS()这个方法:
write()方法是把保存在s字段上的数据真正发送出去,如下所示:
//调用写操作publiclongwrite()throwsIOException{if(s==null)return0;midWrite=true;//调用传输层把数据真正发送出去(transportLayer);}首先判断要发送的s字段是否为零,为零说明缓存在KafkaChannel的Buffer都发送出去了,就不用再发送了;如果不为零就调用上面我们讲的NetworkS类中的writeTo()方法把数据通过网络IO发送出去。
read()方法是把从网络IO中读出的数据保存到字段NetworkReceive中,我们通过代码了解下这个方法:
//接收数据publiclongread()throwsIOException{if(receive==null){receive=newNetworkReceive(maxReceiveSize,id,memoryPool);}longbytesReceived=receive();忽略returnbytesReceived;}首先实例化一个NetworkReceive对象,再调用receive()方法把channel的数据读到NetworkReceive对象中,最后返回读到的字节数。如果没有读完下次还是读这个NetworkReceive对象,如果读完了就新创建一个NetworkReceive对象。
maybeCompleteReceive()这个方法用来判断读Buffer是否读完,我们同样结合代码了解其方法逻辑:
publicNetworkReceivemaybeCompleteReceive(){if(receive!=()){().rewind();NetworkReceiveresult=receive;receive=null;returnresult;}returnnull;}判断是否读完的条件是NetworkReceive里的buffer是否用完,包括上面说过的表示buffer长度的ByteBuffer和请求本身的ByteBuffer。这两个都读完才算真正读完了。
总结这一讲我们介绍了Kafka对JavaNIO的封装,包括SocketChannel和ByteBuffer。
SocketChannel封装类TransportLayer实现了最基础的网络连接、网络读、网络写操作。其中,负责明文传输的是PlaintextTransportLayer类。
最后,KafkaChannel针对上述封装为上层提供了更加友好的网络连接、读写。从这里我们能看到Kafka对底层封装的效果,KafkaChannel的代码更加适合Kafka的实际业务,同时代码有层次、扩展性也非常好。
课后思考与预告我们知道在JavaNIO中读写数据少不了ByteBuffer这个“搬运工”,在用定长ByteBuffer读取数据时,我们不确定读一次能否用完ByteBuffer的空间,所以Buffer提供了hasRemaining()来判断是否还有剩余空间。其实,NetworkReceive类里还有一个方法也有同样的作用,你能发现吗?
欢迎你把答案放到留言区,我们一起交流和学习。





